Skip to content

Airflow e jobs

Princípio

A API HTTP não executa materialização de snapshot nem treino no request. Ela:

  1. autentica o comando;
  2. cria um DemandSnapshot em processing;
  3. dispara uma DAG no Airflow com um payload JSON-serializável;
  4. devolve o identificador do run para rastreabilidade operacional.

O Airflow passa a ser o dono de retries, scheduling, observabilidade de DAG run e reexecução de tarefas. O Atreides mantém apenas as unidades de trabalho que uma DAG chama: os entrypoints em src/infra/jobs.

flowchart LR HTTP["API HTTP"] --> PG["ClickHouse<br/>snapshots + modelos"] HTTP --> AF["Airflow DAG run"] AF --> MAT["job: materialize_snapshot"] AF --> PROJ["job: project_training_series"] AF --> TRN["job: train_model_candidates"] AF --> PUB["job: publish_training_champions"] PUB --> COM["job: commit_training_champion_promotions"] COM --> SYN["job: sync_training_champion_aliases"] MAT --> PG PROJ --> PG PROJ --> SNAP TRN --> PG COM --> PG MAT --> SNAP["Snapshot storage"] TRN --> ART["Artifact store"] TRN --> MLF["MLflow tracking"] PUB --> MLF SYN --> MLF

Contrato da DAG

A DAG principal é model_training, em src/infra/dags/model_training.py.

Ela espera dag_run.conf com:

CampoOrigemUso
organization_idJWT do request HTTPtenant scope do snapshot
model_definition_idrota HTTPmodelo que será treinado
snapshot_idsnapshot criado pelo HTTPentrada durável das etapas
requested_bysub do JWTauditoria da promoção

Todas as tarefas usam retry explícito no Airflow:

  • retries=3
  • retry_delay=2 minutos

Esses valores vivem na DAG porque retry agora é política do orquestrador, não do HTTP nem de uma fila interna.

O campo model_definition_id é mais do que um identificador técnico. Ele seleciona a política operacional que será aplicada ao snapshot. A DAG não recebe lista de séries; project_training_series resolve a configuração ativa desse ModelDefinition, aplica sua series_projection_recipe e cria o índice de todas as séries que entram no treino. train_model_candidates consome esse índice como população do run.

No desenho atual, atribuição granular por SeriesKey não existe na DAG nem no HTTP. Para rodar políticas diferentes, a organização mantém ModelDefinitions diferentes e dispara workflows separados com o identificador da definição correta.

A DAG de predição é model_prediction, em src/infra/dags/model_prediction.py. Ela usa o mesmo contrato de tenant e snapshot, materializa a entrada se necessário, executa execute_model_forecasts.py para gerar frames de forecast por série ativa e grava esses resultados em artefatos intermediários. A task seguinte, persist_forecast_results.py, lê o manifesto desses artefatos e faz um único insert batch dos resultados no ClickHouse. A execução usa o resolved_config_snapshot gravado na referência de treino da campeã ativa, valida o hash congelado e só então chama a estratégia. Instâncias ativas sem snapshot de configuração resolvida falham de forma explícita; mudanças de configuração em runtime ficam restritas aos próximos treinos. Quando não há model_version explícito no comando, a predição aceita campeãs ativas de versões diferentes, desde que as instâncias selecionadas compartilhem a mesma receita congelada de projeção canônica para série alvo. Falhas durante a execução de uma série fazem a task de execução falhar antes da publicação do manifesto de resultados.

Entry points de job

Cada tarefa da DAG chama um arquivo dedicado em src/infra/jobs:

ArquivoResponsabilidade
materialize_snapshot.pycarrega o snapshot processing, valida o input bruto e publica data.parquet + manifest.json canônico
project_training_series.pyresolve a configuração ativa do modelo, valida a projeção em Spark e publica os Parquet particionados por series_id
train_model_candidates.pycarrega séries projetadas, resolve estratégias, treina candidatas e persiste instâncias DRAFT + artefatos
execute_model_forecasts.pycarrega snapshot pronto, executa a campeã ativa por série e publica frames de forecast em Parquet + manifesto intermediário
persist_forecast_results.pylê o manifesto intermediário, carrega os frames de forecast e persiste todos os resultados via Spark ClickHouse Catalog API
publish_training_champions.pyrecarrega candidatas duráveis, seleciona campeãs e publica versões imutáveis no registry externo
commit_training_champion_promotions.pypromove campeãs para ACTIVE em uma única gravação ClickHouse com auditoria
sync_training_champion_aliases.pysincroniza aliases externos depois do commit local
bootstrap.pycompõe repositórios ClickHouse, storage, MLflow, registry de estratégias, logger e telemetria OpenTelemetry para os jobs

Os jobs recebem a mesma porta de observabilidade operacional usada pelo HTTP adapter. As métricas são registradas no runtime OpenTelemetry/Prometheus em infra/observability/otel.py, enquanto o control layer conhece apenas OperationalObservabilityPort. Isso mantém o boundary hexagonal: objetos de runtime OpenTelemetry são criados em bootstrap/composição e injetados nas operações que precisam registrar início, conclusão, falha, duração e volume de séries.

Os entrypoints recebem apenas strings/JSON simples vindos da DAG e convertem para value objects dentro do processo do job. Isso mantém o payload da DAG estável e serializável.

O que acontece no request HTTP

  1. POST /v1/model-definitions/{model_definition_id}/training-jobs recebe input_path e source_format.
  2. A rota resolve o ModelDefinition dentro do tenant do JWT.
  3. submit_model_training_workflow(...) cria o DemandSnapshot em processing.
  4. O adapter AirflowDagRunTrigger chama a API do Airflow.
  5. Se o trigger falhar antes de existir um DAG run, o snapshot é marcado como failed e a API responde 503.
  6. Se o trigger for aceito, a API responde 202 com run_id, dag_id, orchestrator, snapshot_id e status.

O request não informa SeriesKeys. O máximo de escolha neste boundary é qual ModelDefinition será usado. A granularidade da execução vem da projeção definida nessa configuração e do conteúdo do snapshot materializado.

O dag_run_id é determinístico por snapshot:

model_training__<snapshot_id>

Isso deixa o reenvio idempotente no limite HTTP/Airflow: se a mesma tentativa for repetida com o mesmo snapshot, ela aponta para o mesmo run id.

O que acontece no Airflow

sequenceDiagram participant API as API HTTP participant AF as Airflow participant MAT as materialize_snapshot participant PROJ as project_training_series participant TRN as train_model_candidates participant PUB as publish_training_champions participant COM as commit_training_champion_promotions participant SYN as sync_training_champion_aliases participant EXT as ClickHouse / S3 / MLflow API->>AF: POST dagRuns(conf) AF->>MAT: snapshot_id MAT->>EXT: publish snapshot canonical AF->>PROJ: model_definition_id + snapshot_id PROJ->>EXT: projected series partitions + manifest update AF->>TRN: model_definition_id + snapshot_id TRN->>EXT: metrics, artifacts, DRAFT instances AF->>PUB: model_definition_id + snapshot_id + requested_by PUB->>EXT: immutable registry versions AF->>COM: model_definition_id + snapshot_id + requested_by COM->>EXT: ACTIVE champions + audit AF->>SYN: model_definition_id + snapshot_id SYN->>EXT: champion aliases

Falhas depois que o DAG run existe são falhas do Airflow. O Atreides não mantém um segundo ledger de step, fila, lease ou heartbeat.

Na promoção, promote_champions é um TaskGroup: cada fase aparece como task separada no Airflow e tem retry próprio. A mudança local de campeãs acontece apenas em commit_training_champion_promotions, como uma única gravação ClickHouse do ModelDefinition com todos os registros de auditoria. Quando MLflow registry está configurado, versões imutáveis são publicadas antes dessa gravação, e aliases externos são sincronizados depois do commit. Assim, ClickHouse continua sendo a fonte de verdade, enquanto o alias externo pode ser reexecutado pela task se a sincronização falhar.

O treino retorna a model_version congelada e as três tasks de promoção usam esse valor explicitamente. A promoção nunca recalcula a versão atual do ModelDefinition, porque a configuração pode mudar entre o treino e um retry tardio de promoção. A seleção de campeãs também usa a política persistida no resolved_config_snapshot de cada candidata treinada; mudanças posteriores em thresholds ou métrica primária só valem para novos treinos.

Separação de responsabilidades

ComponenteResponsabilidade
API HTTPJWT, tenant scope, criação do snapshot e trigger da DAG
AirflowDAG run, ordering, retries, scheduling e estado operacional do workflow
src/infra/jobsentrypoints executáveis por tarefa
Control layercasos de uso de materialização, treino, seleção e promoção
ClickHousesnapshots, ModelDefinition, instâncias, auditoria e metadados
Storage / MLflowpayloads canônicos, artefatos, tracking, packaging e registry

Boundaries fora deste serviço

O serviço não mantém uma fila própria de background jobs, ledger paralelo de etapas, lease, heartbeat, retry HTTP, cancelamento interno de task ou healthcheck de worker. Qualquer retry ou reexecução de etapa deve ser feito pelo Airflow, reexecutando a task ou o DAG run correspondente.

Quando ler esta página

Use esta página quando estiver:

  • depurando por que um request HTTP não criou DAG run;
  • validando payload de dag_run.conf;
  • adicionando uma nova etapa no workflow de treino;
  • ajustando retry ou operação local do Airflow.

Onde continuar