Skip to content

Projeção de séries de treino

Por que esta etapa existe

O snapshot canônico responde à pergunta: “qual foi o input de demanda aceito para este ciclo?”. A projeção de séries responde a outra pergunta: “qual é a unidade de treino que esta configuração de modelo quer aprender?”.

Essas perguntas são separadas no código. materialize_snapshot valida o arquivo bruto e grava o frame canônico. project_training_series lê esse frame, aplica a series_projection_recipe resolvida para o ModelDefinition e grava artefatos prontos para o treino.

O ModelDefinition é o seletor operacional desta etapa. A DAG não recebe uma lista de séries nem consulta uma atribuição granular por SeriesKey. Ela recebe organization_id, model_definition_id e snapshot_id; a configuração ativa desse ModelDefinition define a series_projection_recipe; e a projeção grava todas as séries que essa receita consegue derivar do snapshot.

Assim, uma organização pode ter múltiplos ModelDefinitions, mas cada execução de treino ou predição escolhe um deles. Se a organização precisa de políticas distintas para populações diferentes, ela deve manter definições distintas e disparar o workflow com o model_definition_id correspondente. No estado atual, filtros por série individual, listas manuais de SeriesKey para treino e regras de roteamento como “categoria X usa definição A” ainda não são conceitos first-class do fluxo.

Essa separação evita dois problemas:

  • o snapshot canônico não fica acoplado ao nível de série escolhido por um modelo;
  • o treino não precisa projetar e revalidar o dataset inteiro dentro da task que executa candidatas.

Fluxo

flowchart LR A["DemandSnapshotManifest<br/>data_file_uris"] --> B["SparkParquetStore.read_frame"] B --> C["write_projected_series_frame"] C --> D["series_projection_recipe"] D --> E["projected-series<br/>partitionBy(series_id)"] D --> F["projected-series-index"] E --> G["manifest.projected_series_data_uri"] F --> H["manifest.projected_series_index_uri"] G --> I["PolarsTrainingDatasetOperations"] H --> I

Entry point

O job vive em src/infra/jobs/project_training_series.py.

Ele recebe somente strings vindas do Airflow:

  • organization_id
  • model_definition_id
  • snapshot_id

Dentro do job:

  1. JobSettings compõe ClickHouse, storage e Spark;
  2. ActiveConfigurationResolver resolve a configuração de treino vigente para a versão atual do ModelDefinition;
  3. DemandSnapshotManifestRepositoryPort.get_snapshot_manifest(...) carrega o manifest criado pela materialização;
  4. o job valida que o tenant do comando, do modelo e do manifest é o mesmo;
  5. SparkParquetStoremanifest.data_file_uris;
  6. write_projected_series_frame(...) aplica a series_projection_recipe;
  7. o manifest é salvo de novo com projected_series_data_uri e projected_series_index_uri.

O resultado é uma projeção completa para a população dessa definição no snapshot atual. A etapa seguinte, train_model_candidates, consome o índice projetado e treina por série; ela não reduz a população a partir de outro seletor.

O que entra

Entrada persistida:

  • snapshot em estado ready;
  • manifest.json com data_file_uris;
  • ModelDefinition existente no tenant;
  • revisão ativa que define series_projection_recipe.

Entrada em memória:

  • frame Spark carregado a partir dos arquivos canônicos;
  • configuração de treino resolvida.

O job não aceita payload rico do Airflow. O payload da DAG continua JSON-serializável e pequeno; composição, validação e leitura de dados acontecem dentro do processo Python.

O que sai

Saída no storage:

  • dataset projetado particionado por series_id;
  • índice das séries projetadas.

Saída no manifest:

  • projected_series_data_uri;
  • projected_series_index_uri.

Esses campos são opcionais no domínio porque o manifest existe antes da projeção. Depois que project_training_series termina, train_model_candidates espera encontrá-los para montar o TrainingDataset.

Relação com SeriesKey

series_projection_recipe transforma colunas canônicas de demanda em uma identidade semântica. O resultado é um SeriesKey com:

  • organization_id;
  • series_kind;
  • series_id;
  • grain;
  • dimensions.

series_id vira a chave de partição física. O objeto completo continua sendo a identidade de domínio usada por treino, promoção, artefatos, forecast e auditoria.

O vínculo entre SeriesKey e ModelDefinition surge da execução: uma série pertence ao workflow de uma definição quando aparece no índice projetado por aquela definição. Depois disso, o ModelInstance treinado carrega tanto o model_definition_id quanto o SeriesKey. O domínio garante uma campeã ativa por série dentro de uma definição, não uma campeã global por série em toda a organização.

Relação com Spark e Polars

Spark é usado aqui porque a etapa opera sobre o snapshot inteiro e escreve artefatos particionados. Polars é usado depois, quando o treino precisa carregar partições específicas por série e aplicar receitas de features por candidata.

Essa divisão é intencional:

EtapaEngineMotivo
materialização do snapshotSparkleitura bruta, validação e escrita canônica
projeção de sériesSparktransformação do dataset inteiro e escrita particionada
treino e prediçãoPolarsexecução por série, split temporal, features e estratégias

Falhas esperadas

CenárioComportamento
organization_id diverge entre comando e modelojob falha antes de ler o snapshot
organization_id diverge entre comando e manifestjob falha antes de gravar projeção
series_projection_recipe inválida para o frame canônicotask falha no Airflow
storage indisponíveltask falha no Airflow e segue a política de retry da DAG

Onde continuar