Data Engineering

Data Lineage

Tracking the full provenance of data from source to model — where it came from, what transformations it underwent, and which model versions it influenced — for debugging and compliance.

Value
root cause analysis + compliance
Granularity
column-level or table-level
Tools
OpenLineage, Marquez, dbt lineage

Table of Contents

SECTION 01

What Is Data Lineage?

Data lineage is the graph that shows: for every piece of data, where did it originate, what transformations were applied, who consumed it, and which downstream models or reports depend on it. When model performance drops, lineage lets you answer: 'Did a source table schema change? Was a filter added upstream? Did ingestion fail for 3 hours?'

SECTION 02

Column-Level vs Table-Level

Table-level lineage: dataset A feeds dataset B feeds model C. Sufficient for impact analysis ('if I change table X, what breaks?'). Column-level lineage: field user_id in table A → field customer_id in table B → feature X in model C. Required for GDPR right-to-erasure ('which models use this user's data?') and for debugging feature-level data quality issues.

SECTION 03

OpenLineage Standard

OpenLineage is an open standard (CNCF) for lineage event emission. Pipelines emit START and COMPLETE events with input/output dataset metadata. Supported by: Apache Airflow, Apache Spark, dbt, Flink, and many others. Events are collected by a backend (Marquez, Atlan, DataHub) that builds the lineage graph.

SECTION 04

Capturing Lineage

Emit lineage events at each pipeline step using the OpenLineage client.

from openlineage.client import OpenLineageClient, set_producer
from openlineage.client.run import RunEvent, RunState, Run, Job
from openlineage.client.facet import SchemaDatasetFacet, SchemaField
import uuid, datetime
client = OpenLineageClient(url="http://localhost:5000")
def emit_lineage(job_name: str, input_datasets: list, output_datasets: list):
    run_id = str(uuid.uuid4())
    event = RunEvent(
        eventType=RunState.COMPLETE,
        eventTime=datetime.datetime.utcnow().isoformat() + "Z",
        run=Run(runId=run_id),
        job=Job(namespace="my-pipeline", name=job_name),
        inputs=input_datasets,
        outputs=output_datasets,
        producer="my-etl-pipeline/1.0",
    )
    client.emit(event)
SECTION 05

Impact Analysis

With lineage, answer 'what breaks if I change X?' before making the change. Downstream impact graph: start from the changed dataset, traverse all edges forward, list all affected downstream datasets and models. Alert owners of affected downstream systems before the change is deployed. This prevents surprise model performance degradations from schema changes.

SECTION 06

Compliance Use Cases

GDPR right-to-erasure: given user_id, find all datasets and models that contain that user's data using column-level lineage, then coordinate deletion. AI Act compliance: for a model under audit, trace all training data back to its source to verify data provenance and consent records. Data residency: verify that data originating in the EU never flows to non-EU processing.

SECTION 07

Lineage Tracking in Data Pipelines

Data lineage answers: where did this data come from? Who transformed it? What other datasets depend on it? Tracking lineage requires instrumenting data pipelines to log source tables, transformations (SQL, Spark, Python), and destination tables. Tools like Airflow, dbt, and Databricks automatically track lineage.

# Manual lineage tracking with metadata
import logging
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def etl_pipeline(source_table, transform_sql, dest_table):
    # Log lineage entry
    lineage_event = {
        "timestamp": datetime.utcnow().isoformat(),
        "source": source_table,
        "transformation": "SQL",
        "destination": dest_table,
        "query_hash": hash(transform_sql)
    }
    logger.info(json.dumps(lineage_event))

    # Execute transformation
    df = spark.sql(f"SELECT * FROM {source_table}")
    result = spark.sql(transform_sql)
    result.write.mode("overwrite").saveAsTable(dest_table)

    return result

Downstream Impact Analysis

Once lineage is tracked, you can answer "if I fix this bug in the source data, what datasets are affected?" With lineage, the impact is instant; without it, you're flying blind and might miss critical dependencies.

# Impact analysis: find all dependent datasets
def find_downstream_datasets(table_name, lineage_graph):
    """Recursive: find all tables that depend (directly or indirectly) on table_name"""
    affected = set()
    queue = [table_name]

    while queue:
        current = queue.pop(0)
        # Find tables that use 'current' as input
        dependents = lineage_graph.get_dependents(current)
        for dep in dependents:
            if dep not in affected:
                affected.add(dep)
                queue.append(dep)  # Recursively find their dependents

    return affected

# Usage: if user_master has a data quality issue, what downstream is affected?
impacted = find_downstream_datasets("user_master", lineage_graph)
print(f"Fixing user_master will affect: {impacted}")
SECTION 08

Lineage Visualization & Governance

Visualizing data lineage as a DAG makes governance easier. Teams can see data provenance, ownership, and sensitivity levels at a glance. Automated governance rules can be enforced: "PII data from user_events can only flow to analytics_prod if anonymized first."

Lineage Element Info to Track Tools Governance Use
Source Table Owner, SLA, PII flag Data Catalog, Alation Approve/deny data use
Transformation SQL/code, timestamp, author Airflow, dbt, Spark Audit trail, rollback ability
Destination Consumer team, SLA, freshness Data Catalog, BigQuery Monitor contract fulfillment
Dataflow Graph Full DAG, dependency depth Airflow UI, OpenLineage Impact analysis, risk assessment

Lineage at Scale: Large enterprises with 10k+ tables face lineage explosion. Prioritize tracking critical paths: source data, transformations affecting financial/legal/PII data, and key business metrics. Use sampling for non-critical lineage to keep overhead reasonable. Store lineage in a dedicated metadata warehouse (separate from operational DW) to isolate governance queries from business analytics.

Implement lineage retention policies: keep detailed lineage for 90 days, summary lineage for 1 year. This balances governance needs with storage costs. Use OpenLineage standard for portability across tools.

Lineage-Driven Data Quality Monitoring: Lineage graphs enable automatic data quality propagation. If source table has a data quality issue (15% nulls in critical column), automatically flag all downstream tables as "quality at risk." Use this to prevent bad data from spreading. Set up automated quality gates on high-risk tables: if column X becomes >10% null, pause dependent pipelines and alert the owner. Lineage-driven monitoring catches bugs earlier than reactive dashboards that monitor end-metrics only after damage is done.

For regulatory audits, lineage proves data provenance. GDPR data subject access requests (DSARs) require knowing everywhere a customer's data appears. Lineage queries answer this instantly: "find all tables containing customer_id 12345." Implement GDPR-aligned lineage: track data from collection point through all transformations and deletions. Document deletion compliance: when a customer requests deletion (right to be forgotten), lineage tells you exactly which tables need scrubbing.

Monitoring and observability are essential for production systems. Set up comprehensive logging at every layer: API requests, model predictions, database queries, cache hits/misses. Use structured logging (JSON) to enable filtering and aggregation across thousands of servers. For production deployments, track not just errors but also latency percentiles (p50, p95, p99); if p99 latency suddenly doubles, something is wrong even if error rates are normal. Set up alerting based on SLO violations: if a service is supposed to have 99.9% availability and it drops to 99.5%, alert immediately. Use distributed tracing (Jaeger, Lightstep) to track requests across multiple services; a slow end-to-end latency might be hidden in one deep service call, invisible in aggregate metrics.

For long-running ML jobs (training, batch inference), implement checkpoint recovery and graceful degradation. If a training job crashes after 2 weeks, you want to resume from the last checkpoint, not restart from scratch. Implement job orchestration with Kubernetes or Airflow to handle retries, resource allocation, and dependency management. Use feature flags for safe deployment: deploy new model versions behind a flag that's off by default, gradually roll out to 1% of users, 10%, then 100%, monitoring metrics at each step. If something goes wrong, flip the flag back instantly. This approach reduces risk and enables fast rollback.

Finally, build a culture of incident response and post-mortems. When something breaks (and it will), document the incident: timeline, root cause, mitigation steps, and preventive measures. Use incidents as learning opportunities; blameless post-mortems focus on systems, not people. Share findings across teams to prevent repeat incidents. A well-documented incident history is an organization's institutional knowledge about system failures and how to avoid them.

The rapid evolution of AI infrastructure requires continuous learning and adaptation. Teams should establish regular tech talks and knowledge-sharing sessions where engineers present lessons learned from production deployments, performance optimization work, and incident postmortems. Create internal wiki pages documenting best practices specific to your organization: how to debug common failure modes, performance tuning guides for your hardware, and checklists for safe deployments. This prevents repeating mistakes and accelerates onboarding of new team members.

Build relationships with vendors and open-source communities. If you encounter bugs in frameworks (PyTorch, JAX), file detailed reports. If you have questions, ask on forums; community members often have encountered similar issues. For mission-critical infrastructure, consider purchasing support contracts with vendors (PyTorch, HuggingFace, cloud providers). Support gives you direct access to engineers who understand your system and can prioritize fixes. This is insurance against production outages caused by third-party software bugs.

Finally, remember that optimization is a journey, not a destination. Today's cutting-edge technique becomes tomorrow's baseline. Allocate 10-15% of engineering time to exploration and experimentation. Some experiments will fail, but successful ones compound into significant efficiency gains. Foster a culture of continuous improvement: measure, analyze, iterate, and share results. The teams that stay ahead are those that invest in understanding their systems deeply and adapting proactively to new technologies and changing demands.

Key Takeaway: Success in GenAI infrastructure depends on mastering fundamentals: understand your hardware constraints, profile your workloads, measure everything, and iterate. The most sophisticated techniques (dynamic batching, mixed precision, distributed training) build on solid foundations of clear thinking and empirical validation. Avoid cargo-cult engineering: if you don't understand why a technique helps your specific use case, it probably won't. Invest time in understanding root causes, not just applying trendy solutions. Over time, this rigor will compound into significant competitive advantage.