planningmcp
π 17 tools
planningmcp
Planning and execution engine for AI agents with MCP support.
Part of the MCP AI suite: ragmcp (knowledge) Β· memorymcp (memory) Β· planningmcp (reasoning)
planningmcp gives AI agents the ability to decompose goals into executable plans, track execution with dependency resolution, re-plan on failure, estimate costs, and learn from past plans.
Inspired by Tree-of-Thought, ReAct, Reflexion (Shinn 2023), CoALA (Sumers 2023).
Quick Start
from planningmcp import PlanningFactory
pipeline = PlanningFactory.default()
# Decompose a goal into steps
plan = await pipeline.create_plan("migrate database to new schema")
# Estimate cost before execution
estimate = await pipeline.estimate_cost(plan.id)
print(f"Estimated: {estimate.estimated_tokens} tokens, ${estimate.estimated_cost_usd:.4f}")
# Execute the plan
result = await pipeline.execute_plan(plan.id)
print(f"Status: {result.status}, Progress: {result.progress:.0%}")
MCP Server
planningmcp serve
{
"mcpServers": {
"planningmcp": {
"command": "planningmcp",
"args": ["serve"]
}
}
}
Features
- Goal decomposition β LLM-powered or pattern-based breakdown of goals into steps
- Plan templates β 8 built-in templates (deploy, migrate, audit, analyze, setup, refactor, onboard, incident)
- Plan validation β Cycle detection, missing dependency checks, duplicate detection
- Plan optimization β Auto-parallelize steps, bottleneck detection, critical path analysis
- Dependency resolution β Topological sort with parallel execution groups
- Plan execution β Step-by-step with retries, timeouts, and progress tracking
- Dynamic re-planning β Retry, skip, or LLM-generate alternative steps on failure
- Approval gates β Human-in-the-loop checkpoints with auto-detection of dangerous operations
- Conditional branching β If step X succeeds/fails, run step Y
- Cost estimation β Token and monetary cost per step before execution
- Budget management β Per-plan and per-namespace spending limits
- Plan history β Learn from past plans to inform future ones
- Mermaid diagrams β Export plans as Mermaid flowcharts
- Event streaming β 16 event types via EventBus for real-time plan monitoring
- Webhooks β HTTP POST notifications on plan events
- OpenTelemetry tracing β Per-plan and per-step spans for observability
- Plan scheduling β Cron-based recurring plan execution
- LTP engine β Lean Task Protocol: compile goals into deterministic execution plans with @PARALLEL, ON_FAIL, FOREACH, RE-PLAN directives. LTP support is provided by the separate
ltpmcppackage (a dependency), and planningmcp re-exports its LTP types (LTPPlan,LTPCompiler,LTPRuntime, etc.). - MCP server β 17 tools for Claude Desktop, Cursor, or any MCP client
- CLI β 16 commands: serve, plan, execute, execute-step, cancel, replan, status, list, estimate, history, config, validate, optimize, diagram, templates, ltp
- Persistent stores β InMemory (default), SQLite, Redis
- Integration β Works with memorymcp (plan memory) and ragmcp (context retrieval)
Installation
pip install mcpaisuite-planningmcp
# With LLM support
pip install "mcpaisuite-planningmcp[litellm]"
# Full suite integration
pip install "mcpaisuite-planningmcp[all]"
MCP Tools (17)
| Tool | Description |
|---|---|
create_plan | Decompose a goal into an executable plan |
execute_plan | Run all steps of a plan |
execute_step | Run a single step |
get_plan | Get plan details and status |
list_plans | List plans for a namespace |
cancel_plan | Abort a running plan |
estimate_cost | Estimate token/cost before execution |
replan | Generate a new plan after failure |
plan_history | Get past plans and outcomes |
plan_config | View pipeline configuration |
validate_plan | Check plan for cycles, missing deps, and duplicates |
optimize_plan | Auto-parallelize steps and detect bottlenecks |
plan_diagram | Export plan as a Mermaid flowchart |
list_templates | List available plan templates |
compile_ltp | Compile a goal into a Lean Task Protocol (LTP) plan |
run_ltp | Compile and execute a goal as an LTP plan |
validate_ltp | Compile an LTP plan and validate it without executing (dry run) |
CLI
planningmcp serve # Start MCP server
planningmcp plan "migrate database schema" # Create a plan
planningmcp execute <plan_id> # Run a plan
planningmcp status <plan_id> # Check plan status
planningmcp list --namespace default # List plans
planningmcp estimate <plan_id> # Cost estimate
planningmcp history --namespace default # Past plans
planningmcp config # Show config
planningmcp validate <plan_id> # Validate plan structure
planningmcp optimize <plan_id> # Optimize plan execution
planningmcp diagram <plan_id> # Export Mermaid diagram
planningmcp templates # List plan templates
Plan Templates
8 built-in templates for common workflows:
| Template | Description |
|---|---|
deploy | Application deployment with rollback steps |
migrate | Data or schema migration with validation |
audit | Security or compliance audit checklist |
analyze | Data analysis pipeline |
setup | Environment or project setup |
refactor | Code refactoring with test verification |
onboard | New team member or service onboarding |
incident | Incident response and resolution |
plan = await pipeline.create_plan("deploy web app", template="deploy")
Event Streaming
16 event types via EventBus for real-time monitoring of plan execution.
The pipeline emits PlanEvent objects to a shared event_bus. Consume them with
event_bus.stream(plan_id) (an async iterator) or event_bus.subscribe(plan_id)
(an asyncio.Queue). Each PlanEvent has .type, .plan_id, .step_id,
.message, and a .data dict for extra fields.
import asyncio
from planningmcp import PlanningFactory, event_bus, EventType
pipeline = PlanningFactory.default()
plan = await pipeline.create_plan("migrate database to new schema")
async def monitor(plan_id: str):
# stream() yields events until the plan completes/fails/cancels
async for event in event_bus.stream(plan_id):
if event.type == EventType.step_completed:
print(f"Step {event.step_id} completed: {event.message}")
elif event.type == EventType.plan_failed:
print(f"Plan {event.plan_id} failed: {event.message}")
# Run the monitor and the execution concurrently
await asyncio.gather(
monitor(plan.id),
pipeline.execute_plan(plan.id),
)
Event types include: plan.started, plan.completed, plan.failed, plan.cancelled, step.started, step.completed, step.failed, step.skipped, step.retrying, approval.requested, approval.granted, approval.denied, replan.started, replan.completed, budget.warning, budget.exceeded.
Approval Gates
Human-in-the-loop checkpoints that pause execution until approved. Dangerous operations (e.g., destructive migrations, production deploys) are auto-detected.
from planningmcp.core.models import Step
# Mark specific steps as requiring approval via requires_approval flag
step = Step(name="Deploy to production", requires_approval=True)
# During execution, steps with requires_approval=True will emit
# an approval.requested event and pause until approved
result = await pipeline.execute_plan(plan.id)
Conditional Branching
Route plan execution based on step outcomes using condition and condition_step_id.
from planningmcp.core.models import Step
# Step runs only if the referenced step meets the condition
validate = Step(name="Validate schema", tool_name="validate_schema")
apply = Step(name="Apply migration", condition="success", condition_step_id=validate.id, depends_on=[validate.id])
rollback = Step(name="Rollback and alert", condition="failure", condition_step_id=validate.id, depends_on=[validate.id])
LTP compilation & validation
LTP (Lean Task Protocol) compiles a goal into a deterministic, single-LLM-call execution plan with directives like @PARALLEL, ON_FAIL, FOREACH, and RE-PLAN. The LTP types are provided by the separate ltpmcp package (a dependency); planningmcp re-exports them (LTPPlan, LTPStep, LTPCondition, LTPParser, LTPRuntime, LTPCompiler, validate_plan).
All three LTP methods require a completion_fn β pass one to PlanningFactory.create(completion_fn=...). If none is configured, the methods raise a ValueError.
from planningmcp import PlanningFactory
pipeline = PlanningFactory.create(completion_fn=my_llm_fn)
# Compile only β returns an LTPPlan (one LLM call)
plan = await pipeline.compile_ltp("scrape the docs and summarize each page")
# Dry-run validation β compiles and validates without executing
report = await pipeline.validate_ltp("scrape the docs and summarize each page")
# report -> {"valid": bool, "errors": list[str], "step_count": int,
# "steps": [{"id", "tool", "output_var"}, ...], "raw": str}
# Compile + execute end-to-end β returns the runtime result dict
result = await pipeline.run_ltp("scrape the docs and summarize each page")
| Method | Signature | Returns |
|---|---|---|
compile_ltp | compile_ltp(goal, namespace="default") | LTPPlan (validate_plan warnings logged, not raised) |
validate_ltp | validate_ltp(goal) | dict with valid, errors, step_count, steps, raw |
run_ltp | run_ltp(goal, tool_executor=None, namespace="default") | dict from LTPRuntime.execute |
These map directly to the MCP tools compile_ltp, run_ltp, and validate_ltp (see the MCP Tools table) and the planningmcp ltp CLI command.
Validation (validate_plan, re-exported from ltpmcp.analyzer.validate) returns a list[str] of errors and catches issues such as duplicate step IDs, undefined output variables, references to missing steps, and plans with no terminating/respond step. For run_ltp, you may supply a custom tool_executor β an async (tool_name, args, namespace) -> {"success": bool, "output": str} callable; otherwise the pipelineβs configured step executor is used.
Webhooks
WebhookManager sends HTTP POST notifications when plan events fire. It subscribes to the shared event_bus and forwards matching events to registered URLs. Each request sends the PlanEvent as a JSON body (event.model_dump(mode="json")).
The simplest setup is via the factory β pass webhook_urls (and optionally webhook_events) to PlanningFactory.create(). Each URL is registered and the manager is started automatically:
from planningmcp import PlanningFactory
pipeline = PlanningFactory.create(
webhook_urls=["https://example.com/hook", "https://ops.example.com/notify"],
webhook_events=["plan.completed", "plan.failed"], # default if omitted
)
You can also set the PLANNINGMCP_WEBHOOK_URLS environment variable (comma-separated) and build the pipeline with PlanningFactory.from_env():
export PLANNINGMCP_WEBHOOK_URLS="https://example.com/hook,https://ops.example.com/notify"
For full control, use WebhookManager directly. register() accepts optional headers and a secret for HMAC-SHA256 request signing (sent as an X-Signature-256: sha256=... header):
from planningmcp import WebhookManager
manager = WebhookManager()
manager.register(
"https://example.com/hook",
events=["plan.completed", "plan.failed"], # defaults to these two
headers={"Authorization": "Bearer token"},
secret="my-signing-secret", # enables HMAC-SHA256 signing
)
manager.start() # begins forwarding events from the event_bus
# ...
manager.stop() # cancels the background listener
Webhook delivery uses httpx (10s timeout). If httpx is not installed, a warning is logged and delivery is skipped. manager.registered_hooks returns the list of registered {url, events} entries; manager.unregister(url) removes one.
Plan scheduling
PlanScheduler runs recurring (or one-time) plan execution on a cron-like schedule. Construct it with a pipeline, add schedules, then start() the background loop (which checks every 30 seconds):
from datetime import datetime, timezone
from planningmcp import PlanningFactory, PlanScheduler, CronTrigger, OnceTrigger
pipeline = PlanningFactory.default()
scheduler = PlanScheduler(pipeline)
# Recurring: every 5 minutes
scheduler.add("run nightly health audit", trigger=CronTrigger("*/5 * * * *"))
# One-time: at a specific datetime
scheduler.add(
"send weekly report",
trigger=OnceTrigger(datetime(2026, 6, 2, 9, 0, tzinfo=timezone.utc)),
)
# A plain cron string also works (positional, for convenience)
scheduler.add("rotate logs", "0 0 * * *", namespace="ops")
scheduler.start() # background loop checks schedules every 30s
# ...
scheduler.stop()
Each scheduler.add(...) returns a ScheduledPlan (with an id, last_run, next_run, run_count, and enabled flag). When a schedule fires, the scheduler calls pipeline.create_plan(...) then pipeline.execute_plan(...).
| Trigger | Constructor | Meaning |
|---|---|---|
CronTrigger | CronTrigger(cron, timezone="UTC") | Recurring minute hour day month weekday cron expression |
OnceTrigger | OnceTrigger(at) | Fires once at the given datetime |
The cron matcher supports *, exact values, and */N step syntax for each of the five fields. Manage schedules with scheduler.list(), scheduler.remove(id), scheduler.enable(id), and scheduler.disable(id).
Factory
from planningmcp import PlanningFactory
# Zero config β in-memory
pipeline = PlanningFactory.default()
# From environment variables
pipeline = PlanningFactory.from_env()
# Full configuration
pipeline = PlanningFactory.create(
decomposer="llm",
completion_fn=my_llm_fn,
plan_store="sqlite",
sqlite_path="plans.db",
replan_strategy="llm",
model="gpt-4o",
max_cost_per_plan=1.0,
)
# Template-based decomposition
pipeline = PlanningFactory.create(
decomposer="template",
templates=["deploy", "migrate", "audit"],
)
Integration with memorymcp + ragmcp
from planningmcp import PlanningFactory
from memorymcp import MemoryFactory
from ragmcp import RAGFactory
planning = PlanningFactory.create(
memory_pipeline=MemoryFactory.default(),
rag_pipeline=RAGFactory.default(),
decomposer="llm",
completion_fn=my_fn,
)
plan = await planning.create_plan("migrate HubSpot data to new schema")
License
AGPL-3.0 β see LICENSE.
For commercial licensing (closed-source usage), contact the author.