Skip to content

Execução programática local

Quando usar

Use esta página para três formas locais de executar treino:

  • notebook Jupyter chamando o entrypoint direto;
  • script direto sem API nem Airflow;
  • HTTP + Airflow, que preserva o caminho durável de produção.

Os três caminhos usam ClickHouse para metadados e MLflow para runs de treino. O caminho direto usa storage local em tmp/ para snapshots e artefatos. O caminho HTTP + Airflow usa a stack do compose e, por padrão, grava snapshots e artefatos no MinIO local.

Pré-requisitos

Terminal window
uv sync
cp .env.local.example .env
docker compose -f compose.yml up -d clickhouse minio mlflow minio-init
just db-init
just seed-local-model-definition

Para trabalhar em Jupyter, registre um kernel que aponte para o ambiente do uv:

Terminal window
uv run python -m ipykernel install --user --name atreides-local --display-name "Atreides local"

No notebook, selecione o kernel Atreides local. Quando abrir o notebook fora da raiz do repositório, adicione a raiz do repo e src ao sys.path na primeira célula:

from pathlib import Path
import sys
repo_root = Path.cwd()
sys.path.insert(0, str(repo_root))
sys.path.insert(0, str(repo_root / "src"))

Arquivo de demanda

Os caminhos locais recebem um input_path próprio e um source_format explícito:

  • source_format="csv" para CSV;
  • source_format="parquet" para Parquet.

O arquivo precisa respeitar as colunas canônicas de demanda:

organization_id,sku_id,store_id,sku_system,sku_key,store_system,store_key,grain,period_start,quantity,uom,source,created_at

Gerar CSV em Jupyter

Uma sessão de notebook pode preparar dados canônicos e depois chamar o entrypoint direto ou o endpoint HTTP. Esta célula cria um CSV diário com histórico suficiente para o holdout padrão:

from datetime import date, datetime, timedelta, timezone
from pathlib import Path
import pandas as pd
organization_id = "01956c73-9c1f-7fb7-94b1-a2c5bc65d901"
rows = []
start = date(2026, 1, 1)
for sku in ("SKU-1", "SKU-2"):
for store in ("STORE-1", "STORE-2"):
for offset in range(45):
rows.append(
{
"organization_id": organization_id,
"sku_id": sku,
"store_id": store,
"sku_system": "local",
"sku_key": sku,
"store_system": "local",
"store_key": store,
"grain": "daily",
"period_start": (start + timedelta(days=offset)).isoformat(),
"quantity": 10 + offset % 7,
"uom": "unit",
"source": "notebook",
"created_at": datetime.now(timezone.utc).isoformat(),
}
)
path = Path("tmp/local-demand.csv")
path.parent.mkdir(exist_ok=True)
pd.DataFrame(rows).to_csv(path, index=False)
path

Notebook direto

Use o caminho direto quando quiser depurar comportamento de treino sem autenticar HTTP, disparar DAG ou abrir a UI do Airflow. O entrypoint é importável e retorna um resumo simples para inspeção no notebook:

from pathlib import Path
from examples.run_direct_training import run_local_training
result = run_local_training(
input_path=Path("tmp/local-demand.csv"),
config_manifest=Path("examples/local_model_definition_config.json"),
demand_storage_root=Path("tmp/demand-storage"),
model_artifact_root=Path("tmp/model-artifacts"),
generate_sample=False,
)
result

Esse entrypoint cria ou reutiliza o CSV local, semeia a definição determinística, aplica o manifesto de configuração no ClickHouse, materializa o snapshot com Spark, projeta as séries de treino e chama diretamente control.modeling.training.use_cases.train_model_candidates(...) e control.modeling.promotion.use_cases.promote_trained_candidates(...). As settings de job carregam .env automaticamente, então o mesmo código funciona fora do just quando o notebook roda a partir da raiz do repositório.

O mesmo caminho roda como script:

Terminal window
just example-direct-training

Use esse modo para depurar imports, composição local, configuração efetiva, feature recipes, splits de holdout, treino Polars e persistência ClickHouse. Ele não testa JWT, payload de DAG, retries do Airflow ou ordenação operacional das tasks.

HTTP + Airflow com configuração por manifesto

Para exercitar o caminho durável local, suba a stack com Airflow, aplique as migrations e semeie a definição determinística:

Terminal window
docker compose -f compose.yml up -d clickhouse minio mlflow minio-init airflow
just db-init
just seed-local-model-definition

O manifesto de exemplo fica em examples/local_model_definition_config.json. Ele ativa uma configuração diária sku_store, com regras de validação que usam o DemandSnapshotManifest materializado pelo job.

Valide e aplique a configuração no ClickHouse:

Terminal window
just config-validate examples/local_model_definition_config.json
just config-apply examples/local_model_definition_config.json

Suba a API HTTP em outro terminal:

Terminal window
just serve-http

No notebook, gere o mesmo CSV canônico da seção anterior e dispare o treino pelo endpoint HTTP. A API autentica o tenant pelo JWT local e apenas enfileira o DAG; Airflow materializa o snapshot, projeta as séries conforme a configuração ativa, lê o manifesto atualizado e executa os jobs duráveis.

O mesmo fluxo está disponível como script em examples/run_http_training.py:

Terminal window
just example-http-training
import json
import subprocess
import urllib.request
from pathlib import Path
organization_id = "01956c73-9c1f-7fb7-94b1-a2c5bc65d901"
model_definition_id = "01956c73-9c1f-7fb7-94b1-a2c5bc65d940"
auth_header = subprocess.check_output(
[
"just",
"setup-local-auth",
organization_id,
"notebook-operator",
],
text=True,
).strip()
payload = json.dumps(
{
"input_path": str((Path.cwd() / "tmp/local-demand.csv").resolve()),
"source_format": "csv",
}
).encode("utf-8")
request = urllib.request.Request(
f"http://127.0.0.1:8000/v1/model-definitions/{model_definition_id}/training-jobs",
data=payload,
headers={
"Authorization": auth_header.removeprefix("Authorization: "),
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(request) as response:
queued = json.loads(response.read().decode("utf-8"))
queued

Abra http://127.0.0.1:8080 e acompanhe o run_id retornado no DAG model_training. Para depuração, use docker compose -f compose.yml logs -f airflow.

O que esperar no HTTP + Airflow

Depois da execução, você deve ter:

  • metadados do modelo nas tabelas ClickHouse locais;
  • snapshot canônico e séries projetadas no bucket MinIO atreides-demand;
  • artefatos no bucket MinIO atreides-artifacts;
  • runs de treino no MLflow local;
  • instâncias promovidas para ACTIVE dentro do ModelDefinition persistido em ClickHouse.

Quando preferir Airflow

Se a dúvida for sobre JWT, retries, payload de DAG, ordenação das tarefas ou estado operacional do workflow, use o caminho HTTP + Airflow e acompanhe os logs:

Terminal window
docker compose -f compose.yml up -d clickhouse minio mlflow minio-init airflow
just db-init
docker compose -f compose.yml logs -f airflow

Observações

  • mantenha pelo menos algumas semanas de histórico por série;
  • o CSV canônico atual representa séries sku_store, derivadas de organization_id + sku_id + store_id + grain; outros agrupamentos devem produzir um SeriesKey com series_kind, series_id e dimensões próprias antes do treino;
  • o caminho direto exercita materialização Spark, projeção Spark, treino Polars, configuração ativa, persistência ClickHouse e MLflow;
  • o caminho HTTP + Airflow também exercita JWT, rotas HTTP, payload de DAG, retries do orquestrador e estado operacional do workflow.