Data Engineering · RAG

Data Ingestion & Pipelines

PDF, HTML, database, and API ingestion — building reliable document processing pipelines at scale.

5 Source types
Docling PDF parsing
Incremental Updates
Contents
  1. Source types & challenges
  2. PDF ingestion
  3. Web & HTML
  4. Database & API
  5. Normalization
  6. Incremental updates
  7. Pipeline orchestration
  8. Tools & references
01 — Context

Source Types & Challenges

RAG systems need to ingest data from diverse sources. Each format has different challenges: PDFs lose structure, web pages contain boilerplate, databases are large, APIs are transient. A good ingestion pipeline abstracts these differences.

SourceExtraction difficultyStructure lossUpdate frequencyBest for
PDF High (tables, images) Significant Quarterly Technical docs, papers, reports
HTML/Web Medium (JS rendering) Boilerplate noise Daily Help centers, blogs, documentation
DOCX/XLSX Low (structured) Minor Monthly Internal docs, spreadsheets
Database Low (queryable) None Real-time Customer data, transactions, logs
API/Email Low (structured JSON) None Streaming Live events, messages, feeds
💡 Start with the highest-value source first. Usually documentation or internal knowledge bases. Once that's working, add others incrementally.
02 — PDFs

PDF Extraction

PDF parsing is the most common ingestion task but also the hardest. Libraries range from simple text extraction (loses structure) to intelligent layout analysis (preserves structure).

Library Comparison

LibraryText extractionTablesLayoutSpeed
PyPDF2 Basic Difficult Lost Fast
pdfplumber Good Excellent Preserved Medium
Docling Excellent Excellent Excellent Slow
LlamaParse Excellent Excellent Excellent API-based

Docling Example

Docling is the newest and best option. It uses deep learning for layout analysis and handles complex PDFs (tables, images, equations) well.

from docling.document_converter import DocumentConverter from docling.pipeline.standard_pipeline import StandardPipeline # Initialize converter converter = DocumentConverter() # Convert PDF source = "path/to/document.pdf" result = converter.convert(source) # Access structured content document = result.document for block in document.blocks: if block.is_heading: print(f"Heading: {block.text}") elif block.is_table: # Tables are structured for row in block.table.rows: cells = [c.text for c in row.cells] print(cells) elif block.is_paragraph: print(f"Para: {block.text}") # Markdown export preserves structure markdown_text = result.document.export_to_markdown() print(markdown_text)

When to Use Each

03 — Web

Web & HTML Ingestion

Extracting content from web pages requires dealing with JavaScript rendering, navigation menus, ads, and other boilerplate. Simple HTML parsing often pulls too much noise.

Extraction Strategies

ToolJS RenderingBoilerplate removalUse case
BeautifulSoup No Manual CSS selectors Static HTML, known structure
Trafilatura No Automatic Articles, blog posts
Jina Reader API Yes Automatic Any URL, simple API
Selenium/Playwright Yes Manual Complex SPAs, custom logic

Using Trafilatura

import trafilatura # Fetch and parse url = "https://example.com/article" downloaded = trafilatura.fetch_url(url) result = trafilatura.extract( downloaded, include_comments=False, favor_precision=True ) # Get clean text print(result) # Or use LangChain from langchain.document_loaders import UnstructuredURLLoader loader = UnstructuredURLLoader(urls=[url]) docs = loader.load() print(docs[0].page_content)

Jina Reader API

For maximum simplicity, Jina Reader converts any URL to markdown with a single API call:

import httpx response = httpx.get( "https://r.jina.ai/https://example.com/article" ) markdown_content = response.text print(markdown_content)
04 — Structured Sources

Database & API Sources

Databases and APIs are already structured. The challenge is scaling: how do you sync large datasets? How do you detect changes?

Database Connectors

from sqlalchemy import create_engine, text from langchain.document_loaders import SQLDatabaseLoader # Create engine engine = create_engine( "postgresql://user:pass@localhost/mydb" ) # Load from table loader = SQLDatabaseLoader( engine=engine, query="SELECT id, title, content FROM articles" ) documents = loader.load() # Each row becomes a document for doc in documents: print(doc.page_content) print(doc.metadata) # includes column values

REST API Ingestion

For APIs, handle pagination and rate limiting. Store the last_updated timestamp to detect changes:

import httpx import json from datetime import datetime, timedelta class APIIngester: def __init__(self, base_url, api_key): self.base_url = base_url self.api_key = api_key self.last_sync = None def fetch_paginated(self, endpoint, params=None): """Fetch all pages of results.""" documents = [] page = 1 while True: params = params or {} params['page'] = page response = httpx.get( f"{self.base_url}/{endpoint}", params=params, headers={'Authorization': f'Bearer {self.api_key}'} ) data = response.json() if not data.get('results'): break documents.extend(data['results']) page += 1 return documents def sync_incremental(self, endpoint): """Sync only changed documents.""" params = {} if self.last_sync: params['updated_after'] = self.last_sync.isoformat() docs = self.fetch_paginated(endpoint, params) self.last_sync = datetime.now() return docs ingester = APIIngester( "https://api.example.com", api_key="sk-..." ) articles = ingester.sync_incremental("articles")
05 — Cleaning

Document Normalisation

After extraction, normalize the content. Remove junk, fix encoding, standardize whitespace, detect sections and metadata.

Normalization Checklist

import re from html import unescape def normalize_document(text): """Normalize extracted text.""" # Decode HTML entities text = unescape(text) # Remove excess whitespace text = re.sub(r'\n\s*\n+', '\n\n', text) text = re.sub(r'[ ]{2,}', ' ', text) # Fix unicode text = text.encode('utf-8', errors='ignore').decode('utf-8') return text.strip() def extract_sections(text): """Split by headers, preserve structure.""" sections = [] current_section = {"title": "Introduction", "content": []} for line in text.split('\n'): # Detect headers (lines in ALL CAPS or with ###) if line.startswith('#') or line.isupper(): sections.append(current_section) current_section = { "title": line.strip('# '), "content": [] } else: current_section["content"].append(line) sections.append(current_section) return sections text = "Messy\n\n\nHTML text   with
stuff" clean = normalize_document(text) sections = extract_sections(clean)
06 — Scaling

Incremental & Scheduled Updates

Most RAG systems need fresh data. Reprocessing everything weekly is wasteful. Detect changes and only process deltas.

Change Detection Patterns

PatternMechanismLatencyCost
Timestamps Track last_modified, check before processing Minutes Low
Fingerprinting Hash content, compare to stored hash Minutes Low
Change Data Capture Database triggers emit change events Seconds Medium
Webhooks Source notifies on updates Seconds Low (source handles it)

Incremental Ingestion with Airflow

Use Airflow or Prefect to orchestrate scheduled ingestion with change detection:

from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta def fetch_docs(**context): """Fetch documents modified since last run.""" last_run = context['task_instance'].xcom_pull( task_ids='mark_success', key='last_update' ) # Fetch only newer docs docs = api.fetch(updated_after=last_run) return [d['id'] for d in docs] def process_docs(doc_ids, **context): """Process and index new docs.""" for doc_id in doc_ids: doc = api.get(doc_id) content = extract_text(doc) index_doc(content) default_args = { 'owner': 'data', 'start_date': datetime(2024, 1, 1), 'email_on_failure': True, } with DAG('ingestion_pipeline', default_args=default_args, schedule_interval='@daily', catchup=False) as dag: fetch = PythonOperator(task_id='fetch', python_callable=fetch_docs) process = PythonOperator(task_id='process', python_callable=process_docs) fetch >> process
07 — Integration

Pipeline Orchestration

Most teams start with LangChain or LlamaIndex document loaders (simple, limited). As complexity grows, move to dedicated data pipelines (Airflow, Prefect, Dagster).

LangChain Document Loaders

Good for getting started. Abstracts different sources under a common interface:

from langchain.document_loaders import ( PyPDFLoader, UnstructuredURLLoader, CSVLoader, DirectoryLoader ) # Single file pdf_loader = PyPDFLoader("document.pdf") docs = pdf_loader.load() # Directory of files dir_loader = DirectoryLoader( "./documents", glob="**/*.pdf", loader_cls=PyPDFLoader ) docs = dir_loader.load() # URLs url_loader = UnstructuredURLLoader( urls=["https://example.com"] ) docs = url_loader.load() # Custom pipeline with retry loader = DirectoryLoader( "./docs", loader_cls=PyPDFLoader, recursive=True ) documents = loader.load() print(f"Loaded {len(documents)} documents")

LlamaIndex Readers

LlamaIndex offers a similar abstraction. Pick whichever fits your setup better.

⚠️ Start simple, scale later. Use LangChain loaders until you hit limits (performance, custom sources). Then move to Airflow/Prefect.
08 — Ecosystem

Ingestion Tools

Docling
PDF Parsing
State-of-the-art PDF extraction with layout analysis. Handles tables, images, complex layouts.
Unstructured.io
Document Processing
Unified API for PDFs, images, documents. Self-hosted and cloud options.
LlamaParse
PDF Parsing API
Commercial PDF parsing. Excellent for scanned PDFs and complex layouts.
Trafilatura
Web Extraction
Extract article text from HTML. Automatic boilerplate removal.
Apache Airflow
Workflow Orchestration
Industry-standard DAG-based pipeline orchestration. Mature, scalable.
Prefect
Workflow Orchestration
Modern alternative to Airflow. Easier syntax, better debugging.
pdfplumber
PDF Extraction
Python library for PDF tables and text. Lightweight, good for most PDFs.
LangChain Loaders
Document Loading
Unified interface for 100+ document types. Great for getting started.
09 — Further Reading

References

Academic Papers
Documentation & Guides
Practitioner Writing