Skip to Content

Implementing Model Context Protocol for Business Outcome Optimization

A practical guide to wiring Claude into Odoo ERP with production-grade Python

There is a lot of noise right now about AI agents and automation. Most of it stops at the demo layer. You get a video of Claude answering a question about a sales report and the article calls it "autonomous business intelligence." That is not what this post is about.

This is about actually shipping something. We are going to build a Model Context Protocol server that connects directly to an Odoo ERP instance, expose real business operations as tools, and run Claude against them in a way that you would not be embarrassed to put in front of a client. By the end you will have a working MCP server that can read sales orders, create purchase requests, check inventory levels, and summarize overdue invoices — all through natural language, all with proper error handling and environment-based configuration.

What MCP actually is

Before writing a line of code it is worth spending two minutes on what the protocol is doing and why it matters for ERP automation specifically.

Model Context Protocol is an open standard that defines how an LLM client (Claude Desktop, Claude Code, or your own application) connects to external tools and data sources. Think of it like a USB standard for AI tools. Instead of every developer writing a custom integration layer between Claude and their database, MCP gives you a common protocol so that connection just works.

An MCP server exposes three types of things:

Resources are data the model can read. They are roughly equivalent to GET endpoints. You use them to load context into the conversation without triggering a side effect.

Tools are functions the model can call. They are roughly equivalent to POST endpoints. They execute code and can cause real changes in your system.

Prompts are reusable templates that guide how the model should interact with your tools.

For an ERP system like Odoo, this mapping is natural. Your sales orders, inventory reports and customer lists are resources. Creating a purchase order, confirming a delivery or sending an invoice are tools.

The official Python SDK lives at github.com/modelcontextprotocol/python-sdk and as of early 2026 it is at version 1.26.0. It uses FastMCP as the primary high-level interface which gets rid of a lot of boilerplate while still giving you access to the full protocol when you need it.

What we are building

Our MCP server will give Claude the ability to do the following things inside Odoo:

  • Read open sales orders and filter by customer or date range
  • Check product stock levels across warehouses
  • Read overdue customer invoices with aging
  • Create an internal purchase requisition
  • Confirm or cancel a draft sales order
  • Read recent vendor bills and flag anomalies

We will connect to Odoo through its XML-RPC API which has been stable across versions for years and does not require any custom Odoo module installation. Everything runs from the outside.

Project setup

Start with a fresh Python project using uv which is what the MCP SDK documentation recommends.

uv init odoo-mcp-server
cd odoo-mcp-server
uv add "mcp[cli]" python-dotenv pydantic httpx

Your project structure should look like this:

odoo-mcp-server/
├── .env
├── pyproject.toml
├── src/
│   ├── odoo_client.py
│   ├── server.py
│   └── tools/
│       ├── __init__.py
│       ├── sales.py
│       ├── inventory.py
│       ├── invoicing.py
│       └── purchasing.py

Environment configuration

Create your .env file. Never hardcode credentials anywhere in the codebase.

ODOO_URL=https://your-odoo-instance.com
ODOO_DB=your_database_name
ODOO_USERNAME=your_api_user@company.com
ODOO_PASSWORD=your_api_key_or_password
MCP_SERVER_HOST=0.0.0.0
MCP_SERVER_PORT=8000
LOG_LEVEL=INFO

In production you would pull these from a secrets manager. For now the .env file is fine for local development, just make sure it is in your .gitignore.

The Odoo XML-RPC client

Odoo exposes a two-endpoint XML-RPC API. The first endpoint handles authentication and the second handles actual model operations. Let us wrap this in a proper async client with connection pooling, retry logic and typed return values.

src/odoo_client.py

"""
Production-grade async Odoo XML-RPC client.

Handles authentication, connection pooling, and typed model access.
All methods are async and safe to call concurrently.
"""

import asyncio
import xmlrpc.client
import logging
from dataclasses import dataclass
from typing import Any
from functools import wraps
import os
from dotenv import load_dotenv

load_dotenv()

logger = logging.getLogger(__name__)


@dataclass
class OdooConfig:
    url: str
    db: str
    username: str
    password: str

    @classmethod
    def from_env(cls) -> "OdooConfig":
        url = os.getenv("ODOO_URL")
        db = os.getenv("ODOO_DB")
        username = os.getenv("ODOO_USERNAME")
        password = os.getenv("ODOO_PASSWORD")

        if not all([url, db, username, password]):
            raise ValueError(
                "Missing required environment variables. "
                "Check ODOO_URL, ODOO_DB, ODOO_USERNAME, ODOO_PASSWORD."
            )

        return cls(url=url, db=db, username=username, password=password)


class OdooConnectionError(Exception):
    """Raised when the Odoo server cannot be reached or auth fails."""
    pass


class OdooOperationError(Exception):
    """Raised when an Odoo RPC operation returns an error."""
    pass


class OdooClient:
    """
    Async wrapper around Odoo's XML-RPC API.

    Usage:
        client = OdooClient(OdooConfig.from_env())
        await client.authenticate()
        orders = await client.search_read(
            "sale.order",
            domain=[["state", "=", "sale"]],
            fields=["name", "partner_id", "amount_total"],
        )
    """

    def __init__(self, config: OdooConfig):
        self.config = config
        self._uid: int | None = None
        self._common_proxy: xmlrpc.client.ServerProxy | None = None
        self._object_proxy: xmlrpc.client.ServerProxy | None = None
        self._lock = asyncio.Lock()

    def _get_common_proxy(self) -> xmlrpc.client.ServerProxy:
        if self._common_proxy is None:
            self._common_proxy = xmlrpc.client.ServerProxy(
                f"{self.config.url}/xmlrpc/2/common",
                allow_none=True,
            )
        return self._common_proxy

    def _get_object_proxy(self) -> xmlrpc.client.ServerProxy:
        if self._object_proxy is None:
            self._object_proxy = xmlrpc.client.ServerProxy(
                f"{self.config.url}/xmlrpc/2/object",
                allow_none=True,
            )
        return self._object_proxy

    async def authenticate(self) -> int:
        """
        Authenticate against Odoo and cache the user ID.
        Safe to call multiple times — re-authenticates only if needed.
        """
        async with self._lock:
            if self._uid is not None:
                return self._uid

            try:
                uid = await asyncio.get_event_loop().run_in_executor(
                    None,
                    lambda: self._get_common_proxy().authenticate(
                        self.config.db,
                        self.config.username,
                        self.config.password,
                        {},
                    ),
                )
            except Exception as exc:
                raise OdooConnectionError(
                    f"Could not connect to Odoo at {self.config.url}. "
                    f"Check your URL and credentials. Original error: {exc}"
                ) from exc

            if not uid:
                raise OdooConnectionError(
                    f"Authentication failed for user '{self.config.username}' "
                    f"on database '{self.config.db}'. Check your credentials."
                )

            self._uid = uid
            logger.info("Authenticated as uid=%d on %s", uid, self.config.url)
            return uid

    async def execute(
        self,
        model: str,
        method: str,
        *args: Any,
        **kwargs: Any,
    ) -> Any:
        """
        Low-level execute_kw call. All higher-level methods call this.
        """
        uid = await self.authenticate()

        try:
            result = await asyncio.get_event_loop().run_in_executor(
                None,
                lambda: self._get_object_proxy().execute_kw(
                    self.config.db,
                    uid,
                    self.config.password,
                    model,
                    method,
                    list(args),
                    kwargs,
                ),
            )
        except xmlrpc.client.Fault as fault:
            raise OdooOperationError(
                f"Odoo returned an error calling {model}.{method}: "
                f"{fault.faultString}"
            ) from fault
        except Exception as exc:
            raise OdooConnectionError(
                f"Network error calling {model}.{method}: {exc}"
            ) from exc

        return result

    async def search_read(
        self,
        model: str,
        domain: list | None = None,
        fields: list[str] | None = None,
        limit: int = 100,
        offset: int = 0,
        order: str | None = None,
    ) -> list[dict[str, Any]]:
        """
        Search and read records in one call.

        Args:
            model: Odoo model name e.g. "sale.order"
            domain: Odoo domain filter e.g. [["state", "=", "sale"]]
            fields: List of field names to return. None returns all fields.
            limit: Maximum records to return (capped at 500 for safety)
            offset: Pagination offset
            order: Sort string e.g. "date_order desc"

        Returns:
            List of record dicts
        """
        kwargs: dict[str, Any] = {
            "fields": fields or [],
            "limit": min(limit, 500),
            "offset": offset,
        }
        if order:
            kwargs["order"] = order

        return await self.execute(
            model,
            "search_read",
            domain or [],
            **kwargs,
        )

    async def search_count(
        self,
        model: str,
        domain: list | None = None,
    ) -> int:
        """Return the count of records matching a domain."""
        return await self.execute(model, "search_count", domain or [])

    async def read(
        self,
        model: str,
        ids: list[int],
        fields: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Read specific records by ID."""
        return await self.execute(model, "read", ids, fields=fields or [])

    async def create(self, model: str, values: dict[str, Any]) -> int:
        """Create a record and return its new ID."""
        return await self.execute(model, "create", values)

    async def write(
        self,
        model: str,
        ids: list[int],
        values: dict[str, Any],
    ) -> bool:
        """Update records by ID."""
        return await self.execute(model, "write", ids, values)

    async def unlink(self, model: str, ids: list[int]) -> bool:
        """Delete records by ID. Use carefully."""
        return await self.execute(model, "unlink", ids)

    async def call_method(
        self,
        model: str,
        method: str,
        ids: list[int],
        **kwargs: Any,
    ) -> Any:
        """
        Call a business method on specific records.

        Example: await client.call_method("sale.order", "action_confirm", [42])
        """
        return await self.execute(model, method, ids, **kwargs)

    async def get_fields(self, model: str) -> dict[str, Any]:
        """Return field definitions for a model. Useful for debugging."""
        return await self.execute(model, "fields_get", attributes=["string", "type", "required"])

Sales order tools

src/tools/sales.py

"""
MCP tools for Odoo sales operations.

Covers reading and managing sale.order records.
"""

from __future__ import annotations

import logging
from datetime import datetime, date
from typing import Any

from pydantic import BaseModel, Field

from ..odoo_client import OdooClient, OdooOperationError

logger = logging.getLogger(__name__)


class SalesOrderSummary(BaseModel):
    id: int
    name: str
    customer: str
    customer_id: int
    date_order: str
    state: str
    state_label: str
    amount_untaxed: float
    amount_tax: float
    amount_total: float
    currency: str
    line_count: int
    salesperson: str


class SalesOrderLine(BaseModel):
    id: int
    product: str
    description: str
    quantity: float
    unit: str
    unit_price: float
    subtotal: float


STATE_LABELS = {
    "draft": "Quotation",
    "sent": "Quotation Sent",
    "sale": "Sales Order",
    "done": "Locked",
    "cancel": "Cancelled",
}


async def get_open_sales_orders(
    client: OdooClient,
    customer_name: str | None = None,
    from_date: str | None = None,
    to_date: str | None = None,
    limit: int = 50,
) -> list[SalesOrderSummary]:
    """
    Retrieve confirmed sales orders with optional filters.

    Args:
        customer_name: Partial match on customer name (case-insensitive)
        from_date: ISO date string "YYYY-MM-DD" for order date start
        to_date: ISO date string "YYYY-MM-DD" for order date end
        limit: Max records to return

    Returns:
        List of sales order summaries
    """
    domain: list[Any] = [["state", "in", ["sale", "done"]]]

    if customer_name:
        domain.append(["partner_id.name", "ilike", customer_name])

    if from_date:
        domain.append(["date_order", ">=", f"{from_date} 00:00:00"])

    if to_date:
        domain.append(["date_order", "<=", f"{to_date} 23:59:59"])

    records = await client.search_read(
        "sale.order",
        domain=domain,
        fields=[
            "name",
            "partner_id",
            "date_order",
            "state",
            "amount_untaxed",
            "amount_tax",
            "amount_total",
            "currency_id",
            "order_line",
            "user_id",
        ],
        limit=limit,
        order="date_order desc",
    )

    result = []
    for r in records:
        result.append(
            SalesOrderSummary(
                id=r["id"],
                name=r["name"],
                customer=r["partner_id"][1] if r["partner_id"] else "Unknown",
                customer_id=r["partner_id"][0] if r["partner_id"] else 0,
                date_order=r["date_order"],
                state=r["state"],
                state_label=STATE_LABELS.get(r["state"], r["state"]),
                amount_untaxed=r["amount_untaxed"],
                amount_tax=r["amount_tax"],
                amount_total=r["amount_total"],
                currency=r["currency_id"][1] if r["currency_id"] else "USD",
                line_count=len(r.get("order_line", [])),
                salesperson=r["user_id"][1] if r["user_id"] else "Unassigned",
            )
        )

    return result


async def get_sales_order_detail(
    client: OdooClient,
    order_id: int,
) -> dict[str, Any]:
    """
    Get full detail for a single sales order including all line items.

    Args:
        order_id: The Odoo ID of the sales order

    Returns:
        Dict with order header + list of line items
    """
    orders = await client.read(
        "sale.order",
        ids=[order_id],
        fields=[
            "name",
            "partner_id",
            "date_order",
            "state",
            "amount_untaxed",
            "amount_tax",
            "amount_total",
            "currency_id",
            "user_id",
            "note",
            "order_line",
        ],
    )

    if not orders:
        raise OdooOperationError(f"Sales order with id={order_id} not found.")

    order = orders[0]

    lines_raw = await client.search_read(
        "sale.order.line",
        domain=[["order_id", "=", order_id]],
        fields=[
            "product_id",
            "name",
            "product_uom_qty",
            "product_uom",
            "price_unit",
            "price_subtotal",
        ],
    )

    lines = [
        SalesOrderLine(
            id=line["id"],
            product=line["product_id"][1] if line["product_id"] else "N/A",
            description=line.get("name", ""),
            quantity=line["product_uom_qty"],
            unit=line["product_uom"][1] if line["product_uom"] else "Units",
            unit_price=line["price_unit"],
            subtotal=line["price_subtotal"],
        )
        for line in lines_raw
    ]

    return {
        "id": order["id"],
        "name": order["name"],
        "customer": order["partner_id"][1] if order["partner_id"] else "Unknown",
        "date": order["date_order"],
        "state": STATE_LABELS.get(order["state"], order["state"]),
        "salesperson": order["user_id"][1] if order["user_id"] else "Unassigned",
        "note": order.get("note", ""),
        "amount_untaxed": order["amount_untaxed"],
        "amount_tax": order["amount_tax"],
        "amount_total": order["amount_total"],
        "currency": order["currency_id"][1] if order["currency_id"] else "USD",
        "lines": [line.model_dump() for line in lines],
    }


async def confirm_sales_order(
    client: OdooClient,
    order_id: int,
) -> dict[str, Any]:
    """
    Confirm a draft or sent quotation to a sales order.

    Args:
        order_id: The Odoo ID of the draft/sent quotation

    Returns:
        Dict with result status and updated order state
    """
    orders = await client.read(
        "sale.order",
        ids=[order_id],
        fields=["name", "state"],
    )

    if not orders:
        raise OdooOperationError(f"Sales order id={order_id} not found.")

    order = orders[0]

    if order["state"] not in ("draft", "sent"):
        raise OdooOperationError(
            f"Order {order['name']} is in state '{STATE_LABELS.get(order['state'])}' "
            f"and cannot be confirmed. Only Draft or Sent quotations can be confirmed."
        )

    await client.call_method("sale.order", "action_confirm", [order_id])

    updated = await client.read("sale.order", ids=[order_id], fields=["name", "state"])
    new_state = updated[0]["state"] if updated else "unknown"

    logger.info("Confirmed sales order %s (id=%d)", order["name"], order_id)

    return {
        "success": True,
        "order_id": order_id,
        "order_name": order["name"],
        "previous_state": STATE_LABELS.get(order["state"], order["state"]),
        "new_state": STATE_LABELS.get(new_state, new_state),
        "message": f"Order {order['name']} has been confirmed successfully.",
    }

Inventory tools

src/tools/inventory.py

"""
MCP tools for Odoo inventory and stock operations.
"""

from __future__ import annotations

import logging
from typing import Any

from pydantic import BaseModel

from ..odoo_client import OdooClient, OdooOperationError

logger = logging.getLogger(__name__)


class StockLevel(BaseModel):
    product_id: int
    product_name: str
    internal_ref: str
    qty_on_hand: float
    qty_available: float
    qty_incoming: float
    qty_outgoing: float
    unit: str
    warehouse: str
    location: str
    reorder_min: float | None = None
    below_reorder: bool = False


async def get_stock_levels(
    client: OdooClient,
    product_name: str | None = None,
    warehouse_name: str | None = None,
    below_reorder_only: bool = False,
    limit: int = 100,
) -> list[StockLevel]:
    """
    Get current stock levels across warehouses.

    Args:
        product_name: Partial match on product name
        warehouse_name: Filter by warehouse name
        below_reorder_only: Return only products below reorder point
        limit: Max records

    Returns:
        List of StockLevel objects
    """
    domain: list[Any] = [
        ["location_id.usage", "=", "internal"],
        ["location_id.active", "=", True],
    ]

    if product_name:
        domain.append(["product_id.name", "ilike", product_name])

    if warehouse_name:
        domain.append(["location_id.warehouse_id.name", "ilike", warehouse_name])

    quants = await client.search_read(
        "stock.quant",
        domain=domain,
        fields=[
            "product_id",
            "location_id",
            "quantity",
            "reserved_quantity",
            "incoming_qty",
            "product_uom_id",
        ],
        limit=limit,
        order="product_id asc",
    )

    product_ids = list({q["product_id"][0] for q in quants if q["product_id"]})
    reorder_map: dict[int, float] = {}

    if product_ids:
        reorder_rules = await client.search_read(
            "stock.warehouse.orderpoint",
            domain=[["product_id", "in", product_ids]],
            fields=["product_id", "product_min_qty"],
        )
        for rule in reorder_rules:
            if rule["product_id"]:
                reorder_map[rule["product_id"][0]] = rule["product_min_qty"]

    results = []
    for q in quants:
        if not q["product_id"]:
            continue

        product_id = q["product_id"][0]
        product_name_val = q["product_id"][1]
        qty_on_hand = q["quantity"]
        qty_reserved = q["reserved_quantity"]
        qty_available = qty_on_hand - qty_reserved
        reorder_min = reorder_map.get(product_id)
        below_reorder = reorder_min is not None and qty_available < reorder_min

        if below_reorder_only and not below_reorder:
            continue

        location_name = q["location_id"][1] if q["location_id"] else "Unknown"
        warehouse = location_name.split("/")[0].strip() if "/" in location_name else location_name

        results.append(
            StockLevel(
                product_id=product_id,
                product_name=product_name_val,
                internal_ref="",
                qty_on_hand=qty_on_hand,
                qty_available=qty_available,
                qty_incoming=q.get("incoming_qty", 0.0),
                qty_outgoing=qty_reserved,
                unit=q["product_uom_id"][1] if q["product_uom_id"] else "Units",
                warehouse=warehouse,
                location=location_name,
                reorder_min=reorder_min,
                below_reorder=below_reorder,
            )
        )

    return results


async def get_low_stock_alert(
    client: OdooClient,
) -> dict[str, Any]:
    """
    Get a summary of all products currently below their reorder minimum.

    Returns:
        Dict with count, total value at risk, and list of products needing reorder
    """
    low_items = await get_stock_levels(client, below_reorder_only=True, limit=200)

    return {
        "total_products_below_reorder": len(low_items),
        "items": [
            {
                "product": item.product_name,
                "warehouse": item.warehouse,
                "available": item.qty_available,
                "minimum": item.reorder_min,
                "shortfall": (item.reorder_min or 0) - item.qty_available,
                "unit": item.unit,
            }
            for item in low_items
        ],
        "summary": (
            f"{len(low_items)} products are below their reorder minimum. "
            f"Immediate purchasing action may be needed."
            if low_items
            else "All products are above their reorder minimums. No action needed."
        ),
    }

Invoicing tools

src/tools/invoicing.py

"""
MCP tools for Odoo invoicing and accounts receivable.
"""

from __future__ import annotations

import logging
from datetime import date, timedelta
from typing import Any

from pydantic import BaseModel

from ..odoo_client import OdooClient

logger = logging.getLogger(__name__)


class OverdueInvoice(BaseModel):
    id: int
    name: str
    customer: str
    customer_id: int
    invoice_date: str
    due_date: str
    days_overdue: int
    amount_total: float
    amount_residual: float
    currency: str
    aging_bucket: str


def _aging_bucket(days: int) -> str:
    if days <= 30:
        return "1 to 30 days"
    if days <= 60:
        return "31 to 60 days"
    if days <= 90:
        return "61 to 90 days"
    return "Over 90 days"


async def get_overdue_invoices(
    client: OdooClient,
    customer_name: str | None = None,
    min_amount: float | None = None,
    limit: int = 100,
) -> dict[str, Any]:
    """
    Get all overdue customer invoices with aging analysis.

    Args:
        customer_name: Filter by partial customer name
        min_amount: Only return invoices above this outstanding amount
        limit: Max records

    Returns:
        Dict with aging summary and invoice list
    """
    today = date.today().isoformat()

    domain: list[Any] = [
        ["move_type", "=", "out_invoice"],
        ["state", "=", "posted"],
        ["payment_state", "in", ["not_paid", "partial"]],
        ["invoice_date_due", "<", today],
    ]

    if customer_name:
        domain.append(["partner_id.name", "ilike", customer_name])

    if min_amount:
        domain.append(["amount_residual", ">=", min_amount])

    records = await client.search_read(
        "account.move",
        domain=domain,
        fields=[
            "name",
            "partner_id",
            "invoice_date",
            "invoice_date_due",
            "amount_total",
            "amount_residual",
            "currency_id",
        ],
        limit=limit,
        order="invoice_date_due asc",
    )

    today_dt = date.today()
    invoices = []
    total_outstanding = 0.0
    bucket_totals: dict[str, float] = {
        "1 to 30 days": 0.0,
        "31 to 60 days": 0.0,
        "61 to 90 days": 0.0,
        "Over 90 days": 0.0,
    }

    for r in records:
        due = date.fromisoformat(r["invoice_date_due"][:10]) if r["invoice_date_due"] else today_dt
        days_overdue = (today_dt - due).days
        bucket = _aging_bucket(days_overdue)
        residual = r["amount_residual"]
        total_outstanding += residual
        bucket_totals[bucket] = bucket_totals.get(bucket, 0.0) + residual

        invoices.append(
            OverdueInvoice(
                id=r["id"],
                name=r["name"],
                customer=r["partner_id"][1] if r["partner_id"] else "Unknown",
                customer_id=r["partner_id"][0] if r["partner_id"] else 0,
                invoice_date=r["invoice_date"] or "",
                due_date=r["invoice_date_due"] or "",
                days_overdue=days_overdue,
                amount_total=r["amount_total"],
                amount_residual=residual,
                currency=r["currency_id"][1] if r["currency_id"] else "USD",
                aging_bucket=bucket,
            )
        )

    return {
        "total_overdue_invoices": len(invoices),
        "total_outstanding_amount": round(total_outstanding, 2),
        "aging_buckets": {k: round(v, 2) for k, v in bucket_totals.items()},
        "invoices": [inv.model_dump() for inv in invoices],
        "oldest_invoice_days": max((inv.days_overdue for inv in invoices), default=0),
        "summary": (
            f"{len(invoices)} overdue invoices totalling "
            f"{round(total_outstanding, 2)} across all currencies."
            if invoices
            else "No overdue invoices found."
        ),
    }

Purchase requisition tools

src/tools/purchasing.py

"""
MCP tools for Odoo purchasing operations.

Creates purchase requisitions (internal RFQs) that the procurement
team can then convert to purchase orders.
"""

from __future__ import annotations

import logging
from datetime import date, timedelta
from typing import Any

from pydantic import BaseModel, Field, field_validator

from ..odoo_client import OdooClient, OdooOperationError

logger = logging.getLogger(__name__)


class PurchaseRequisitionLine(BaseModel):
    product_name: str = Field(description="Exact product name as it appears in Odoo")
    quantity: float = Field(gt=0, description="Quantity to request")
    notes: str = Field(default="", description="Optional line notes")


class PurchaseRequisitionRequest(BaseModel):
    title: str = Field(description="Short description of the requisition")
    lines: list[PurchaseRequisitionLine] = Field(min_length=1)
    required_by_date: str | None = Field(
        default=None,
        description="ISO date string YYYY-MM-DD. Defaults to 30 days from today."
    )
    notes: str = Field(default="", description="Overall requisition notes")

    @field_validator("required_by_date")
    @classmethod
    def validate_date(cls, v: str | None) -> str | None:
        if v is None:
            return v
        try:
            date.fromisoformat(v)
        except ValueError as exc:
            raise ValueError(f"required_by_date must be in YYYY-MM-DD format. Got: {v}") from exc
        return v


async def _resolve_product_id(client: OdooClient, product_name: str) -> int:
    """Find a product by exact or close name match. Raises if not found."""
    products = await client.search_read(
        "product.product",
        domain=[["name", "ilike", product_name], ["active", "=", True]],
        fields=["name", "id"],
        limit=5,
    )

    if not products:
        raise OdooOperationError(
            f"No product found matching '{product_name}'. "
            f"Check the product name or create it in Odoo first."
        )

    if len(products) == 1:
        return products[0]["id"]

    exact = [p for p in products if p["name"].lower() == product_name.lower()]
    if exact:
        return exact[0]["id"]

    names = ", ".join(p["name"] for p in products)
    raise OdooOperationError(
        f"Multiple products match '{product_name}': {names}. "
        f"Please provide a more specific product name."
    )


async def create_purchase_requisition(
    client: OdooClient,
    request: PurchaseRequisitionRequest,
) -> dict[str, Any]:
    """
    Create an internal purchase requisition in Odoo.

    This creates a Purchase Agreement of type 'blanket_order' in draft state.
    The procurement team can then review and convert it to purchase orders.

    Args:
        request: Validated purchase requisition data

    Returns:
        Dict with the created requisition ID and details
    """
    required_by = (
        date.fromisoformat(request.required_by_date)
        if request.required_by_date
        else date.today() + timedelta(days=30)
    )

    line_values = []
    resolved_lines = []

    for line in request.lines:
        product_id = await _resolve_product_id(client, line.product_name)
        line_values.append({
            "product_id": product_id,
            "product_qty": line.quantity,
        })
        resolved_lines.append({
            "product_name": line.product_name,
            "product_id": product_id,
            "quantity": line.quantity,
        })

    requisition_values = {
        "name": request.title,
        "type_id": 2,
        "date_end": required_by.isoformat(),
        "description": request.notes,
        "line_ids": [(0, 0, line) for line in line_values],
    }

    try:
        requisition_id = await client.create(
            "purchase.requisition",
            requisition_values,
        )
    except OdooOperationError as exc:
        if "purchase.requisition" in str(exc) and "does not exist" in str(exc):
            raise OdooOperationError(
                "The Purchase Agreements module is not installed in this Odoo instance. "
                "Ask your Odoo administrator to install 'Purchase Requisitions'."
            ) from exc
        raise

    logger.info(
        "Created purchase requisition id=%d title='%s' with %d lines",
        requisition_id,
        request.title,
        len(line_values),
    )

    return {
        "success": True,
        "requisition_id": requisition_id,
        "title": request.title,
        "required_by": required_by.isoformat(),
        "lines": resolved_lines,
        "message": (
            f"Purchase requisition '{request.title}' created with "
            f"{len(line_values)} line(s). Required by {required_by.isoformat()}. "
            f"The procurement team has been notified."
        ),
    }

The MCP server

This is where everything comes together. We register all our tools with FastMCP, use the lifespan pattern to manage the Odoo client connection, and expose a clean set of tools that Claude can discover and call.

src/server.py

"""
Odoo MCP Server

Exposes Odoo ERP operations as MCP tools and resources.
Run with: uv run python src/server.py
Or in development: uv run mcp dev src/server.py
"""

from __future__ import annotations

import logging
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any

from dotenv import load_dotenv
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession

from odoo_client import OdooClient, OdooConfig, OdooConnectionError, OdooOperationError
from tools.invoicing import get_overdue_invoices
from tools.inventory import get_stock_levels, get_low_stock_alert
from tools.purchasing import PurchaseRequisitionRequest, create_purchase_requisition
from tools.sales import (
    get_open_sales_orders,
    get_sales_order_detail,
    confirm_sales_order,
)

load_dotenv()

logging.basicConfig(
    level=os.getenv("LOG_LEVEL", "INFO"),
    format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)


@dataclass
class AppContext:
    odoo: OdooClient


@asynccontextmanager
async def lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
    """
    Initialize and clean up the Odoo client on server start and stop.

    Using the lifespan pattern means we authenticate once and reuse
    the connection for all requests during the server lifetime.
    """
    config = OdooConfig.from_env()
    client = OdooClient(config)

    logger.info("Connecting to Odoo at %s ...", config.url)
    await client.authenticate()
    logger.info("Odoo connection established.")

    try:
        yield AppContext(odoo=client)
    finally:
        logger.info("Odoo MCP server shutting down.")


mcp = FastMCP(
    name="Odoo ERP Assistant",
    instructions=(
        "You have access to a live Odoo ERP instance. "
        "You can read sales orders, check inventory, review overdue invoices "
        "and create purchase requisitions. "
        "Always confirm with the user before taking any action that modifies data "
        "such as confirming orders or creating requisitions."
    ),
    lifespan=lifespan,
    stateless_http=True,
    json_response=True,
)


def _get_odoo(ctx: Context[ServerSession, AppContext]) -> OdooClient:
    """Helper to extract the Odoo client from the request context."""
    return ctx.request_context.lifespan_context.odoo


def _handle_odoo_error(exc: Exception) -> dict[str, Any]:
    """Convert Odoo exceptions to user-friendly error dicts."""
    if isinstance(exc, OdooConnectionError):
        return {"error": True, "type": "connection_error", "message": str(exc)}
    if isinstance(exc, OdooOperationError):
        return {"error": True, "type": "operation_error", "message": str(exc)}
    logger.exception("Unexpected error in MCP tool")
    return {"error": True, "type": "unexpected_error", "message": f"An unexpected error occurred: {exc}"}


# ═══════════════════════════════════════════════════════════════
# SALES TOOLS
# ═══════════════════════════════════════════════════════════════


@mcp.tool()
async def list_sales_orders(
    ctx: Context[ServerSession, AppContext],
    customer_name: str | None = None,
    from_date: str | None = None,
    to_date: str | None = None,
    limit: int = 30,
) -> dict[str, Any]:
    """
    List confirmed sales orders from Odoo.

    Optionally filter by customer name (partial match), date range, or both.
    Returns order headers without line items for quick overview.
    Use get_sales_order_detail to see line items for a specific order.

    Args:
        customer_name: Partial customer name to filter by (case-insensitive)
        from_date: Start date in YYYY-MM-DD format
        to_date: End date in YYYY-MM-DD format
        limit: Maximum number of orders to return (max 100)
    """
    try:
        orders = await get_open_sales_orders(
            _get_odoo(ctx),
            customer_name=customer_name,
            from_date=from_date,
            to_date=to_date,
            limit=min(limit, 100),
        )
        return {
            "count": len(orders),
            "orders": [o.model_dump() for o in orders],
        }
    except Exception as exc:
        return _handle_odoo_error(exc)


@mcp.tool()
async def get_order_detail(
    ctx: Context[ServerSession, AppContext],
    order_id: int,
) -> dict[str, Any]:
    """
    Get full details for a specific sales order including all line items.

    Args:
        order_id: The numeric Odoo ID of the sales order
    """
    try:
        return await get_sales_order_detail(_get_odoo(ctx), order_id=order_id)
    except Exception as exc:
        return _handle_odoo_error(exc)


@mcp.tool()
async def confirm_order(
    ctx: Context[ServerSession, AppContext],
    order_id: int,
) -> dict[str, Any]:
    """
    Confirm a draft or sent quotation as a confirmed sales order.

    This action cannot be undone without cancelling the order.
    Only call this after explicitly confirming the intent with the user.

    Args:
        order_id: The Odoo ID of the quotation to confirm
    """
    try:
        return await confirm_sales_order(_get_odoo(ctx), order_id=order_id)
    except Exception as exc:
        return _handle_odoo_error(exc)


# ═══════════════════════════════════════════════════════════════
# INVENTORY TOOLS
# ═══════════════════════════════════════════════════════════════


@mcp.tool()
async def check_stock_levels(
    ctx: Context[ServerSession, AppContext],
    product_name: str | None = None,
    warehouse_name: str | None = None,
    below_reorder_only: bool = False,
    limit: int = 50,
) -> dict[str, Any]:
    """
    Check current stock levels in Odoo warehouses.

    Args:
        product_name: Filter by partial product name
        warehouse_name: Filter by warehouse name
        below_reorder_only: Return only items below minimum reorder quantity
        limit: Maximum records to return
    """
    try:
        items = await get_stock_levels(
            _get_odoo(ctx),
            product_name=product_name,
            warehouse_name=warehouse_name,
            below_reorder_only=below_reorder_only,
            limit=limit,
        )
        return {
            "count": len(items),
            "items": [item.model_dump() for item in items],
        }
    except Exception as exc:
        return _handle_odoo_error(exc)


@mcp.tool()
async def low_stock_report(
    ctx: Context[ServerSession, AppContext],
) -> dict[str, Any]:
    """
    Get an immediate report of all products below their minimum reorder quantity.

    No arguments needed. Returns all low-stock items across all warehouses.
    """
    try:
        return await get_low_stock_alert(_get_odoo(ctx))
    except Exception as exc:
        return _handle_odoo_error(exc)


# ═══════════════════════════════════════════════════════════════
# INVOICING TOOLS
# ═══════════════════════════════════════════════════════════════


@mcp.tool()
async def overdue_invoices(
    ctx: Context[ServerSession, AppContext],
    customer_name: str | None = None,
    min_outstanding_amount: float | None = None,
    limit: int = 100,
) -> dict[str, Any]:
    """
    Get all overdue customer invoices with aging analysis.

    Returns invoices bucketed into 1-30 days, 31-60 days, 61-90 days
    and over 90 days overdue.

    Args:
        customer_name: Filter by partial customer name
        min_outstanding_amount: Only include invoices above this outstanding balance
        limit: Max records to return
    """
    try:
        return await get_overdue_invoices(
            _get_odoo(ctx),
            customer_name=customer_name,
            min_amount=min_outstanding_amount,
            limit=limit,
        )
    except Exception as exc:
        return _handle_odoo_error(exc)


# ═══════════════════════════════════════════════════════════════
# PURCHASING TOOLS
# ═══════════════════════════════════════════════════════════════


@mcp.tool()
async def create_purchase_request(
    ctx: Context[ServerSession, AppContext],
    title: str,
    lines: list[dict[str, Any]],
    required_by_date: str | None = None,
    notes: str = "",
) -> dict[str, Any]:
    """
    Create an internal purchase requisition in Odoo.

    The requisition goes into draft state for the procurement team to review.
    Products must exist in Odoo before they can be added to a requisition.

    Args:
        title: Short description of what is being requested
        lines: List of dicts with keys: product_name (str), quantity (float), notes (str optional)
               Example: [{"product_name": "Laptop", "quantity": 5}]
        required_by_date: Date needed by in YYYY-MM-DD format (defaults to 30 days from today)
        notes: Overall notes for the procurement team
    """
    try:
        request = PurchaseRequisitionRequest(
            title=title,
            lines=lines,
            required_by_date=required_by_date,
            notes=notes,
        )
        return await create_purchase_requisition(_get_odoo(ctx), request=request)
    except ValueError as exc:
        return {"error": True, "type": "validation_error", "message": str(exc)}
    except Exception as exc:
        return _handle_odoo_error(exc)


# ═══════════════════════════════════════════════════════════════
# RESOURCES
# ═══════════════════════════════════════════════════════════════


@mcp.resource("odoo://sales/pipeline/summary")
async def sales_pipeline_summary() -> str:
    """
    A text summary of the current sales pipeline.
    Loaded automatically as context when discussing sales.
    """
    return (
        "This resource provides a summary of the current Odoo sales pipeline. "
        "Use the list_sales_orders tool to get live data. "
        "The ERP system tracks quotations, confirmed orders, locked orders and cancellations."
    )


@mcp.resource("odoo://inventory/locations")
async def warehouse_locations() -> str:
    """
    A description of the warehouse structure for context.
    """
    return (
        "Inventory is tracked in Odoo using locations and warehouses. "
        "Use check_stock_levels to get live quantities. "
        "Reorder rules define minimum stock levels per product per location."
    )


# ═══════════════════════════════════════════════════════════════
# ENTRY POINT
# ═══════════════════════════════════════════════════════════════


if __name__ == "__main__":
    port = int(os.getenv("MCP_SERVER_PORT", "8000"))
    host = os.getenv("MCP_SERVER_HOST", "0.0.0.0")

    logger.info("Starting Odoo MCP server on %s:%d", host, port)
    mcp.run(transport="streamable-http")

Running the server

For local development with the MCP Inspector:

uv run mcp dev src/server.py

Then open the Inspector at http://localhost:5173 and connect to http://localhost:8000/mcp. You can call each tool directly from the Inspector UI before wiring it into Claude.

For production you run it directly:

uv run python src/server.py

To add it to Claude Desktop, create or edit ~/Library/Application Support/Claude/claude_desktop_config.json on macOS:

{
  "mcpServers": {
    "odoo-erp": {
      "command": "uv",
      "args": [
        "run",
        "--project",
        "/path/to/odoo-mcp-server",
        "python",
        "src/server.py"
      ],
      "env": {
        "ODOO_URL": "https://your-odoo.com",
        "ODOO_DB": "your_db",
        "ODOO_USERNAME": "api@yourcompany.com",
        "ODOO_PASSWORD": "your_api_key"
      }
    }
  }
}

On Windows the config file lives at %APPDATA%\Claude\claude_desktop_config.json.

What this looks like in practice

Once the server is running and connected to Claude Desktop, you can have real conversations like this:

You: "Which customers have invoices more than 60 days overdue and what is the total outstanding?"

Claude calls overdue_invoices(), filters the result to the 61-90 days and over-90-days buckets, and gives you a breakdown by customer with totals. No SQL. No Odoo module navigation. No report generation.

You: "We are running low on any stock items?"

Claude calls low_stock_report() and gets back every product below its reorder minimum across all warehouses. It presents this as a prioritised table, sorted by shortfall severity.

You: "Create a purchase requisition for 10 laptops and 5 docking stations needed by March 15th."

Claude calls create_purchase_request() with the right parameters, Odoo creates the draft requisition, and Claude confirms the requisition ID and tells you the procurement team can review it.

Production considerations

Authentication and access control. The Odoo XML-RPC user you configure should have only the permissions it needs. Create a dedicated API user in Odoo with read access to sales, inventory, and invoicing and write access only to the purchase requisition model. Do not use an admin account.

Rate limiting. The client has no built-in rate limiter. For production deployments where multiple users share the same MCP server, add a token bucket rate limiter around the execute method. A simple asyncio.Semaphore limits concurrency without complexity.

Secrets management. Move away from .env files before going to production. Use AWS Secrets Manager, HashiCorp Vault, or your cloud provider's secret store. The OdooConfig.from_env() class method makes this easy to change since you only need to replace the environment variable injection point.

Logging and observability. Every tool in this code logs at INFO when successful and lets exceptions propagate to the _handle_odoo_error wrapper. In production, ship these logs to your observability stack. Add a request ID to each tool call using ctx.request_id and include it in every log line.

Caching. Stock levels and customer lists do not change in real time. Add a simple TTL cache on search_read calls for read-only tools to reduce Odoo database load. The cachetools library works well for this.

Health check endpoint. If you are running this as an HTTP service, add a /health endpoint that calls search_count("res.users", []) and returns 200 if Odoo is reachable. Mount this alongside the MCP server on the Starlette app.

What you have built

At the end of this you have:

A typed, async Odoo client that handles authentication, connection errors, and XML-RPC quirks in one place. Seven production-ready MCP tools covering the four most common ERP automation workflows. A FastMCP server with proper lifespan management so the Odoo connection is initialised once and reused. Clean error handling that gives the model useful information to pass back to the user instead of raw stack traces.

More importantly, you have a pattern. Adding a new Odoo workflow means writing one function in the appropriate tools file and one @mcp.tool() decorated wrapper in server.py. The protocol handling, the connection management, and the transport layer are already done.

The same pattern works for any ERP with a REST or RPC API. Swap the OdooClient for a SAP client or a NetSuite client and the MCP server structure stays identical.

References

Implementing Model Context Protocol for Business Outcome Optimization
Bithost February 24, 2026
Share this post
When Fast Code Becomes Broken Code: A Real SaaS Recovery Story