Skip to content

Fluxos principais

Mapa rápido

FluxoCaso de uso / entrypointResultado
Submissão HTTP de treinosubmit_model_training_workflowsnapshot processing e DAG run no Airflow
Materializaçãoinfra/jobs/materialize_snapshot.pysnapshot canônico ready ou failed
Projeção de sériesinfra/jobs/project_training_series.pyartefatos particionados por series_id e manifest atualizado
Treino de candidatasinfra/jobs/train_model_candidates.pyinstâncias DRAFT, métricas, runs e artefatos
Promoçãopromote_champions TaskGrouppublish, commit transacional e sync de aliases
Predição batchmodel_prediction DAGForecastRuns persistidos por série ativa

Fluxo ponta a ponta

flowchart LR A["HTTP input_path"] --> B["Snapshot processing"] B --> C["Airflow DAG run"] C --> D["materialize_snapshot"] D --> E["project_training_series"] E --> F["train_model_candidates"] F --> G["promote_training_champions"] G --> H["Campeãs ativas"]

1. Comando HTTP e snapshot

  • entrada: um model_definition_id na rota, um input_path absoluto local ou s3://, apontando para o arquivo, e source_format explícito (csv ou parquet);
  • organization_id efetivo vem do JWT;
  • a API cria um DemandSnapshot em processing;
  • a API dispara a DAG model_training no Airflow;
  • se o trigger do Airflow falhar, o snapshot é marcado como failed e a API responde 503;
  • a resposta HTTP inclui run_id, dag_id, orchestrator, snapshot_id e status.

O HTTP não executa treino, materialização, retry ou polling.

O model_definition_id é a escolha operacional do forecast que será treinado. Ele aponta para uma política de treino/projeção/promoção já configurada para a organização. O request não escolhe séries individuais; a população processada é derivada depois, pela series_projection_recipe resolvida para essa definição.

2. DAG Airflow

A DAG recebe dag_run.conf com:

  • organization_id
  • model_definition_id
  • snapshot_id
  • requested_by

Ela executa materialização, treino e um grupo semântico de promoção:

materialize_snapshot
-> project_training_series
-> train_model_candidates
-> promote_champions.publish_training_champions
-> promote_champions.commit_training_champion_promotions
-> promote_champions.sync_training_champion_aliases

Cada task tem retry explícito na própria DAG:

  • retries=3
  • retry_delay=2 minutos

3. Materialização de snapshot

O job materialize_snapshot:

  1. carrega o snapshot criado pelo HTTP;
  2. valida o input bruto antes de persistir artefatos preparados;
  3. aplica canonicalização/preparação;
  4. escreve data.parquet e manifest.json no storage canônico;
  5. marca o snapshot como ready ou failed.

Garantias:

  • somente o materializador promove input_path para snapshot consumível;
  • tenant scope é validado no frame canônico;
  • o restante do treino consome apenas o snapshot preparado.

4. Projeção de séries de treino

O job project_training_series:

  1. carrega o ModelDefinition e a revisão ativa congelada para o treino;
  2. valida que organization_id do comando, do modelo e do manifest batem;
  3. lê os arquivos canônicos indicados por DemandSnapshotManifest.data_file_uris;
  4. aplica a series_projection_recipe em Spark;
  5. deriva o SeriesKey.series_id usado por treino, artefatos, versionamento e promoção;
  6. grava o dataset particionado por series_id;
  7. atualiza o manifest.json com projected_series_data_uri e projected_series_index_uri.

Essa etapa existe para separar duas responsabilidades que antes ficavam misturadas: materializar o snapshot bruto validado e construir a visão de treino do modelo ativo. O snapshot canônico continua sendo a fonte congelada do ciclo; a projeção é uma visão derivada, específica da configuração de treino.

Essa visão derivada é completa para o ModelDefinition escolhido. Se a receita alvo for sku_store diária, o job grava todas as séries SKU-loja diárias presentes no snapshot. Se a receita alvo for category_store semanal, ele grava todas as séries categoria-loja semanais deriváveis do mesmo snapshot. O fluxo atual não tem parâmetro de “treinar só esta lista de séries”.

5. Treino de candidatas

O job train_model_candidates:

  1. carrega ModelDefinition, manifest e os artefatos projetados por série;
  2. monta um TrainingRunContext com a configuração congelada;
  3. resolve ResolvedStrategyCandidate para cada estratégia habilitada;
  4. carrega somente as partições necessárias por series_id;
  5. aplica a feature_recipe da candidata ao frame da série;
  6. decide a ação de cada série (SEARCH_CANDIDATES ou REFRESH_CHAMPION);
  7. agrupa as decisões em StrategyTrainingWorkItem executáveis;
  8. avalia cada candidata em holdout temporal por série;
  9. persiste artefatos, métricas, child runs de MLflow e candidatas DRAFT.

Regras relevantes:

  • o use case de treino abre uma sessão lógica de treino quando nenhuma sessão pai é injetada; cada série cria um run pai de MLflow com nome derivado do SeriesKey, e cada candidata dessa série recebe mlflow.parentRunId;
  • o conjunto de séries vem do índice projetado pela definição escolhida no trigger; não há roteamento granular adicional entre projeção e treino;
  • séries sem campeã ativa rodam busca completa;
  • campeãs ativas só passam por refresh quando a política permite e os hashes de configuração resolvida, validação, projeção, features, parâmetros e seleção continuam compatíveis com a configuração atual;
  • se a configuração mudou, se o reaproveitamento foi desabilitado ou se a estratégia campeã saiu do catálogo habilitado, a série volta para busca completa;
  • falha de estratégia ou treino falha claramente a task, sem falso sucesso terminal;
  • SeriesKey continua sendo a identidade semântica usada para versionamento, promoção, artefatos e auditoria;
  • paralelismo de treino é configuração de execução (execution.max_workers), mantendo uma chamada de ModelStrategy.train(...) por série.

6. Promoção

O TaskGroup promote_champions:

  1. publish_training_champions recarrega candidatas duráveis, seleciona campeãs e publica versões imutáveis no registry quando MLflow está configurado;
  2. commit_training_champion_promotions promove todas as campeãs para ACTIVE em uma única gravação ClickHouse com auditoria;
  3. sync_training_champion_aliases sincroniza aliases do registry depois do commit local.

Regras relevantes:

  • o domínio permite uma única instância ativa por série;
  • a promoção usa as métricas retornadas pela estratégia e persistidas nas candidatas;
  • a promoção recebe a model_version congelada pelo treino e usa a política persistida no resolved_config_snapshot da candidata, não thresholds ou métrica primária carregados da configuração ativa no momento do retry;
  • ClickHouse é a fonte de verdade da campeã ativa; aliases do registry são uma projeção externa retryable.

A composição in-process control/modeling/workflows.py também monta um training_report.json quando ela é usada para treinar e promover no mesmo processo. O caminho Airflow atual divide treino e promoção em tasks separadas; ele não depende desse relatório para persistir candidatas, campeãs ou auditoria. Quando gerado, o relatório consolida:

  • perfil de cada série (n_periods, janela de datas, média/desvio, percentual de zeros, ADI, CV2 e segmento);
  • candidatas treinadas, métricas, tags, URI do artefato, run ID, versão da instância e configuração resolvida;
  • flags de campeã (is_champion, champion_run_id, champion_strategy_key);
  • explicação opcional do modelo, como colunas de feature e coeficientes/importâncias quando a estratégia fornece.

O relatório não define uma segunda forma de identidade de série: ele serializa o SeriesKey completo no campo series_key, incluindo organização, series_id, granularidade, tipo de série e dimensões.

Esse relatório em MLflow é observabilidade e debugging. Ele não é fonte de verdade para campeãs, auditoria de promoção, referências de serving, fingerprints de configuração, URIs necessários para serving ou métricas usadas em promoções futuras; esses dados continuam no domínio e na persistência command-side.

A classificação de séries (smooth, intermittent, erratic, lumpy, no_demand) é regra de domínio em domain.demand.training. O cálculo do perfil numérico do frame passa pela porta profile_training_series(...) de data-preparation, para que cada runtime use operações nativas e eficientes de dataframe e entregue as estatísticas ao classificador de domínio.

API HTTP

No caminho HTTP atual:

  • /v1/* exige JWT bearer;
  • o organization_id efetivo vem do token;
  • a API expõe comandos de configuração e submissão de treino;
  • treino e predição são disparados por model_definition_id;
  • não há seletor HTTP para atribuir séries individuais a outra definição ou limitar o treino a uma lista de SeriesKey;
  • não há endpoint de previsão online;
  • não há endpoint de retry/cancelamento de job interno;
  • inferência batch/offline existe como exceção explícita do command side, via POST /v1/model-definitions/{model_definition_id}/prediction-jobs e DAG model_prediction.

Falhas esperadas e tratamento

CenárioComportamento esperado
Airflow indisponível antes de criar DAG runsnapshot marcado como failed, HTTP responde 503
Materialização falhatask falha no Airflow; snapshot fica failed
Estratégia falha no treinotask de treino falha no Airflow; erro aparece no DAG run
Sem candidata elegíveljob de promoção conclui sem promoção para a série
Airflow retryreexecuta a task conforme política da DAG

Onde ver detalhes