Projeção de séries de treino
Por que esta etapa existe
O snapshot canônico responde à pergunta: “qual foi o input de demanda aceito para este ciclo?”. A projeção de séries responde a outra pergunta: “qual é a unidade de treino que esta configuração de modelo quer aprender?”.
Essas perguntas são separadas no código. materialize_snapshot valida o arquivo
bruto e grava o frame canônico. project_training_series lê esse frame, aplica a
series_projection_recipe resolvida para o ModelDefinition e grava artefatos
prontos para o treino.
O ModelDefinition é o seletor operacional desta etapa. A DAG não recebe uma
lista de séries nem consulta uma atribuição granular por SeriesKey. Ela recebe
organization_id, model_definition_id e snapshot_id; a configuração ativa
desse ModelDefinition define a series_projection_recipe; e a projeção grava
todas as séries que essa receita consegue derivar do snapshot.
Assim, uma organização pode ter múltiplos ModelDefinitions, mas cada execução
de treino ou predição escolhe um deles. Se a organização precisa de políticas
distintas para populações diferentes, ela deve manter definições distintas e
disparar o workflow com o model_definition_id correspondente. No estado atual,
filtros por série individual, listas manuais de SeriesKey para treino e regras
de roteamento como “categoria X usa definição A” ainda não são conceitos
first-class do fluxo.
Essa separação evita dois problemas:
- o snapshot canônico não fica acoplado ao nível de série escolhido por um modelo;
- o treino não precisa projetar e revalidar o dataset inteiro dentro da task que executa candidatas.
Fluxo
Entry point
O job vive em src/infra/jobs/project_training_series.py.
Ele recebe somente strings vindas do Airflow:
organization_idmodel_definition_idsnapshot_id
Dentro do job:
JobSettingscompõe ClickHouse, storage e Spark;ActiveConfigurationResolverresolve a configuração de treino vigente para a versão atual doModelDefinition;DemandSnapshotManifestRepositoryPort.get_snapshot_manifest(...)carrega o manifest criado pela materialização;- o job valida que o tenant do comando, do modelo e do manifest é o mesmo;
SparkParquetStorelêmanifest.data_file_uris;write_projected_series_frame(...)aplica aseries_projection_recipe;- o manifest é salvo de novo com
projected_series_data_urieprojected_series_index_uri.
O resultado é uma projeção completa para a população dessa definição no snapshot
atual. A etapa seguinte, train_model_candidates, consome o índice projetado e
treina por série; ela não reduz a população a partir de outro seletor.
O que entra
Entrada persistida:
- snapshot em estado
ready; manifest.jsoncomdata_file_uris;ModelDefinitionexistente no tenant;- revisão ativa que define
series_projection_recipe.
Entrada em memória:
- frame Spark carregado a partir dos arquivos canônicos;
- configuração de treino resolvida.
O job não aceita payload rico do Airflow. O payload da DAG continua JSON-serializável e pequeno; composição, validação e leitura de dados acontecem dentro do processo Python.
O que sai
Saída no storage:
- dataset projetado particionado por
series_id; - índice das séries projetadas.
Saída no manifest:
projected_series_data_uri;projected_series_index_uri.
Esses campos são opcionais no domínio porque o manifest existe antes da projeção.
Depois que project_training_series termina, train_model_candidates espera
encontrá-los para montar o TrainingDataset.
Relação com SeriesKey
series_projection_recipe transforma colunas canônicas de demanda em uma
identidade semântica. O resultado é um SeriesKey com:
organization_id;series_kind;series_id;grain;dimensions.
series_id vira a chave de partição física. O objeto completo continua sendo a
identidade de domínio usada por treino, promoção, artefatos, forecast e
auditoria.
O vínculo entre SeriesKey e ModelDefinition surge da execução: uma série
pertence ao workflow de uma definição quando aparece no índice projetado por
aquela definição. Depois disso, o ModelInstance treinado carrega tanto o
model_definition_id quanto o SeriesKey. O domínio garante uma campeã ativa
por série dentro de uma definição, não uma campeã global por série em toda a
organização.
Relação com Spark e Polars
Spark é usado aqui porque a etapa opera sobre o snapshot inteiro e escreve artefatos particionados. Polars é usado depois, quando o treino precisa carregar partições específicas por série e aplicar receitas de features por candidata.
Essa divisão é intencional:
| Etapa | Engine | Motivo |
|---|---|---|
| materialização do snapshot | Spark | leitura bruta, validação e escrita canônica |
| projeção de séries | Spark | transformação do dataset inteiro e escrita particionada |
| treino e predição | Polars | execução por série, split temporal, features e estratégias |
Falhas esperadas
| Cenário | Comportamento |
|---|---|
organization_id diverge entre comando e modelo | job falha antes de ler o snapshot |
organization_id diverge entre comando e manifest | job falha antes de gravar projeção |
series_projection_recipe inválida para o frame canônico | task falha no Airflow |
| storage indisponível | task falha no Airflow e segue a política de retry da DAG |