Home / Docs / ragmcp
On this page

ragmcp

🔍 8 tools

ragmcp

Plug-and-play RAG with native MCP support. LLM-agnostic, swappable at every level.

PyPI Python License: AGPL-3.0

ragmcp is a modular Python RAG library that exposes your document pipeline as an MCP server — ready to be used with Claude Desktop, Cursor, or any MCP-compatible client.

  • Zero lock-in — swap embedders, vector stores, rerankers, caches, and LLMs without rewriting your pipeline
  • MCP-native — one line to turn your pipeline into a tool callable by Claude Desktop or Cursor
  • Production-ready modules — Graph RAG, multimodal search (CLIP + ColPali), streaming ingestion, feedback loop, observability
  • RAGFactory — start in minutes; scale by changing env vars

Table of Contents


Architecture

Interfaces
CLIREST APIMCP ServerAssistant
Orchestration
RAGPipeline
Stratégies
RetrieverRerankerCompressorRouter
Composants
ChunkerEmbedderVectorStoreGraphStore
Support
LoaderCacheAuditFeedbackStreaming
Fondations
Core Models & Abstractions
LayerRole
InterfacesEntry points: CLI commands, REST API, MCP stdio/SSE transport, pydantic-ai/Langfuse agents
OrchestrationRAGPipeline wires every component together and owns the ingest/search/chat lifecycle
StrategiesPluggable algorithms — swap Retriever, Reranker, Compressor, or Router independently
ComponentsStateful building blocks: Chunker splits text, Embedder produces vectors, VectorStore/GraphStore persists them
SupportCross-cutting concerns: Loader reads sources, Cache accelerates hot paths, Audit/Feedback record activity, Streaming handles live data
FoundationsPydantic models, abstract base classes, and shared utilities that every layer depends on

All components are injectable — swap any layer without modifying the others.


Installation

# Minimal (MCP server + in-memory store)
pip install mcpaisuite-ragmcp

# With local embedder + ChromaDB
pip install "mcpaisuite-ragmcp[local]"

# With PDF, DOCX, HTML loaders
pip install "mcpaisuite-ragmcp[pdf,docx,html]"

# Full stack (all backends)
pip install "mcpaisuite-ragmcp[all]"

Optional extras:

ExtraWhat it enables
fastembedFastEmbed local ONNX embedder (CPU, no API key, no PyTorch)
litellmLiteLLM embedder (OpenAI, Cohere, Mistral, …)
localsentence-transformers + ChromaDB + Ollama
pdfPDF loader via pypdf
docxWord document loader
htmlHTML/web page loader
ocrOCR for scanned PDFs and images (pytesseract)
audioWhisper transcription for audio/video files
qdrantQdrant vector store
milvusMilvus vector store
pgvectorPostgreSQL + pgvector (includes psycopg2-binary, pgvector, numpy)
rerankCrossEncoderReranker (sentence-transformers)
cohereCohereReranker
cacheRedis cache + memory LRU cache
graphGraphRAG with NetworkX or Neo4j
clipCLIP image search
colpaliColPali MaxSim document image search
spladeSPLADE learned sparse index (transformers + torch)
sourcesSQL, REST API, GitHub connectors (sqlalchemy, aiohttp, asyncpg)
emailIMAP email source (stdlib only, no extra packages)
slackSlack source (channels, threads)
jiraJira source (Cloud + Server/DC, JQL)
notionNotion loader
confluenceConfluence loader
streamingKafka streaming ingestion (aiokafka)
urlURL/web page loader (aiohttp + beautifulsoup4)
s3S3 loader
gcsGoogle Cloud Storage loader
tiktokenAccurate token counting for OpenAI models (used by ContextAssembler)
metricsPrometheus metrics
tracingOpenTelemetry tracing
evalRecall@k + RAGAS evaluation
apiFastAPI REST server
langfuseLangfuse tracing integration
pydantic-aipydantic-ai agent bridge
allEverything

Quickstart

5-minute start

from ragmcp import RAGFactory

# Default pipeline (fastembed + persistent ChromaDB), no API key required
pipeline = RAGFactory.create_default()

await pipeline.ingest("docs/manual.pdf")
results = await pipeline.search("How do I reset my password?")

for chunk in results:
    print(chunk.content)

With OpenAI + ChromaDB

import os
from ragmcp import RAGFactory

# OpenAI embeddings + local ChromaDB
pipeline = RAGFactory.create_openai(api_key=os.environ["OPENAI_API_KEY"])

await pipeline.ingest_folder("./docs")
results = await pipeline.search("RAG architectures", top_k=5)

Production stack

import os
from ragmcp import RAGFactory

# pgvector + Cohere reranker
pipeline = RAGFactory.create_production(
    db_url="postgresql://user:pass@localhost/ragmcp",
    api_key=os.environ["OPENAI_API_KEY"],
)

From environment variables

export RAGMCP_EMBEDDER=litellm
export RAGMCP_EMBEDDER_MODEL=text-embedding-3-small
export RAGMCP_EMBEDDER_API_KEY=sk-...
export RAGMCP_VECTORSTORE=qdrant
export RAGMCP_VECTORSTORE_URL=http://localhost:6333
export RAGMCP_CACHE=redis
export RAGMCP_CACHE_URL=redis://localhost:6379
from ragmcp import RAGFactory

pipeline = RAGFactory.from_env()

MCP Server

ragmcp exposes your pipeline as an MCP server so Claude Desktop, Cursor, or any MCP-compatible LLM client can call it as a native tool. The server registers 8 tools by default: search_documents, ingest_document (or ingest_content in upload mode), list_sources, delete_source, run_eval, chat, plus the multimodal visual_ingest / visual_search (ColPali). A 9th, submit_feedback, is added when a feedback store is configured. search_documents also accepts a retrieval_mode (dense / bm25 / splade) to switch retriever per call.

Two transports are supported:

TransportWhen to use
stdio (default)Local use — Claude Desktop or Cursor launches ragmcp as a subprocess
sseHTTP server — share a running server across clients, Docker, or remote deployments

Python

from ragmcp import RAGFactory
from ragmcp.mcp_server import RAGMCPServer

pipeline = RAGFactory.from_env()
server = RAGMCPServer(pipeline=pipeline)

# stdio — Claude Desktop / Cursor subprocess (default)
server.run()

# SSE — HTTP server on port 8080
# server.run(transport="sse", port=8080)

Claude Desktop — stdio (claude_desktop_config.json)

ragmcp is launched as a subprocess; documents ingest through ingest_document or by running ragmcp ingest before starting.

{
  "mcpServers": {
    "ragmcp": {
      "command": "ragmcp",
      "args": ["serve"],
      "cwd": "/path/to/your/project"
    }
  }
}

To use a custom config file:

{
  "mcpServers": {
    "ragmcp": {
      "command": "ragmcp",
      "args": ["serve", "--config", "ragmcp.yaml"],
      "cwd": "/path/to/your/project"
    }
  }
}

Config file location:

  • macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
  • Windows: %APPDATA%\Claude\claude_desktop_config.json

Claude Desktop — SSE

Start the server first, then point Claude Desktop at it. Useful when you want a persistent server with documents already indexed.

ragmcp serve --transport sse --port 8080
{
  "mcpServers": {
    "ragmcp": {
      "type": "sse",
      "url": "http://localhost:8080/sse"
    }
  }
}

Cursor

# Cursor settings → MCP Servers → Add:
ragmcp:
  command: ragmcp
  args: ["serve"]
  cwd: /path/to/your/project

CLI

# stdio (default)
ragmcp serve

# With config file
ragmcp serve --config ragmcp.yaml

# SSE transport
ragmcp serve --transport sse --port 8080

Core Pipeline

RAGPipeline is the central object. All high-level operations go through it.

from ragmcp.pipeline import RAGPipeline
from ragmcp.embedders import FastEmbedEmbedder
from ragmcp.vectorstores import ChromaStore
from ragmcp.retrievers import HybridRetriever
from ragmcp.rerankers import CrossEncoderReranker

pipeline = RAGPipeline(
    embedder=FastEmbedEmbedder(),
    vectorstore=ChromaStore(path=".chroma"),
    retriever=HybridRetriever(),
    reranker=CrossEncoderReranker(),
)

# Ingest a single file
await pipeline.ingest("report.pdf")

# Ingest a whole folder (recursive by default)
await pipeline.ingest_folder("./docs")

# Ingest from a streaming data source (S3, GitHub, SQL, …)
from ragmcp.loaders.s3_loader import S3Loader  # via AutoLoader / BaseDataSource
# await pipeline.ingest_source(my_source)

# Search
chunks = await pipeline.search("What is the refund policy?", top_k=10)

# Stream chunks one by one
async for chunk in pipeline.search_stream("Summarize the report"):
    print(chunk.content)

Loaders & Chunkers

Loaders

ragmcp auto-detects file type via AutoLoader.

LoaderExtensionsExtra
TextLoader.txt, .md
PDFLoader.pdfpdf
PDFOCRLoaderscanned .pdfocr
DocxLoader.docxdocx
HTMLLoader.html, URLshtml
CSVLoader.csv— (stdlib)
JSONLoader.json, .jsonl— (stdlib)
ImageLoader.png, .jpg, .webpocr or litellm
AudioLoader.mp3, .wav, .m4aaudio
VideoLoader.mp4, .webm, .mov, .mkvaudio
S3Loaders3://s3
GCSLoadergs://gcs

Cloud Storage Loaders

from ragmcp.loaders.s3_loader import S3Loader
from ragmcp.loaders.gcs_loader import GCSLoader

# AWS S3 (production)
loader = S3Loader(bucket="my-bucket", prefix="docs/")

# AWS S3 with a local emulator (MinIO, LocalStack, …)
loader = S3Loader(
    bucket="ragmcp-test",
    endpoint_url="http://localhost:9100",   # or set AWS_ENDPOINT_URL env var
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
)

# Google Cloud Storage (production)
loader = GCSLoader(bucket="my-bucket", prefix="docs/")

# GCS with a local emulator (fake-gcs-server)
# Set STORAGE_EMULATOR_HOST before instantiating — no credentials required
import os
os.environ["STORAGE_EMULATOR_HOST"] = "http://localhost:4443"
loader = GCSLoader(bucket="ragmcp-test")

Chunkers

from ragmcp.chunkers import RecursiveChunker, SentenceChunker, LateChunker, SemanticChunker

# Token-based recursive splitting (default)
chunker = RecursiveChunker(chunk_size=512, overlap=64)

# Sentence-aware splitting
chunker = SentenceChunker(max_sentences=5, overlap_sentences=1)

# Contextual: neighbor text is included in each chunk's embedding window
chunker = LateChunker(chunk_size=512, overlap=64, context_before=200, context_after=200)

# Semantic: cuts at topic transitions detected via cosine similarity
# Requires an embedder — use achunk() inside async code
chunker = SemanticChunker(
    embedder=embedder,
    breakpoint_threshold=0.5,  # lower = more chunks
    buffer_size=1,             # sentences of context for each embedding window
    min_chunk_size=100,        # merge chunks shorter than this
    max_chunk_size=2000,       # split chunks larger than this with RecursiveChunker
)

# Inside an async context:
chunks = await chunker.achunk(documents)

# Or synchronously (wraps asyncio.run — avoid inside existing event loop):
chunks = chunker.chunk(documents)

Embedders

ClassBackendExtra
FastEmbedEmbedderfastembed (local, CPU)fastembed
LiteLLMEmbedderOpenAI, Cohere, Mistral, …litellm
OllamaEmbedderOllama (local GPU)local
CachedEmbedderWraps any embedder with cachecache
from ragmcp.embedders import LiteLLMEmbedder
from ragmcp.cache import CachedEmbedder, MemoryLRUCache

embedder = CachedEmbedder(
    embedder=LiteLLMEmbedder(model="text-embedding-3-small", api_key="sk-..."),
    cache=MemoryLRUCache(max_size=10_000),
)

Vector Stores

ClassBackendExtra
InMemoryVectorStoreNumPy (no deps)
ChromaStoreChromaDBlocal
QdrantStoreQdrantqdrant
MilvusStoreMilvusmilvus
PgVectorStorePostgreSQL + pgvectorpgvector
from ragmcp.vectorstores import QdrantStore

store = QdrantStore(url="http://localhost:6333", collection="my_docs")

Retrievers

ClassStrategy
DenseRetrieverPure vector similarity
BM25SparseIndexKeyword-based BM25
HybridRetrieverDense + BM25 with RRF fusion
CachedRetrieverWraps any retriever with query-level cache
GraphRAGRetrieverGraph traversal + augmented dense retrieval
MultiQueryRetrieverGenerates N query variants via LLM + RRF fusion
HyDERetrieverHypothetical Document Embeddings (Gao et al. 2022)

Choosing the search strategy

The retrieval strategy is selected by the retriever you inject into the pipeline (DenseRetriever, BM25SparseIndex, HybridRetriever, …). When no retriever is set, the pipeline falls back to direct dense vector search via the vector store.

from ragmcp.retrievers import HybridRetriever

pipeline = RAGPipeline(..., retriever=HybridRetriever(embedder=embedder, vectorstore=store))
results = await pipeline.search(query, top_k=5)   # dense + BM25 with RRF fusion

pipeline.search() signature: search(query, top_k=None, filters=None, user_id=None, session_id=None).

Sparse Indexes

ClassBackendNotes
BM25SparseIndexBM25 (keyword)Default, no deps
SPLADESparseIndexSPLADE (learned sparse)pip install "mcpaisuite-ragmcp[splade]"
from ragmcp.retrievers import SPLADESparseIndex, HybridRetriever

# Drop-in replacement for BM25SparseIndex
splade = SPLADESparseIndex(
    model_name="naver/splade-cocondenser-ensembledistil",
    device="cpu",   # or "cuda" / "mps"
)
retriever = HybridRetriever(embedder=embedder, vectorstore=store, sparse_index=splade)

SPLADE learns to expand queries with synonyms and related terms — better than BM25 on specialized vocabulary (medical, legal, technical). Model downloads on first use (~500MB).

from ragmcp.retrievers import HybridRetriever, CachedRetriever, MultiQueryRetriever, HyDERetriever
from ragmcp.cache import RedisCache

# Cache query results
retriever = CachedRetriever(
    retriever=HybridRetriever(alpha=0.7),
    cache=RedisCache(url="redis://localhost:6379"),
)

# Multi-query: generates 3 variants via LLM, fuses with RRF (+15-25% recall)
retriever = MultiQueryRetriever(
    retriever=HybridRetriever(...),
    llm_model="gpt-4o-mini",
    num_queries=3,
)

# HyDE: generates a hypothetical passage, embeds that instead of the raw query
retriever = HyDERetriever(
    embedder=embedder,
    vectorstore=vectorstore,
    llm_model="gpt-4o-mini",
)

Rerankers

ClassBackendExtra
CrossEncoderRerankersentence-transformers (local)rerank
CohereRerankerCohere Rerank APIcohere
FeedbackRerankerUser feedback signals (SQL store)
from ragmcp.rerankers import CrossEncoderReranker, FeedbackReranker
from ragmcp.audit.sql_feedback_store import SQLFeedbackStore

# Local reranker
reranker = CrossEncoderReranker(model="cross-encoder/ms-marco-MiniLM-L-6-v2")

# Feedback-driven reranker — boosts chunks with positive votes
feedback_store = SQLFeedbackStore(db_path="feedback.db")
reranker = FeedbackReranker(store=feedback_store, feedback_weight=0.3)

Context Compression

Reduce tokens sent to the LLM by compressing retrieved chunks before generation.

from ragmcp.compression import ExtractiveSentenceCompressor, LLMContextCompressor

# Extractive: rank sentences by relevance, drop low-scoring ones (no LLM required)
compressor = ExtractiveSentenceCompressor(min_sentence_score=0.0)

# LLM-based: ask the LLM to extract relevant passages
compressor = LLMContextCompressor(model="gpt-4o-mini")

# Compress retrieved chunks down to a token budget before generation
chunks = await pipeline.search("refund policy", top_k=10)
compressed = await compressor.compress("refund policy", chunks, target_tokens=1500)

Cache

from ragmcp.cache import MemoryLRUCache, RedisCache

# In-process LRU cache
cache = MemoryLRUCache(max_size=5_000, default_ttl_s=3600)

# Redis cache (survives restarts, shared across workers)
cache = RedisCache(url="redis://localhost:6379", ttl=86400)

Both can be passed to CachedEmbedder (embedding-level) or CachedRetriever (query-level).


Graph RAG

Build a knowledge graph from your documents, then augment retrieval with entity-aware traversal.

from ragmcp.graph import GraphRAGRetriever, NetworkXGraphStore, LLMEntityExtractor

graph_store = NetworkXGraphStore()   # or Neo4jGraphStore(uri=..., user=..., password=...)
extractor = LLMEntityExtractor(model="gpt-4o-mini")

retriever = GraphRAGRetriever(
    graph_store=graph_store,
    entity_extractor=extractor,
    dense_retriever=HybridRetriever(),
    max_hops=2,
)

# Extracts entities → traverses graph → augments query → dense retrieval
results = await retriever.retrieve("What did Alice say about the merger?", top_k=5)

Neo4j is supported for production graph stores:

from ragmcp.graph import Neo4jGraphStore

graph_store = Neo4jGraphStore(uri="bolt://localhost:7687", user="neo4j", password="...")

from ragmcp.multimodal import CLIPImageEmbedder
from ragmcp.vectorstores import InMemoryVectorStore

embedder = CLIPImageEmbedder()
store = InMemoryVectorStore()

# Embed and index an image
img_vec = await embedder.embed_image("product_photo.jpg")
await store.upsert([chunk], [img_vec])

# Cross-modal search: text query → similar images
text_vec = await embedder.embed(["red running shoes"])
results = await store.search(text_vec[0], top_k=5)
from ragmcp.multimodal import ColPaliRetriever

retriever = ColPaliRetriever()
await retriever.index_image("invoice_page1.png")
results = await retriever.search("total amount due", top_k=3)
# Uses MaxSim scoring — works on dense document images without OCR

Vision LLM (Image Description)

from ragmcp.multimodal import LiteLLMVisionDescriber

describer = LiteLLMVisionDescriber(model="gpt-4o", api_key="sk-...")
description = await describer.describe("chart.png", prompt="What trend does this chart show?")

Audio & Video Transcription

from ragmcp.loaders.audio_loader import AudioLoader
from ragmcp.loaders.video_loader import VideoLoader
from ragmcp.multimodal import WhisperTranscriber

transcriber = WhisperTranscriber(mode="local", model="base")   # or mode="api"

loader = AudioLoader(transcriber=transcriber)
chunks = await loader.load("interview.mp3")

video_loader = VideoLoader(transcriber=transcriber)
chunks = await video_loader.load("meeting_recording.mp4")

Streaming Ingestion

Kafka — quickstart

from ragmcp.streaming import StreamingIngestionService
from ragmcp.streaming import KafkaStreamSource   # pip install mcpaisuite-ragmcp[streaming]

kafka = KafkaStreamSource(
    bootstrap_servers="localhost:9092",
    topic="documents",
    group_id="ragmcp-ingest",
)
service = StreamingIngestionService(pipeline=pipeline)
await service.run(kafka)  # infinite loop, Ctrl+C to stop

Kafka — multi-tenant

Each tenant gets its own Kafka consumer and dedicated topic. Topic naming convention:

  • ragmcp-docs → default tenant
  • ragmcp-docs-legal → tenant legal
  • ragmcp-docs-support → tenant support
from ragmcp.streaming import KafkaStreamSource, StreamingIngestionService

# One consumer per tenant — subscribe to the tenant-specific topic
source = KafkaStreamSource(
    bootstrap_servers=["localhost:9092"],
    topic="ragmcp-docs-legal",     # tenant-specific topic
    group_id="ragmcp-consumer-legal",
)

service = StreamingIngestionService(pipeline=pipeline, source=source)
await service.run()   # streams docs into the pipeline continuously

Publish a message (for testing):

# Start Kafka
docker compose --profile kafka up -d kafka

# Publish to the tenant topic
docker exec -it demo-kafka-1 kafka-console-producer \
    --bootstrap-server localhost:9092 \
    --topic ragmcp-docs-legal

Note: the UNKNOWN_TOPIC_OR_PARTITION warning on first publish is expected — Kafka auto-creates the topic and retries. Start the consumer before publishing for real-time ingestion; messages are retained by Kafka if the consumer starts later.

Webhook (HMAC-secured)

from ragmcp.streaming import HMACWebhookHandler

handler = HMACWebhookHandler(secret="my-webhook-secret", pipeline=pipeline)
# Mount on your FastAPI app: app.include_router(handler.router)

Source Connectors

Pull documents from external systems on demand or on a schedule.

SourceClassInstallNotes
GitHub repoGitHubSourceragmcp[sources]files, issues, wikis
SQL databaseSQLDataSourceragmcp[sources]PostgreSQL, MySQL, SQLite
REST APIRESTAPISourceragmcp[sources]paginated, JSONPath
IMAP emailEmailSourcestdlib onlyGmail, any IMAP server
SlackSlackSourceragmcp[slack]channels, threads
JiraJiraSourceragmcp[jira]Cloud + Server/DC, JQL, comments
NotionNotionLoaderragmcp[notion]pages, databases
ConfluenceConfluenceLoaderragmcp[confluence]spaces, pages
KafkaKafkaStreamSourceragmcp[streaming]real-time streaming ingestion
AWS S3S3Loaderragmcp[s3]any bucket, MinIO-compatible
Google Cloud StorageGCSLoaderragmcp[gcs]any bucket, emulator-compatible
from ragmcp.sources import GitHubSource, SQLDataSource, RESTAPISource
from ragmcp.sources import EmailSource, SlackSource, JiraSource

# GitHub repository
source = GitHubSource(repo="owner/repo", token="ghp_...", path_prefix="docs/")
await pipeline.ingest_source(source)

# SQL database
source = SQLDataSource(
    url="postgresql+asyncpg://user:pass@localhost/db",
    query="SELECT title, body FROM articles",
    content_column="body",
)
await pipeline.ingest_source(source)

# IMAP email — stdlib only, no extra deps
source = EmailSource(
    host="imap.gmail.com",
    username="me@example.com",
    password="app-password",
    mailbox="INBOX",
    since_days=30,
    max_emails=500,
)
await pipeline.ingest_source(source)

# Slack — pip install mcpaisuite-ragmcp[slack]
source = SlackSource(
    token="xoxb-...",
    channels=["general", "engineering"],
    since_days=30,
    window_size=10,       # messages grouped into windows of 10
    include_threads=True,
)
await pipeline.ingest_source(source)

# Jira — pip install mcpaisuite-ragmcp[jira]
source = JiraSource(
    base_url="https://myteam.atlassian.net",
    email="me@example.com",
    api_token="ATATT...",
    jql="project = ENG AND updated >= -30d ORDER BY updated DESC",
    max_issues=1000,
    include_comments=True,
)
await pipeline.ingest_source(source)

# Notion — pip install mcpaisuite-ragmcp[notion]
from ragmcp.loaders.notion_loader import NotionLoader
from ragmcp import RAGFactory

# Use NotionLoader as the pipeline's loader, then ingest by page/database ID
pipeline = RAGFactory.create_default(loader=NotionLoader(api_key="secret_...", max_pages=50))
await pipeline.ingest("notion://8fbf270c287742f6a0671b68a4f8541d")   # page or database UUID

# Kafka streaming — pip install mcpaisuite-ragmcp[streaming]
from ragmcp.streaming import KafkaStreamSource, StreamingIngestionService

source = KafkaStreamSource(
    bootstrap_servers="localhost:9092",
    topic="ragmcp-docs",
    group_id="ragmcp-ingest",
)
service = StreamingIngestionService(pipeline=pipeline)
await service.run(source)   # streams continuously until cancelled

All sources follow the BaseDataSource async streaming interface — no data is loaded into memory all at once.


Multi-tenancy

Each tenant gets an isolated vector store, BM25 index, and pipeline instance.

tenant_id is set when the pipeline is constructed — each tenant gets a dedicated pipeline whose ingests and searches are isolated to its namespace.

from ragmcp.pipeline import RAGPipeline

acme = RAGPipeline(..., tenant_id="acme")
globex = RAGPipeline(..., tenant_id="globex")

await acme.ingest("acme_docs.pdf")
await globex.ingest("globex_docs.pdf")

results = await acme.search("refund policy")
# Only returns chunks from acme's documents

User Profiles & Personalization

Personalization works by persisting a user profile and passing user_id to search() — the pipeline fetches the profile and reranks results accordingly.

Persistent profile store

from ragmcp.graph import SQLUserProfileStore, PersonalizationStrategy
from ragmcp.core import BaseUserProfile as UserProfile

profile_store = SQLUserProfileStore("profiles.db")
await profile_store.save(UserProfile(
    user_id="alice",
    preferences={"topics": ["machine learning", "NLP"]},
    search_history=[],
))

# Personalized search: chunks matching the profile are boosted
results = await pipeline.search(
    "What is RAG?",
    top_k=5,
    user_id="alice",   # pipeline fetches the profile and reranks accordingly
)

Evaluation

Recall@k

from ragmcp.eval import EvalSample, evaluate

samples = [
    EvalSample(question="What is the refund window?", expected_source="refunds.md"),
    EvalSample(question="How to cancel?",             expected_source="cancellation.pdf"),
]
result = await evaluate(pipeline, samples)   # matches expected_source against result metadata
print(f"Recall@k    : {result.recall_at_k:.2%}")
print(f"Mean latency: {result.mean_latency_ms:.0f}ms")
print(f"p95 latency : {result.p95_latency_ms:.0f}ms")
print(f"Failed      : {result.failed_samples}/{result.total_samples}")

EvalSample(question, expected_source)evaluate checks whether expected_source appears (substring match) in the metadata source of any returned chunk. EvalResult fields: recall_at_k, mean_latency_ms, p50_latency_ms, p95_latency_ms, total_samples, failed_samples.

RAGAS (5 metrics)

from ragmcp.eval.ragas_eval import RAGASEvaluator, RAGASSample

evaluator = RAGASEvaluator(embedder=pipeline.embedder)

samples = [
    RAGASSample(
        query="What is the refund window?",
        answer="Refunds are accepted within 30 days.",
        chunks=retrieved_chunks,
        ground_truth="30 days",   # optional — enables answer_correctness metric
    )
]
result = await evaluator.evaluate(samples)
print(f"Context Relevancy : {result.context_relevancy:.2f}")
print(f"Context Precision : {result.context_precision:.2f}")  # fraction of useful chunks
print(f"Answer Relevancy  : {result.answer_relevancy:.2f}")
print(f"Faithfulness      : {result.faithfulness:.2f}")
print(f"Answer Correctness: {result.answer_correctness:.2f}")  # vs ground truth
print(f"Overall (harmonic): {result.overall:.2f}")

Agentic RAG (ReAct)

For complex multi-step questions, the ReAct agent runs an iterative Thought/Action/Observation loop — the LLM decides how many searches to perform and when it has enough context.

from ragmcp.agent import ReActRAGAgent

agent = ReActRAGAgent(
    pipeline=pipeline,
    llm_fn=my_llm_function,   # async (messages: list[dict]) -> str
    top_k=5,
    max_steps=5,
)

result = await agent.run(
    "Compare the refund policy for enterprise vs standard plans, "
    "and list any exceptions that apply after 2023"
)

print(result.final_answer)
print(f"Iterations: {result.iteration_count}")
for step in result.steps:
    print(f"  → searched: {step.action_input}")
    print(f"    thought:  {step.thought}")

Both ReActRAGAgent and SelfRAGPipeline require an llm_fn — an async callable you provide (ReActRAGAgent receives messages: list[dict], SelfRAGPipeline receives a prompt: str). There is no built-in model shorthand; wire your own LLM client.

from ragmcp.agent import ReActRAGAgent

agent = ReActRAGAgent(pipeline=pipeline, llm_fn=my_llm_fn, max_steps=5)
result = await agent.run("Compare dense and hybrid retrieval")
# result.final_answer, result.steps (list of ReActStep: thought/action/action_input/observation)

Difference vs Self-RAG: Self-RAG does 1 retrieval + optional 1 re-retrieval with a fixed critique. ReAct has no fixed structure — the LLM drives all decisions through natural language reasoning.


Self-RAG

Generate an answer, critique it, and optionally re-retrieve if the answer is insufficiently supported or incomplete.

from ragmcp.agent import SelfRAGPipeline

self_rag = SelfRAGPipeline(
    pipeline=pipeline,
    llm_fn=my_llm_function,       # async (prompt: str) -> str
    support_threshold=6.0,        # re-retrieve if support score < 6/10
    completeness_threshold=6.0,   # re-retrieve if completeness score < 6/10
    max_iterations=2,
)

result = await self_rag.run("What are the refund conditions for enterprise plans?")

print(result.final_answer)
print(f"Support      : {result.support_score:.0%}")
print(f"Completeness : {result.completeness_score:.0%}")
print(f"Iterations   : {result.iteration_count}")   # 1 = no re-retrieval needed
if result.refined_query:
    print(f"Refined query: {result.refined_query}")

The LLM is used three times per iteration: generate answer, critique support + completeness, generate refined query. Falls back gracefully if any step fails.


Observability

Prometheus metrics

pip install "mcpaisuite-ragmcp[metrics]"
import prometheus_client
from ragmcp.observability import RAGMetrics

metrics = RAGMetrics()
pipeline = RAGPipeline(..., metrics=metrics)

# Start a dedicated HTTP server that exposes /metrics for Prometheus to scrape.
# Use a port that does not conflict with Prometheus UI (9090) or Milvus (9091).
prometheus_client.start_http_server(9095)

Metrics exposed:

MetricTypeLabels
ragmcp_searches_totalCountertenant_id
ragmcp_search_duration_secondsHistogramtenant_id
ragmcp_ingestions_totalCountertenant_id, status
ragmcp_chunks_ingested_totalCountertenant_id
ragmcp_embed_duration_secondsHistogrammodel

Prometheus scrape_configs (use host.docker.internal if Prometheus runs in Docker):

scrape_configs:
  - job_name: "ragmcp"
    static_configs:
      - targets: ["host.docker.internal:9095"]   # or localhost:9095 if running natively

⚠️ Port conflicts to avoid: 9090 = Prometheus UI, 9091 = Milvus metrics.

OpenTelemetry tracing

from ragmcp.observability import RAGTracer

tracer = RAGTracer(
    service_name="my-rag-service",
    otlp_endpoint="http://localhost:4317",
)
pipeline = RAGPipeline(..., tracer=tracer)
pip install "mcpaisuite-ragmcp[tracing]"

Audit Logging

Every search and ingest operation is logged with user, tenant, query, latency, and result IDs.

from ragmcp.audit import SQLAuditLogger, FileAuditLogger

# SQLite / PostgreSQL
audit = SQLAuditLogger(db_url="sqlite:///audit.db")

# Append-only JSONL file
audit = FileAuditLogger(path="audit.jsonl")

pipeline = RAGPipeline(..., audit_logger=audit)

REST API

ragmcp ships a production-ready FastAPI server wrapping any RAGPipeline.

pip install "mcpaisuite-ragmcp[api]"

Embedding the server

import uvicorn
from ragmcp import RAGFactory
from ragmcp.api import create_app
from ragmcp.audit.sql_feedback_store import SQLFeedbackStore

pipeline = RAGFactory.create_default()
app = create_app(
    pipeline,
    api_keys={"your-secret-key"},    # None = no auth (dev only)
    feedback_store=SQLFeedbackStore(db_path="feedback.db"),  # enables POST /feedback
    chat_fn=my_llm_fn,               # enables POST /chat
)
uvicorn.run(app, host="0.0.0.0", port=8000)

Endpoints

MethodPathAuthDescription
GET/healthLiveness probe, always public
POST/searchSemantic search, returns top-k chunks
POST/streamSame as /search but streamed as NDJSON
POST/chatSearch + LLM answer (opt-in via chat_fn)
POST/ingestIngest a single file path
POST/ingest/folderIngest all files in a directory
POST/ingest/uploadUpload and ingest a file (multipart)
GET/sourcesList all indexed source IDs
DELETE/sources/{id}Remove a source from the index
POST/feedbackSubmit relevance feedback (opt-in via feedback_store)
GET/metricsPrometheus metrics (opt-in via metrics=)

Auth is X-API-Key header. Pass api_keys=None to disable (development only).

Example requests

# Search
curl -X POST http://localhost:8000/search \
  -H "X-API-Key: your-secret-key" \
  -H "Content-Type: application/json" \
  -d '{"query": "refund policy", "top_k": 5}'

# Stream results as NDJSON
curl -X POST http://localhost:8000/stream \
  -H "X-API-Key: your-secret-key" \
  -H "Content-Type: application/json" \
  -d '{"query": "billing", "top_k": 3}'

# Upload and ingest a file
curl -X POST http://localhost:8000/ingest/upload \
  -H "X-API-Key: your-secret-key" \
  -F "file=@manual.pdf"

# Chat (requires chat_fn)
curl -X POST http://localhost:8000/chat \
  -H "X-API-Key: your-secret-key" \
  -H "Content-Type: application/json" \
  -d '{"query": "What is the return window?", "top_k": 5}'

# List indexed sources
curl http://localhost:8000/sources -H "X-API-Key: your-secret-key"

# Delete a source
curl -X DELETE http://localhost:8000/sources/doc-abc123 \
  -H "X-API-Key: your-secret-key"

CLI

# Start MCP server (stdio transport)
ragmcp serve

# Start REST API server
ragmcp api --host 0.0.0.0 --port 8000

# Ingest documents (embedder/vectorstore are set in ragmcp.yaml)
ragmcp ingest ./docs

# Search
ragmcp search "What is the refund policy?" --top-k 5

# Evaluate (JSON file of questions + expected sources)
ragmcp eval --samples queries.json

pydantic-ai Integration

ragmcp ships a first-class bridge for pydantic-ai — giving any pydantic-ai Agent semantic search, source listing, and live ingestion with zero boilerplate.

Installation

pip install "mcpaisuite-ragmcp[pydantic-ai]"

Quickstart — create_rag_agent

from ragmcp import RAGFactory
from ragmcp.integrations.pydantic_ai import create_rag_agent

pipeline = RAGFactory.create_default()
await pipeline.ingest_folder("./docs")

agent = create_rag_agent(
    "openai:gpt-4o",
    pipeline,
    system_prompt="You are a support agent for Acme Corp.",
    top_k=8,
)

result = await agent.run("What are the refund conditions?")
print(result.data)

Three tools are registered automatically:

ToolDescription
search_knowledge_base(query)Semantic search — always called first
list_indexed_sources()Lists available documents
ingest_url(url)Live ingestion (opt-in via enable_ingest=True)

Manual tool registration

from pydantic_ai import Agent
from ragmcp.integrations.pydantic_ai import ragmcp_tools

agent = Agent("anthropic:claude-opus-4-6", system_prompt="You are a helpful assistant.")
for tool in ragmcp_tools(pipeline, top_k=5, enable_ingest=True):
    agent.tool_plain(tool)

result = await agent.run("Summarise the onboarding section.")

RunContext / dependency injection

from pydantic_ai import Agent, RunContext
from ragmcp.integrations.pydantic_ai import RAGMCPDeps

agent = Agent("openai:gpt-4o", deps_type=RAGMCPDeps)

@agent.tool
async def search(ctx: RunContext[RAGMCPDeps], query: str) -> str:
    return await ctx.deps.search(query)

@agent.tool
async def ingest(ctx: RunContext[RAGMCPDeps], url: str) -> str:
    return await ctx.deps.ingest(url)

result = await agent.run(
    "What changed in v2?",
    deps=RAGMCPDeps(pipeline=pipeline, top_k=10, enable_ingest=True),
)
ragmcp           → ingestion, retrieval, MCP
+ pydantic-ai    → tool calling, structured outputs, agent loop
+ langfuse       → tracing, evaluation, A/B testing in production

Langfuse Integration

ragmcp ships a transparent tracing wrapper that sends every search and ingest call to Langfuse — no code changes required on the pipeline side.

Installation

pip install "mcpaisuite-ragmcp[langfuse]"

Quickstart

import os
from ragmcp import RAGFactory
from ragmcp.integrations.langfuse import LangfuseRAGPipeline

os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."

pipeline = RAGFactory.create_default()
traced = LangfuseRAGPipeline(pipeline)  # wraps transparently

await traced.ingest("./docs/guide.pdf")
chunks = await traced.search("How does billing work?")

One-liner factory

from ragmcp.integrations.langfuse import trace_pipeline

traced = trace_pipeline(RAGFactory.create_default())

What gets traced

OperationLangfuse spanOutput fields
search(query)searchresult_count, sources
ingest(source)ingestsuccess, skipped, failed

Errors are recorded with level="ERROR" and the exception message.

Graceful degradation

If langfuse is not installed or keys are missing, the wrapper silently falls back to the unwrapped pipeline with no exception raised. Use enabled=False to skip tracing entirely in tests.

wrapped = LangfuseRAGPipeline(pipeline, enabled=False)

Flush before exit

traced.flush()  # sends any buffered events to Langfuse

Semantic Router

Route any query to the best-matching pipeline, agent, or handler based on semantic similarity — no regex, no keyword lists.

Quickstart

from ragmcp.routing import SemanticRouter, Route

router = SemanticRouter(embedder=embedder, routes=[
    Route(name="technical", examples=["how to install", "configuration", "API reference"]),
    Route(name="billing",   examples=["pricing", "invoice", "subscription"]),
])
route = await router.route("How do I configure the chunker?")
# → "technical"

Full example

from ragmcp.routing import Route, SemanticRouter
from ragmcp.embedders import FastEmbedEmbedder

router = SemanticRouter(
    embedder=FastEmbedEmbedder(),
    routes=[
        Route("billing",   ["How do I pay?", "Invoice not received", "refund"]),
        Route("technical", ["API returns 500", "SDK crash", "rate limit error"]),
        Route("general",   ["Hello", "What can you do?"]),
    ],
    threshold=0.45,   # minimum cosine similarity to declare a match
    default="general", # fallback when no route exceeds the threshold
)

await router.build()   # embed examples once (reuse across requests)

route, score = await router.route("My payment failed")
# → Route(name="billing"), 0.87

Handler dispatch

billing_pipeline = RAGFactory.create_default()
await billing_pipeline.ingest_folder("./docs/billing")
technical_pipeline = RAGFactory.create_default()
await technical_pipeline.ingest_folder("./docs/technical")

Route("billing", [...], handler=lambda q: billing_pipeline.search(q))
Route("technical", [...], handler=lambda q: technical_pipeline.search(q))

result = await router.route_and_handle("My invoice is missing")
# → calls billing handler automatically

REST API

The demo server exposes a POST /router/route endpoint:

curl -X POST http://localhost:8000/router/route \
  -H "Content-Type: application/json" \
  -d '{
    "query": "My payment failed",
    "routes": [
      {"name": "billing", "examples": ["How to pay", "refund"]},
      {"name": "technical", "examples": ["API error", "crash"]}
    ],
    "threshold": 0.45
  }'

The demo UI also includes a Router Playground page with live score visualization.


Configuration

YAML config

# ragmcp.yaml
embedder:
  type: litellm
  model: text-embedding-3-small
  api_key: "${OPENAI_API_KEY}"

vectorstore:
  type: qdrant
  url: http://localhost:6333
  collection: my_docs

retriever:
  type: hybrid
  alpha: 0.7

reranker:
  type: cross_encoder
  model: cross-encoder/ms-marco-MiniLM-L-6-v2

cache:
  type: redis
  url: redis://localhost:6379
  ttl: 86400

chunker:
  type: recursive       # recursive | sentence | late | semantic
  chunk_size: 512
  chunk_overlap: 64

# Semantic chunker — splits at topic boundaries (requires embedder)
# chunker:
#   type: semantic
#   breakpoint_threshold: 0.5   # lower = more splits
#   buffer_size: 1
#   min_chunk_size: 100

audit:
  type: sql
  db_url: sqlite:///audit.db

# Langfuse tracing (optional)
langfuse:
  enabled: true
  public_key: "${LANGFUSE_PUBLIC_KEY}"
  secret_key: "${LANGFUSE_SECRET_KEY}"
  host: "https://cloud.langfuse.com"   # or self-hosted URL
from ragmcp import RAGFactory

pipeline = RAGFactory.from_config("ragmcp.yaml")

Backends Reference

Loaders

FormatClassNotes
TXT / MDTextLoaderAlways available
PDFPDFLoaderpip install "mcpaisuite-ragmcp[pdf]"
PDF (scanned)PDFOCRLoaderpip install "mcpaisuite-ragmcp[ocr]"
DOCXDocxLoaderpip install "mcpaisuite-ragmcp[docx]"
HTML / URLHTMLLoaderpip install "mcpaisuite-ragmcp[html]"
CSVCSVLoaderAlways available
JSON / JSONLJSONLoaderAlways available
ImageImageLoaderOCR or vision LLM
AudioAudioLoaderWhisper
VideoVideoLoaderWhisper (extracts audio)
S3S3Loaderpip install "mcpaisuite-ragmcp[s3]"
GCSGCSLoaderpip install "mcpaisuite-ragmcp[gcs]"

Vector Stores

Retrievers

StrategyClassNotes
DenseDenseRetrieverPure vector similarity
BM25BM25SparseIndexKeyword-based
HybridHybridRetrieverDense + BM25 + RRF
CachedCachedRetrieverQuery-level cache wrapper
Multi-QueryMultiQueryRetrieverLLM query variants + RRF fusion
HyDEHyDERetrieverHypothetical Document Embeddings
GraphRAGGraphRAGRetrieverGraph traversal + dense

Vector Stores

StoreClassNotes
In-memoryInMemoryVectorStoreDefault, no persistence
ChromaDBChromaStoreLocal persistence
QdrantQdrantStoreLocal or cloud
MilvusMilvusStoreSelf-hosted
PostgreSQLPgVectorStorepgvector extension required

Chunkers

StrategyClassNotes
Recursive (token-based)RecursiveChunkerDefault, no deps
Sentence-awareSentenceChunkerSplits at sentence boundaries
ContextualLateChunkerEnriches chunk embeddings with neighbor context
Semantic (topic-based)SemanticChunkerCuts at topic transitions via cosine similarity

Embedders

ModelClassNotes
fastembedFastEmbedEmbedderDefault, CPU, no API key
OpenAI / Cohere / MistralLiteLLMEmbedderAny LiteLLM-supported model
OllamaOllamaEmbedderLocal GPU

Rerankers

ModelClassNotes
Cross-encoder (local)CrossEncoderRerankerNo API key
Cohere RerankCohereRerankerCohere API key required
Feedback-drivenFeedbackRerankerUses thumbs up/down signals from users

Security

Authentication

Protect the REST API with API keys:

app = create_app(pipeline, api_keys={"key-prod-xxx", "key-dev-yyy"})

All requests must include the X-API-Key header. Pass api_keys=None to disable authentication (development only).

Multi-tenancy isolation

Each tenant is isolated in its own vectorstore namespace. Data from one tenant is never accessible from another tenant — the tenant_id is enforced at every query and ingest call.

SQL Injection

PgVectorStore uses parameterized queries (%s placeholders) for all dynamic values. Filter keys are validated against a whitelist of scalar types.


Deployment

Docker

FROM python:3.11-slim
RUN pip install "mcpaisuite-ragmcp[litellm,pgvector,qdrant,api]"
COPY . /app
WORKDIR /app
CMD ["ragmcp", "api", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose (ragmcp + PostgreSQL/pgvector)

# docker-compose.yml
services:
  ragmcp:
    build: .
    ports: ["8000:8000"]
    environment:
      RAGMCP_EMBEDDER: litellm
      RAGMCP_EMBEDDER_MODEL: text-embedding-3-small
      RAGMCP_EMBEDDER_API_KEY: ${OPENAI_API_KEY}
      RAGMCP_VECTORSTORE: pgvector
      RAGMCP_VECTORSTORE_URL: postgresql://user:pass@postgres/ragmcp
  postgres:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: ragmcp

Troubleshooting

SemanticChunker in an async context

# Incorrect — raises RuntimeError inside an existing event loop
chunks = chunker.chunk(docs)

# Correct
chunks = await chunker.achunk(docs)

Dimension mismatch (Milvus / Qdrant)

If you change the embedding model, the existing vectorstore has a different dimension. Solution:

await pipeline.reindex(folder, new_embedder=new_embedder)

No results returned

  • Check that documents were ingested: sources = await pipeline.vectorstore.list_sources()
  • The tenant_id must match between ingestion and search (set on the pipeline).
  • Try a BM25SparseIndex to isolate embedding issues.

MCP Server — stdout corruption

The stdio transport uses stdout as the JSON-RPC channel. Never write to stdout in an MCP configuration. ragmcp automatically redirects logs to stderr in stdio mode.

Slow first call

The first embed() call downloads the ONNX model (~45–560 MB depending on the model). Subsequent calls use the local cache. Use RAGMCPServer with warmup=True to pre-load at startup:

server = RAGMCPServer(pipeline=pipeline, warmup=True)
server.run()

Contributing

git clone https://github.com/ragmcp/ragmcp
cd ragmcp
pip install -e ".[all]"
pip install pytest pytest-asyncio
pytest tests/

Integration tests

Integration tests live in tests/integration/ and require real or emulated backends.

  1. Copy the credentials template and fill in values:

    cp tests/integration/.env.example tests/integration/.env
    # edit tests/integration/.env
  2. Start the local emulators (Qdrant, ChromaDB, PostgreSQL/pgvector, Milvus, Redis, MinIO, fake-gcs, Neo4j):

    docker compose -f docker-compose.test.yml up -d
  3. Run the integration suite:

    pytest tests/integration/ -v

    Any credential left blank in .env causes that backend’s tests to be skipped automatically — you don’t need every service running.

PRs welcome. Please open an issue first for large changes.


LangChain Loader Bridge — 95+ Document Formats

ragmcp natively supports 15 document loaders. With the LangChain Loader Bridge, you get instant access to 80+ additional loaders — for a total of 95+ supported formats.

from langchain_community.document_loaders import (
    PyPDFLoader, NotionDBLoader, S3FileLoader,
    GitLoader, ConfluenceLoader, WikipediaLoader,
)
from ragmcp import RAGFactory
from ragmcp.loaders.langchain_bridge import LangChainLoaderBridge

rag = RAGFactory.create_default()
bridge = LangChainLoaderBridge(rag)

# Ingest from any source — LangChain handles the loading, ragmcp handles the rest
await bridge.ingest_loader(PyPDFLoader("quarterly_report.pdf"))
await bridge.ingest_loader(NotionDBLoader(token="secret_xxx"))
await bridge.ingest_loader(GitLoader(repo_path="./codebase", branch="main"))
await bridge.ingest_loader(ConfluenceLoader(url="https://company.atlassian.net"))

# Search across all ingested documents
results = await rag.search("Q3 revenue highlights")

How it works

  1. LangChain loader produces Document objects
  2. Bridge converts them to ragmcp Document format (preserving metadata)
  3. ragmcp chunker splits into chunks
  4. ragmcp embedder generates vectors
  5. Chunks stored in your configured vectorstore (Chroma, Qdrant, Milvus, PgVector)

Supported formats

CategoryFormats
DocumentsPDF, Word, Excel, PowerPoint, CSV, JSON, HTML, Markdown, EPub, Email
CloudNotion, Confluence, Google Drive, S3, GCS, Slack, Jira, GitHub
MediaYouTube transcripts, Audio (Whisper), Video (FFmpeg)
DataSQL databases, HuggingFace, Arxiv, PubMed, Wikipedia

Requires: pip install langchain-core + specific loader packages (e.g., pip install pypdf for PDF).


License

AGPL-3.0 — see LICENSE.

For commercial licensing (closed-source usage), contact the author.