Home / Docs / planningmcp
On this page

planningmcp

πŸ“‹ 17 tools

planningmcp

Planning and execution engine for AI agents with MCP support.

PyPI Python 3.11+ License: AGPL-3.0 Tests: 222

Part of the MCP AI suite: ragmcp (knowledge) Β· memorymcp (memory) Β· planningmcp (reasoning)

planningmcp gives AI agents the ability to decompose goals into executable plans, track execution with dependency resolution, re-plan on failure, estimate costs, and learn from past plans.

Inspired by Tree-of-Thought, ReAct, Reflexion (Shinn 2023), CoALA (Sumers 2023).


Quick Start

from planningmcp import PlanningFactory

pipeline = PlanningFactory.default()

# Decompose a goal into steps
plan = await pipeline.create_plan("migrate database to new schema")

# Estimate cost before execution
estimate = await pipeline.estimate_cost(plan.id)
print(f"Estimated: {estimate.estimated_tokens} tokens, ${estimate.estimated_cost_usd:.4f}")

# Execute the plan
result = await pipeline.execute_plan(plan.id)
print(f"Status: {result.status}, Progress: {result.progress:.0%}")

MCP Server

planningmcp serve
{
  "mcpServers": {
    "planningmcp": {
      "command": "planningmcp",
      "args": ["serve"]
    }
  }
}

Features

  • Goal decomposition β€” LLM-powered or pattern-based breakdown of goals into steps
  • Plan templates β€” 8 built-in templates (deploy, migrate, audit, analyze, setup, refactor, onboard, incident)
  • Plan validation β€” Cycle detection, missing dependency checks, duplicate detection
  • Plan optimization β€” Auto-parallelize steps, bottleneck detection, critical path analysis
  • Dependency resolution β€” Topological sort with parallel execution groups
  • Plan execution β€” Step-by-step with retries, timeouts, and progress tracking
  • Dynamic re-planning β€” Retry, skip, or LLM-generate alternative steps on failure
  • Approval gates β€” Human-in-the-loop checkpoints with auto-detection of dangerous operations
  • Conditional branching β€” If step X succeeds/fails, run step Y
  • Cost estimation β€” Token and monetary cost per step before execution
  • Budget management β€” Per-plan and per-namespace spending limits
  • Plan history β€” Learn from past plans to inform future ones
  • Mermaid diagrams β€” Export plans as Mermaid flowcharts
  • Event streaming β€” 16 event types via EventBus for real-time plan monitoring
  • Webhooks β€” HTTP POST notifications on plan events
  • OpenTelemetry tracing β€” Per-plan and per-step spans for observability
  • Plan scheduling β€” Cron-based recurring plan execution
  • LTP engine β€” Lean Task Protocol: compile goals into deterministic execution plans with @PARALLEL, ON_FAIL, FOREACH, RE-PLAN directives. LTP support is provided by the separate ltpmcp package (a dependency), and planningmcp re-exports its LTP types (LTPPlan, LTPCompiler, LTPRuntime, etc.).
  • MCP server β€” 17 tools for Claude Desktop, Cursor, or any MCP client
  • CLI β€” 16 commands: serve, plan, execute, execute-step, cancel, replan, status, list, estimate, history, config, validate, optimize, diagram, templates, ltp
  • Persistent stores β€” InMemory (default), SQLite, Redis
  • Integration β€” Works with memorymcp (plan memory) and ragmcp (context retrieval)

Installation

pip install mcpaisuite-planningmcp

# With LLM support
pip install "mcpaisuite-planningmcp[litellm]"

# Full suite integration
pip install "mcpaisuite-planningmcp[all]"

MCP Tools (17)

ToolDescription
create_planDecompose a goal into an executable plan
execute_planRun all steps of a plan
execute_stepRun a single step
get_planGet plan details and status
list_plansList plans for a namespace
cancel_planAbort a running plan
estimate_costEstimate token/cost before execution
replanGenerate a new plan after failure
plan_historyGet past plans and outcomes
plan_configView pipeline configuration
validate_planCheck plan for cycles, missing deps, and duplicates
optimize_planAuto-parallelize steps and detect bottlenecks
plan_diagramExport plan as a Mermaid flowchart
list_templatesList available plan templates
compile_ltpCompile a goal into a Lean Task Protocol (LTP) plan
run_ltpCompile and execute a goal as an LTP plan
validate_ltpCompile an LTP plan and validate it without executing (dry run)

CLI

planningmcp serve                           # Start MCP server
planningmcp plan "migrate database schema"  # Create a plan
planningmcp execute <plan_id>               # Run a plan
planningmcp status <plan_id>                # Check plan status
planningmcp list --namespace default        # List plans
planningmcp estimate <plan_id>              # Cost estimate
planningmcp history --namespace default     # Past plans
planningmcp config                          # Show config
planningmcp validate <plan_id>              # Validate plan structure
planningmcp optimize <plan_id>              # Optimize plan execution
planningmcp diagram <plan_id>               # Export Mermaid diagram
planningmcp templates                       # List plan templates

Plan Templates

8 built-in templates for common workflows:

TemplateDescription
deployApplication deployment with rollback steps
migrateData or schema migration with validation
auditSecurity or compliance audit checklist
analyzeData analysis pipeline
setupEnvironment or project setup
refactorCode refactoring with test verification
onboardNew team member or service onboarding
incidentIncident response and resolution
plan = await pipeline.create_plan("deploy web app", template="deploy")

Event Streaming

16 event types via EventBus for real-time monitoring of plan execution.

The pipeline emits PlanEvent objects to a shared event_bus. Consume them with event_bus.stream(plan_id) (an async iterator) or event_bus.subscribe(plan_id) (an asyncio.Queue). Each PlanEvent has .type, .plan_id, .step_id, .message, and a .data dict for extra fields.

import asyncio
from planningmcp import PlanningFactory, event_bus, EventType

pipeline = PlanningFactory.default()
plan = await pipeline.create_plan("migrate database to new schema")

async def monitor(plan_id: str):
    # stream() yields events until the plan completes/fails/cancels
    async for event in event_bus.stream(plan_id):
        if event.type == EventType.step_completed:
            print(f"Step {event.step_id} completed: {event.message}")
        elif event.type == EventType.plan_failed:
            print(f"Plan {event.plan_id} failed: {event.message}")

# Run the monitor and the execution concurrently
await asyncio.gather(
    monitor(plan.id),
    pipeline.execute_plan(plan.id),
)

Event types include: plan.started, plan.completed, plan.failed, plan.cancelled, step.started, step.completed, step.failed, step.skipped, step.retrying, approval.requested, approval.granted, approval.denied, replan.started, replan.completed, budget.warning, budget.exceeded.


Approval Gates

Human-in-the-loop checkpoints that pause execution until approved. Dangerous operations (e.g., destructive migrations, production deploys) are auto-detected.

from planningmcp.core.models import Step

# Mark specific steps as requiring approval via requires_approval flag
step = Step(name="Deploy to production", requires_approval=True)

# During execution, steps with requires_approval=True will emit
# an approval.requested event and pause until approved
result = await pipeline.execute_plan(plan.id)

Conditional Branching

Route plan execution based on step outcomes using condition and condition_step_id.

from planningmcp.core.models import Step

# Step runs only if the referenced step meets the condition
validate = Step(name="Validate schema", tool_name="validate_schema")
apply = Step(name="Apply migration", condition="success", condition_step_id=validate.id, depends_on=[validate.id])
rollback = Step(name="Rollback and alert", condition="failure", condition_step_id=validate.id, depends_on=[validate.id])

LTP compilation & validation

LTP (Lean Task Protocol) compiles a goal into a deterministic, single-LLM-call execution plan with directives like @PARALLEL, ON_FAIL, FOREACH, and RE-PLAN. The LTP types are provided by the separate ltpmcp package (a dependency); planningmcp re-exports them (LTPPlan, LTPStep, LTPCondition, LTPParser, LTPRuntime, LTPCompiler, validate_plan).

All three LTP methods require a completion_fn β€” pass one to PlanningFactory.create(completion_fn=...). If none is configured, the methods raise a ValueError.

from planningmcp import PlanningFactory

pipeline = PlanningFactory.create(completion_fn=my_llm_fn)

# Compile only β€” returns an LTPPlan (one LLM call)
plan = await pipeline.compile_ltp("scrape the docs and summarize each page")

# Dry-run validation β€” compiles and validates without executing
report = await pipeline.validate_ltp("scrape the docs and summarize each page")
# report -> {"valid": bool, "errors": list[str], "step_count": int,
#            "steps": [{"id", "tool", "output_var"}, ...], "raw": str}

# Compile + execute end-to-end β€” returns the runtime result dict
result = await pipeline.run_ltp("scrape the docs and summarize each page")
MethodSignatureReturns
compile_ltpcompile_ltp(goal, namespace="default")LTPPlan (validate_plan warnings logged, not raised)
validate_ltpvalidate_ltp(goal)dict with valid, errors, step_count, steps, raw
run_ltprun_ltp(goal, tool_executor=None, namespace="default")dict from LTPRuntime.execute

These map directly to the MCP tools compile_ltp, run_ltp, and validate_ltp (see the MCP Tools table) and the planningmcp ltp CLI command.

Validation (validate_plan, re-exported from ltpmcp.analyzer.validate) returns a list[str] of errors and catches issues such as duplicate step IDs, undefined output variables, references to missing steps, and plans with no terminating/respond step. For run_ltp, you may supply a custom tool_executor β€” an async (tool_name, args, namespace) -> {"success": bool, "output": str} callable; otherwise the pipeline’s configured step executor is used.


Webhooks

WebhookManager sends HTTP POST notifications when plan events fire. It subscribes to the shared event_bus and forwards matching events to registered URLs. Each request sends the PlanEvent as a JSON body (event.model_dump(mode="json")).

The simplest setup is via the factory β€” pass webhook_urls (and optionally webhook_events) to PlanningFactory.create(). Each URL is registered and the manager is started automatically:

from planningmcp import PlanningFactory

pipeline = PlanningFactory.create(
    webhook_urls=["https://example.com/hook", "https://ops.example.com/notify"],
    webhook_events=["plan.completed", "plan.failed"],  # default if omitted
)

You can also set the PLANNINGMCP_WEBHOOK_URLS environment variable (comma-separated) and build the pipeline with PlanningFactory.from_env():

export PLANNINGMCP_WEBHOOK_URLS="https://example.com/hook,https://ops.example.com/notify"

For full control, use WebhookManager directly. register() accepts optional headers and a secret for HMAC-SHA256 request signing (sent as an X-Signature-256: sha256=... header):

from planningmcp import WebhookManager

manager = WebhookManager()
manager.register(
    "https://example.com/hook",
    events=["plan.completed", "plan.failed"],  # defaults to these two
    headers={"Authorization": "Bearer token"},
    secret="my-signing-secret",               # enables HMAC-SHA256 signing
)
manager.start()   # begins forwarding events from the event_bus
# ...
manager.stop()    # cancels the background listener

Webhook delivery uses httpx (10s timeout). If httpx is not installed, a warning is logged and delivery is skipped. manager.registered_hooks returns the list of registered {url, events} entries; manager.unregister(url) removes one.


Plan scheduling

PlanScheduler runs recurring (or one-time) plan execution on a cron-like schedule. Construct it with a pipeline, add schedules, then start() the background loop (which checks every 30 seconds):

from datetime import datetime, timezone
from planningmcp import PlanningFactory, PlanScheduler, CronTrigger, OnceTrigger

pipeline = PlanningFactory.default()
scheduler = PlanScheduler(pipeline)

# Recurring: every 5 minutes
scheduler.add("run nightly health audit", trigger=CronTrigger("*/5 * * * *"))

# One-time: at a specific datetime
scheduler.add(
    "send weekly report",
    trigger=OnceTrigger(datetime(2026, 6, 2, 9, 0, tzinfo=timezone.utc)),
)

# A plain cron string also works (positional, for convenience)
scheduler.add("rotate logs", "0 0 * * *", namespace="ops")

scheduler.start()   # background loop checks schedules every 30s
# ...
scheduler.stop()

Each scheduler.add(...) returns a ScheduledPlan (with an id, last_run, next_run, run_count, and enabled flag). When a schedule fires, the scheduler calls pipeline.create_plan(...) then pipeline.execute_plan(...).

TriggerConstructorMeaning
CronTriggerCronTrigger(cron, timezone="UTC")Recurring minute hour day month weekday cron expression
OnceTriggerOnceTrigger(at)Fires once at the given datetime

The cron matcher supports *, exact values, and */N step syntax for each of the five fields. Manage schedules with scheduler.list(), scheduler.remove(id), scheduler.enable(id), and scheduler.disable(id).


Factory

from planningmcp import PlanningFactory

# Zero config β€” in-memory
pipeline = PlanningFactory.default()

# From environment variables
pipeline = PlanningFactory.from_env()

# Full configuration
pipeline = PlanningFactory.create(
    decomposer="llm",
    completion_fn=my_llm_fn,
    plan_store="sqlite",
    sqlite_path="plans.db",
    replan_strategy="llm",
    model="gpt-4o",
    max_cost_per_plan=1.0,
)

# Template-based decomposition
pipeline = PlanningFactory.create(
    decomposer="template",
    templates=["deploy", "migrate", "audit"],
)

Integration with memorymcp + ragmcp

from planningmcp import PlanningFactory
from memorymcp import MemoryFactory
from ragmcp import RAGFactory

planning = PlanningFactory.create(
    memory_pipeline=MemoryFactory.default(),
    rag_pipeline=RAGFactory.default(),
    decomposer="llm",
    completion_fn=my_fn,
)

plan = await planning.create_plan("migrate HubSpot data to new schema")

License

AGPL-3.0 β€” see LICENSE.

For commercial licensing (closed-source usage), contact the author.