Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/World-Open-Graph/br-acc/llms.txt

Use this file to discover all available pages before exploring further.

Overview

This guide walks through creating a production-grade ETL pipeline from scratch. We’ll build a pipeline for ANATEL (Brazilian telecom regulator) as an example.

Prerequisites

  • Python 3.12+
  • Neo4j 5.x running locally
  • Basic understanding of ETL concepts
  • Familiarity with Pandas for data processing

Pipeline Template

All pipelines inherit from Pipeline base class:
from bracc_etl.base import Pipeline
from bracc_etl.loader import Neo4jBatchLoader
from bracc_etl.transforms import (
    format_cnpj,
    normalize_name,
    parse_date,
    deduplicate_rows,
)

class AnatelPipeline(Pipeline):
    """ETL pipeline for ANATEL telecom licenses."""
    
    name = "anatel"
    source_id = "anatel"
    
    def __init__(self, driver, data_dir="./data", **kwargs):
        super().__init__(driver, data_dir, **kwargs)
        self.licenses = []
        self.companies = []
        self.relationships = []
    
    def extract(self) -> None:
        """Download raw data from ANATEL API."""
        pass
    
    def transform(self) -> None:
        """Normalize and deduplicate data."""
        pass
    
    def load(self) -> None:
        """Load data into Neo4j."""
        pass

Step 1: Create Download Script

Create etl/scripts/download_anatel.py:
#!/usr/bin/env python3
"""Download ANATEL telecom license data.

Usage:
    python etl/scripts/download_anatel.py --output-dir ./data/anatel
"""

import logging
import sys
from pathlib import Path

import click
import httpx

sys.path.insert(0, str(Path(__file__).parent))
from _download_utils import download_file

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

ANATEL_API = "https://sistemas.anatel.gov.br/stel/api/licencas"


@click.command()
@click.option("--output-dir", default="./data/anatel", help="Output directory")
@click.option("--timeout", type=int, default=600, help="Download timeout")
def main(output_dir: str, timeout: int) -> None:
    """Download ANATEL license data."""
    out = Path(output_dir)
    out.mkdir(parents=True, exist_ok=True)
    
    # Download license registry
    dest = out / "licencas.json"
    logger.info("Downloading from %s", ANATEL_API)
    
    try:
        with httpx.stream("GET", ANATEL_API, timeout=timeout) as response:
            response.raise_for_status()
            with open(dest, "wb") as f:
                for chunk in response.iter_bytes(chunk_size=8192):
                    f.write(chunk)
        logger.info("Downloaded: %s", dest)
    except httpx.HTTPError as e:
        logger.error("Download failed: %s", e)
        sys.exit(1)


if __name__ == "__main__":
    main()
Download utilities (etl/scripts/_download_utils.py):
  • download_file(url, dest, timeout) — HTTP download with progress
  • extract_zip(zip_path, extract_dir) — Safe ZIP extraction
  • validate_csv(path, expected_cols) — Column validation

Step 2: Implement Extract Phase

In etl/src/bracc_etl/pipelines/anatel.py:
import json
import logging
from pathlib import Path
import pandas as pd

logger = logging.getLogger(__name__)

class AnatelPipeline(Pipeline):
    # ... (name, source_id, __init__)
    
    def extract(self) -> None:
        """Download and read ANATEL license data."""
        anatel_dir = Path(self.data_dir) / "anatel"
        anatel_dir.mkdir(parents=True, exist_ok=True)
        
        # Option 1: Download via script
        # Run: python etl/scripts/download_anatel.py
        
        # Option 2: Download within pipeline
        import httpx
        url = "https://sistemas.anatel.gov.br/stel/api/licencas"
        dest = anatel_dir / "licencas.json"
        
        if not dest.exists():
            logger.info("Downloading from %s", url)
            try:
                response = httpx.get(url, timeout=600)
                response.raise_for_status()
                dest.write_bytes(response.content)
                logger.info("Downloaded: %s", dest)
            except httpx.HTTPError as e:
                logger.error("Download failed: %s", e)
                raise
        
        # Read JSON data
        with open(dest, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        # Convert to DataFrame
        self._raw_licenses = pd.DataFrame(data["licencas"])
        logger.info("Extracted %d licenses", len(self._raw_licenses))
Key patterns:
  • Create data directory structure
  • Check if file exists (skip re-download)
  • Handle HTTP errors gracefully
  • Log progress

Step 3: Implement Transform Phase

from bracc_etl.transforms import (
    format_cnpj,
    normalize_name,
    parse_date,
    deduplicate_rows,
)

class AnatelPipeline(Pipeline):
    # ... (extract)
    
    def transform(self) -> None:
        """Normalize and deduplicate ANATEL data."""
        df = self._raw_licenses.copy()
        
        # 1. Normalize company data
        df["cnpj"] = df["cnpj"].astype(str).map(format_cnpj)
        df["razao_social"] = df["razao_social"].astype(str).map(normalize_name)
        
        # 2. Parse dates
        df["data_emissao"] = df["data_emissao"].astype(str).map(parse_date)
        df["data_validade"] = df["data_validade"].astype(str).map(parse_date)
        
        # 3. Build company nodes
        company_cols = ["cnpj", "razao_social"]
        self.companies = deduplicate_rows(
            df[company_cols].to_dict("records"),
            key_fields=["cnpj"],
        )
        
        # 4. Build license nodes
        license_cols = [
            "numero_licenca", "tipo", "servico", "tecnologia",
            "data_emissao", "data_validade", "status",
        ]
        self.licenses = df[license_cols].to_dict("records")
        
        # 5. Build relationships (Company -[:POSSUI_LICENCA]-> License)
        self.relationships = [
            {
                "source_key": row["cnpj"],
                "target_key": row["numero_licenca"],
                "data_emissao": row["data_emissao"],
            }
            for _, row in df.iterrows()
        ]
        
        logger.info(
            "Transformed: %d companies, %d licenses, %d relationships",
            len(self.companies),
            len(self.licenses),
            len(self.relationships),
        )
Transform best practices:
  1. Copy DataFrame: df = self._raw.copy() to avoid mutations
  2. Vectorized operations: Use .map() instead of .apply() for speed
  3. Deduplicate: Always deduplicate nodes on primary keys
  4. Type safety: Use .astype(str) before transforms

Step 4: Implement Load Phase

from bracc_etl.loader import Neo4jBatchLoader

class AnatelPipeline(Pipeline):
    # ... (extract, transform)
    
    def load(self) -> None:
        """Load data into Neo4j."""
        loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)
        
        # 1. Load Company nodes
        if self.companies:
            loader.load_nodes(
                label="Company",
                rows=self.companies,
                key_field="cnpj",
            )
            logger.info("Loaded %d Company nodes", len(self.companies))
        
        # 2. Load License nodes
        if self.licenses:
            loader.load_nodes(
                label="License",
                rows=self.licenses,
                key_field="numero_licenca",
            )
            logger.info("Loaded %d License nodes", len(self.licenses))
        
        # 3. Load relationships
        if self.relationships:
            loader.load_relationships(
                rel_type="POSSUI_LICENCA",
                rows=self.relationships,
                source_label="Company",
                source_key="cnpj",
                target_label="License",
                target_key="numero_licenca",
                properties=["data_emissao"],
            )
            logger.info("Loaded %d POSSUI_LICENCA relationships", len(self.relationships))
        
        # Track metrics
        self.rows_in = len(self._raw_licenses)
        self.rows_loaded = len(self.companies) + len(self.licenses)
Load patterns:
  • Use Neo4jBatchLoader for efficient bulk inserts
  • Always specify key_field for MERGE semantics
  • Load nodes before relationships (foreign key integrity)
  • Track rows_in and rows_loaded for monitoring

Step 5: Register Pipeline

Add to etl/src/bracc_etl/runner.py:54:
from bracc_etl.pipelines.anatel import AnatelPipeline

PIPELINES: dict[str, type] = {
    # ... existing pipelines
    "anatel": AnatelPipeline,
}

Step 6: Test the Pipeline

Run with Limit

bracc-etl run \
  --source anatel \
  --neo4j-password your-password \
  --data-dir ./data \
  --limit 1000
This processes only 1,000 licenses for quick testing.

Verify Results

// Count nodes
MATCH (l:License)
RETURN count(l) AS total_licenses

// Sample licenses
MATCH (c:Company)-[r:POSSUI_LICENCA]->(l:License)
RETURN c.razao_social, l.tipo, l.servico, r.data_emissao
LIMIT 10

Check IngestionRun

MATCH (r:IngestionRun {source_id: 'anatel'})
RETURN r.status, r.rows_in, r.rows_loaded, r.started_at
ORDER BY r.started_at DESC
LIMIT 1
Expected: status = 'loaded'

Advanced Patterns

Pattern 1: Streaming for Large Datasets

For datasets > 10GB, implement run_streaming():
class AnatelPipeline(Pipeline):
    # ... (extract, transform, load)
    
    def run_streaming(self, start_phase: int = 1) -> None:
        """Stream-process large datasets chunk-by-chunk."""
        loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)
        anatel_dir = Path(self.data_dir) / "anatel"
        
        # Phase 1: Stream licenses from JSONL file
        file_path = anatel_dir / "licencas.jsonl"
        total_companies = 0
        total_licenses = 0
        
        with open(file_path, "r") as f:
            chunk = []
            for line in f:
                chunk.append(json.loads(line))
                
                if len(chunk) >= self.chunk_size:
                    # Transform chunk
                    df = pd.DataFrame(chunk)
                    companies, licenses, rels = self._transform_chunk(df)
                    
                    # Load chunk
                    loader.load_nodes("Company", companies, key_field="cnpj")
                    loader.load_nodes("License", licenses, key_field="numero_licenca")
                    loader.load_relationships(
                        "POSSUI_LICENCA", rels,
                        "Company", "cnpj",
                        "License", "numero_licenca",
                    )
                    
                    total_companies += len(companies)
                    total_licenses += len(licenses)
                    chunk = []
                    
                    logger.info(
                        "Progress: %d companies, %d licenses",
                        total_companies, total_licenses,
                    )
        
        logger.info("Streaming complete: %d companies, %d licenses", 
                    total_companies, total_licenses)
Run:
bracc-etl run --source anatel --streaming --neo4j-password pass

Pattern 2: Multi-File Processing

For sources with multiple files (like CNPJ):
def extract(self) -> None:
    """Extract from multiple CSV files."""
    anatel_dir = Path(self.data_dir) / "anatel"
    
    # Find all CSV files matching pattern
    files = sorted(anatel_dir.glob("licencas_*.csv"))
    if not files:
        raise FileNotFoundError("No license files found")
    
    frames = []
    for f in files:
        logger.info("Reading %s", f.name)
        df = pd.read_csv(f, dtype=str, keep_default_na=False)
        frames.append(df)
    
    self._raw_licenses = pd.concat(frames, ignore_index=True)
    logger.info("Extracted %d licenses from %d files", 
                len(self._raw_licenses), len(files))

Pattern 3: API Pagination

For paginated APIs:
import httpx

def extract(self) -> None:
    """Extract from paginated API."""
    base_url = "https://api.anatel.gov.br/v1/licencas"
    all_records = []
    page = 1
    
    while True:
        logger.info("Fetching page %d", page)
        response = httpx.get(
            base_url,
            params={"page": page, "per_page": 1000},
            timeout=60,
        )
        response.raise_for_status()
        data = response.json()
        
        records = data.get("results", [])
        if not records:
            break
        
        all_records.extend(records)
        page += 1
        
        # Rate limiting
        time.sleep(1)
    
    self._raw_licenses = pd.DataFrame(all_records)
    logger.info("Extracted %d licenses from API", len(self._raw_licenses))

Pattern 4: Schema Validation

Add Pandera schema for validation:
import pandera as pa

class AnatelLicenseSchema(pa.DataFrameModel):
    numero_licenca: str = pa.Field(str_length={"min_value": 1})
    cnpj: str = pa.Field(str_matches=r"^\d{14}$")
    tipo: str = pa.Field(isin=["SCM", "SMP", "STFC", "SeAC"])
    data_emissao: str = pa.Field(str_matches=r"^\d{4}-\d{2}-\d{2}$")

def transform(self) -> None:
    """Transform with validation."""
    df = self._raw_licenses.copy()
    
    # Normalize
    df["cnpj"] = df["cnpj"].map(format_cnpj)
    df["data_emissao"] = df["data_emissao"].map(parse_date)
    
    # Validate schema
    try:
        AnatelLicenseSchema.validate(df, lazy=True)
    except pa.errors.SchemaErrors as e:
        logger.error("Schema validation failed:\n%s", e)
        raise
    
    # Continue transformation...

Pattern 5: Reference Table Resolution

For coded values that need human-readable labels:
def _load_reference_tables(self) -> None:
    """Load ANATEL reference tables."""
    ref_dir = Path(self.data_dir) / "anatel" / "reference"
    
    # Load service type codes
    df = pd.read_csv(ref_dir / "tipos_servico.csv", dtype=str)
    self._service_types = dict(zip(df["codigo"], df["descricao"]))
    logger.info("Loaded %d service type codes", len(self._service_types))

def _resolve_service_type(self, code: str) -> str:
    """Look up service type description."""
    return self._service_types.get(code.strip(), code) if code else code

def transform(self) -> None:
    """Transform with reference resolution."""
    self._load_reference_tables()
    
    df = self._raw_licenses.copy()
    df["servico"] = df["tipo_servico"].map(self._resolve_service_type)
    # ...

Testing

Unit Tests

Create etl/tests/test_anatel.py:
import pandas as pd
import pytest
from bracc_etl.pipelines.anatel import AnatelPipeline


@pytest.fixture
def sample_data():
    return pd.DataFrame({
        "numero_licenca": ["123456789", "987654321"],
        "cnpj": ["12345678000190", "98765432000110"],
        "razao_social": ["EMPRESA A", "EMPRESA B"],
        "tipo": ["SMP", "SCM"],
        "data_emissao": ["2024-01-15", "2024-02-20"],
    })


def test_transform(sample_data, neo4j_driver):
    """Test transform phase."""
    pipeline = AnatelPipeline(neo4j_driver, data_dir="./test_data")
    pipeline._raw_licenses = sample_data
    pipeline.transform()
    
    assert len(pipeline.companies) == 2
    assert len(pipeline.licenses) == 2
    assert len(pipeline.relationships) == 2
    
    # Check CNPJ formatting
    assert pipeline.companies[0]["cnpj"] == "12.345.678/0001-90"

Integration Tests

Run full pipeline with test data:
# Create test data
mkdir -p ./test_data/anatel
cat > ./test_data/anatel/licencas.json << EOF
{
  "licencas": [
    {"numero_licenca": "123456789", "cnpj": "12345678000190", ...}
  ]
}
EOF

# Run pipeline
bracc-etl run \
  --source anatel \
  --neo4j-password test \
  --data-dir ./test_data

# Verify
bracc-etl sources --status --neo4j-password test

Production Checklist

Before merging your pipeline:
  • Download script works with real data source
  • Extract phase handles missing files gracefully
  • Transform phase normalizes all key fields (CNPJ, names, dates)
  • Load phase uses MERGE (not CREATE) for idempotency
  • Deduplication on all node primary keys
  • Error handling with try/except and logging
  • Progress logging every 10k-100k rows
  • Schema validation with Pandera (optional but recommended)
  • Unit tests for transform logic
  • Integration test with sample data
  • Documentation in source registry (source_registry_br_v1.csv)
  • Registered in runner.py PIPELINES dict

Complete Example

Full working pipeline: etl/src/bracc_etl/pipelines/anatel.py
from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any

import httpx
import pandas as pd

from bracc_etl.base import Pipeline
from bracc_etl.loader import Neo4jBatchLoader
from bracc_etl.transforms import (
    deduplicate_rows,
    format_cnpj,
    normalize_name,
    parse_date,
)

if TYPE_CHECKING:
    from neo4j import Driver

logger = logging.getLogger(__name__)

ANATEL_API = "https://sistemas.anatel.gov.br/stel/api/licencas"


class AnatelPipeline(Pipeline):
    """ETL pipeline for ANATEL telecom licenses."""

    name = "anatel"
    source_id = "anatel"

    def __init__(
        self,
        driver: Driver,
        data_dir: str = "./data",
        limit: int | None = None,
        chunk_size: int = 50_000,
        **kwargs: Any,
    ) -> None:
        super().__init__(driver, data_dir, limit=limit, chunk_size=chunk_size, **kwargs)
        self._raw_licenses = pd.DataFrame()
        self.companies: list[dict[str, Any]] = []
        self.licenses: list[dict[str, Any]] = []
        self.relationships: list[dict[str, Any]] = []

    def extract(self) -> None:
        """Download and read ANATEL license data."""
        anatel_dir = Path(self.data_dir) / "anatel"
        anatel_dir.mkdir(parents=True, exist_ok=True)
        dest = anatel_dir / "licencas.json"

        if not dest.exists():
            logger.info("Downloading from %s", ANATEL_API)
            try:
                response = httpx.get(ANATEL_API, timeout=600)
                response.raise_for_status()
                dest.write_bytes(response.content)
                logger.info("Downloaded: %s", dest)
            except httpx.HTTPError as e:
                logger.error("Download failed: %s", e)
                raise

        with open(dest, "r", encoding="utf-8") as f:
            data = json.load(f)

        self._raw_licenses = pd.DataFrame(data["licencas"])
        if self.limit:
            self._raw_licenses = self._raw_licenses.head(self.limit)
        logger.info("Extracted %d licenses", len(self._raw_licenses))

    def transform(self) -> None:
        """Normalize and deduplicate ANATEL data."""
        df = self._raw_licenses.copy()

        df["cnpj"] = df["cnpj"].astype(str).map(format_cnpj)
        df["razao_social"] = df["razao_social"].astype(str).map(normalize_name)
        df["data_emissao"] = df["data_emissao"].astype(str).map(parse_date)
        df["data_validade"] = df["data_validade"].astype(str).map(parse_date)

        company_cols = ["cnpj", "razao_social"]
        self.companies = deduplicate_rows(
            df[company_cols].to_dict("records"), key_fields=["cnpj"],
        )

        license_cols = [
            "numero_licenca", "tipo", "servico", "tecnologia",
            "data_emissao", "data_validade", "status",
        ]
        self.licenses = df[license_cols].to_dict("records")

        self.relationships = [
            {
                "source_key": row["cnpj"],
                "target_key": row["numero_licenca"],
                "data_emissao": row["data_emissao"],
            }
            for _, row in df.iterrows()
        ]

        logger.info(
            "Transformed: %d companies, %d licenses, %d relationships",
            len(self.companies), len(self.licenses), len(self.relationships),
        )

    def load(self) -> None:
        """Load data into Neo4j."""
        loader = Neo4jBatchLoader(self.driver, batch_size=self.chunk_size)

        if self.companies:
            loader.load_nodes("Company", self.companies, key_field="cnpj")
            logger.info("Loaded %d Company nodes", len(self.companies))

        if self.licenses:
            loader.load_nodes("License", self.licenses, key_field="numero_licenca")
            logger.info("Loaded %d License nodes", len(self.licenses))

        if self.relationships:
            loader.load_relationships(
                rel_type="POSSUI_LICENCA",
                rows=self.relationships,
                source_label="Company",
                source_key="cnpj",
                target_label="License",
                target_key="numero_licenca",
                properties=["data_emissao"],
            )
            logger.info("Loaded %d POSSUI_LICENCA relationships", len(self.relationships))

        self.rows_in = len(self._raw_licenses)
        self.rows_loaded = len(self.companies) + len(self.licenses)

Next Steps

Running Pipelines

Test your new pipeline locally

Pipeline Architecture

Learn advanced design patterns

Data Sources

Add your pipeline to the source registry

Overview

Back to ETL framework overview