Parsing AWS CUR Parquet Files with Python Pandas

Production-grade FinOps automation routinely fractures at the ingestion layer when engineers rely on default pandas.read_parquet() behavior against AWS Cost and Usage Report (CUR) exports. The core bottleneck is rarely storage throughput; it is uncontrolled memory allocation during schema resolution and aggressive JSON tag expansion. When orchestrating an AWS CUR to Data Lake Pipeline, naive parsing strategies trigger MemoryError exceptions on files exceeding 2GB, silently drop decimal precision on unblended cost columns, and fail to handle incremental partition drift. Resolving this requires a deterministic, PyArrow-backed parsing workflow that enforces explicit schemas, safely normalizes nested resource tags, and implements partition-aware chunking.

The Memory & Schema Bottleneck in Production

AWS CUR Parquet exports utilize a highly dynamic, append-only schema. Columns like line_item_resource_tags and reservation_reservation_arn are serialized as JSON strings, while financial metrics (line_item_unblended_cost, line_item_blended_cost) use DECIMAL(18,6) precision. Pandas defaults to float64 for numeric columns and attempts to auto-infer nested structures, which causes two critical failures in production environments:

  1. Tag Explosion & OOM: When pd.read_parquet() encounters a JSON string column, downstream pd.json_normalize() calls materialize every unique tag key as a separate column. In multi-account organizations with 50,000+ unique tag keys, this creates a sparse matrix that exhausts RAM before type casting completes. The memory footprint scales quadratically with tag cardinality, making it impossible to run inside standard Kubernetes worker pods.
  2. Decimal Precision Loss: Financial reconciliation requires exact decimal arithmetic. Pandas silently casts DECIMAL to float64, introducing rounding errors at the $0.001 threshold. Across millions of line items, these micro-errors compound, breaking cross-cloud cost reconciliation engines and violating audit compliance standards.

The solution is to bypass pandas’ inference engine entirely. By leveraging pyarrow.dataset for partition filtering and reading with dtype_backend="pyarrow", you retain exact decimal precision, defer tag expansion until after filtering, and enable zero-copy memory mapping. This approach aligns with modern Cloud Billing Data Ingestion & Parsing best practices, ensuring deterministic behavior across daily CUR partitions.

Deterministic Ingestion Architecture

A production-ready CUR parser must decouple schema definition from data scanning. The architecture relies on three pillars:

  • Explicit Schema Enforcement: Define the expected column types upfront using PyArrow’s type system. This prevents silent type coercion and guarantees DECIMAL(18,6) preservation.
  • Partition-Aware Scanning: Use pyarrow.dataset to push down filter expressions. This allows the scanner to skip irrelevant partitions (e.g., historical months) without loading them into memory.
  • Deferred Tag Normalization: Parse JSON tags row-by-row or in micro-batches after filtering, mapping only business-relevant keys to columns. This avoids sparse matrix materialization.

Production-Ready Implementation

The following implementation is designed for containerized workers with strict memory limits (e.g., 4GB per pod). It uses batched iteration, explicit type mapping, and safe JSON parsing to guarantee stability under heavy load.

import pyarrow as pa
import pyarrow.dataset as ds
import pandas as pd
import json
import logging
from pathlib import Path
from typing import Iterator, Dict, Any, Optional

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)

# Explicit CUR schema mapping to prevent inference drift
# Focuses on identity, temporal, and financial columns
CUR_SCHEMA = pa.schema([
    ("identity_line_item_id", pa.string()),
    ("line_item_usage_account_id", pa.string()),
    ("line_item_usage_start_date", pa.timestamp("us", tz="UTC")),
    ("line_item_usage_end_date", pa.timestamp("us", tz="UTC")),
    ("line_item_unblended_cost", pa.decimal128(18, 6)),
    ("line_item_blended_cost", pa.decimal128(18, 6)),
    ("line_item_resource_tags", pa.string()),
    ("line_item_product_code", pa.string()),
    ("line_item_usage_type", pa.string()),
])

def parse_tags_safe(tag_json: str, allowed_prefixes: Optional[set] = None) -> Dict[str, str]:
    """
    Safely parse CUR JSON tags without exploding into sparse matrices.
    Filters to business-critical prefixes to control memory footprint.
    """
    if not tag_json or tag_json == "{}":
        return {}
    try:
        raw = json.loads(tag_json)
        if allowed_prefixes:
            return {
                k: str(v) for k, v in raw.items()
                if any(k.startswith(prefix) for prefix in allowed_prefixes)
            }
        return {k: str(v) for k, v in raw.items() if isinstance(v, (str, int, float))}
    except json.JSONDecodeError as e:
        logger.warning("Invalid JSON in resource_tags: %s", e)
        return {}

def stream_cur_partitions(
    cur_path: str,
    batch_size: int = 500_000,
    tag_prefixes: Optional[set] = None,
    partition_filter: Optional[pa.dataset.Expression] = None
) -> Iterator[pd.DataFrame]:
    """
    Partition-aware, chunked CUR ingestion with explicit schema enforcement.
    Yields pandas DataFrames with Arrow-backed types for zero-copy downstream processing.
    """
    logger.info("Initializing PyArrow dataset scanner: %s", cur_path)
    dataset = ds.dataset(cur_path, format="parquet", schema=CUR_SCHEMA)

    # Configure scanner with partition pruning and batch sizing
    scanner = dataset.scanner(
        batch_size=batch_size,
        use_threads=True,
        filter=partition_filter
    )

    for batch_idx, batch in enumerate(scanner.to_batches()):
        logger.debug("Processing batch %d (%d rows)", batch_idx, batch.num_rows)

        # Convert to pandas with Arrow backend to preserve DECIMAL precision
        df = batch.to_pandas(types_mapper=pd.ArrowDtype)

        # Deferred tag expansion: apply row-wise to avoid memory spikes
        df["parsed_tags"] = df["line_item_resource_tags"].apply(
            lambda x: parse_tags_safe(x, tag_prefixes)
        )

        # Drop raw JSON column to free memory before yielding
        df.drop(columns=["line_item_resource_tags"], inplace=True)

        yield df

    logger.info("Dataset scan complete.")

Key Engineering Decisions

  1. types_mapper=pd.ArrowDtype: This forces pandas to retain PyArrow’s native types instead of casting to NumPy equivalents. Financial columns remain as decimal.Decimal objects, eliminating float64 rounding drift. For deeper implementation details, consult the official Pandas PyArrow Backend Documentation.
  2. Batched Iteration: Instead of loading the entire file into RAM, scanner.to_batches() streams data in configurable chunks. This enables processing multi-gigabyte CUR files on 2–4GB container instances without swapping.
  3. Prefix-Filtered Tag Parsing: The allowed_prefixes parameter restricts tag expansion to known business domains (e.g., cost_center:, env:, project:). This prevents uncontrolled column proliferation from ephemeral or developer-generated tags.

Operationalizing the Pipeline

Deploying this parser in production requires addressing cloud-specific constraints around incremental loads, schema drift, and validation.

Handling Partition Drift

AWS CUR exports are delivered daily and partitioned by year and month. When new partitions arrive, the scanner must only process deltas. Use PyArrow’s expression builder to filter scans:

from pyarrow import compute as pc

# Filter to current month partition (assumes dataset partitioned by year/month)
filter_expr = (pc.field("year") == 2024) & (pc.field("month") == 10)
for df in stream_cur_partitions("s3://cur-bucket/exports/", partition_filter=filter_expr):
    # Write to Delta Lake, Iceberg, or BigQuery
    pass

Memory & CPU Tuning

  • Thread Control: use_threads=True leverages multi-core CPUs for Parquet decoding, but in memory-constrained pods, set use_threads=False to prevent thread pool overhead.
  • Batch Sizing: Start with batch_size=250_000 for 2GB pods. Monitor RSS memory via psutil or Kubernetes metrics and adjust upward until you hit the 80% utilization threshold.
  • Garbage Collection: Explicitly call gc.collect() between batch yields if downstream transformations retain references to previous chunks.

Schema Validation & Drift Guardrails

AWS occasionally adds columns to CUR exports (e.g., new sustainability metrics). The explicit CUR_SCHEMA will reject unknown columns during dataset initialization. Implement a fallback loader that logs schema mismatches and routes them to a quarantine table rather than failing the entire pipeline. Reference the AWS CUR User Guide for monthly schema change announcements.

Conclusion

Parsing AWS CUR Parquet files at scale demands a shift from pandas’ convenience-driven defaults to deterministic, schema-first ingestion. By combining PyArrow’s dataset API, explicit decimal preservation, and deferred tag normalization, FinOps teams can process multi-terabyte billing datasets within strict memory budgets. This architecture eliminates reconciliation drift, prevents OOM crashes, and provides a stable foundation for downstream cost allocation, showback, and anomaly detection engines.