Skip to content

Data-preparation Flow

Objetivo

Descrever de forma direta como o dado entra, é validado, vira snapshot canônico em Spark, é projetado por série ainda em Spark e depois é consumido por execução Polars de treino e predição.

Fluxo ponta a ponta

flowchart LR A["Raw demand input<br/>(csv/parquet/local/s3)"] --> B["SparkDemandSnapshotMaterializer"] B --> C["SparkDemandIngestionPipeline.run(context)"] C --> D["Canonical frame validated"] D --> E["Canonical Parquet snapshot<br/>+ manifest.json"] E --> F["project_training_series job<br/>resolve model config"] F --> F2["SparkDemandIngestionPipeline<br/>with series_projection_recipe"] F2 --> F3["Spark Parquet write<br/>partitionBy(series_id)"] F3 --> G["DemandSnapshotManifestRepositoryPort.save_snapshot_manifest<br/>(projected data + index URIs)"] G --> H["PolarsTrainingDatasetOperations<br/>read projected series index"] H --> J["Load each series_id partition"] J --> K["Per-series TrainingDataset"] K --> L["ResolvedStrategyCandidate<br/>(strategy + feature_recipe)"] L --> M["prepare_feature_frame_for_candidate"] M --> N["split_series_for_holdout"] N --> O["SeriesHoldoutSplit<br/>(train_frame + holdout_frame)"] O --> P["ModelStrategy.train/predict<br/>(holdout validity path)"] P --> Q["ModelStrategy.train<br/>(final candidate)"] Q --> R["Strategy metrics"]

Responsabilidades dos componentes

ComponenteCamadaResponsabilidade
SparkDemandSnapshotMaterializerinfra/data_preparation/sparkValidar e materializar input bruto em snapshot canônico interno.
DemandDataPreparationPipeline[TContext, TFrame]control/data_preparationOrquestrar chain-of-responsibility genérica sobre contexto e engine de frame.
SparkDemandIngestionPipelineinfra/data_preparation/sparkPipeline Spark concreta para validação e canonicalização de input bruto; no job de projeção, também valida a series_projection_recipe antes da escrita particionada.
DemandSnapshotManifestRepositoryinfra/ingestionPersistir e resolver DemandSnapshotManifest.
SparkTabularDataRepositoryinfra/data_preparation/spark/snapshot_frame_repository.pyLer input bruto e persistir o dataset Parquet diretamente pelo Spark.
project_training_series.pyinfra/jobsResolver a configuração do ModelDefinition, carregar o snapshot canônico, rodar validação Spark de projeção e atualizar o manifest com os artefatos projetados.
write_projected_series_frameinfra/data_preparation/spark/training_series_projection.pyAgregar o frame já validado conforme series_projection_recipe, derivar o SeriesKey.series_id usado no resto do app e escrever Parquet particionado por series_id.
PolarsTrainingDatasetOperationsinfra/data_preparation/polarsLer o índice de séries projetadas, carregar somente as partições por series_id, aplicar feature recipe de execução e fazer split mecânico de frames.
TrainingDatasetdomain/demand/training.pyRepresentar o dataset de treino como conjunto semântico de contextos por série, sem embutir política de iteração ou batching.
SeriesTrainingContextdomain/demand/training.pyContexto semântico por série (SeriesKey, expected_uom, observed_points, frame genérico de treino).
DemandSnapshotManifestdomain/demand/training.pySnapshot congelado que ancora o treino command-side com identidade, contagens e materialização associada.
split_series_for_holdoutcontrol/data_preparationPolítica temporal de split train/holdout no command side.
DemandValidationCatalogcontrol/data_preparation/validationExecutar regras de validação já tipadas pelo domínio contra executores registrados pelo adapter sem mutar o frame; o contexto tipado carrega a fase e a strategy da fase resolve params configurados ou derivados do runtime.
ForecastingDatasetOperationsPortcontrol/data_preparationConstruir dataset projetado a partir dos artefatos por série, aplicar feature recipe por candidata, expor frames por série e fazer split de holdout.
train_model_candidatescontrol/modeling/trainingPreparar o run, aplicar feature recipes, treinar candidatas e persistir instâncias DRAFT.
publish/commit/sync_training_championscontrol/modeling/promotionSelecionar campeãs, promover instâncias ACTIVE e sincronizar aliases externos depois do commit local.

Holdout e métricas

No caminho atual, o holdout temporal continua frame-first, mas o boundary de data-preparation não calcula métricas:

  • o control não conhece a biblioteca concreta do payload;
  • o control modela separadamente o repositório de frames e a porta de operações sobre dataset;
  • o ForecastingDatasetOperationsPort expõe leitura dos artefatos projetados, preparo de features por candidata, frame por série e split train/holdout;
  • a engine pode treinar/predizer contra o prefixo de holdout para validar o caminho de artefato e forecast;
  • as métricas persistidas no metric_snapshot vêm de TrainResult.metrics, não de conversões de forecast dentro de data-preparation.

Na predição, o output canônico é uma sequência JSON-serializable de linhas de forecast (period_start, value, uom, lower, upper, confidence). O domínio do run valida essas linhas diretamente; não há mapper Polars -> ponto de forecast no caminho definitivo.

Chain of responsibility atual

O pipeline de data-preparation já roda como chain of responsibility:

Materialização canônica passa explicitamente:

  1. ValidateIngestionFrameStep
  2. CanonicalizeColumnsStep

O job project_training_series passa explicitamente:

  1. ValidateIngestionFrameStep
  2. CanonicalizeColumnsStep
  3. ValidateProjectionFrameStep(series_projection_recipe)

Cada handler recebe um SparkIngestionContext e retorna novo contexto para o próximo elo. O boundary público em control/data_preparation continua sem expor a biblioteca de frame, enquanto os conceitos semânticos que ele carrega (DemandSnapshotManifest, TrainingDataset, SeriesTrainingContext) vivem em domain/demand/training.py e só exigem que payloads ausentes sejam rejeitados cedo.

O fluxo de leitura também fica direto: o repositório de manifests resolve snapshot_id -> DemandSnapshotManifest, a porta de operações Polars lê o índice de séries projetadas e carrega somente a partição Parquet de cada SeriesKey.series_id.

No caminho de materialização, os custom steps recebem um frame já canônico e validado pela política configurada. A validação de projeção específica do Spark roda antes da escrita dos artefatos particionados, quando há series_projection_recipe.

O ValidationContext genérico carrega somente frame e organization_id. Contagens, schema version e receita de projeção entram como params tipados da regra, então Polars e Spark implementam a mesma chave semântica sem depender de manifest fake ou de campos escondidos no contexto. Não há mais paridade artificial por pasta: Spark é o caminho canônico de materialização, projeção e escrita particionada; Polars é o caminho de execução de treino/predição, inclusive o forecast recursivo que reaplica feature_recipe.

feature_recipe segue o mesmo desenho: o domínio expõe definições FeatureStep canônicas, cada uma com seu schema de params. Chamar a definição com um payload retorna um step configurado e tipado para a FeatureRecipe; o FeatureEngineeringCatalog do adapter resolve esses step keys para executores Polars/Spark frame-specific.

Extensão prática:

  • para adicionar novas fases (ex.: enrich de features canônicas), basta compor um novo handler e encadear depois da validação padrão;
  • para backend alternativo, mantenha o contrato de contexto e troque apenas a implementação de handlers, reaproveitando a pipeline genérica.