01 — Context
Source Types & Challenges
RAG systems need to ingest data from diverse sources. Each format has different challenges: PDFs lose structure, web pages contain boilerplate, databases are large, APIs are transient. A good ingestion pipeline abstracts these differences.
| Source | Extraction difficulty | Structure loss | Update frequency | Best for |
| PDF |
High (tables, images) |
Significant |
Quarterly |
Technical docs, papers, reports |
| HTML/Web |
Medium (JS rendering) |
Boilerplate noise |
Daily |
Help centers, blogs, documentation |
| DOCX/XLSX |
Low (structured) |
Minor |
Monthly |
Internal docs, spreadsheets |
| Database |
Low (queryable) |
None |
Real-time |
Customer data, transactions, logs |
| API/Email |
Low (structured JSON) |
None |
Streaming |
Live events, messages, feeds |
💡
Start with the highest-value source first. Usually documentation or internal knowledge bases. Once that's working, add others incrementally.
02 — PDFs
PDF Extraction
PDF parsing is the most common ingestion task but also the hardest. Libraries range from simple text extraction (loses structure) to intelligent layout analysis (preserves structure).
Library Comparison
| Library | Text extraction | Tables | Layout | Speed |
| PyPDF2 |
Basic |
Difficult |
Lost |
Fast |
| pdfplumber |
Good |
Excellent |
Preserved |
Medium |
| Docling |
Excellent |
Excellent |
Excellent |
Slow |
| LlamaParse |
Excellent |
Excellent |
Excellent |
API-based |
Docling Example
Docling is the newest and best option. It uses deep learning for layout analysis and handles complex PDFs (tables, images, equations) well.
from docling.document_converter import DocumentConverter
from docling.pipeline.standard_pipeline import StandardPipeline
# Initialize converter
converter = DocumentConverter()
# Convert PDF
source = "path/to/document.pdf"
result = converter.convert(source)
# Access structured content
document = result.document
for block in document.blocks:
if block.is_heading:
print(f"Heading: {block.text}")
elif block.is_table:
# Tables are structured
for row in block.table.rows:
cells = [c.text for c in row.cells]
print(cells)
elif block.is_paragraph:
print(f"Para: {block.text}")
# Markdown export preserves structure
markdown_text = result.document.export_to_markdown()
print(markdown_text)
When to Use Each
- PyPDF2: Simple text extraction only. Fast for large batches.
- pdfplumber: Tables are important and PDFs are mostly clean.
- Docling: Production use. Complex PDFs. Layout is crucial. Best for knowledge graphs.
- LlamaParse: Can't process locally (commercial). Complex academic papers, scanned PDFs.
03 — Web
Web & HTML Ingestion
Extracting content from web pages requires dealing with JavaScript rendering, navigation menus, ads, and other boilerplate. Simple HTML parsing often pulls too much noise.
Extraction Strategies
| Tool | JS Rendering | Boilerplate removal | Use case |
| BeautifulSoup |
No |
Manual CSS selectors |
Static HTML, known structure |
| Trafilatura |
No |
Automatic |
Articles, blog posts |
| Jina Reader API |
Yes |
Automatic |
Any URL, simple API |
| Selenium/Playwright |
Yes |
Manual |
Complex SPAs, custom logic |
Using Trafilatura
import trafilatura
# Fetch and parse
url = "https://example.com/article"
downloaded = trafilatura.fetch_url(url)
result = trafilatura.extract(
downloaded,
include_comments=False,
favor_precision=True
)
# Get clean text
print(result)
# Or use LangChain
from langchain.document_loaders import UnstructuredURLLoader
loader = UnstructuredURLLoader(urls=[url])
docs = loader.load()
print(docs[0].page_content)
Jina Reader API
For maximum simplicity, Jina Reader converts any URL to markdown with a single API call:
import httpx
response = httpx.get(
"https://r.jina.ai/https://example.com/article"
)
markdown_content = response.text
print(markdown_content)
04 — Structured Sources
Database & API Sources
Databases and APIs are already structured. The challenge is scaling: how do you sync large datasets? How do you detect changes?
Database Connectors
from sqlalchemy import create_engine, text
from langchain.document_loaders import SQLDatabaseLoader
# Create engine
engine = create_engine(
"postgresql://user:pass@localhost/mydb"
)
# Load from table
loader = SQLDatabaseLoader(
engine=engine,
query="SELECT id, title, content FROM articles"
)
documents = loader.load()
# Each row becomes a document
for doc in documents:
print(doc.page_content)
print(doc.metadata) # includes column values
REST API Ingestion
For APIs, handle pagination and rate limiting. Store the last_updated timestamp to detect changes:
import httpx
import json
from datetime import datetime, timedelta
class APIIngester:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.api_key = api_key
self.last_sync = None
def fetch_paginated(self, endpoint, params=None):
"""Fetch all pages of results."""
documents = []
page = 1
while True:
params = params or {}
params['page'] = page
response = httpx.get(
f"{self.base_url}/{endpoint}",
params=params,
headers={'Authorization': f'Bearer {self.api_key}'}
)
data = response.json()
if not data.get('results'):
break
documents.extend(data['results'])
page += 1
return documents
def sync_incremental(self, endpoint):
"""Sync only changed documents."""
params = {}
if self.last_sync:
params['updated_after'] = self.last_sync.isoformat()
docs = self.fetch_paginated(endpoint, params)
self.last_sync = datetime.now()
return docs
ingester = APIIngester(
"https://api.example.com",
api_key="sk-..."
)
articles = ingester.sync_incremental("articles")
05 — Cleaning
Document Normalisation
After extraction, normalize the content. Remove junk, fix encoding, standardize whitespace, detect sections and metadata.
Normalization Checklist
- Whitespace: Replace multiple spaces/newlines with singles.
- Encoding: Fix unicode errors, handle different charsets.
- HTML entities: Convert → space, < → <.
- Sections: Detect headers, split into chunks, preserve hierarchy.
- Metadata: Extract title, date, author, source URL.
- Code blocks: Preserve indentation, syntax highlighting hints.
import re
from html import unescape
def normalize_document(text):
"""Normalize extracted text."""
# Decode HTML entities
text = unescape(text)
# Remove excess whitespace
text = re.sub(r'\n\s*\n+', '\n\n', text)
text = re.sub(r'[ ]{2,}', ' ', text)
# Fix unicode
text = text.encode('utf-8', errors='ignore').decode('utf-8')
return text.strip()
def extract_sections(text):
"""Split by headers, preserve structure."""
sections = []
current_section = {"title": "Introduction", "content": []}
for line in text.split('\n'):
# Detect headers (lines in ALL CAPS or with ###)
if line.startswith('#') or line.isupper():
sections.append(current_section)
current_section = {
"title": line.strip('# '),
"content": []
}
else:
current_section["content"].append(line)
sections.append(current_section)
return sections
text = "Messy\n\n\nHTML text with
stuff"
clean = normalize_document(text)
sections = extract_sections(clean)
06 — Scaling
Incremental & Scheduled Updates
Most RAG systems need fresh data. Reprocessing everything weekly is wasteful. Detect changes and only process deltas.
Change Detection Patterns
| Pattern | Mechanism | Latency | Cost |
| Timestamps |
Track last_modified, check before processing |
Minutes |
Low |
| Fingerprinting |
Hash content, compare to stored hash |
Minutes |
Low |
| Change Data Capture |
Database triggers emit change events |
Seconds |
Medium |
| Webhooks |
Source notifies on updates |
Seconds |
Low (source handles it) |
Incremental Ingestion with Airflow
Use Airflow or Prefect to orchestrate scheduled ingestion with change detection:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def fetch_docs(**context):
"""Fetch documents modified since last run."""
last_run = context['task_instance'].xcom_pull(
task_ids='mark_success',
key='last_update'
)
# Fetch only newer docs
docs = api.fetch(updated_after=last_run)
return [d['id'] for d in docs]
def process_docs(doc_ids, **context):
"""Process and index new docs."""
for doc_id in doc_ids:
doc = api.get(doc_id)
content = extract_text(doc)
index_doc(content)
default_args = {
'owner': 'data',
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
}
with DAG('ingestion_pipeline', default_args=default_args,
schedule_interval='@daily', catchup=False) as dag:
fetch = PythonOperator(task_id='fetch', python_callable=fetch_docs)
process = PythonOperator(task_id='process', python_callable=process_docs)
fetch >> process
07 — Integration
Pipeline Orchestration
Most teams start with LangChain or LlamaIndex document loaders (simple, limited). As complexity grows, move to dedicated data pipelines (Airflow, Prefect, Dagster).
LangChain Document Loaders
Good for getting started. Abstracts different sources under a common interface:
from langchain.document_loaders import (
PyPDFLoader,
UnstructuredURLLoader,
CSVLoader,
DirectoryLoader
)
# Single file
pdf_loader = PyPDFLoader("document.pdf")
docs = pdf_loader.load()
# Directory of files
dir_loader = DirectoryLoader(
"./documents",
glob="**/*.pdf",
loader_cls=PyPDFLoader
)
docs = dir_loader.load()
# URLs
url_loader = UnstructuredURLLoader(
urls=["https://example.com"]
)
docs = url_loader.load()
# Custom pipeline with retry
loader = DirectoryLoader(
"./docs",
loader_cls=PyPDFLoader,
recursive=True
)
documents = loader.load()
print(f"Loaded {len(documents)} documents")
LlamaIndex Readers
LlamaIndex offers a similar abstraction. Pick whichever fits your setup better.
⚠️
Start simple, scale later. Use LangChain loaders until you hit limits (performance, custom sources). Then move to Airflow/Prefect.
09 — Further Reading
References
Academic Papers
Documentation & Guides
Practitioner Writing