Extract-transform-load pipelines adapted for AI data: ingesting raw documents and converting them to clean, chunked, embedded, and indexed data ready for LLM retrieval.
Traditional ETL moves structured data between databases. AI ETL handles: unstructured documents (PDFs, HTML, code), extraction of text + structure from arbitrary formats, semantic chunking and embedding, and vector index population. The key difference: every transformation step can degrade quality — a poor PDF parser produces garbled text that no LLM can recover from.
Parse raw documents into clean text + structure. " "Use specialised parsers: Unstructured.io for PDFs/Office/HTML, " "BeautifulSoup for web pages, tree-sitter for code. " "Extract tables as structured data, images as captions (via vision model), " "and preserve heading hierarchy for context.
from unstructured.partition.auto import partition
def extract_document(file_path: str) -> list[dict]:
elements = partition(filename=file_path, strategy="hi_res")
chunks = []
current_section = ""
for elem in elements:
if elem.category == "Title":
current_section = str(elem)
elif elem.category in ("NarrativeText", "ListItem", "Table"):
chunks.append({
"text": str(elem),
"section": current_section,
"type": elem.category,
"metadata": elem.metadata.to_dict(),
})
return chunks
Clean: remove boilerplate headers/footers, fix encoding issues, normalise whitespace. Chunk: split into retrieval-sized pieces (200–800 tokens) with overlap (10–20%). Enrich: add metadata (doc title, section, source URL). Embed: convert text to vectors (OpenAI text-embedding-3-small or local model). Each step should be idempotent and independently testable.
Batch-upsert to your vector store with document_id + chunk_index as the unique key. " "Upsert (not insert) so re-runs don't create duplicates. " "Update the metadata catalogue (Postgres/DynamoDB) with ingestion timestamp.
from openai import OpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
import uuid
oai = OpenAI()
qdrant = QdrantClient("localhost", port=6333)
def embed_and_load(chunks: list[dict], collection: str):
texts = [c["text"] for c in chunks]
embeddings = oai.embeddings.create(
model="text-embedding-3-small", input=texts
).data
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=emb.embedding,
payload=chunks[i],
)
for i, emb in enumerate(embeddings)
]
qdrant.upsert(collection_name=collection, points=points)
return len(points)
Use a workflow orchestrator (Airflow, Prefect, Dagster) to schedule and monitor ETL runs. Structure as a DAG: extract → validate → chunk → embed → load → verify. Checkpoint each stage so failed runs resume from the last successful step. Monitor: documents processed/hour, embedding cost, index size, and error rate.
Full re-ingestion is expensive. For incremental updates: hash each document on ingest, store in a manifest table. On re-run, skip documents whose hash hasn't changed. Delete + re-embed only changed or new documents. For deleted source documents, implement a soft-delete: mark as inactive in the manifest, purge from the index on next cleanup run.
ETL (Extract, Transform, Load) pipelines are the backbone of data systems. Architecture decisions define your ability to scale: batch (nightly runs, high latency but simple) vs. streaming (continuous, sub-second latency but complex). Orchestration tools (Airflow, Prefect, Dagster) manage dependencies, retries, and monitoring. A robust pipeline: (1) Logs every step with timestamps, (2) Validates data at each stage (row counts, schema, statistics), (3) Implements backpressure (slows down if downstream is congested), (4) Has rollback capability (revert bad transforms). Testing ETL code is non-trivial: mock data sources, test edge cases (empty files, schema changes), and verify aggregations.
| Tool | Data Scale | Latency | Monitoring |
|---|---|---|---|
| Apache Airflow | Petabyte | Minutes | Native DAG dashboard |
| dbt | Multi-TB | Minutes–Hours | Test + Slack alerts |
| Apache Spark | Petabyte | Seconds–Minutes | Spark UI + custom metrics |
| Streaming (Kafka) | Unbounded | Sub-second | Consumer lag, throughput |
def etl_pipeline(source_path: str, dest_path: str) -> dict:
"""Minimal ETL: extract, validate, transform, load."""
import pandas as pd
# Extract
df = pd.read_parquet(source_path)
# Validate
assert df.isnull().sum().sum() < len(df) * 0.05, "Too many nulls"
# Transform
df['created_date'] = pd.to_datetime(df['created_date'])
df = df[df['created_date'] > '2025-01-01']
df['year'] = df['created_date'].dt.year
# Load
df.to_parquet(dest_path, compression='snappy')
return {'rows': len(df), 'cols': len(df.columns)}Monitoring ETL is critical because silent failures are worst — bad data flows downstream without warning. Monitor: (1) Pipeline latency (how long did each step take?), (2) Data volume (are we processing expected row counts?), (3) Quality metrics (nulls, outliers, schema compliance), (4) Resource usage (memory, CPU, network). Set alerts for anomalies: if tonight's run is 3x slower than usual, investigate. Implement idempotency — running the same transformation twice produces the same result. This enables safe retries and recovery. Data observability platforms (Monte Carlo, Great Expectations) provide automated drift detection across your pipelines.
Data pipeline reliability is as important as model reliability. If your pipeline fails silently (processes bad data without raising alarms), models trained on that data become unreliable. Implement comprehensive error handling: (1) Schema validation (check every incoming file/record), (2) Row-level assertions (data should satisfy business rules), (3) Downstream reconciliation (counts, checksums, statistics should match expectations). Tools like dbt provide built-in testing; custom pipelines need manual instrumentation. A common pattern: at each stage, log row counts, nullity statistics, and sample values. Compare to historical baselines; large deviations trigger investigation. In production, "silent failures" (data flows through but is bad) are worse than "loud failures" (pipeline crashes, triggering alerts). Design for safety: fail explicitly when something's wrong.
Backpressure and flow control prevent cascading failures. If downstream processing is slow (the ML training job is saturated), the upstream ETL should slow down, not queue infinite data. Implement producer-consumer patterns: upstream processes data and publishes to a queue (Kafka, RabbitMQ); downstream consumes at its own pace. If the queue fills up, upstream blocks. This prevents memory overflow and gives you time to diagnose bottlenecks. For batch pipelines, implement checkpointing: if a job fails halfway through processing 1 million rows, resume from the checkpoint rather than restarting. This saves compute and enables recovery. Idempotency is key: if you rerun a transform on the same input, the output should be identical. This enables safe recovery.
Observability platforms provide visibility into pipeline health. Metrics to track: latency (total time, per-stage breakdown), throughput (rows/sec), resource usage (CPU, memory, disk I/O), data quality (nulls, duplicates, schema violations), and freshness (how old is the most recent data?). Dashboards should answer: "Is my pipeline healthy?" (quick glance), "Where is the bottleneck?" (dive deep), "What changed?" (compare to yesterday). Set alert thresholds: if latency increases by 50%, if quality metrics degrade, if data freshness lags. Modern platforms like Datadog, New Relic provide end-to-end observability; open-source alternatives (Prometheus + Grafana) work for smaller deployments. Invest in observability early—it's invaluable for troubleshooting and capacity planning.
Data warehousing is foundational for AI/ML. A data warehouse centralizes data from multiple sources (databases, APIs, log files) into a unified repository. Tools like Snowflake, BigQuery, Redshift provide scalable SQL engines. Data lakes (unstructured or semi-structured storage) complement warehouses for raw data. A modern data stack: raw data in a lake, curated data in a warehouse, features in a feature store (specialized system for ML features). Each layer adds structure and enables different queries. Data warehouse design for ML differs from traditional OLAP: instead of supporting real-time transactions, they support batch feature extraction and historical analysis. Star schemas (fact tables with dimensions) work well; often you need nested data (JSON columns) for complex attributes. Partitioning and clustering tables by common query patterns (e.g., partition by date, cluster by user) improves query performance. A 1 billion row table that takes 5 minutes to scan is useless; partition and cluster to make scans sub-second.
Data freshness and latency are critical trade-offs. Real-time pipelines (Kafka, Flink) provide sub-second latency but are complex. Batch pipelines (daily or hourly runs) are simpler but introduce lag. For most ML systems, hourly or daily batches are sufficient. But for recommendation systems, fraud detection, or real-time personalization, lower latency is necessary. Hybrid approaches: batch process most data (fast, cheap), stream process high-value subsets (expensive, low-latency). Incremental processing (only process new/changed data since the last run) reduces compute vs. reprocessing everything. Logging and debugging ETL requires extensive instrumentation: log every error, every schema issue, every unusual pattern. Tools like Apache Spark provide execution plans that reveal bottlenecks. For production systems, 99.9% uptime (9 hours of downtime per year) is a reasonable SLA; 99.99% (1 hour downtime per year) requires serious infrastructure.
Data lineage tools (Apache Atlas, Collibra, custom solutions) track data provenance: where did this data come from, what transformations did it undergo, where is it used? This is essential for compliance (GDPR right to access), debugging (why is this model making wrong predictions? trace back to source data), and understanding data flow. Visual lineage graphs show dependencies: if source system A is down, which downstream processes are affected? Lineage enables impact analysis: before making a breaking change (removing a column), see what depends on it. Lineage also supports certification: mark data as "certified" (verified, high-quality) and track its flow to identify which models benefit from certified data. Building lineage is overhead initially (requires instrumentation, tooling) but pays dividends in operations, debugging, and governance. For production systems handling petabyte-scale data, lineage is essential; for small-scale projects, simpler documentation suffices.