Airflow e jobs
Princípio
A API HTTP não executa materialização de snapshot nem treino no request. Ela:
- autentica o comando;
- cria um
DemandSnapshotemprocessing; - dispara uma DAG no Airflow com um payload JSON-serializável;
- devolve o identificador do run para rastreabilidade operacional.
O Airflow passa a ser o dono de retries, scheduling, observabilidade de DAG run
e reexecução de tarefas. O Atreides mantém apenas as unidades de trabalho que
uma DAG chama: os entrypoints em src/infra/jobs.
Contrato da DAG
A DAG principal é model_training, em src/infra/dags/model_training.py.
Ela espera dag_run.conf com:
| Campo | Origem | Uso |
|---|---|---|
organization_id | JWT do request HTTP | tenant scope do snapshot |
model_definition_id | rota HTTP | modelo que será treinado |
snapshot_id | snapshot criado pelo HTTP | entrada durável das etapas |
requested_by | sub do JWT | auditoria da promoção |
Todas as tarefas usam retry explícito no Airflow:
retries=3retry_delay=2 minutos
Esses valores vivem na DAG porque retry agora é política do orquestrador, não do HTTP nem de uma fila interna.
O campo model_definition_id é mais do que um identificador técnico. Ele
seleciona a política operacional que será aplicada ao snapshot. A DAG não recebe
lista de séries; project_training_series resolve a configuração ativa desse
ModelDefinition, aplica sua series_projection_recipe e cria o índice de
todas as séries que entram no treino. train_model_candidates consome esse
índice como população do run.
No desenho atual, atribuição granular por SeriesKey não existe na DAG nem no
HTTP. Para rodar políticas diferentes, a organização mantém ModelDefinitions
diferentes e dispara workflows separados com o identificador da definição
correta.
A DAG de predição é model_prediction, em src/infra/dags/model_prediction.py.
Ela usa o mesmo contrato de tenant e snapshot, materializa a entrada se
necessário, executa execute_model_forecasts.py para gerar frames de forecast
por série ativa e grava esses resultados em artefatos intermediários. A task
seguinte, persist_forecast_results.py, lê o manifesto desses artefatos e faz
um único insert batch dos resultados no ClickHouse. A execução usa o
resolved_config_snapshot gravado na
referência de treino da campeã ativa, valida o hash congelado e só então chama a
estratégia. Instâncias ativas sem snapshot de configuração resolvida falham de
forma explícita; mudanças de configuração em runtime ficam restritas aos
próximos treinos. Quando não há model_version explícito no comando, a predição
aceita campeãs ativas de versões diferentes, desde que as instâncias
selecionadas compartilhem a mesma receita congelada de projeção canônica para
série alvo. Falhas durante a execução de uma série fazem a task de execução
falhar antes da publicação do manifesto de resultados.
Entry points de job
Cada tarefa da DAG chama um arquivo dedicado em src/infra/jobs:
| Arquivo | Responsabilidade |
|---|---|
materialize_snapshot.py | carrega o snapshot processing, valida o input bruto e publica data.parquet + manifest.json canônico |
project_training_series.py | resolve a configuração ativa do modelo, valida a projeção em Spark e publica os Parquet particionados por series_id |
train_model_candidates.py | carrega séries projetadas, resolve estratégias, treina candidatas e persiste instâncias DRAFT + artefatos |
execute_model_forecasts.py | carrega snapshot pronto, executa a campeã ativa por série e publica frames de forecast em Parquet + manifesto intermediário |
persist_forecast_results.py | lê o manifesto intermediário, carrega os frames de forecast e persiste todos os resultados via Spark ClickHouse Catalog API |
publish_training_champions.py | recarrega candidatas duráveis, seleciona campeãs e publica versões imutáveis no registry externo |
commit_training_champion_promotions.py | promove campeãs para ACTIVE em uma única gravação ClickHouse com auditoria |
sync_training_champion_aliases.py | sincroniza aliases externos depois do commit local |
bootstrap.py | compõe repositórios ClickHouse, storage, MLflow, registry de estratégias, logger e telemetria OpenTelemetry para os jobs |
Os jobs recebem a mesma porta de observabilidade operacional usada pelo HTTP
adapter. As métricas são registradas no runtime OpenTelemetry/Prometheus em
infra/observability/otel.py, enquanto o control layer conhece apenas
OperationalObservabilityPort. Isso mantém o boundary hexagonal: objetos de
runtime OpenTelemetry são criados em bootstrap/composição e injetados nas
operações que precisam registrar início, conclusão, falha, duração e volume de
séries.
Os entrypoints recebem apenas strings/JSON simples vindos da DAG e convertem para value objects dentro do processo do job. Isso mantém o payload da DAG estável e serializável.
O que acontece no request HTTP
POST /v1/model-definitions/{model_definition_id}/training-jobsrecebeinput_pathesource_format.- A rota resolve o
ModelDefinitiondentro do tenant do JWT. submit_model_training_workflow(...)cria oDemandSnapshotemprocessing.- O adapter
AirflowDagRunTriggerchama a API do Airflow. - Se o trigger falhar antes de existir um DAG run, o snapshot é marcado como
failede a API responde503. - Se o trigger for aceito, a API responde
202comrun_id,dag_id,orchestrator,snapshot_idestatus.
O request não informa SeriesKeys. O máximo de escolha neste boundary é qual
ModelDefinition será usado. A granularidade da execução vem da projeção
definida nessa configuração e do conteúdo do snapshot materializado.
O dag_run_id é determinístico por snapshot:
model_training__<snapshot_id>Isso deixa o reenvio idempotente no limite HTTP/Airflow: se a mesma tentativa for repetida com o mesmo snapshot, ela aponta para o mesmo run id.
O que acontece no Airflow
Falhas depois que o DAG run existe são falhas do Airflow. O Atreides não mantém um segundo ledger de step, fila, lease ou heartbeat.
Na promoção, promote_champions é um TaskGroup: cada fase aparece como task
separada no Airflow e tem retry próprio. A mudança local de campeãs acontece
apenas em commit_training_champion_promotions, como uma única gravação ClickHouse
do ModelDefinition com todos os registros de auditoria. Quando MLflow registry
está configurado, versões imutáveis são publicadas antes dessa gravação, e aliases
externos são sincronizados depois do commit. Assim, ClickHouse continua sendo a
fonte de verdade, enquanto o alias externo pode ser reexecutado pela task se a
sincronização falhar.
O treino retorna a model_version congelada e as três tasks de promoção usam
esse valor explicitamente. A promoção nunca recalcula a versão atual do
ModelDefinition, porque a configuração pode mudar entre o treino e um retry
tardio de promoção. A seleção de campeãs também usa a política persistida no
resolved_config_snapshot de cada candidata treinada; mudanças posteriores em
thresholds ou métrica primária só valem para novos treinos.
Separação de responsabilidades
| Componente | Responsabilidade |
|---|---|
| API HTTP | JWT, tenant scope, criação do snapshot e trigger da DAG |
| Airflow | DAG run, ordering, retries, scheduling e estado operacional do workflow |
src/infra/jobs | entrypoints executáveis por tarefa |
| Control layer | casos de uso de materialização, treino, seleção e promoção |
| ClickHouse | snapshots, ModelDefinition, instâncias, auditoria e metadados |
| Storage / MLflow | payloads canônicos, artefatos, tracking, packaging e registry |
Boundaries fora deste serviço
O serviço não mantém uma fila própria de background jobs, ledger paralelo de etapas, lease, heartbeat, retry HTTP, cancelamento interno de task ou healthcheck de worker. Qualquer retry ou reexecução de etapa deve ser feito pelo Airflow, reexecutando a task ou o DAG run correspondente.
Quando ler esta página
Use esta página quando estiver:
- depurando por que um request HTTP não criou DAG run;
- validando payload de
dag_run.conf; - adicionando uma nova etapa no workflow de treino;
- ajustando retry ou operação local do Airflow.