The data layer that makes or breaks AI systems: ingestion pipelines, metadata design, labeling, synthetic data, versioning, and data contracts.
Every machine learning system is only as good as its underlying data. In production AI systems, data quality issues account for more failures than model defects. Yet data engineering is systematically overlooked in tutorials and courses that focus on architectures and training techniques.
A perfect model on bad data beats a bad model on perfect data. The inverse is not true: good data can recover from mediocre models through retraining. Bad data cannot be recovered from—it compounds through the system.
Data engineering for AI breaks down into five core stages:
Data ingestion is the critical first stage. Real-world data is messy: documents are unstructured, databases have schema drift, APIs have rate limits, and logs contain mixed formats. A robust ingestion layer normalizes everything into standardized, versioned datasets.
| Source | Challenge | Typical tools | Frequency |
|---|---|---|---|
| Documents (PDF, Word, HTML) | Format variance, layout complexity | Docling, LangChain loaders, Trafilatura | Batch or incremental |
| Databases | Schema evolution, sync at scale | SQLAlchemy, Fivetran, Airbyte | Incremental, CDC |
| REST/GraphQL APIs | Rate limits, pagination, auth | httpx, requests, LangChain connectors | Batch or streaming |
| Event streams | Volume, ordering, late arrivals | Kafka, Kinesis, Pub/Sub | Real-time streaming |
| Unstructured text/images | Metadata extraction, preprocessing | Custom parsers, embedding APIs | Batch or incremental |
A production ingestion system requires error handling, retry logic, idempotency, and observability. Here's a pattern for building versioned datasets from raw sources:
High-quality labels are the foundation of supervised learning. Labeling is labor-intensive, error-prone, and bottlenecked by human annotators. Managing labeling workflows requires infrastructure for task distribution, quality control, and disagreement resolution.
| Strategy | Cost | Quality | Speed | Use case |
|---|---|---|---|---|
| Crowdsourcing | Low | Variable | Fast | Large datasets, simple tasks |
| Expert annotation | High | Excellent | Slow | Complex tasks, medical/legal domains |
| Weak supervision | Very low | Moderate | Instant | Programmatic rules, distant supervision |
| Active learning | Medium | High | Iterative | When labeling budget is tight |
| Semi-supervised | Low | Good | Medium | Bootstrap training with unlabeled data |
Always measure labeling quality. Inter-annotator agreement (Cohen's kappa) quantifies disagreement. Disagreement often reveals label ambiguity—fix the label definition, not the annotators.
Use specialized labeling platforms to manage annotators, distribute work, and track quality. Common options include Label Studio (self-hosted), Argilla (self-hosted, strong for NLP), and commercial services like Labelbox and Scale AI.
Datasets change over time: new records arrive, labels are corrected, schemas evolve. Without versioning, you cannot reproduce model training or debug failures. Data governance establishes ownership and lineage.
Git is designed for source code, not gigabyte-scale datasets. Use specialized data versioning systems:
A data contract is a formal agreement between data producers and consumers about format, schema, and quality guarantees. Contracts prevent silent failures when upstream data changes.
Training and test sets share information, inflating apparent model performance. Classic case: splitting on time but test data arrives before training data. Solution: define splits before any preprocessing.
Incorrect labels harm more than missing labels. A model can ignore missing features, but incorrect labels teach wrong patterns. Measure label quality. If noise is high, collect fewer high-quality labels instead of more noisy ones.
Training and production data differ in subtle ways: season, geography, user cohort, etc. This causes performance degradation. Solution: monitor production data drift and regularly retrain.
Ingestion outputs CSV, labeling pipeline expects Parquet, training code expects TFRecord. Format conversions introduce bugs. Define a standard interchange format (JSON-L or Arrow) and stick to it.
You test code, but not data. Create held-out test sets before data engineering work begins. Run data quality checks on test sets before every training run.
from dataclasses import dataclass, field
from typing import Iterator
import hashlib, json, logging
@dataclass
class Document:
source: str
content: str
metadata: dict = field(default_factory=dict)
content_hash: str = ""
def __post_init__(self):
self.content_hash = hashlib.sha256(
self.content.encode()
).hexdigest()[:16]
class IngestionPipeline:
def __init__(self, validators=None, transformers=None):
self.validators = validators or []
self.transformers = transformers or []
self.seen_hashes: set = set()
self.stats = {"processed": 0, "dropped": 0, "duplicates": 0, "errors": 0}
def run(self, documents: Iterator[dict]) -> Iterator[Document]:
for raw in documents:
try:
doc = Document(**raw)
# Deduplicate by content hash
if doc.content_hash in self.seen_hashes:
self.stats["duplicates"] += 1
continue
self.seen_hashes.add(doc.content_hash)
# Validate (e.g. min length, language check)
if not all(v(doc) for v in self.validators):
self.stats["dropped"] += 1
continue
# Transform (normalize whitespace, strip HTML, etc.)
for t in self.transformers:
doc = t(doc)
self.stats["processed"] += 1
yield doc
except Exception as e:
logging.warning(f"Ingestion error on {raw.get('source','?')}: {e}")
self.stats["errors"] += 1
# Usage
pipeline = IngestionPipeline(
validators=[lambda d: len(d.content) > 50],
transformers=[lambda d: Document(d.source, d.content.strip(), d.metadata)]
)
docs = list(pipeline.run(raw_records))
print(pipeline.stats)
# {"processed": 842, "dropped": 12, "duplicates": 38, "errors": 2}
import subprocess, json
from pathlib import Path
# One-time repo setup:
# $ dvc init && git commit -m "init dvc"
# $ dvc remote add -d s3remote s3://my-bucket/dvc-cache
def snapshot_dataset(dataset_path: str, message: str) -> dict:
"""Track a dataset file with DVC and record lineage metadata."""
# Add file to DVC (creates .dvc sidecar)
subprocess.run(["dvc", "add", dataset_path], check=True)
# Read the generated .dvc file for the content hash
dvc_file = dataset_path + ".dvc"
dvc_content = Path(dvc_file).read_text()
md5 = ""
for line in dvc_content.splitlines():
if "md5:" in line:
md5 = line.split("md5:")[1].strip()
meta = {
"path": dataset_path,
"message": message,
"md5": md5,
"row_count": sum(1 for _ in open(dataset_path))
}
# Append to lineage log
Path("data/lineage.jsonl").open("a").write(json.dumps(meta) + "
")
# Stage .dvc file and commit
subprocess.run(["git", "add", dvc_file, ".gitignore"], check=True)
subprocess.run(["git", "commit", "-m", f"data: {message}"], check=True)
subprocess.run(["dvc", "push"], check=True) # upload to remote
return meta
def restore_version(git_commit: str, dataset_path: str):
"""Restore a specific dataset version by git commit hash."""
subprocess.run(["git", "checkout", git_commit, "--",
dataset_path + ".dvc"], check=True)
subprocess.run(["dvc", "checkout", dataset_path], check=True)
print(f"Restored {dataset_path} to commit {git_commit[:8]}")
# Example
meta = snapshot_dataset("data/train.jsonl", "v2 — added 5k synthetic examples")
print(meta)
Data engineering breaks down into specialized topics. Explore each for production systems: