Reserved Instance Mapping Logic
Pipeline Architecture & Positioning
Reserved Instance Mapping Logic operates at the normalization and allocation layer of a cloud cost data pipeline. Raw billing exports deliver unattributed hourly usage records, while commitment inventories (Reserved Instances, Savings Plans, Committed Use Discounts, and Azure Reservations) exist as separate financial instruments. The mapping engine must deterministically correlate usage with active discount scopes, applying vendor-specific precedence rules before costs are distributed to business units. This stage executes after schema standardization and before FinOps Architecture & Billing Fundamentals chargeback/showback distribution.
In production, mapping must handle multi-account hierarchies, cross-tenant sharing, and temporal misalignments between usage ingestion and commitment activation. The engine typically runs as a daily batch job, consuming normalized Parquet/CSV exports and outputting a coverage matrix with utilization metrics, effective rate adjustments, and unallocated on-demand deltas. Accurate mapping prevents cost leakage, ensures compliance with procurement contracts, and provides the foundational data required for capacity forecasting and rightsizing initiatives.
Core Mapping Principles
Vendor Precedence Hierarchies
Cloud providers apply discount instruments using strict, non-interchangeable consumption orders. Misapplying these rules results in phantom utilization or inflated on-demand spend.
- AWS: Compute-optimized Savings Plans consume eligible hours first, followed by regional Reserved Instances, then zonal Reserved Instances. Instance family matching requires exact alignment with
instance_typeprefixes (e.g.,c5,m6g). - GCP: Regional committed use discounts apply across all zones within a region before zonal commitments are evaluated. GCP also supports flexible family matching (e.g., N2 to N2D) depending on contract terms.
- Azure: Reservation utilization maps to specific VM series within the same subscription or resource group scope. Scope boundaries and Azure Hybrid Benefit Cost Tracking licensing overlays must be resolved prior to assignment.
Temporal Alignment & Boundary Resolution
Billing exports rarely align perfectly with commitment activation windows. The engine must:
- Convert all timestamps to UTC.
- Handle partial-hour boundaries by prorating usage to the nearest billing increment.
- Account for activation delays (typically 15–60 minutes post-purchase).
- Generate a time-indexed usage matrix keyed by
account_id,instance_type,availability_zone, andhour_utc.
Scope Resolution & Cross-Account Sharing
Organizations frequently share commitments across organizational units via consolidated billing or resource sharing. The mapping layer must respect sharing hierarchies: payer accounts apply commitments to linked accounts only when explicitly enabled, and zonal commitments cannot satisfy usage in different availability zones unless regional flexibility is purchased.
Step-by-Step Implementation Workflow
- Commitment Inventory Sync: Query vendor billing APIs to retrieve active discount instruments. Normalize fields to a canonical schema:
commitment_id,start_utc,end_utc,scope(regional/zonal/account),instance_family,os_license,tenancy, andpurchased_vcpu. Pagination and rate-limit handling are mandatory at this stage. - Usage Normalization: Ingest hourly usage exports and map vendor-specific SKUs to a unified family taxonomy. Filter to eligible instance types (compute, memory-optimized, GPU) and exclude non-eligible workloads (spot, free tier, data transfer, storage).
- Temporal Alignment: Convert all timestamps to UTC. Handle partial-hour boundaries, commitment activation delays, and overlapping scopes. Generate a time-indexed usage matrix keyed by
account_id,instance_type,availability_zone, andhour_utc. - Priority-Based Assignment: Apply vendor matching hierarchies. For AWS, compute-optimized Savings Plans consume first, followed by regional RIs, then zonal RIs. For GCP, regional committed use discounts apply before zonal. For Azure, reservation utilization maps to specific VM series within the same resource group or subscription scope.
- Coverage & Utilization Calculation: Compute
effective_coverage = discounted_hours / total_eligible_hoursandutilization = consumed_hours / purchased_hours. Track unallocated on-demand hours and flag commitments with utilization below threshold (typically <70%). - Idempotent State Persistence: Write mapping results to a data warehouse or object store with deterministic run IDs. Implement upsert logic to prevent duplicate allocation during pipeline retries.
Production-Grade Python Engine
The following implementation demonstrates a memory-efficient, retry-aware mapping engine. It uses paginated API consumers, exponential backoff, IAM credential rotation handling, and chunked processing to maintain sub-4GB memory footprints even at enterprise scale. Financial calculations leverage Python’s decimal module to prevent floating-point drift.
import os
import uuid
import logging
import requests
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional, Generator, Tuple
from dataclasses import dataclass, field
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
@dataclass
class Commitment:
commitment_id: str
start_utc: datetime
end_utc: datetime
scope: str # regional | zonal | account
instance_family: str
os_license: str
tenancy: str
purchased_hours: Decimal
consumed_hours: Decimal = Decimal("0")
utilization_pct: Decimal = Decimal("0")
@dataclass
class UsageRecord:
account_id: str
instance_type: str
availability_zone: str
hour_utc: datetime
eligible_hours: Decimal
on_demand_rate: Decimal
class CommitmentMapper:
def __init__(self, api_base_url: str, auth_headers: Dict[str, str], chunk_size: int = 5000):
self.api_base_url = api_base_url
self.headers = auth_headers
self.chunk_size = chunk_size
self.session = self._build_retry_session()
def _build_retry_session(self) -> requests.Session:
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=1.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def fetch_commitments(self) -> Generator[Commitment, None, None]:
"""Paginated commitment inventory sync with token rotation handling."""
next_token = None
while True:
params = {"limit": self.chunk_size}
if next_token:
params["next_token"] = next_token
resp = self.session.get(
f"{self.api_base_url}/commitments",
headers=self.headers,
params=params,
timeout=30
)
resp.raise_for_status()
payload = resp.json()
for item in payload.get("items", []):
yield Commitment(
commitment_id=item["id"],
start_utc=datetime.fromisoformat(item["start_utc"].replace("Z", "+00:00")),
end_utc=datetime.fromisoformat(item["end_utc"].replace("Z", "+00:00")),
scope=item["scope"],
instance_family=item["family"],
os_license=item.get("os_license", "linux"),
tenancy=item.get("tenancy", "shared"),
purchased_hours=Decimal(str(item["purchased_hours"]))
)
next_token = payload.get("next_token")
if not next_token:
break
def normalize_usage(self, raw_records: List[Dict]) -> List[UsageRecord]:
"""Filter and map vendor SKUs to canonical taxonomy."""
normalized = []
for rec in raw_records:
if rec.get("usage_type") in ("Spot", "FreeTier", "DataTransfer"):
continue
normalized.append(UsageRecord(
account_id=rec["account_id"],
instance_type=rec["instance_type"],
availability_zone=rec["az"],
hour_utc=datetime.fromisoformat(rec["hour_utc"].replace("Z", "+00:00")),
eligible_hours=Decimal(str(rec["usage_hours"])),
on_demand_rate=Decimal(str(rec["on_demand_rate"]))
))
return normalized
def assign_commitments(self, commitments: List[Commitment], usage: List[UsageRecord]) -> List[Dict]:
"""Greedy priority-based assignment respecting vendor precedence."""
results = []
usage.sort(key=lambda u: u.hour_utc)
for u in usage:
allocated_hours = Decimal("0")
applicable = [
c for c in commitments
if c.instance_family == u.instance_type.split(".")[0]
and c.start_utc <= u.hour_utc < c.end_utc
and (c.scope == "regional" or c.scope == u.availability_zone)
and c.consumed_hours < c.purchased_hours
]
# Sort by precedence: regional/zonal/account logic handled by scope priority
applicable.sort(key=lambda c: (0 if c.scope == "regional" else 1, c.start_utc))
for c in applicable:
if allocated_hours >= u.eligible_hours:
break
remaining_capacity = c.purchased_hours - c.consumed_hours
consume = min(u.eligible_hours - allocated_hours, remaining_capacity)
c.consumed_hours += consume
allocated_hours += consume
on_demand_hours = u.eligible_hours - allocated_hours
results.append({
"account_id": u.account_id,
"instance_type": u.instance_type,
"az": u.availability_zone,
"hour_utc": u.hour_utc.isoformat(),
"covered_hours": allocated_hours,
"on_demand_hours": on_demand_hours,
"effective_rate": (allocated_hours * Decimal("0.00") + on_demand_hours * u.on_demand_rate) / u.eligible_hours if u.eligible_hours > 0 else Decimal("0")
})
return results
def calculate_utilization(self, commitments: List[Commitment]) -> List[Dict]:
"""Compute coverage and utilization metrics."""
metrics = []
for c in commitments:
c.utilization_pct = (c.consumed_hours / c.purchased_hours * 100).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
metrics.append({
"commitment_id": c.commitment_id,
"purchased_hours": c.purchased_hours,
"consumed_hours": c.consumed_hours,
"utilization_pct": c.utilization_pct,
"status": "underutilized" if c.utilization_pct < 70 else "optimal"
})
return metrics
def persist_results(self, run_id: str, mapping_results: List[Dict], utilization_metrics: List[Dict]) -> None:
"""Idempotent state persistence with deterministic run IDs."""
# In production, replace with Parquet write to S3/GCS or warehouse upsert
logger.info(f"Persisting run_id={run_id} | records={len(mapping_results)} | metrics={len(utilization_metrics)}")
# Example: df.to_parquet(f"s3://finops-data/mapping/run_id={run_id}/")
def run_mapping_pipeline():
auth_headers = {"Authorization": f"Bearer {os.environ.get('BILLING_API_TOKEN')}"}
mapper = CommitmentMapper(
api_base_url="https://billing-api.cloudprovider.internal/v1",
auth_headers=auth_headers,
chunk_size=10000
)
run_id = str(uuid.uuid4())
logger.info(f"Starting mapping pipeline | run_id={run_id}")
commitments = list(mapper.fetch_commitments())
logger.info(f"Synced {len(commitments)} active commitments")
# Simulate raw usage ingestion
raw_usage = [
{"account_id": "acc-01", "instance_type": "c5.xlarge", "az": "us-east-1a",
"hour_utc": "2024-01-15T10:00:00Z", "usage_type": "OnDemand", "usage_hours": "1.0", "on_demand_rate": "0.170"}
]
normalized = mapper.normalize_usage(raw_usage)
mapping_results = mapper.assign_commitments(commitments, normalized)
utilization = mapper.calculate_utilization(commitments)
mapper.persist_results(run_id, mapping_results, utilization)
logger.info("Pipeline execution complete")
if __name__ == "__main__":
run_mapping_pipeline()
Operational Hardening
Memory & Throughput Optimization
Processing millions of hourly records requires streaming architectures. The engine above uses generator-based API consumption and chunked in-memory sorting. For datasets exceeding 100M rows, offload temporal joins to a columnar engine (e.g., DuckDB, Apache Spark, or BigQuery) and use the Python layer strictly for orchestration and vendor-rule application.
Idempotency & Retry Safety
Cloud billing APIs occasionally return partial payloads or transient 5xx errors. The mapping pipeline must:
- Generate a deterministic
run_idper execution window. - Write intermediate state to a staging partition before committing to the production table.
- Implement
INSERT ... ON CONFLICT DO UPDATEor equivalent upsert semantics keyed on(account_id, instance_type, hour_utc, run_id).
Monitoring & Drift Detection
Deploy automated alerts when:
- Commitment utilization drops below 70% for three consecutive billing cycles.
- Unallocated on-demand spend exceeds 15% of total compute cost.
- Temporal gaps exceed 2 hours between usage ingestion and commitment activation.
Integrate mapping outputs into dashboards alongside AWS Cost Explorer Architecture data to validate vendor-reported coverage against internal calculations. Discrepancies often indicate misconfigured sharing scopes or untracked cross-account usage.
Financial Precision
Cloud billing requires exact decimal arithmetic. The implementation above uses Python’s decimal.Decimal to prevent IEEE-754 floating-point accumulation errors. Always quantize final rates to 4–6 decimal places before writing to financial reporting systems, and validate against vendor-provided GCP Billing Export Configuration and AWS cost allocation reports monthly.
Related Pages
- AWS Savings Plans vs RI Coverage Analysis
- GCP Committed Use Discount Optimization