Building Async Batch Processors for Inventory Updates
Policy & Compliance Boundaries
University research laboratories operate under strict federal and institutional mandates. NIH and NSF grant reporting requires precise asset tracking, while OSHA Hazard Communication standards and EPA hazardous waste manifest regulations demand immutable chemical and equipment inventories. Any ingestion pipeline must guarantee deterministic synchronization, meaning identical payloads produce identical ledger states regardless of submission timing or network conditions.
Compliance officers must demonstrate that malformed rows never corrupt the central ledger. This requires strict separation between validation, execution, and quarantine routing. Policy dictates that every record carries an institutional grant prefix, non-negative quantities, and standardized location codes. Implementation must enforce these rules before any database transaction occurs, and troubleshooting protocols must guarantee that rejected records are captured, classified, and routed for manual review without blocking the primary synchronization stream.
Architecture & Operational Separation
The ingestion layer decouples file parsing from execution. Incoming CSV or Excel manifests are normalized, tokenized, and dispatched through a dedicated worker pool, preserving administrative portal availability during peak grant reporting windows. This design mirrors the principles established in the Automated Ingestion & Data Sync Workflows baseline, where payloads are validated against institutional schemas before entering the execution queue.
Once normalized, records flow into an asynchronous consumer that applies strict type coercion, validates against federal grant formats, and routes failures to isolated recovery channels. Backpressure management and chunked dispatch are handled through the Async Processing & Queue Management subsystem, ensuring that high-volume submission windows do not exhaust connection pools or violate institutional SLAs. The operational boundary is explicit: policy defines acceptable schemas, implementation enforces them via idempotent async workers, and troubleshooting protocols govern quarantine review and deterministic retry semantics.
flowchart TD
M["Inventory manifest (CSV)"] --> C["Split into chunks"]
C --> P["Process chunks concurrently"]
P --> K{"Idempotency key seen?"}
K -->|"yes"| SK["Skip duplicate"]
K -->|"no"| V{"Pydantic validation"}
V -->|"invalid"| Q["Quarantine with error detail"]
V -->|"valid"| U["Async commit to API"]
U -->|"5xx / network"| RB["Backoff + retry"]
RB --> U
U -->|"retries exhausted"| Q
U -->|"committed"| R["Aggregate batch result"]
Figure: chunks are validated and committed concurrently; duplicates are skipped and failures fall through to quarantine.
Implementation: Idempotent Async Batch Processor
The following production-ready implementation enforces schema validation, cryptographic traceability, and deterministic fallback routing. It uses pydantic for strict type coercion, asyncio for non-blocking chunk processing, and explicit idempotency keys to prevent duplicate ledger entries during network retries.
import asyncio
import hashlib
import logging
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Any
from dataclasses import dataclass, field
import pandas as pd
from pydantic import BaseModel, ValidationError, field_validator, ConfigDict
import httpx
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[
logging.FileHandler("inventory_audit.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("compliance.inventory_processor")
class InventoryItem(BaseModel):
model_config = ConfigDict(extra="forbid")
asset_id: str
grant_code: str
category: str
quantity: int
location: str
acquisition_date: str
@field_validator("grant_code")
@classmethod
def validate_grant_prefix(cls, v: str) -> str:
allowed = ("NIH-", "NSF-", "DOE-", "DOD-", "EPA-")
if not v.startswith(allowed):
raise ValueError(f"Grant code must begin with institutional prefix: {allowed}")
return v.upper()
@field_validator("quantity")
@classmethod
def validate_non_negative(cls, v: int) -> int:
if v < 0:
raise ValueError("Quantity must be non-negative per OSHA/EPA manifest standards")
return v
@dataclass
class BatchResult:
committed: List[Dict[str, Any]] = field(default_factory=list)
quarantined: List[Dict[str, Any]] = field(default_factory=list)
batch_hash: str = ""
processed_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class AsyncInventoryProcessor:
def __init__(self, api_base_url: str, chunk_size: int = 500, max_retries: int = 3):
self.api_base_url = api_base_url.rstrip("/")
self.chunk_size = chunk_size
self.max_retries = max_retries
self._idempotency_store: set[str] = set()
def _compute_batch_hash(self, records: List[Dict[str, Any]]) -> str:
payload = json.dumps(records, sort_keys=True).encode()
return hashlib.sha256(payload).hexdigest()
def _generate_idempotency_key(self, record: Dict[str, Any]) -> str:
return f"{record.get('asset_id')}:{record.get('grant_code')}"
async def _commit_chunk(self, chunk: List[Dict[str, Any]], client: httpx.AsyncClient) -> List[Dict[str, Any]]:
idem_key = self._compute_batch_hash(chunk)
headers = {"Idempotency-Key": idem_key, "X-Compliance-Standard": "NIH-NSF-OSHA-EPA"}
for attempt in range(1, self.max_retries + 1):
try:
async with client.stream("POST", f"{self.api_base_url}/inventory/upsert", json=chunk, headers=headers) as resp:
resp.raise_for_status()
return chunk
except httpx.HTTPStatusError as e:
if 500 <= e.response.status_code < 600:
logger.warning(f"Transient server error (attempt {attempt}/{self.max_retries}): {e.response.status_code}")
await asyncio.sleep(2 ** attempt)
continue
raise
except httpx.RequestError:
logger.warning(f"Network drop (attempt {attempt}/{self.max_retries}). Retrying...")
await asyncio.sleep(2 ** attempt)
continue
raise RuntimeError("Max retries exceeded for chunk commit")
async def _process_chunk(self, chunk: List[Dict[str, Any]], client: httpx.AsyncClient) -> BatchResult:
result = BatchResult()
valid_records = []
for record in chunk:
key = self._generate_idempotency_key(record)
if key in self._idempotency_store:
logger.debug(f"Skipping duplicate idempotency key: {key}")
continue
try:
validated = InventoryItem(**record)
valid_records.append(validated.model_dump())
self._idempotency_store.add(key)
except ValidationError as ve:
result.quarantined.append({
"original": record,
"errors": ve.errors(),
"reason": "schema_violation"
})
if valid_records:
try:
committed = await self._commit_chunk(valid_records, client)
result.committed.extend(committed)
except Exception as e:
logger.error(f"Chunk commit failed. Routing to deterministic fallback: {e}")
for rec in valid_records:
result.quarantined.append({
"original": rec,
"errors": [{"msg": "Network/commit failure during chunk submission"}],
"reason": "transient_failure"
})
result.batch_hash = self._compute_batch_hash(valid_records)
return result
async def process_manifest(self, file_path: Path) -> List[BatchResult]:
logger.info(f"Starting async batch processing for: {file_path}")
df = pd.read_csv(file_path, dtype=str)
records = df.to_dict(orient="records")
chunks = [records[i:i + self.chunk_size] for i in range(0, len(records), self.chunk_size)]
results = []
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = [self._process_chunk(chunk, client) for chunk in chunks]
results = await asyncio.gather(*tasks, return_exceptions=True)
final_results = []
for r in results:
if isinstance(r, Exception):
logger.error(f"Chunk processing failed with exception: {r}")
final_results.append(BatchResult(quarantined=[{"error": str(r)}]))
else:
final_results.append(r)
total_committed = sum(len(r.committed) for r in final_results)
total_quarantined = sum(len(r.quarantined) for r in final_results)
logger.info(f"Completed processing. Committed: {total_committed} | Quarantined: {total_quarantined}")
return final_resultsKey Implementation Notes
- Idempotency Enforcement: The
_generate_idempotency_keymethod creates a composite key fromasset_idandgrant_code. The in-memory store prevents duplicate processing during retries. For distributed deployments, replaceself._idempotency_storewith a Redis-backed set or database constraint. - Deterministic Fallback Routing: If a chunk fails after exhausting retries, valid records are not silently dropped. They are explicitly routed to the
quarantinedlist with atransient_failureclassification, enabling compliance officers to trigger manual reconciliation without data loss. - Schema Drift Protection:
pydantic’sextra="forbid"configuration rejects unexpected columns, preventing silent data corruption when upstream systems modify manifest structures.
Troubleshooting & Recovery Protocols
Network volatility and schema drift are inevitable in multi-institutional research environments. The following operational boundaries define recovery semantics:
- Partial Network Drops: If an HTTP connection resets mid-chunk, the processor retries with exponential backoff. If the remote ledger returns
409 Conflict, the idempotency key guarantees the record was already committed. The processor logs the conflict and proceeds without duplication. - Schema Drift Events: When upstream systems add deprecated columns or alter date formats,
pydanticvalidation fails predictably. Quarantined records include exact field-level error traces. Lab managers should run a schema diff against the institutional baseline before resubmitting. - Quarantine Review Workflow: Compliance officers must review
quarantinedpayloads daily. Records markedschema_violationrequire manual correction or upstream system patching. Records markedtransient_failurecan be safely re-queued once network stability is verified. - Memory & Concurrency Limits: The chunk size defaults to 500 to balance throughput and memory footprint. For manifests exceeding 50,000 rows, deploy the processor within a containerized worker pool with explicit memory limits (
--max-memory=1G) to prevent OOM kills during peak grant cycles.
Audit & Cryptographic Traceability
Every batch generates a SHA-256 hash of its normalized payload before execution. This hash is logged alongside the processed_at timestamp and stored in the inventory_audit.log file. During federal audits, this cryptographic fingerprint allows compliance officers to prove that a specific manifest version was processed exactly as submitted, without post-hoc manipulation.
The audit log integrates with institutional SIEM platforms via structured JSON output. Grant administrators can cross-reference batch_hash values with NIH/NSF reporting portals to verify asset synchronization. For chemical inventories, OSHA and EPA inspectors can trace hazardous material manifests through the quarantined routing logs, ensuring that rejected entries never bypass regulatory review.