🌏 中文版
A lot of RAG systems are built the same way: one big function that sequentially runs query parsing, vector search, reranking, and generation, with a tangle of if-else branches in between. This works fine when the system is simple, but as you add more steps — HyDE, Multi-Query, CRAG, Self-Reflection — that big function turns into an unmaintainable ball of mud.
The Modular Pipeline design breaks each RAG step into an independent module and uses a Pipeline Engine to orchestrate them. The core idea draws from the concept of a Directed Acyclic Graph (DAG): steps are nodes, dependencies are edges, and the engine executes them in order while passing state along.
Step Structure
Each pipeline step is an object implementing a fixed interface:
interface PipelineStep {
name: string;
skipWhen?: (ctx: PipelineContext) => boolean;
timeout?: number; // milliseconds, per-step timeout
execute: (ctx: PipelineContext, env: Env) => Promise<void>;
}
A concrete step looks like this:
const hydeStep: PipelineStep = {
name: "hyde",
skipWhen: (ctx) => ctx.queryType !== "complex",
timeout: 3000,
execute: async (ctx, env) => {
const hypoDoc = await generateHypotheticalDoc(ctx.query, env);
ctx.hydeEmbedding = await embed(hypoDoc, env);
ctx.trace.hyde = { generated: hypoDoc, durationMs: /* ... */ };
},
};
skipWhen is the key to this design: each step decides for itself whether to run, so you don’t need branching logic scattered throughout the engine. The result of Query Classification (ctx.queryType) lets each step automatically take the right path.
Step Registry
All steps register with the Registry at startup, and the Engine retrieves an ordered list of steps from it:
const registry = new PipelineStepRegistry();
registry.register(semanticCacheStep); // 1
registry.register(quotaCheckStep); // 2
registry.register(toolSelectionStep); // 3
registry.register(textToSqlStep); // 4
registry.register(hydeStep); // 5
registry.register(multiQueryStep); // 6
registry.register(filterBuildStep); // 7
registry.register(embeddingStep); // 8
registry.register(hybridSearchStep); // 9
registry.register(crossEncoderStep); // 10
registry.register(mmrStep); // 11
registry.register(popularityRerankStep); // 12
registry.register(llmGenerationStep); // 13
registry.register(judgeStep); // 14
registry.register(selfReflectionStep); // 15
registry.register(guardrailsOutputStep); // 16
registry.register(memoryExtractionStep); // 17
Adding a new step only requires writing a new step object and adding one line to the Registry — no changes to the Engine itself.
Pipeline Engine
The Engine’s core logic is remarkably simple:
class PipelineEngine {
async run(ctx: PipelineContext, env: Env): Promise<void> {
const steps = this.registry.getSteps();
for (const step of steps) {
// Dynamic config: admins can disable a step from the dashboard
const isEnabled = ctx.config.steps[step.name]?.enabled ?? true;
if (!isEnabled) continue;
// skipWhen: the step decides for itself whether to run
if (step.skipWhen?.(ctx)) continue;
// Execute with timeout
await withTimeout(
step.execute(ctx, env),
step.timeout ?? ctx.config.defaultStepTimeout
);
}
}
}
In practice you’d add timeout handling, error isolation (a failing step shouldn’t bring down the whole pipeline), trace recording, and so on — but the core logic is just this loop.
PipelineContext: A Single State Object
All steps share one PipelineContext, passing information between them by mutating the context:
interface PipelineContext {
// Input
query: string;
userId?: string;
config: AIConfig;
// Query analysis results
queryType: "simple" | "complex" | "sql" | "hybrid" | "general-knowledge" | "clarification-needed";
sqlTemplateId?: string;
// Vector search
queryEmbedding?: number[];
hydeEmbedding?: number[];
expandedQueries?: string[];
vectorFilter?: VectorFilter;
// Search results
candidateMatches: SearchResult[];
// Generation
context: string; // assembled context string
messages: Message[]; // LLM messages array
response?: AIResponse;
// Quality evaluation
judgeResult?: JudgeResult;
// Observability
trace: PipelineTrace;
tokenBreakdown: TokenBreakdown;
// Quota
quotaDeducted: boolean;
cragRetryCount: number;
}
A single state object makes debugging intuitive: when something goes wrong, printing the entire context shows exactly what each step did. It also makes testing easier: you can set up a context at a specific state and test a step’s behavior in isolation.
Dynamic Configuration
Step enable/disable states and various thresholds are stored in the ai_config database table and can be adjusted in real time through the Admin UI:
{
"steps": {
"hyde": { "enabled": true },
"multi_query": { "enabled": true },
"cross_encoder": { "enabled": true },
"self_reflection": { "enabled": false } // temporarily disabled
},
"reranker_relevance_threshold": 0.5,
"mmr_lambda": 0.7,
"rag_strategy": "plan-execute"
}
This lets you change system behavior without redeployment:
- Disable a broken step as a live hotfix
- Tune threshold parameters for A/B testing
- Switch RAG strategies (baseline / plan-execute / agentic)
Cloudflare Workers Constraints
Running the Pipeline on Cloudflare Workers comes with a few things to keep in mind:
CPU time limits: Workers have a CPU time cap (not wall-clock time). Waiting on I/O (LLM API calls) doesn’t consume CPU, but be mindful of embedding computation and heavy string processing.
Parallelism done right: I/O across multiple steps should use Promise.all() for concurrency — not sequential await chains:
// Correct: run in parallel
const [queryEmbedding, hydeDoc] = await Promise.all([
embed(query, env),
generateHyDE(query, env),
]);
// Wrong: sequential, wastes time
const queryEmbedding = await embed(query, env);
const hydeDoc = await generateHyDE(query, env);
The purpose of ctx.waitUntil(): Non-critical work (memory extraction, Contextual Retrieval updates) can use waitUntil() to continue executing after the response is returned, without blocking the user.
In Summary
The Modular Pipeline solves the maintainability problem in RAG systems. As you add more steps, a modular design is far easier to maintain than a single monolithic function; dynamic configuration is more flexible than tweaking parameters through redeployment; skipWhen is cleaner than screens full of if-else.
The core trade-off in this architecture: you add a layer of abstraction (Pipeline Engine, Step Registry), and in exchange you get testability, configurability, and observability. For a RAG system that keeps evolving, that trade-off is worth it.
References
Loading...