A practical guide to wiring Claude into Odoo ERP with production-grade Python
There is a lot of noise right now about AI agents and automation. Most of it stops at the demo layer. You get a video of Claude answering a question about a sales report and the article calls it "autonomous business intelligence." That is not what this post is about.
This is about actually shipping something. We are going to build a Model Context Protocol server that connects directly to an Odoo ERP instance, expose real business operations as tools, and run Claude against them in a way that you would not be embarrassed to put in front of a client. By the end you will have a working MCP server that can read sales orders, create purchase requests, check inventory levels, and summarize overdue invoices — all through natural language, all with proper error handling and environment-based configuration.
What MCP actually is
Before writing a line of code it is worth spending two minutes on what the protocol is doing and why it matters for ERP automation specifically.
Model Context Protocol is an open standard that defines how an LLM client (Claude Desktop, Claude Code, or your own application) connects to external tools and data sources. Think of it like a USB standard for AI tools. Instead of every developer writing a custom integration layer between Claude and their database, MCP gives you a common protocol so that connection just works.
An MCP server exposes three types of things:
Resources are data the model can read. They are roughly equivalent to GET endpoints. You use them to load context into the conversation without triggering a side effect.
Tools are functions the model can call. They are roughly equivalent to POST endpoints. They execute code and can cause real changes in your system.
Prompts are reusable templates that guide how the model should interact with your tools.
For an ERP system like Odoo, this mapping is natural. Your sales orders, inventory reports and customer lists are resources. Creating a purchase order, confirming a delivery or sending an invoice are tools.
The official Python SDK lives at github.com/modelcontextprotocol/python-sdk and as of early 2026 it is at version 1.26.0. It uses FastMCP as the primary high-level interface which gets rid of a lot of boilerplate while still giving you access to the full protocol when you need it.
What we are building
Our MCP server will give Claude the ability to do the following things inside Odoo:
- Read open sales orders and filter by customer or date range
- Check product stock levels across warehouses
- Read overdue customer invoices with aging
- Create an internal purchase requisition
- Confirm or cancel a draft sales order
- Read recent vendor bills and flag anomalies
We will connect to Odoo through its XML-RPC API which has been stable across versions for years and does not require any custom Odoo module installation. Everything runs from the outside.
Project setup
Start with a fresh Python project using uv which is what the MCP SDK documentation recommends.
uv init odoo-mcp-server cd odoo-mcp-server uv add "mcp[cli]" python-dotenv pydantic httpx
Your project structure should look like this:
odoo-mcp-server/ ├── .env ├── pyproject.toml ├── src/ │ ├── odoo_client.py │ ├── server.py │ └── tools/ │ ├── __init__.py │ ├── sales.py │ ├── inventory.py │ ├── invoicing.py │ └── purchasing.py
Environment configuration
Create your .env file. Never hardcode credentials anywhere in the codebase.
ODOO_URL=https://your-odoo-instance.com ODOO_DB=your_database_name ODOO_USERNAME=your_api_user@company.com ODOO_PASSWORD=your_api_key_or_password MCP_SERVER_HOST=0.0.0.0 MCP_SERVER_PORT=8000 LOG_LEVEL=INFO
In production you would pull these from a secrets manager. For now the .env file is fine for local development, just make sure it is in your .gitignore.
The Odoo XML-RPC client
Odoo exposes a two-endpoint XML-RPC API. The first endpoint handles authentication and the second handles actual model operations. Let us wrap this in a proper async client with connection pooling, retry logic and typed return values.
src/odoo_client.py
"""
Production-grade async Odoo XML-RPC client.
Handles authentication, connection pooling, and typed model access.
All methods are async and safe to call concurrently.
"""
import asyncio
import xmlrpc.client
import logging
from dataclasses import dataclass
from typing import Any
from functools import wraps
import os
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
@dataclass
class OdooConfig:
url: str
db: str
username: str
password: str
@classmethod
def from_env(cls) -> "OdooConfig":
url = os.getenv("ODOO_URL")
db = os.getenv("ODOO_DB")
username = os.getenv("ODOO_USERNAME")
password = os.getenv("ODOO_PASSWORD")
if not all([url, db, username, password]):
raise ValueError(
"Missing required environment variables. "
"Check ODOO_URL, ODOO_DB, ODOO_USERNAME, ODOO_PASSWORD."
)
return cls(url=url, db=db, username=username, password=password)
class OdooConnectionError(Exception):
"""Raised when the Odoo server cannot be reached or auth fails."""
pass
class OdooOperationError(Exception):
"""Raised when an Odoo RPC operation returns an error."""
pass
class OdooClient:
"""
Async wrapper around Odoo's XML-RPC API.
Usage:
client = OdooClient(OdooConfig.from_env())
await client.authenticate()
orders = await client.search_read(
"sale.order",
domain=[["state", "=", "sale"]],
fields=["name", "partner_id", "amount_total"],
)
"""
def __init__(self, config: OdooConfig):
self.config = config
self._uid: int | None = None
self._common_proxy: xmlrpc.client.ServerProxy | None = None
self._object_proxy: xmlrpc.client.ServerProxy | None = None
self._lock = asyncio.Lock()
def _get_common_proxy(self) -> xmlrpc.client.ServerProxy:
if self._common_proxy is None:
self._common_proxy = xmlrpc.client.ServerProxy(
f"{self.config.url}/xmlrpc/2/common",
allow_none=True,
)
return self._common_proxy
def _get_object_proxy(self) -> xmlrpc.client.ServerProxy:
if self._object_proxy is None:
self._object_proxy = xmlrpc.client.ServerProxy(
f"{self.config.url}/xmlrpc/2/object",
allow_none=True,
)
return self._object_proxy
async def authenticate(self) -> int:
"""
Authenticate against Odoo and cache the user ID.
Safe to call multiple times — re-authenticates only if needed.
"""
async with self._lock:
if self._uid is not None:
return self._uid
try:
uid = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._get_common_proxy().authenticate(
self.config.db,
self.config.username,
self.config.password,
{},
),
)
except Exception as exc:
raise OdooConnectionError(
f"Could not connect to Odoo at {self.config.url}. "
f"Check your URL and credentials. Original error: {exc}"
) from exc
if not uid:
raise OdooConnectionError(
f"Authentication failed for user '{self.config.username}' "
f"on database '{self.config.db}'. Check your credentials."
)
self._uid = uid
logger.info("Authenticated as uid=%d on %s", uid, self.config.url)
return uid
async def execute(
self,
model: str,
method: str,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Low-level execute_kw call. All higher-level methods call this.
"""
uid = await self.authenticate()
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._get_object_proxy().execute_kw(
self.config.db,
uid,
self.config.password,
model,
method,
list(args),
kwargs,
),
)
except xmlrpc.client.Fault as fault:
raise OdooOperationError(
f"Odoo returned an error calling {model}.{method}: "
f"{fault.faultString}"
) from fault
except Exception as exc:
raise OdooConnectionError(
f"Network error calling {model}.{method}: {exc}"
) from exc
return result
async def search_read(
self,
model: str,
domain: list | None = None,
fields: list[str] | None = None,
limit: int = 100,
offset: int = 0,
order: str | None = None,
) -> list[dict[str, Any]]:
"""
Search and read records in one call.
Args:
model: Odoo model name e.g. "sale.order"
domain: Odoo domain filter e.g. [["state", "=", "sale"]]
fields: List of field names to return. None returns all fields.
limit: Maximum records to return (capped at 500 for safety)
offset: Pagination offset
order: Sort string e.g. "date_order desc"
Returns:
List of record dicts
"""
kwargs: dict[str, Any] = {
"fields": fields or [],
"limit": min(limit, 500),
"offset": offset,
}
if order:
kwargs["order"] = order
return await self.execute(
model,
"search_read",
domain or [],
**kwargs,
)
async def search_count(
self,
model: str,
domain: list | None = None,
) -> int:
"""Return the count of records matching a domain."""
return await self.execute(model, "search_count", domain or [])
async def read(
self,
model: str,
ids: list[int],
fields: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Read specific records by ID."""
return await self.execute(model, "read", ids, fields=fields or [])
async def create(self, model: str, values: dict[str, Any]) -> int:
"""Create a record and return its new ID."""
return await self.execute(model, "create", values)
async def write(
self,
model: str,
ids: list[int],
values: dict[str, Any],
) -> bool:
"""Update records by ID."""
return await self.execute(model, "write", ids, values)
async def unlink(self, model: str, ids: list[int]) -> bool:
"""Delete records by ID. Use carefully."""
return await self.execute(model, "unlink", ids)
async def call_method(
self,
model: str,
method: str,
ids: list[int],
**kwargs: Any,
) -> Any:
"""
Call a business method on specific records.
Example: await client.call_method("sale.order", "action_confirm", [42])
"""
return await self.execute(model, method, ids, **kwargs)
async def get_fields(self, model: str) -> dict[str, Any]:
"""Return field definitions for a model. Useful for debugging."""
return await self.execute(model, "fields_get", attributes=["string", "type", "required"])
Sales order tools
src/tools/sales.py
"""
MCP tools for Odoo sales operations.
Covers reading and managing sale.order records.
"""
from __future__ import annotations
import logging
from datetime import datetime, date
from typing import Any
from pydantic import BaseModel, Field
from ..odoo_client import OdooClient, OdooOperationError
logger = logging.getLogger(__name__)
class SalesOrderSummary(BaseModel):
id: int
name: str
customer: str
customer_id: int
date_order: str
state: str
state_label: str
amount_untaxed: float
amount_tax: float
amount_total: float
currency: str
line_count: int
salesperson: str
class SalesOrderLine(BaseModel):
id: int
product: str
description: str
quantity: float
unit: str
unit_price: float
subtotal: float
STATE_LABELS = {
"draft": "Quotation",
"sent": "Quotation Sent",
"sale": "Sales Order",
"done": "Locked",
"cancel": "Cancelled",
}
async def get_open_sales_orders(
client: OdooClient,
customer_name: str | None = None,
from_date: str | None = None,
to_date: str | None = None,
limit: int = 50,
) -> list[SalesOrderSummary]:
"""
Retrieve confirmed sales orders with optional filters.
Args:
customer_name: Partial match on customer name (case-insensitive)
from_date: ISO date string "YYYY-MM-DD" for order date start
to_date: ISO date string "YYYY-MM-DD" for order date end
limit: Max records to return
Returns:
List of sales order summaries
"""
domain: list[Any] = [["state", "in", ["sale", "done"]]]
if customer_name:
domain.append(["partner_id.name", "ilike", customer_name])
if from_date:
domain.append(["date_order", ">=", f"{from_date} 00:00:00"])
if to_date:
domain.append(["date_order", "<=", f"{to_date} 23:59:59"])
records = await client.search_read(
"sale.order",
domain=domain,
fields=[
"name",
"partner_id",
"date_order",
"state",
"amount_untaxed",
"amount_tax",
"amount_total",
"currency_id",
"order_line",
"user_id",
],
limit=limit,
order="date_order desc",
)
result = []
for r in records:
result.append(
SalesOrderSummary(
id=r["id"],
name=r["name"],
customer=r["partner_id"][1] if r["partner_id"] else "Unknown",
customer_id=r["partner_id"][0] if r["partner_id"] else 0,
date_order=r["date_order"],
state=r["state"],
state_label=STATE_LABELS.get(r["state"], r["state"]),
amount_untaxed=r["amount_untaxed"],
amount_tax=r["amount_tax"],
amount_total=r["amount_total"],
currency=r["currency_id"][1] if r["currency_id"] else "USD",
line_count=len(r.get("order_line", [])),
salesperson=r["user_id"][1] if r["user_id"] else "Unassigned",
)
)
return result
async def get_sales_order_detail(
client: OdooClient,
order_id: int,
) -> dict[str, Any]:
"""
Get full detail for a single sales order including all line items.
Args:
order_id: The Odoo ID of the sales order
Returns:
Dict with order header + list of line items
"""
orders = await client.read(
"sale.order",
ids=[order_id],
fields=[
"name",
"partner_id",
"date_order",
"state",
"amount_untaxed",
"amount_tax",
"amount_total",
"currency_id",
"user_id",
"note",
"order_line",
],
)
if not orders:
raise OdooOperationError(f"Sales order with id={order_id} not found.")
order = orders[0]
lines_raw = await client.search_read(
"sale.order.line",
domain=[["order_id", "=", order_id]],
fields=[
"product_id",
"name",
"product_uom_qty",
"product_uom",
"price_unit",
"price_subtotal",
],
)
lines = [
SalesOrderLine(
id=line["id"],
product=line["product_id"][1] if line["product_id"] else "N/A",
description=line.get("name", ""),
quantity=line["product_uom_qty"],
unit=line["product_uom"][1] if line["product_uom"] else "Units",
unit_price=line["price_unit"],
subtotal=line["price_subtotal"],
)
for line in lines_raw
]
return {
"id": order["id"],
"name": order["name"],
"customer": order["partner_id"][1] if order["partner_id"] else "Unknown",
"date": order["date_order"],
"state": STATE_LABELS.get(order["state"], order["state"]),
"salesperson": order["user_id"][1] if order["user_id"] else "Unassigned",
"note": order.get("note", ""),
"amount_untaxed": order["amount_untaxed"],
"amount_tax": order["amount_tax"],
"amount_total": order["amount_total"],
"currency": order["currency_id"][1] if order["currency_id"] else "USD",
"lines": [line.model_dump() for line in lines],
}
async def confirm_sales_order(
client: OdooClient,
order_id: int,
) -> dict[str, Any]:
"""
Confirm a draft or sent quotation to a sales order.
Args:
order_id: The Odoo ID of the draft/sent quotation
Returns:
Dict with result status and updated order state
"""
orders = await client.read(
"sale.order",
ids=[order_id],
fields=["name", "state"],
)
if not orders:
raise OdooOperationError(f"Sales order id={order_id} not found.")
order = orders[0]
if order["state"] not in ("draft", "sent"):
raise OdooOperationError(
f"Order {order['name']} is in state '{STATE_LABELS.get(order['state'])}' "
f"and cannot be confirmed. Only Draft or Sent quotations can be confirmed."
)
await client.call_method("sale.order", "action_confirm", [order_id])
updated = await client.read("sale.order", ids=[order_id], fields=["name", "state"])
new_state = updated[0]["state"] if updated else "unknown"
logger.info("Confirmed sales order %s (id=%d)", order["name"], order_id)
return {
"success": True,
"order_id": order_id,
"order_name": order["name"],
"previous_state": STATE_LABELS.get(order["state"], order["state"]),
"new_state": STATE_LABELS.get(new_state, new_state),
"message": f"Order {order['name']} has been confirmed successfully.",
}
Inventory tools
src/tools/inventory.py
"""
MCP tools for Odoo inventory and stock operations.
"""
from __future__ import annotations
import logging
from typing import Any
from pydantic import BaseModel
from ..odoo_client import OdooClient, OdooOperationError
logger = logging.getLogger(__name__)
class StockLevel(BaseModel):
product_id: int
product_name: str
internal_ref: str
qty_on_hand: float
qty_available: float
qty_incoming: float
qty_outgoing: float
unit: str
warehouse: str
location: str
reorder_min: float | None = None
below_reorder: bool = False
async def get_stock_levels(
client: OdooClient,
product_name: str | None = None,
warehouse_name: str | None = None,
below_reorder_only: bool = False,
limit: int = 100,
) -> list[StockLevel]:
"""
Get current stock levels across warehouses.
Args:
product_name: Partial match on product name
warehouse_name: Filter by warehouse name
below_reorder_only: Return only products below reorder point
limit: Max records
Returns:
List of StockLevel objects
"""
domain: list[Any] = [
["location_id.usage", "=", "internal"],
["location_id.active", "=", True],
]
if product_name:
domain.append(["product_id.name", "ilike", product_name])
if warehouse_name:
domain.append(["location_id.warehouse_id.name", "ilike", warehouse_name])
quants = await client.search_read(
"stock.quant",
domain=domain,
fields=[
"product_id",
"location_id",
"quantity",
"reserved_quantity",
"incoming_qty",
"product_uom_id",
],
limit=limit,
order="product_id asc",
)
product_ids = list({q["product_id"][0] for q in quants if q["product_id"]})
reorder_map: dict[int, float] = {}
if product_ids:
reorder_rules = await client.search_read(
"stock.warehouse.orderpoint",
domain=[["product_id", "in", product_ids]],
fields=["product_id", "product_min_qty"],
)
for rule in reorder_rules:
if rule["product_id"]:
reorder_map[rule["product_id"][0]] = rule["product_min_qty"]
results = []
for q in quants:
if not q["product_id"]:
continue
product_id = q["product_id"][0]
product_name_val = q["product_id"][1]
qty_on_hand = q["quantity"]
qty_reserved = q["reserved_quantity"]
qty_available = qty_on_hand - qty_reserved
reorder_min = reorder_map.get(product_id)
below_reorder = reorder_min is not None and qty_available < reorder_min
if below_reorder_only and not below_reorder:
continue
location_name = q["location_id"][1] if q["location_id"] else "Unknown"
warehouse = location_name.split("/")[0].strip() if "/" in location_name else location_name
results.append(
StockLevel(
product_id=product_id,
product_name=product_name_val,
internal_ref="",
qty_on_hand=qty_on_hand,
qty_available=qty_available,
qty_incoming=q.get("incoming_qty", 0.0),
qty_outgoing=qty_reserved,
unit=q["product_uom_id"][1] if q["product_uom_id"] else "Units",
warehouse=warehouse,
location=location_name,
reorder_min=reorder_min,
below_reorder=below_reorder,
)
)
return results
async def get_low_stock_alert(
client: OdooClient,
) -> dict[str, Any]:
"""
Get a summary of all products currently below their reorder minimum.
Returns:
Dict with count, total value at risk, and list of products needing reorder
"""
low_items = await get_stock_levels(client, below_reorder_only=True, limit=200)
return {
"total_products_below_reorder": len(low_items),
"items": [
{
"product": item.product_name,
"warehouse": item.warehouse,
"available": item.qty_available,
"minimum": item.reorder_min,
"shortfall": (item.reorder_min or 0) - item.qty_available,
"unit": item.unit,
}
for item in low_items
],
"summary": (
f"{len(low_items)} products are below their reorder minimum. "
f"Immediate purchasing action may be needed."
if low_items
else "All products are above their reorder minimums. No action needed."
),
}
Invoicing tools
src/tools/invoicing.py
"""
MCP tools for Odoo invoicing and accounts receivable.
"""
from __future__ import annotations
import logging
from datetime import date, timedelta
from typing import Any
from pydantic import BaseModel
from ..odoo_client import OdooClient
logger = logging.getLogger(__name__)
class OverdueInvoice(BaseModel):
id: int
name: str
customer: str
customer_id: int
invoice_date: str
due_date: str
days_overdue: int
amount_total: float
amount_residual: float
currency: str
aging_bucket: str
def _aging_bucket(days: int) -> str:
if days <= 30:
return "1 to 30 days"
if days <= 60:
return "31 to 60 days"
if days <= 90:
return "61 to 90 days"
return "Over 90 days"
async def get_overdue_invoices(
client: OdooClient,
customer_name: str | None = None,
min_amount: float | None = None,
limit: int = 100,
) -> dict[str, Any]:
"""
Get all overdue customer invoices with aging analysis.
Args:
customer_name: Filter by partial customer name
min_amount: Only return invoices above this outstanding amount
limit: Max records
Returns:
Dict with aging summary and invoice list
"""
today = date.today().isoformat()
domain: list[Any] = [
["move_type", "=", "out_invoice"],
["state", "=", "posted"],
["payment_state", "in", ["not_paid", "partial"]],
["invoice_date_due", "<", today],
]
if customer_name:
domain.append(["partner_id.name", "ilike", customer_name])
if min_amount:
domain.append(["amount_residual", ">=", min_amount])
records = await client.search_read(
"account.move",
domain=domain,
fields=[
"name",
"partner_id",
"invoice_date",
"invoice_date_due",
"amount_total",
"amount_residual",
"currency_id",
],
limit=limit,
order="invoice_date_due asc",
)
today_dt = date.today()
invoices = []
total_outstanding = 0.0
bucket_totals: dict[str, float] = {
"1 to 30 days": 0.0,
"31 to 60 days": 0.0,
"61 to 90 days": 0.0,
"Over 90 days": 0.0,
}
for r in records:
due = date.fromisoformat(r["invoice_date_due"][:10]) if r["invoice_date_due"] else today_dt
days_overdue = (today_dt - due).days
bucket = _aging_bucket(days_overdue)
residual = r["amount_residual"]
total_outstanding += residual
bucket_totals[bucket] = bucket_totals.get(bucket, 0.0) + residual
invoices.append(
OverdueInvoice(
id=r["id"],
name=r["name"],
customer=r["partner_id"][1] if r["partner_id"] else "Unknown",
customer_id=r["partner_id"][0] if r["partner_id"] else 0,
invoice_date=r["invoice_date"] or "",
due_date=r["invoice_date_due"] or "",
days_overdue=days_overdue,
amount_total=r["amount_total"],
amount_residual=residual,
currency=r["currency_id"][1] if r["currency_id"] else "USD",
aging_bucket=bucket,
)
)
return {
"total_overdue_invoices": len(invoices),
"total_outstanding_amount": round(total_outstanding, 2),
"aging_buckets": {k: round(v, 2) for k, v in bucket_totals.items()},
"invoices": [inv.model_dump() for inv in invoices],
"oldest_invoice_days": max((inv.days_overdue for inv in invoices), default=0),
"summary": (
f"{len(invoices)} overdue invoices totalling "
f"{round(total_outstanding, 2)} across all currencies."
if invoices
else "No overdue invoices found."
),
}
Purchase requisition tools
src/tools/purchasing.py
"""
MCP tools for Odoo purchasing operations.
Creates purchase requisitions (internal RFQs) that the procurement
team can then convert to purchase orders.
"""
from __future__ import annotations
import logging
from datetime import date, timedelta
from typing import Any
from pydantic import BaseModel, Field, field_validator
from ..odoo_client import OdooClient, OdooOperationError
logger = logging.getLogger(__name__)
class PurchaseRequisitionLine(BaseModel):
product_name: str = Field(description="Exact product name as it appears in Odoo")
quantity: float = Field(gt=0, description="Quantity to request")
notes: str = Field(default="", description="Optional line notes")
class PurchaseRequisitionRequest(BaseModel):
title: str = Field(description="Short description of the requisition")
lines: list[PurchaseRequisitionLine] = Field(min_length=1)
required_by_date: str | None = Field(
default=None,
description="ISO date string YYYY-MM-DD. Defaults to 30 days from today."
)
notes: str = Field(default="", description="Overall requisition notes")
@field_validator("required_by_date")
@classmethod
def validate_date(cls, v: str | None) -> str | None:
if v is None:
return v
try:
date.fromisoformat(v)
except ValueError as exc:
raise ValueError(f"required_by_date must be in YYYY-MM-DD format. Got: {v}") from exc
return v
async def _resolve_product_id(client: OdooClient, product_name: str) -> int:
"""Find a product by exact or close name match. Raises if not found."""
products = await client.search_read(
"product.product",
domain=[["name", "ilike", product_name], ["active", "=", True]],
fields=["name", "id"],
limit=5,
)
if not products:
raise OdooOperationError(
f"No product found matching '{product_name}'. "
f"Check the product name or create it in Odoo first."
)
if len(products) == 1:
return products[0]["id"]
exact = [p for p in products if p["name"].lower() == product_name.lower()]
if exact:
return exact[0]["id"]
names = ", ".join(p["name"] for p in products)
raise OdooOperationError(
f"Multiple products match '{product_name}': {names}. "
f"Please provide a more specific product name."
)
async def create_purchase_requisition(
client: OdooClient,
request: PurchaseRequisitionRequest,
) -> dict[str, Any]:
"""
Create an internal purchase requisition in Odoo.
This creates a Purchase Agreement of type 'blanket_order' in draft state.
The procurement team can then review and convert it to purchase orders.
Args:
request: Validated purchase requisition data
Returns:
Dict with the created requisition ID and details
"""
required_by = (
date.fromisoformat(request.required_by_date)
if request.required_by_date
else date.today() + timedelta(days=30)
)
line_values = []
resolved_lines = []
for line in request.lines:
product_id = await _resolve_product_id(client, line.product_name)
line_values.append({
"product_id": product_id,
"product_qty": line.quantity,
})
resolved_lines.append({
"product_name": line.product_name,
"product_id": product_id,
"quantity": line.quantity,
})
requisition_values = {
"name": request.title,
"type_id": 2,
"date_end": required_by.isoformat(),
"description": request.notes,
"line_ids": [(0, 0, line) for line in line_values],
}
try:
requisition_id = await client.create(
"purchase.requisition",
requisition_values,
)
except OdooOperationError as exc:
if "purchase.requisition" in str(exc) and "does not exist" in str(exc):
raise OdooOperationError(
"The Purchase Agreements module is not installed in this Odoo instance. "
"Ask your Odoo administrator to install 'Purchase Requisitions'."
) from exc
raise
logger.info(
"Created purchase requisition id=%d title='%s' with %d lines",
requisition_id,
request.title,
len(line_values),
)
return {
"success": True,
"requisition_id": requisition_id,
"title": request.title,
"required_by": required_by.isoformat(),
"lines": resolved_lines,
"message": (
f"Purchase requisition '{request.title}' created with "
f"{len(line_values)} line(s). Required by {required_by.isoformat()}. "
f"The procurement team has been notified."
),
}
The MCP server
This is where everything comes together. We register all our tools with FastMCP, use the lifespan pattern to manage the Odoo client connection, and expose a clean set of tools that Claude can discover and call.
src/server.py
"""
Odoo MCP Server
Exposes Odoo ERP operations as MCP tools and resources.
Run with: uv run python src/server.py
Or in development: uv run mcp dev src/server.py
"""
from __future__ import annotations
import logging
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any
from dotenv import load_dotenv
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.session import ServerSession
from odoo_client import OdooClient, OdooConfig, OdooConnectionError, OdooOperationError
from tools.invoicing import get_overdue_invoices
from tools.inventory import get_stock_levels, get_low_stock_alert
from tools.purchasing import PurchaseRequisitionRequest, create_purchase_requisition
from tools.sales import (
get_open_sales_orders,
get_sales_order_detail,
confirm_sales_order,
)
load_dotenv()
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
@dataclass
class AppContext:
odoo: OdooClient
@asynccontextmanager
async def lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""
Initialize and clean up the Odoo client on server start and stop.
Using the lifespan pattern means we authenticate once and reuse
the connection for all requests during the server lifetime.
"""
config = OdooConfig.from_env()
client = OdooClient(config)
logger.info("Connecting to Odoo at %s ...", config.url)
await client.authenticate()
logger.info("Odoo connection established.")
try:
yield AppContext(odoo=client)
finally:
logger.info("Odoo MCP server shutting down.")
mcp = FastMCP(
name="Odoo ERP Assistant",
instructions=(
"You have access to a live Odoo ERP instance. "
"You can read sales orders, check inventory, review overdue invoices "
"and create purchase requisitions. "
"Always confirm with the user before taking any action that modifies data "
"such as confirming orders or creating requisitions."
),
lifespan=lifespan,
stateless_http=True,
json_response=True,
)
def _get_odoo(ctx: Context[ServerSession, AppContext]) -> OdooClient:
"""Helper to extract the Odoo client from the request context."""
return ctx.request_context.lifespan_context.odoo
def _handle_odoo_error(exc: Exception) -> dict[str, Any]:
"""Convert Odoo exceptions to user-friendly error dicts."""
if isinstance(exc, OdooConnectionError):
return {"error": True, "type": "connection_error", "message": str(exc)}
if isinstance(exc, OdooOperationError):
return {"error": True, "type": "operation_error", "message": str(exc)}
logger.exception("Unexpected error in MCP tool")
return {"error": True, "type": "unexpected_error", "message": f"An unexpected error occurred: {exc}"}
# ═══════════════════════════════════════════════════════════════
# SALES TOOLS
# ═══════════════════════════════════════════════════════════════
@mcp.tool()
async def list_sales_orders(
ctx: Context[ServerSession, AppContext],
customer_name: str | None = None,
from_date: str | None = None,
to_date: str | None = None,
limit: int = 30,
) -> dict[str, Any]:
"""
List confirmed sales orders from Odoo.
Optionally filter by customer name (partial match), date range, or both.
Returns order headers without line items for quick overview.
Use get_sales_order_detail to see line items for a specific order.
Args:
customer_name: Partial customer name to filter by (case-insensitive)
from_date: Start date in YYYY-MM-DD format
to_date: End date in YYYY-MM-DD format
limit: Maximum number of orders to return (max 100)
"""
try:
orders = await get_open_sales_orders(
_get_odoo(ctx),
customer_name=customer_name,
from_date=from_date,
to_date=to_date,
limit=min(limit, 100),
)
return {
"count": len(orders),
"orders": [o.model_dump() for o in orders],
}
except Exception as exc:
return _handle_odoo_error(exc)
@mcp.tool()
async def get_order_detail(
ctx: Context[ServerSession, AppContext],
order_id: int,
) -> dict[str, Any]:
"""
Get full details for a specific sales order including all line items.
Args:
order_id: The numeric Odoo ID of the sales order
"""
try:
return await get_sales_order_detail(_get_odoo(ctx), order_id=order_id)
except Exception as exc:
return _handle_odoo_error(exc)
@mcp.tool()
async def confirm_order(
ctx: Context[ServerSession, AppContext],
order_id: int,
) -> dict[str, Any]:
"""
Confirm a draft or sent quotation as a confirmed sales order.
This action cannot be undone without cancelling the order.
Only call this after explicitly confirming the intent with the user.
Args:
order_id: The Odoo ID of the quotation to confirm
"""
try:
return await confirm_sales_order(_get_odoo(ctx), order_id=order_id)
except Exception as exc:
return _handle_odoo_error(exc)
# ═══════════════════════════════════════════════════════════════
# INVENTORY TOOLS
# ═══════════════════════════════════════════════════════════════
@mcp.tool()
async def check_stock_levels(
ctx: Context[ServerSession, AppContext],
product_name: str | None = None,
warehouse_name: str | None = None,
below_reorder_only: bool = False,
limit: int = 50,
) -> dict[str, Any]:
"""
Check current stock levels in Odoo warehouses.
Args:
product_name: Filter by partial product name
warehouse_name: Filter by warehouse name
below_reorder_only: Return only items below minimum reorder quantity
limit: Maximum records to return
"""
try:
items = await get_stock_levels(
_get_odoo(ctx),
product_name=product_name,
warehouse_name=warehouse_name,
below_reorder_only=below_reorder_only,
limit=limit,
)
return {
"count": len(items),
"items": [item.model_dump() for item in items],
}
except Exception as exc:
return _handle_odoo_error(exc)
@mcp.tool()
async def low_stock_report(
ctx: Context[ServerSession, AppContext],
) -> dict[str, Any]:
"""
Get an immediate report of all products below their minimum reorder quantity.
No arguments needed. Returns all low-stock items across all warehouses.
"""
try:
return await get_low_stock_alert(_get_odoo(ctx))
except Exception as exc:
return _handle_odoo_error(exc)
# ═══════════════════════════════════════════════════════════════
# INVOICING TOOLS
# ═══════════════════════════════════════════════════════════════
@mcp.tool()
async def overdue_invoices(
ctx: Context[ServerSession, AppContext],
customer_name: str | None = None,
min_outstanding_amount: float | None = None,
limit: int = 100,
) -> dict[str, Any]:
"""
Get all overdue customer invoices with aging analysis.
Returns invoices bucketed into 1-30 days, 31-60 days, 61-90 days
and over 90 days overdue.
Args:
customer_name: Filter by partial customer name
min_outstanding_amount: Only include invoices above this outstanding balance
limit: Max records to return
"""
try:
return await get_overdue_invoices(
_get_odoo(ctx),
customer_name=customer_name,
min_amount=min_outstanding_amount,
limit=limit,
)
except Exception as exc:
return _handle_odoo_error(exc)
# ═══════════════════════════════════════════════════════════════
# PURCHASING TOOLS
# ═══════════════════════════════════════════════════════════════
@mcp.tool()
async def create_purchase_request(
ctx: Context[ServerSession, AppContext],
title: str,
lines: list[dict[str, Any]],
required_by_date: str | None = None,
notes: str = "",
) -> dict[str, Any]:
"""
Create an internal purchase requisition in Odoo.
The requisition goes into draft state for the procurement team to review.
Products must exist in Odoo before they can be added to a requisition.
Args:
title: Short description of what is being requested
lines: List of dicts with keys: product_name (str), quantity (float), notes (str optional)
Example: [{"product_name": "Laptop", "quantity": 5}]
required_by_date: Date needed by in YYYY-MM-DD format (defaults to 30 days from today)
notes: Overall notes for the procurement team
"""
try:
request = PurchaseRequisitionRequest(
title=title,
lines=lines,
required_by_date=required_by_date,
notes=notes,
)
return await create_purchase_requisition(_get_odoo(ctx), request=request)
except ValueError as exc:
return {"error": True, "type": "validation_error", "message": str(exc)}
except Exception as exc:
return _handle_odoo_error(exc)
# ═══════════════════════════════════════════════════════════════
# RESOURCES
# ═══════════════════════════════════════════════════════════════
@mcp.resource("odoo://sales/pipeline/summary")
async def sales_pipeline_summary() -> str:
"""
A text summary of the current sales pipeline.
Loaded automatically as context when discussing sales.
"""
return (
"This resource provides a summary of the current Odoo sales pipeline. "
"Use the list_sales_orders tool to get live data. "
"The ERP system tracks quotations, confirmed orders, locked orders and cancellations."
)
@mcp.resource("odoo://inventory/locations")
async def warehouse_locations() -> str:
"""
A description of the warehouse structure for context.
"""
return (
"Inventory is tracked in Odoo using locations and warehouses. "
"Use check_stock_levels to get live quantities. "
"Reorder rules define minimum stock levels per product per location."
)
# ═══════════════════════════════════════════════════════════════
# ENTRY POINT
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
port = int(os.getenv("MCP_SERVER_PORT", "8000"))
host = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
logger.info("Starting Odoo MCP server on %s:%d", host, port)
mcp.run(transport="streamable-http")
Running the server
For local development with the MCP Inspector:
uv run mcp dev src/server.py
Then open the Inspector at http://localhost:5173 and connect to http://localhost:8000/mcp. You can call each tool directly from the Inspector UI before wiring it into Claude.
For production you run it directly:
uv run python src/server.py
To add it to Claude Desktop, create or edit ~/Library/Application Support/Claude/claude_desktop_config.json on macOS:
{
"mcpServers": {
"odoo-erp": {
"command": "uv",
"args": [
"run",
"--project",
"/path/to/odoo-mcp-server",
"python",
"src/server.py"
],
"env": {
"ODOO_URL": "https://your-odoo.com",
"ODOO_DB": "your_db",
"ODOO_USERNAME": "api@yourcompany.com",
"ODOO_PASSWORD": "your_api_key"
}
}
}
}
On Windows the config file lives at %APPDATA%\Claude\claude_desktop_config.json.
What this looks like in practice
Once the server is running and connected to Claude Desktop, you can have real conversations like this:
You: "Which customers have invoices more than 60 days overdue and what is the total outstanding?"
Claude calls overdue_invoices(), filters the result to the 61-90 days and over-90-days buckets, and gives you a breakdown by customer with totals. No SQL. No Odoo module navigation. No report generation.
You: "We are running low on any stock items?"
Claude calls low_stock_report() and gets back every product below its reorder minimum across all warehouses. It presents this as a prioritised table, sorted by shortfall severity.
You: "Create a purchase requisition for 10 laptops and 5 docking stations needed by March 15th."
Claude calls create_purchase_request() with the right parameters, Odoo creates the draft requisition, and Claude confirms the requisition ID and tells you the procurement team can review it.
Production considerations
Authentication and access control. The Odoo XML-RPC user you configure should have only the permissions it needs. Create a dedicated API user in Odoo with read access to sales, inventory, and invoicing and write access only to the purchase requisition model. Do not use an admin account.
Rate limiting. The client has no built-in rate limiter. For production deployments where multiple users share the same MCP server, add a token bucket rate limiter around the execute method. A simple asyncio.Semaphore limits concurrency without complexity.
Secrets management. Move away from .env files before going to production. Use AWS Secrets Manager, HashiCorp Vault, or your cloud provider's secret store. The OdooConfig.from_env() class method makes this easy to change since you only need to replace the environment variable injection point.
Logging and observability. Every tool in this code logs at INFO when successful and lets exceptions propagate to the _handle_odoo_error wrapper. In production, ship these logs to your observability stack. Add a request ID to each tool call using ctx.request_id and include it in every log line.
Caching. Stock levels and customer lists do not change in real time. Add a simple TTL cache on search_read calls for read-only tools to reduce Odoo database load. The cachetools library works well for this.
Health check endpoint. If you are running this as an HTTP service, add a /health endpoint that calls search_count("res.users", []) and returns 200 if Odoo is reachable. Mount this alongside the MCP server on the Starlette app.
What you have built
At the end of this you have:
A typed, async Odoo client that handles authentication, connection errors, and XML-RPC quirks in one place. Seven production-ready MCP tools covering the four most common ERP automation workflows. A FastMCP server with proper lifespan management so the Odoo connection is initialised once and reused. Clean error handling that gives the model useful information to pass back to the user instead of raw stack traces.
More importantly, you have a pattern. Adding a new Odoo workflow means writing one function in the appropriate tools file and one @mcp.tool() decorated wrapper in server.py. The protocol handling, the connection management, and the transport layer are already done.
The same pattern works for any ERP with a REST or RPC API. Swap the OdooClient for a SAP client or a NetSuite client and the MCP server structure stays identical.
References
- MCP Python SDK: github.com/modelcontextprotocol/python-sdk
- MCP Protocol Specification: modelcontextprotocol.io/specification/latest
- Odoo External API Documentation: odoo.com/documentation/17.0/developer/reference/external_api.html
- FastMCP API Reference: modelcontextprotocol.github.io/python-sdk/api