Building a Substack Articles Search Engine: Lesson 1
Setup, configuration and articles ingestion in Supabase Postgres database.
You’re drowning in AI newsletters. Between
’s deep dives on AI systems, ’s explorations of agentic architectures, and ’s practical small LLM insights, the knowledge is invaluable—but scattered across multiple Substack publications sitting in your inbox.Manual searching fails the moment you need to find that article about RAG evaluation metrics or compare different perspectives on fine-tuning strategies. Basic keyword search misses semantic connections—searching for neural networks won’t surface articles about transformers even though they’re deeply related. And let’s be honest: your email client wasn’t designed for knowledge management.
Here’s what we’re building instead: a production-grade Retrieval-Augmented Generation (RAG) system that automatically ingests, indexes, and enables intelligent search across your favorite Substack newsletters. The system offers two search modes—direct article title filtering and LLM-powered semantic search that actually understands what you’re asking about.
The complete code lives on GitHub, and here’s the best part—the entire system runs on free tiers of Supabase, Qdrant, Prefect Cloud, and Google Cloud Run. You can experiment with it or run it on a small scale in production without spending a cent.
Course Outline
This course has been developed together with Miguel Otero Pedrido, the founder of The Neural Maze, who after releasing the last lesson, will show in a video an overview of the application and showcase the entire pipeline.
It is structured in 5 lessons:
Setup, Configuration and Articles Ingestion - Today’s lesson
Vector Embeddings and Hybrid Search Infrastructure (21.10.2025)
FastAPI Backend, Streaming and Multi-Provider LLM Support (23.10.2025)
Cloud Run Deployment and Gradio UI (28.10.2025)
Video Application Overview (29.10.2025)
What You End Up Building
Once the core pipeline is running, you can extend it in interesting ways. The repository includes a Gradio app for quick local testing, but I also built a full React production application (find it at substack.martindatasol.com) that demonstrates how to create a polished user interface. You can try building something similar once you’ve finished this course.
Here’s the Gradio version for local development that can be also deployed in production:
And here’s the production React frontend:
Ready to dive in? Let’s get started!
System Architecture: Division of Responsibilities
Before we dive into code, let’s understand how the components work together. The architecture follows a clear separation of concerns—each piece handles one specific job, making the system easier to understand, debug, and extend.
RSS Feeds (30+ newsletters)
↓
Prefect Orchestration
↓
┌────────────────┐
│ Supabase │ ← Structured metadata
│ PostgreSQL │ (articles, authors, URLs)
└────────────────┘
↓
Text Chunking
↓
┌────────────────┐
│ Qdrant │ ← Vector embeddings
│ Vector Database│ (hybrid search)
└────────────────┘
↓
┌────────────────┐
│ FastAPI │ ← Query interface
│ Backend API │ (multi-provider LLM)
└────────────────┘
↓
┌────────────────┐
│ Gradio UI / │ ← User interface
│ Frontend │ (Filtered Search)
└────────────────┘Let’s walk through what each component does and why it matters.
Supabase PostgreSQL stores the structured metadata—article titles, URLs, publication dates, author information. This becomes your source of truth for tracking what’s been ingested.
Qdrant Vector Database handles the semantic search magic through hybrid dense and sparse embeddings.
Prefect orchestrates the entire pipeline. Fetched RSS articles, manages parallel processing across multiple feeds, and handles incremental updates intelligently.
FastAPI provides the query interface with support for multiple LLM providers (OpenRouter, OpenAI, HuggingFace). It handles both streaming and non-streaming responses.
Gradio UI offers rapid prototyping for testing queries.
Google Cloud Run hosts the FastAPI backend, providing automatic scaling and deployment.
This separation makes the codebase maintainable—adding a new data source means creating new tasks in the pipelines directory without touching infrastructure or API code. The pattern scales well as your system grows.
Project Structure: Navigating the Codebase
The repository organizes code by functional responsibility, making it straightforward to locate specific features and understand dependencies:
substack-newsletters-search-pipeline/
├── .env.example # Environment variables template
├── .github/ # GitHub configuration and CI/CD workflows
├── Dockerfile # Dockerfile for FastAPI app
├── Makefile # Automation commands
├── README.md # Project Readme
├── cloudbuild_fastapi.yaml # Google Cloud Build config for FastAPI
├── deploy_fastapi.sh # Script to deploy FastAPI to Cloud Run
├── prefect-cloud.yaml # Prefect Cloud deployment
├── prefect-local.yaml # Prefect local deployment
├── pyproject.toml # Python dependencies
├── requirements.txt # Prefect deployment deps
├── frontend/ # Gradio UI
├── src/
│ ├── api/ # FastAPI application
│ │ ├── exceptions/ # Error handlers
│ │ ├── middleware/ # Logging middleware
│ │ ├── models/ # API schemas
│ │ ├── routes/ # Endpoint definitions
│ │ ├── services/ # Business logic
│ │ └── main.py # App entry point
│ ├── config.py # Centralized settings
│ ├── configs/ # Newsletter sources
│ ├── models/ # Pydantic/SQLAlchemy models
│ ├── infrastructure/ # Infrastructure integrations
│ │ ├── supabase/ # Database setup
│ │ └── qdrant/ # Vector store setup
│ ├── pipelines/ # Prefect flows and tasks
│ │ ├── flows/ # Prefect workflows
│ │ └── tasks/ # Prefect tasks
│ └── utils/ # Logging and text splitter utils
└── tests/ # Tests
├── conftest.py # Pytest configuration
├── integration/ # Integration tests (DB, pipeline)
└── unit/ # Unit testsThis organization keeps infrastructure concerns (database connections, vector store setup) separate from business logic (RSS parsing, embedding generation) and API endpoints (query handling, response formatting). When you need to add a new feature, the structure tells you exactly where it belongs.
Automation with Makefile
Before we get into implementation details, let’s talk about the project’s automation layer. The Makefile serves as a command center for all common operations—database setup, data ingestion, deployment, testing, and code quality checks.
Rather than remembering complex command sequences with multiple flags and environment variables, you run simple commands like make supabase-create or make ingest-embeddings-flow.
Here are the key command categories you’ll use throughout development:
# Supabase
supabase-create # Create Supabase database
supabase-delete # Delete Supabase database
# Qdrant
qdrant-create-collection # Create Qdrant collection
qdrant-delete-collection # Delete Qdrant collection
qdrant-create-index # Create Qdrant index
qdrant-ingest-from-sql # Ingest data from SQL to Qdrant
# Prefect flows
ingest-rss-articles-flow # Ingest RSS articles flow
ingest-embeddings-flow # Ingest embeddings flow
# Prefect deployment
deploy-cloud-flows # Deploy Prefect flows to Prefect Cloud
deploy-local-flows # Deploy Prefect flows to Prefect Local Server
# Run services
run-api # Run FastAPI application
run-gradio # Run Gradio application
# Quality checks
all-check # Run all: linting, formatting and type checking
all-fix # Run all fix: auto-formatting and linting fixes
clean # Clean up cached generated filesThroughout the course, I’ll reference these commands rather than showing raw Python invocations—it’s more practical when working with the actual codebase.
If you’re enjoying this content, consider joining the AI Echoes community. Your support helps this newsletter expand and reach a wider audience.
Configuration Management
For the development of the project, I used Pydantic Settings for type-safe configuration with environment variable support. This approach provides compile-time validation and clear documentation of required settings, catching configuration errors before they cause runtime failures.
NOTE: If you’re new to Pydantic, I wrote a three-part series covering how it works in detail: Part 1, Part 2, and Part 3. For now, let’s focus on how we’re using it here.
Here’s the core configuration structure:
# src/config.py
class Settings(BaseSettings):
supabase_db: SupabaseDBSettings = Field(default_factory=SupabaseDBSettings)
qdrant: QdrantSettings = Field(default_factory=QdrantSettings)
rss: RSSSettings = Field(default_factory=RSSSettings)
rss_config_yaml_path: str = “src/configs/feeds_rss.yaml”
# Pydantic v2 model config
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
env_file=[”.env”],
env_file_encoding=”utf-8”,
extra=”ignore”,
env_nested_delimiter=”__”,
case_sensitive=False,
frozen=True,
)The BaseSettings class from Pydantic automatically validates types and loads values from environment variables. But there’s a clever trick here that makes environment variable management more intuitive—the double underscore delimiter for nested configurations.
Instead of creating dozens of flat variable names like SUPABASE_HOST, SUPABASE_PASSWORD, QDRANT_API_KEY, QDRANT_URL, you can express hierarchical settings naturally:
# .env
SUPABASE_DB__HOST=db.your-project.supabase.co
SUPABASE_DB__PASSWORD=your-secure-password
QDRANT__API_KEY=your-qdrant-key
QDRANT__URL=https://your-cluster.qdrant.ioWhen the Settings class loads these variables (via env_nested_delimiter=”__”), it automatically maps SUPABASE_DB__HOST to settings.supabase_db.host. You get clean namespace separation without verbose variable names, and the structure matches your Python code organization.
Newsletter sources live in a YAML configuration file, which is easier to edit and version control than hardcoding them in Python:
# src/configs/feeds_rss.yaml
feeds:
- name: “AI Echoes”
author: “Benito Martin”
url: “https://aiechoes.substack.com/feed”
- name: “Decoding ML”
author: “Paul Iusztin”
url: “https://decodingml.substack.com/feed”
- name: “The Neural Maze”
author: “Miguel Otero”
url: “https://theneuralmaze.substack.com/feed”The Settings class loads these feeds automatically during initialization through a Pydantic validator, making them accessible throughout the codebase via settings.rss.feeds. This pattern keeps your data sources separate from code—you can modify the feed list without touching Python files, and version control diffs clearly show which feeds were added or removed.
Supabase: Structured Metadata Storage
With the database schema in place and ready to store articles, we can turn our attention to the most critical part of the pipeline—actually getting content into it. This is where theory meets messy reality.
The Supabase schema needs to balance two competing concerns: normalization principles that reduce data redundancy, and query performance that keeps searches fast.
# src/models/sql_models.py
class SubstackArticle(Base):
__tablename__ = settings.supabase_db.table_name
# Primary internal ID
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, index=True)
# External unique identifier
uuid: Mapped[UUID] = mapped_column(
PG_UUID(as_uuid=True),
default=uuid.uuid4,
unique=True,
nullable=False,
index=True,
)
# Article metadata
feed_name: Mapped[str] = mapped_column(String, nullable=False)
feed_author: Mapped[str] = mapped_column(String, nullable=False)
article_authors: Mapped[list[str]] = mapped_column(ARRAY(String)
title: Mapped[str] = mapped_column(String, nullable=False)
url: Mapped[str] = mapped_column(String, unique=True, nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
published_at: Mapped[str] = mapped_column(TIMESTAMP, nullable=False)
created_at: Mapped[str] = mapped_column(
TIMESTAMP, server_default=func.now(), nullable=False
)Let me walk you through some design choices that might not be immediately obvious.
The dual identifier system—auto-incrementing id plus uuid—serves different purposes. The integer ID provides efficient ordering for internal operations like batch processing, while the UUID enables external references and distributed systems. More importantly, this makes the ingestion pipeline idempotent—you can safely retry operations because UUIDs remain stable across attempts.
The article_authors field uses PostgreSQL’s native array support to handle newsletters with multiple contributors. This avoids creating a separate authors table and article_authors junction table, which would add complexity for what’s essentially a simple many-to-one relationship in our use case.
Notice we’re tracking two different timestamps. The published_at timestamp comes directly from the RSS feed and represents when the author originally published the content. The created_at timestamp records when your system ingested the article. This distinction helps you identify ingestion delays (articles published days ago but only recently ingested) or gaps in your pipeline (missing articles from a specific date range). When debugging, knowing both timestamps is invaluable.
The URL uniqueness constraint serves as our primary deduplication mechanism. RSS feeds sometimes contain the same article multiple times due to updates, corrections, or republishing. The database enforces uniqueness at this level, causing insertion attempts for duplicate URLs to fail gracefully rather than creating duplicate records.
With this schema defined, creating the database is straightforward:
make supabase-createThis command runs the create_db.py script, which checks for existing tables before creating new ones:
# src/infrastructure/supabase/create_db.py
def create_table() -> None:
engine = init_engine()
try:
inspector = inspect(engine)
existing_tables = inspector.get_table_names()
table_name = SubstackArticle.__tablename__
if table_name in existing_tables:
logger.info(f”Table ‘{table_name}’ already exists. Skipping.”)
else:
Base.metadata.create_all(bind=engine)
logger.info(f”Table ‘{table_name}’ created successfully.”)
finally:
engine.dispose()RSS Parsing: Handling Real-World Messiness
Once we have successfully created our database it is time to define the Prefect tasks that will orchestrate in one flow the ingestion of the newsletter articles into the database. We will be using two task: RSS parsing and batch ingestion.
RSS feeds from Substack contain XML content that needs conversion to clean Markdown for better chunking and embedding quality. But the real challenge isn’t the happy path—it’s the edge cases that break naive parsers. Paywalled content, missing fields, and malformed XML all lurk in production RSS feeds.
One issue I encountered during development was paywalled content. I wanted to focus on free articles, but some newsletters include only article excerpts in their RSS feeds, adding a Read more link pointing back to the full article on Substack. If you ingest these blindly, you end up with chunks full of This post is for paying subscribers only... messages that pollute your search results and waste embedding tokens.
Here’s how we catch these before they poison the database:
# src/pipelines/tasks/fetch_rss.py
@task(task_run_name=”fetch_rss_entries-{feed.name}”)
def fetch_rss_entries(
feed: FeedItem,
engine: Engine,
article_model: type[SubstackArticle] = SubstackArticle,
) -> list[ArticleItem]:
# ... fetching and parsing logic ...
for _, item in enumerate(rss_items):
try:
link = item.find(”link”).get_text(strip=True) if item.find(”link”) else “”
if not link or session.query(article_model).filter_by(url=link).first():
logger.info(f”Skipping already stored or empty-link from {feed.name}”)
continue
# 🚨 Skip articles with self-referencing “Read more” links
if raw_html:
try:
html_soup = BeautifulSoup(raw_html, “html.parser”)
for a in html_soup.find_all(”a”, href=True):
if (
a[”href”].strip() == link
and “read more” in a.get_text(strip=True).lower()
):
raise StopIteration # skip this item
except StopIteration:
continueThis check looks for anchor tags where the href matches the article’s own URL, and the link text contains read more. When both conditions match, the article is likely a paywall excerpt rather than full content. This simple heuristic filtered out 15-20% of articles during testing—a significant improvement in content quality.
Once you’ve confirmed the content is genuine, converting HTML to Markdown requires careful configuration of the markdownify library:
content_md = md(
raw_html,
strip=[”script”, “style”], # Remove code and CSS
heading_style=”ATX”, # Use # syntax for headers
bullets=”*”, # Consistent bullet points
autolinks=True, # Convert URLs to links
)
# Clean up extra whitespace
content_md = “\n”.join(
line.strip() for line in content_md.splitlines() if line.strip()
)The strip parameter removes JavaScript and CSS that sometimes leaks into RSS feed content. Without this, you’d see chunks of code or style definitions mixed into your article text—not great for semantic search. The heading_style=”ATX” option ensures consistent header formatting using # symbols rather than underlines, making the Markdown easier to chunk cleanly. The whitespace cleanup pass removes blank lines and excessive indentation that waste tokens during embedding.
Not all RSS feeds are well-formed, so defensive programming prevents database constraint violations:
# Handle missing authors gracefully
author_elem = item.find(”creator”) or item.find(”dc:creator”)
author = (
author_elem.get_text(strip=True)
if author_elem else feed.author
)
# Parse publication date with fallback
pub_date_elem = item.find(”pubDate”)
pub_date_str = (
pub_date_elem.get_text(strip=True)
if pub_date_elem else None
)
article_item = ArticleItem(
feed_name=feed.name,
feed_author=feed.author,
title=title,
url=link,
content=content_md,
article_authors=[author] if author else [],
published_at=pub_date_str,
)
items.append(article_item)These fallback strategies keep the pipeline running even when individual articles have problems. If an article lacks an explicit author, you use the feed author as a reasonable default. If the publication date is missing, you log a warning but still process the article. If the content is empty after Markdown conversion, you skip that article entirely rather than inserting an invalid record.
logger.info(f”Fetched {len(items)} new articles for ‘{feed.name}’”)
return items
except Exception as e:
logger.error(f”Error fetching from ‘{feed.name}’: {e}”)
raise
finally:
session.close()This pattern means one misbehaving feed doesn’t crash the entire ingestion process. Prefect will retry the failed feed automatically while the others complete successfully.
Note on rate limiting: I didn’t implement explicit rate limiting for RSS fetches because the batch sizes and number of feeds (30+) didn’t generate enough requests to trigger rate limits on Substack’s servers (although this might change in the future). If you’re scaling beyond 50-100 feeds or running ingestion more frequently, you’ll want to add throttling.
Batch Ingestion: Optimizing Database Writes
Now that we’ve successfully parsed RSS feeds and filtered out paywalled content, we need an efficient strategy for writing these articles to the database. Writing them one at a time would work, but it’s slow and puts unnecessary load on Supabase. Let’s look at how batching solves this.
We’ve built all the individual pieces—database schema, RSS parsing with paywall detection, and batch insertion for efficiency. But how do these components actually work together? That’s where Prefect tasks come in, orchestrating everything into a cohesive, production-ready pipeline.
The ingestion pipeline processes articles in batches to optimize database writes while providing clear progress tracking to balance transaction overhead:
# src/pipelines/tasks/ingest_rss.py
from prefect import task
from prefect.cache_policies import NO_CACHE
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from src.config import settings
from src.infrastructure.supabase.init_session import init_session
from src.models.article_models import ArticleItem, FeedItem
from src.models.sql_models import SubstackArticle
from src.utils.logger_util import setup_logging
@task(
task_run_name=”batch_ingest-{feed.name}”,
description=”Ingest already parsed RSS articles in batches.”,
retries=2,
retry_delay_seconds=120,
cache_policy=NO_CACHE,
)
def ingest_from_rss(
fetched_articles: list[ArticleItem],
feed: FeedItem,
article_model: type[SubstackArticle],
engine: Engine,
) -> None:
logger = setup_logging()
rss = settings.rss
errors = []
batch: list[ArticleItem] = []
session: Session = init_session(engine)
try:
for i, article in enumerate(fetched_articles, start=1):
batch.append(article)
if len(batch) >= rss.batch_size:
batch_num = i // rss.batch_size
try:
_persist_batch(session, batch, article_model)
session.commit()
except Exception as e:
session.rollback()
errors.append(f”Batch {batch_num}”)
else:
logger.info(
f”🔁 Ingested batch {batch_num} with {len(batch)}“
f”for feed ‘{feed.name}’”
)
batch = []
# Handle leftover articles
if batch:
try:
_persist_batch(session, batch, article_model)
session.commit()
except Exception as e:
session.rollback()
logger.error(f”Failed to ingest feed ‘{feed.name}’: {e}”)
errors.append(”Final batch”)
else:
logger.info(
f”👉 Ingested batch of {len(batch)} articles for ‘{feed.name}’”
if errors:
raise RuntimeError(f”Ingestion completed with errors: {errors}”)
except Exception as e:
logger.error(f”Error in ingest_from_rss for ‘{feed.name}’: {e}”)
raise
finally:
session.close()
logger.info(f”Database session closed for feed ‘{feed.name}’”)The batch size configuration (default 30 articles) represents a sweet spot I found through testing. Smaller batches like 5-10 articles, provide more frequent progress updates but increase database load from repeated commits. Larger batches like 50-100 improve throughput but reduce granularity when errors occur—if batch 3 fails, you want to know which specific articles caused the problem without debugging 100 candidates.
Each batch commits independently. If one batch fails due to a constraint violation or network issue, the other batches still succeed. This prevents cascading failures where one problematic article blocks processing for an entire feed. When debugging, the batch number in error logs tells you exactly where to look in the article list.
Session management happens at the task level, with each feed getting its own database session created from a shared engine. This avoids connection pool exhaustion when Prefect executes 30 fetch tasks concurrently. The finally block ensures sessions close properly even if exceptions occur, preventing connection leaks that would eventually exhaust the pool and cause deadlocks.
The actual insertion uses SQLAlchemy’s bulk operations for efficiency:
def _persist_batch(
session: Session,
batch: list[ArticleItem],
article_model: type[SubstackArticle],
) -> None:
“”“Helper to bulk insert a batch of ArticleItems.”“”
rows = [
article_model(
feed_name=article.feed_name,
feed_author=article.feed_author,
title=article.title,
url=article.url,
content=article.content,
article_authors=article.article_authors,
published_at=article.published_at,
)
for article in batch
]
session.bulk_save_objects(rows)The bulk_save_objects method generates a single INSERT statement with multiple rows, significantly faster than executing individual inserts in a loop. For a batch of 30 articles, this reduces database round-trips from 30 to 1, cutting insertion time by roughly 80%. When you’re processing hundreds of articles weekly, this efficiency matters.
Orchestrating the Pipeline with Prefect Flows
At this time we’ve built the individual components—database schema, RSS parsing, batch ingestion. Now we need to orchestrate them into a cohesive pipeline. This is where Prefect flows come in, coordinating the execution of tasks while handling errors, retries, and parallel processing.
The RSS Ingestion Flow
The rss_ingest_flow coordinates the entire process of fetching articles from all configured feeds and storing them in Supabase:
# src/pipelines/flows/rss_ingestion_flow.py
@flow(
name=”rss_ingest_flow”,
flow_run_name=”rss_ingest_flow_run”,
description=”Fetch and ingest articles from RSS feeds.”,
retries=2,
retry_delay_seconds=120,
)
def rss_ingest_flow(article_model: type[SubstackArticle] = SubstackArticle) -> None:
logger = setup_logging()
engine = init_engine()
errors = []
# Tracking counters
per_feed_counts: dict[str, int] = {}
total_ingested = 0
try:
if not settings.rss.feeds:
logger.warning(”No feeds found in configuration.”)
return
feeds = [FeedItem(name=f.name, author=f.author, url=f.url) for f in settings.rss.feeds]
logger.info(f”🚀 Processing {len(feeds)} feeds concurrently...”)
# 1. Fetch articles concurrently
fetched_articles_futures = fetch_rss_entries.map(
feeds,
engine=unmapped(engine),
article_model=unmapped(article_model),
)The key pattern here is Prefect’s .map() function, which executes the fetch_rss_entries task in parallel across all feeds. The unmapped() wrapper tells Prefect that the engine and article_model parameters should be the same for all parallel executions—we don’t want 30 different database engines, just one shared across all tasks.
This parallel execution is where the pipeline’s efficiency comes from. Instead of processing 30 feeds sequentially (which could take 10-15 minutes), we process them concurrently and finish in 2-3 minutes. Each feed gets its own task with independent error handling, so one slow or failing feed doesn’t block the others.
The flow continues by processing the fetched articles:
# 2. Ingest concurrently per feed
results = []
for feed, fetched_future in zip(feeds, fetched_articles_futures, strict=False):
try:
fetched_articles = fetched_future.result()
except Exception as e:
logger.error(f”❌ Error fetching articles for feed ‘{feed.name}’: {e}”)
errors.append(f”Fetch error: {feed.name}”)
continue
if not fetched_articles:
logger.info(f”📭 No new articles for feed ‘{feed.name}’”)
per_feed_counts[feed.name] = 0
continue
try:
count = len(fetched_articles)
per_feed_counts[feed.name] = count
total_ingested += count
logger.info(f”✅ Feed ‘{feed.name}’: {count} articles ready for ingestion”)
task_result = ingest_from_rss.submit(
fetched_articles,
feed,
article_model=article_model,
engine=engine,
)
results.append(task_result)
except Exception as e:
logger.error(f”❌ Error submitting ingest_from_rss for feed ‘{feed.name}’: {e}”)
errors.append(f”Ingest error: {feed.name}”)Notice the error handling strategy—we catch exceptions from individual feeds but continue processing the rest. This resilience is crucial for production systems. If one newsletter’s RSS feed is temporarily down or malformed, you still want to ingest the other 29 feeds successfully.
The flow tracks statistics for each feed and provides a comprehensive summary at the end:
# 3. Wait for all ingestion tasks
for r in results:
try:
r.result()
except Exception as e:
logger.error(f”❌ Error in ingest_from_rss task: {e}”)
errors.append(”Task failure”)
# Summary logging
logger.info(”📊 Ingestion Summary per feed:”)
for feed_name, count in per_feed_counts.items():
logger.info(f” • {feed_name}: {count} article(s) ingested”)
logger.info(f”📝 Total ingested across all feeds: {total_ingested}”)
if errors:
raise RuntimeError(f”Flow completed with errors: {errors}”)
except Exception as e:
logger.error(f”💥 Unexpected error in rss_ingest_flow: {e}”)
raise
finally:
engine.dispose()
logger.info(”🔒 Database engine disposed.”)The summary logging gives you immediate visibility into which feeds are active and which might have issues. When you see 0 articles ingested for a feed that usually has content, you know to investigate.
Running the Flow
You can trigger the RSS ingestion flow using the Makefile command:
make ingest-rss-articles-flowThis command executes the flow locally, showing real-time progress as Prefect processes each feed. You’ll see output like:
🚀 Processing 32 feeds concurrently...
✅ Feed ‘AI Echoes’: 3 articles ready for ingestion
✅ Feed ‘Decoding ML’: 5 articles ready for ingestion
📭 No new articles for feed ‘The Neural Maze’
🔁 Ingested batch 1 with 30 articles for feed ‘AI Echoes’
...
📊 Ingestion Summary per feed:
• AI Echoes: 3 article(s) ingested
• Decoding ML: 5 article(s) ingested
• The Neural Maze: 0 article(s) ingested
📝 Total ingested across all feeds: 127
🔒 Database engine disposed.The flow automatically retries failed tasks twice with a 120-second delay between attempts (configured in the @flow decorator). This handles transient network issues or temporary API limits without manual intervention.
NOTE: in the INSTRUCTIONS.md file you will find the way to run the flow either in the cloud or locally. I recommend to start with local flows as the Prefect free cloud tier provides 8 hours per month and you might use it all if you experiment a lot at the beginning.
Conclusion and What’s Next
We’ve built the foundation of a production RAG system—type-safe configuration management with Pydantic Settings, a PostgreSQL schema that balances normalization with performance, and an RSS ingestion pipeline that handles real-world data quality issues.
The patterns we covered make this system production-ready:
Defensive RSS parsing catches paywalled content before it pollutes your search results—this alone filtered out 15-20% of articles in testing.
Batch ingestion with error isolation ensures one failing article doesn’t block an entire feed, critical when processing dozens of sources.
Idempotent database operations allow safe retries through URL uniqueness constraints and existence checks—re-running ingestion only processes new content rather than failing on duplicates.
Type-safe configuration through Pydantic Settings validates environment variables at startup, catching configuration errors before they cause runtime failures deep in the pipeline.
In the next lesson, we’ll add semantic search capabilities on top of this foundation. You’ll learn how to configure Qdrant for hybrid vector search with payload indexing for efficient filtering, generate embeddings using Fastembed (or Jina/Hugging Face as alternative), optimize bulk uploads through strategic index management, and orchestrate everything with another Prefect flow.
The complete code remains available here. Clone it, experiment with different RSS feeds, and adapt the parsing logic for your own content sources.
I don’t write just for myself—every post is meant to give you tools, ideas, and insights you can use right away.
🤝 Got feedback or topics you’d like me to cover? I’d love to hear from you. Your input shapes what comes next!







Wow, RAG for newsletters. Curious about free tier implimentation!
This is cool, bookmarked !