Data-preparation Flow
Objetivo
Descrever de forma direta como o dado entra, é validado, vira snapshot canônico em Spark, é projetado por série ainda em Spark e depois é consumido por execução Polars de treino e predição.
Fluxo ponta a ponta
Responsabilidades dos componentes
| Componente | Camada | Responsabilidade |
|---|---|---|
SparkDemandSnapshotMaterializer | infra/data_preparation/spark | Validar e materializar input bruto em snapshot canônico interno. |
DemandDataPreparationPipeline[TContext, TFrame] | control/data_preparation | Orquestrar chain-of-responsibility genérica sobre contexto e engine de frame. |
SparkDemandIngestionPipeline | infra/data_preparation/spark | Pipeline Spark concreta para validação e canonicalização de input bruto; no job de projeção, também valida a series_projection_recipe antes da escrita particionada. |
DemandSnapshotManifestRepository | infra/ingestion | Persistir e resolver DemandSnapshotManifest. |
SparkTabularDataRepository | infra/data_preparation/spark/snapshot_frame_repository.py | Ler input bruto e persistir o dataset Parquet diretamente pelo Spark. |
project_training_series.py | infra/jobs | Resolver a configuração do ModelDefinition, carregar o snapshot canônico, rodar validação Spark de projeção e atualizar o manifest com os artefatos projetados. |
write_projected_series_frame | infra/data_preparation/spark/training_series_projection.py | Agregar o frame já validado conforme series_projection_recipe, derivar o SeriesKey.series_id usado no resto do app e escrever Parquet particionado por series_id. |
PolarsTrainingDatasetOperations | infra/data_preparation/polars | Ler o índice de séries projetadas, carregar somente as partições por series_id, aplicar feature recipe de execução e fazer split mecânico de frames. |
TrainingDataset | domain/demand/training.py | Representar o dataset de treino como conjunto semântico de contextos por série, sem embutir política de iteração ou batching. |
SeriesTrainingContext | domain/demand/training.py | Contexto semântico por série (SeriesKey, expected_uom, observed_points, frame genérico de treino). |
DemandSnapshotManifest | domain/demand/training.py | Snapshot congelado que ancora o treino command-side com identidade, contagens e materialização associada. |
split_series_for_holdout | control/data_preparation | Política temporal de split train/holdout no command side. |
DemandValidationCatalog | control/data_preparation/validation | Executar regras de validação já tipadas pelo domínio contra executores registrados pelo adapter sem mutar o frame; o contexto tipado carrega a fase e a strategy da fase resolve params configurados ou derivados do runtime. |
ForecastingDatasetOperationsPort | control/data_preparation | Construir dataset projetado a partir dos artefatos por série, aplicar feature recipe por candidata, expor frames por série e fazer split de holdout. |
train_model_candidates | control/modeling/training | Preparar o run, aplicar feature recipes, treinar candidatas e persistir instâncias DRAFT. |
publish/commit/sync_training_champions | control/modeling/promotion | Selecionar campeãs, promover instâncias ACTIVE e sincronizar aliases externos depois do commit local. |
Holdout e métricas
No caminho atual, o holdout temporal continua frame-first, mas o boundary de data-preparation não calcula métricas:
- o
controlnão conhece a biblioteca concreta do payload; - o
controlmodela separadamente o repositório de frames e a porta de operações sobre dataset; - o
ForecastingDatasetOperationsPortexpõe leitura dos artefatos projetados, preparo de features por candidata, frame por série e split train/holdout; - a engine pode treinar/predizer contra o prefixo de holdout para validar o caminho de artefato e forecast;
- as métricas persistidas no
metric_snapshotvêm deTrainResult.metrics, não de conversões de forecast dentro dedata-preparation.
Na predição, o output canônico é uma sequência JSON-serializable de linhas de
forecast (period_start, value, uom, lower, upper, confidence). O
domínio do run valida essas linhas diretamente; não há mapper Polars -> ponto de
forecast no caminho definitivo.
Chain of responsibility atual
O pipeline de data-preparation já roda como chain of responsibility:
Materialização canônica passa explicitamente:
ValidateIngestionFrameStepCanonicalizeColumnsStep
O job project_training_series passa explicitamente:
ValidateIngestionFrameStepCanonicalizeColumnsStepValidateProjectionFrameStep(series_projection_recipe)
Cada handler recebe um SparkIngestionContext e retorna novo contexto para o
próximo elo. O boundary público em control/data_preparation continua sem expor a biblioteca de frame,
enquanto os conceitos semânticos que ele carrega (DemandSnapshotManifest,
TrainingDataset, SeriesTrainingContext) vivem em domain/demand/training.py
e só exigem que payloads ausentes sejam rejeitados cedo.
O fluxo de leitura também fica direto: o repositório de manifests resolve snapshot_id -> DemandSnapshotManifest, a porta de operações Polars lê o índice de séries
projetadas e carrega somente a partição Parquet de cada SeriesKey.series_id.
No caminho de materialização, os custom steps recebem um frame já canônico e
validado pela política configurada. A validação de projeção específica do Spark
roda antes da escrita dos artefatos particionados, quando há
series_projection_recipe.
O ValidationContext genérico carrega somente frame e organization_id. Contagens,
schema version e receita de projeção entram como params tipados da regra, então
Polars e Spark implementam a mesma chave semântica sem depender de manifest fake ou
de campos escondidos no contexto. Não há mais paridade artificial por pasta: Spark é
o caminho canônico de materialização, projeção e escrita particionada; Polars é o
caminho de execução de treino/predição, inclusive o forecast recursivo que reaplica
feature_recipe.
feature_recipe segue o mesmo desenho: o domínio expõe definições FeatureStep
canônicas, cada uma com seu schema de params. Chamar a definição com um payload
retorna um step configurado e tipado para a FeatureRecipe; o
FeatureEngineeringCatalog do adapter resolve esses step keys para executores
Polars/Spark frame-specific.
Extensão prática:
- para adicionar novas fases (ex.: enrich de features canônicas), basta compor um novo handler e encadear depois da validação padrão;
- para backend alternativo, mantenha o contrato de contexto e troque apenas a implementação de handlers, reaproveitando a pipeline genérica.