System Architecture · Building

Data Engineering

The data layer that makes or breaks AI systems: ingestion pipelines, metadata design, labeling, synthetic data, versioning, and data contracts.

5 Pipeline stages
Quality Beats models
Govern Your data
Contents
  1. Why data engineering matters
  2. 5-stage pipeline
  3. Ingestion & cleaning
  4. Labeling & annotation
  5. Versioning & governance
  6. Common pitfalls
  7. Ecosystem tools
  8. References
01 — Foundation

Why Data Engineering Matters

Every machine learning system is only as good as its underlying data. In production AI systems, data quality issues account for more failures than model defects. Yet data engineering is systematically overlooked in tutorials and courses that focus on architectures and training techniques.

The Data Quality Principle

A perfect model on bad data beats a bad model on perfect data. The inverse is not true: good data can recover from mediocre models through retraining. Bad data cannot be recovered from—it compounds through the system.

The Five-Stage Pipeline

Data engineering for AI breaks down into five core stages:

1
Ingestion: Collect raw data from sources (APIs, databases, documents, sensors, web). Handle heterogeneous formats, incomplete records, and inconsistent schemas.
2
Cleaning & normalization: Remove duplicates, standardize formats, handle missing values, filter outliers. Establish quality thresholds.
3
Labeling & annotation: Assign ground truth labels for supervised learning. Manage labeling workflows, track annotator disagreement, measure label quality.
4
Versioning: Track dataset snapshots, record schema changes, maintain audit trails. Link datasets to model versions.
5
Governance & contracts: Define data ownership, lineage, and quality guarantees. Establish data contracts between teams.
02 — Collection

Ingestion & Cleaning Pipelines

Data ingestion is the critical first stage. Real-world data is messy: documents are unstructured, databases have schema drift, APIs have rate limits, and logs contain mixed formats. A robust ingestion layer normalizes everything into standardized, versioned datasets.

Source Types

SourceChallengeTypical toolsFrequency
Documents (PDF, Word, HTML) Format variance, layout complexity Docling, LangChain loaders, Trafilatura Batch or incremental
Databases Schema evolution, sync at scale SQLAlchemy, Fivetran, Airbyte Incremental, CDC
REST/GraphQL APIs Rate limits, pagination, auth httpx, requests, LangChain connectors Batch or streaming
Event streams Volume, ordering, late arrivals Kafka, Kinesis, Pub/Sub Real-time streaming
Unstructured text/images Metadata extraction, preprocessing Custom parsers, embedding APIs Batch or incremental

Building a Robust Ingestion Layer

A production ingestion system requires error handling, retry logic, idempotency, and observability. Here's a pattern for building versioned datasets from raw sources:

from datasets import load_dataset, DatasetDict from pathlib import Path import json def build_training_dataset(raw_dir: str, output_path: str) -> DatasetDict: """Build a versioned, split dataset from raw files.""" records = [] for p in Path(raw_dir).glob("*.jsonl"): with open(p) as f: records.extend(json.loads(l) for l in f) from datasets import Dataset ds = Dataset.from_list(records) ds = ds.filter(lambda x: len(x["text"]) > 50) # quality filter splits = ds.train_test_split(test_size=0.1, seed=42) splits.save_to_disk(output_path) return splits

Anti-Patterns

03 — Annotation

Labeling & Quality Management

High-quality labels are the foundation of supervised learning. Labeling is labor-intensive, error-prone, and bottlenecked by human annotators. Managing labeling workflows requires infrastructure for task distribution, quality control, and disagreement resolution.

Labeling Strategies

StrategyCostQualitySpeedUse case
Crowdsourcing Low Variable Fast Large datasets, simple tasks
Expert annotation High Excellent Slow Complex tasks, medical/legal domains
Weak supervision Very low Moderate Instant Programmatic rules, distant supervision
Active learning Medium High Iterative When labeling budget is tight
Semi-supervised Low Good Medium Bootstrap training with unlabeled data

Quality Control Metrics

Always measure labeling quality. Inter-annotator agreement (Cohen's kappa) quantifies disagreement. Disagreement often reveals label ambiguity—fix the label definition, not the annotators.

Tools for Labeling Workflows

Use specialized labeling platforms to manage annotators, distribute work, and track quality. Common options include Label Studio (self-hosted), Argilla (self-hosted, strong for NLP), and commercial services like Labelbox and Scale AI.

04 — Versioning

Data Versioning & Governance

Datasets change over time: new records arrive, labels are corrected, schemas evolve. Without versioning, you cannot reproduce model training or debug failures. Data governance establishes ownership and lineage.

Version Control for Data

Git is designed for source code, not gigabyte-scale datasets. Use specialized data versioning systems:

Data Contracts

A data contract is a formal agreement between data producers and consumers about format, schema, and quality guarantees. Contracts prevent silent failures when upstream data changes.

Governance Best Practices

05 — Pitfalls

Common Data Engineering Anti-Patterns

Data Leakage

Training and test sets share information, inflating apparent model performance. Classic case: splitting on time but test data arrives before training data. Solution: define splits before any preprocessing.

Label Noise

Incorrect labels harm more than missing labels. A model can ignore missing features, but incorrect labels teach wrong patterns. Measure label quality. If noise is high, collect fewer high-quality labels instead of more noisy ones.

Distribution Mismatch

Training and production data differ in subtle ways: season, geography, user cohort, etc. This causes performance degradation. Solution: monitor production data drift and regularly retrain.

Format Mismatch Between Stages

Ingestion outputs CSV, labeling pipeline expects Parquet, training code expects TFRecord. Format conversions introduce bugs. Define a standard interchange format (JSON-L or Arrow) and stick to it.

No Test Sets for Data

You test code, but not data. Create held-out test sets before data engineering work begins. Run data quality checks on test sets before every training run.

06 — Ecosystem

Data Engineering Tools

DVC
Data Versioning
Git for datasets and ML pipelines. Version control, reproducibility, experiment tracking.
Great Expectations
Data Quality
Test data like code. Validate schemas, stats, distributions. Catch quality regressions early.
Label Studio
Annotation Platform
Open-source labeling UI. Supports NLP, vision, audio. Self-hosted, extensible.
Argilla
Data Annotation
Collaborative annotation for NLP and LLMs. Strong tooling for active learning.
Hugging Face Datasets
Dataset Hub
1000+ ready-to-use datasets. Version control, streaming, cross-dataset operations.
Pachyderm
Data Pipelines
Versioned, containerized data pipelines. Reproducible ML workflows.
Apache Airflow
Workflow Orchestration
DAG-based pipeline orchestration. Mature, widely adopted in data teams.
Prefect
Workflow Orchestration
Modern alternative to Airflow. Better UX, easier debugging, native async support.
Airbyte
Data Integration
ELT pipelines. 300+ connectors. Syncs from databases, APIs, SaaS to data warehouses.
dbt
Data Transformation
Transform raw data in warehouse using SQL. Version control, testing, documentation.
Polars
Data Processing
Fast, expressive dataframe library. Faster than Pandas, cleaner API.
Ray Data
Distributed Processing
Distributed data processing. Map, shuffle, groupby at scale across clusters.
Python · Ingestion pipeline with schema validation and deduplication
from dataclasses import dataclass, field
from typing import Iterator
import hashlib, json, logging

@dataclass
class Document:
    source: str
    content: str
    metadata: dict = field(default_factory=dict)
    content_hash: str = ""

    def __post_init__(self):
        self.content_hash = hashlib.sha256(
            self.content.encode()
        ).hexdigest()[:16]

class IngestionPipeline:
    def __init__(self, validators=None, transformers=None):
        self.validators = validators or []
        self.transformers = transformers or []
        self.seen_hashes: set = set()
        self.stats = {"processed": 0, "dropped": 0, "duplicates": 0, "errors": 0}

    def run(self, documents: Iterator[dict]) -> Iterator[Document]:
        for raw in documents:
            try:
                doc = Document(**raw)
                # Deduplicate by content hash
                if doc.content_hash in self.seen_hashes:
                    self.stats["duplicates"] += 1
                    continue
                self.seen_hashes.add(doc.content_hash)
                # Validate (e.g. min length, language check)
                if not all(v(doc) for v in self.validators):
                    self.stats["dropped"] += 1
                    continue
                # Transform (normalize whitespace, strip HTML, etc.)
                for t in self.transformers:
                    doc = t(doc)
                self.stats["processed"] += 1
                yield doc
            except Exception as e:
                logging.warning(f"Ingestion error on {raw.get('source','?')}: {e}")
                self.stats["errors"] += 1

# Usage
pipeline = IngestionPipeline(
    validators=[lambda d: len(d.content) > 50],
    transformers=[lambda d: Document(d.source, d.content.strip(), d.metadata)]
)
docs = list(pipeline.run(raw_records))
print(pipeline.stats)
# {"processed": 842, "dropped": 12, "duplicates": 38, "errors": 2}
Python · DVC data versioning and dataset lineage tracking
import subprocess, json
from pathlib import Path

# One-time repo setup:
# $ dvc init && git commit -m "init dvc"
# $ dvc remote add -d s3remote s3://my-bucket/dvc-cache

def snapshot_dataset(dataset_path: str, message: str) -> dict:
    """Track a dataset file with DVC and record lineage metadata."""
    # Add file to DVC (creates .dvc sidecar)
    subprocess.run(["dvc", "add", dataset_path], check=True)

    # Read the generated .dvc file for the content hash
    dvc_file = dataset_path + ".dvc"
    dvc_content = Path(dvc_file).read_text()
    md5 = ""
    for line in dvc_content.splitlines():
        if "md5:" in line:
            md5 = line.split("md5:")[1].strip()

    meta = {
        "path": dataset_path,
        "message": message,
        "md5": md5,
        "row_count": sum(1 for _ in open(dataset_path))
    }
    # Append to lineage log
    Path("data/lineage.jsonl").open("a").write(json.dumps(meta) + "
")

    # Stage .dvc file and commit
    subprocess.run(["git", "add", dvc_file, ".gitignore"], check=True)
    subprocess.run(["git", "commit", "-m", f"data: {message}"], check=True)
    subprocess.run(["dvc", "push"], check=True)  # upload to remote
    return meta

def restore_version(git_commit: str, dataset_path: str):
    """Restore a specific dataset version by git commit hash."""
    subprocess.run(["git", "checkout", git_commit, "--",
                    dataset_path + ".dvc"], check=True)
    subprocess.run(["dvc", "checkout", dataset_path], check=True)
    print(f"Restored {dataset_path} to commit {git_commit[:8]}")

# Example
meta = snapshot_dataset("data/train.jsonl", "v2 — added 5k synthetic examples")
print(meta)
07 — Related Topics

Deep Dive into Subclusters

Data engineering breaks down into specialized topics. Explore each for production systems:

08 — Further Reading

References

Key Papers & Reports
Documentation & Guides
Practitioner Writing