Execução programática local
Quando usar
Use esta página para três formas locais de executar treino:
- notebook Jupyter chamando o entrypoint direto;
- script direto sem API nem Airflow;
- HTTP + Airflow, que preserva o caminho durável de produção.
Os três caminhos usam ClickHouse para metadados e MLflow para runs de treino. O
caminho direto usa storage local em tmp/ para snapshots e artefatos. O caminho
HTTP + Airflow usa a stack do compose e, por padrão, grava snapshots e artefatos
no MinIO local.
Pré-requisitos
uv synccp .env.local.example .envdocker compose -f compose.yml up -d clickhouse minio mlflow minio-initjust db-initjust seed-local-model-definitionPara trabalhar em Jupyter, registre um kernel que aponte para o ambiente do
uv:
uv run python -m ipykernel install --user --name atreides-local --display-name "Atreides local"No notebook, selecione o kernel Atreides local. Quando abrir o notebook fora da
raiz do repositório, adicione a raiz do repo e src ao sys.path na primeira
célula:
from pathlib import Pathimport sys
repo_root = Path.cwd()sys.path.insert(0, str(repo_root))sys.path.insert(0, str(repo_root / "src"))Arquivo de demanda
Os caminhos locais recebem um input_path próprio e um source_format explícito:
source_format="csv"para CSV;source_format="parquet"para Parquet.
O arquivo precisa respeitar as colunas canônicas de demanda:
organization_id,sku_id,store_id,sku_system,sku_key,store_system,store_key,grain,period_start,quantity,uom,source,created_atGerar CSV em Jupyter
Uma sessão de notebook pode preparar dados canônicos e depois chamar o entrypoint direto ou o endpoint HTTP. Esta célula cria um CSV diário com histórico suficiente para o holdout padrão:
from datetime import date, datetime, timedelta, timezonefrom pathlib import Path
import pandas as pd
organization_id = "01956c73-9c1f-7fb7-94b1-a2c5bc65d901"rows = []start = date(2026, 1, 1)for sku in ("SKU-1", "SKU-2"): for store in ("STORE-1", "STORE-2"): for offset in range(45): rows.append( { "organization_id": organization_id, "sku_id": sku, "store_id": store, "sku_system": "local", "sku_key": sku, "store_system": "local", "store_key": store, "grain": "daily", "period_start": (start + timedelta(days=offset)).isoformat(), "quantity": 10 + offset % 7, "uom": "unit", "source": "notebook", "created_at": datetime.now(timezone.utc).isoformat(), } )
path = Path("tmp/local-demand.csv")path.parent.mkdir(exist_ok=True)pd.DataFrame(rows).to_csv(path, index=False)pathNotebook direto
Use o caminho direto quando quiser depurar comportamento de treino sem autenticar HTTP, disparar DAG ou abrir a UI do Airflow. O entrypoint é importável e retorna um resumo simples para inspeção no notebook:
from pathlib import Path
from examples.run_direct_training import run_local_training
result = run_local_training( input_path=Path("tmp/local-demand.csv"), config_manifest=Path("examples/local_model_definition_config.json"), demand_storage_root=Path("tmp/demand-storage"), model_artifact_root=Path("tmp/model-artifacts"), generate_sample=False,)
resultEsse entrypoint cria ou reutiliza o CSV local, semeia a definição determinística,
aplica o manifesto de configuração no ClickHouse, materializa o snapshot com
Spark, projeta as séries de treino e chama diretamente
control.modeling.training.use_cases.train_model_candidates(...) e
control.modeling.promotion.use_cases.promote_trained_candidates(...).
As settings de job carregam .env automaticamente, então o mesmo código funciona
fora do just quando o notebook roda a partir da raiz do repositório.
O mesmo caminho roda como script:
just example-direct-trainingUse esse modo para depurar imports, composição local, configuração efetiva, feature recipes, splits de holdout, treino Polars e persistência ClickHouse. Ele não testa JWT, payload de DAG, retries do Airflow ou ordenação operacional das tasks.
HTTP + Airflow com configuração por manifesto
Para exercitar o caminho durável local, suba a stack com Airflow, aplique as migrations e semeie a definição determinística:
docker compose -f compose.yml up -d clickhouse minio mlflow minio-init airflowjust db-initjust seed-local-model-definitionO manifesto de exemplo fica em examples/local_model_definition_config.json.
Ele ativa uma configuração diária sku_store, com regras de validação que usam
o DemandSnapshotManifest materializado pelo job.
Valide e aplique a configuração no ClickHouse:
just config-validate examples/local_model_definition_config.jsonjust config-apply examples/local_model_definition_config.jsonSuba a API HTTP em outro terminal:
just serve-httpNo notebook, gere o mesmo CSV canônico da seção anterior e dispare o treino pelo endpoint HTTP. A API autentica o tenant pelo JWT local e apenas enfileira o DAG; Airflow materializa o snapshot, projeta as séries conforme a configuração ativa, lê o manifesto atualizado e executa os jobs duráveis.
O mesmo fluxo está disponível como script em examples/run_http_training.py:
just example-http-trainingimport jsonimport subprocessimport urllib.requestfrom pathlib import Path
organization_id = "01956c73-9c1f-7fb7-94b1-a2c5bc65d901"model_definition_id = "01956c73-9c1f-7fb7-94b1-a2c5bc65d940"
auth_header = subprocess.check_output( [ "just", "setup-local-auth", organization_id, "notebook-operator", ], text=True,).strip()
payload = json.dumps( { "input_path": str((Path.cwd() / "tmp/local-demand.csv").resolve()), "source_format": "csv", }).encode("utf-8")
request = urllib.request.Request( f"http://127.0.0.1:8000/v1/model-definitions/{model_definition_id}/training-jobs", data=payload, headers={ "Authorization": auth_header.removeprefix("Authorization: "), "Content-Type": "application/json", }, method="POST",)
with urllib.request.urlopen(request) as response: queued = json.loads(response.read().decode("utf-8"))
queuedAbra http://127.0.0.1:8080 e acompanhe o run_id retornado no DAG
model_training. Para depuração, use docker compose -f compose.yml logs -f airflow.
O que esperar no HTTP + Airflow
Depois da execução, você deve ter:
- metadados do modelo nas tabelas ClickHouse locais;
- snapshot canônico e séries projetadas no bucket MinIO
atreides-demand; - artefatos no bucket MinIO
atreides-artifacts; - runs de treino no MLflow local;
- instâncias promovidas para
ACTIVEdentro doModelDefinitionpersistido em ClickHouse.
Quando preferir Airflow
Se a dúvida for sobre JWT, retries, payload de DAG, ordenação das tarefas ou estado operacional do workflow, use o caminho HTTP + Airflow e acompanhe os logs:
docker compose -f compose.yml up -d clickhouse minio mlflow minio-init airflowjust db-initdocker compose -f compose.yml logs -f airflowObservações
- mantenha pelo menos algumas semanas de histórico por série;
- o CSV canônico atual representa séries
sku_store, derivadas deorganization_id + sku_id + store_id + grain; outros agrupamentos devem produzir umSeriesKeycomseries_kind,series_ide dimensões próprias antes do treino; - o caminho direto exercita materialização Spark, projeção Spark, treino Polars, configuração ativa, persistência ClickHouse e MLflow;
- o caminho HTTP + Airflow também exercita JWT, rotas HTTP, payload de DAG, retries do orquestrador e estado operacional do workflow.