Purpose: Download raw data from external sourcesResponsibilities:
Resolve data source URLs (handle versioning, mirrors)
Download files with retry logic and timeout handling
Verify checksums and file integrity
Extract archives (.zip, .gz)
Store raw data in data/{source}/raw/
Example (CNPJ pipeline at etl/scripts/download_cnpj.py:89):
def resolve_rf_release(year_month: str | None = None) -> str: """Resolve the Receita Federal CNPJ release URL. Strategy: 1. Try Nextcloud share (primary since Jan 2026) 2. Fall back to legacy dadosabertos.rfb.gov.br paths 3. Raise RuntimeError if nothing works (fail-closed) """ # Probe Nextcloud tokens for token in tokens_to_try: if _check_nextcloud_token(token): return NEXTCLOUD_BASE.format(token=token) # Try legacy URLs with current/previous month for ym in candidates: url = LEGACY_NEW_BASE_PATTERN.format(year_month=ym) if _check_url_accessible(url): return url raise RuntimeError("Could not resolve CNPJ release")
Use case: Datasets that don’t fit in memory (CNPJ: 50M+ records)Implementation (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:1068):
def run_streaming(self, start_phase: int = 1) -> None: """Stream-process data files chunk-by-chunk.""" loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size) # Phase 1: Build lookup tables (load all estabelecimentos) for f in estabelecimento_files: for chunk in pd.read_csv(f, chunksize=50_000): self._build_estab_lookup(chunk) # Phase 2: Stream empresas → transform → load for f in empresas_files: for chunk in pd.read_csv(f, chunksize=50_000): companies = self._transform_empresas_rf(chunk) loader.load_nodes("Company", companies, key_field="cnpj") # Phase 3: Stream socios → transform → load for f in socios_files: for chunk in pd.read_csv(f, chunksize=50_000): partners, relationships = self._transform_socios_rf(chunk) loader.load_nodes("Person", partners, key_field="cpf") loader.load_relationships("SOCIO_DE", relationships, ...)
Advantages:
Fixed memory footprint (2GB regardless of dataset size)
Incremental progress (can resume from phase N)
Real-time monitoring of row counts
CLI:
bracc-etl run --source cnpj --streaming --start-phase 2
Use case: Temporal data (track ownership changes over time)Implementation (CNPJ pipeline at etl/src/bracc_etl/pipelines/cnpj.py:773):
def _build_snapshot_relationships(self, pf_rels, partial_rels, pj_rels): """Build historical SOCIO_DE_SNAPSHOT relationships.""" rows = [] for rel in pf_rels: rows.append({ "source_key": rel["source_key"], "target_key": rel["target_key"], "snapshot_date": rel["snapshot_date"], # YYYY-MM-01 "data_entrada": rel["data_entrada"], "membership_id": _make_membership_id(...), }) return rowsdef _rebuild_latest_projection_from_snapshots(self): """Rebuild factual SOCIO_DE from latest snapshot per pair.""" session.run(""" MATCH (a)-[r:SOCIO_DE_SNAPSHOT]->(b:Company) WITH a, b, max(r.snapshot_date) AS max_snapshot MATCH (a)-[r2:SOCIO_DE_SNAPSHOT]->(b) WHERE r2.snapshot_date = max_snapshot WITH a, b, collect(r2)[0] AS latest MERGE (a)-[s:SOCIO_DE]->(b) SET s = latest """)
Graph structure:
SOCIO_DE_SNAPSHOT: Immutable historical records (one per month)