Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sampo/changesets/mcp-analytics-python-sdk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'pypi/posthog': minor
---

Add `posthog.mcp`, a Python SDK for PostHog MCP analytics (install with `pip install posthog[mcp]`). `instrument(server, posthog_client)` wraps a `FastMCP` or low-level `mcp.server.Server` so every tool call, agent intent, tools/list, initialize, and failure is captured to PostHog as a `$mcp_*` event. Also adds `PostHogMCP`, a `Client` subclass for custom dispatchers, plus opt-in `context` intent capture, `identify`, `report_missing` (`get_more_tools`), and `conversation_id`. Alpha.
97 changes: 97 additions & 0 deletions examples/mcp_analytics_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Dogfood demo for the PostHog MCP analytics SDK.

Instruments a small FastMCP server and sends real ``$mcp_*`` events to PostHog so
you can watch them land in the MCP analytics dashboard.

Usage::

POSTHOG_PROJECT_API_KEY=phc_xxx python examples/mcp_analytics_demo.py
# optional: POSTHOG_HOST=https://us.i.posthog.com (default)

This drives the instrumented server's seams directly (tools/list + tool calls)
rather than spinning up a transport + client, so it's a self-contained way to
generate events.
"""

import asyncio
import os

import mcp.types as mcp_types
from mcp.server.fastmcp import FastMCP

from posthog import Posthog
from posthog.mcp import instrument
from posthog.mcp.types import MCPAnalyticsOptions, UserIdentity

API_KEY = os.environ.get("POSTHOG_PROJECT_API_KEY")
HOST = os.environ.get("POSTHOG_HOST", "https://us.i.posthog.com")
SERVER_NAME = "posthog-python-mcp-demo"


def build_server() -> FastMCP:
server = FastMCP(SERVER_NAME)

@server.tool()
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b

@server.tool()
def divide(a: int, b: int) -> float:
"""Divide a by b."""
return a / b

return server


async def main() -> None:
if not API_KEY:
raise SystemExit(
"Set POSTHOG_PROJECT_API_KEY (a phc_ project key) to run the demo."
)

posthog = Posthog(API_KEY, host=HOST)
server = build_server()
analytics = instrument(
server,
posthog,
MCPAnalyticsOptions(
identify=lambda request, extra: UserIdentity(
distinct_id="python-sdk-dogfood",
properties={"source": "posthog-python mcp demo"},
),
),
)

# tools/list -> $mcp_tools_list (+ context injection)
list_handler = server._mcp_server.request_handlers[mcp_types.ListToolsRequest]
await list_handler(mcp_types.ListToolsRequest(method="tools/list"))

# tool calls -> $mcp_initialize (lazy, once), $identify, $mcp_tool_call x3, $exception
await server._tool_manager.call_tool(
"add",
{"a": 2, "b": 3, "context": "adding two numbers to demo the python mcp sdk"},
)
await server._tool_manager.call_tool(
"divide",
{"a": 10, "b": 2, "context": "dividing values to show a successful tool call"},
)
try:
await server._tool_manager.call_tool(
"divide",
{"a": 1, "b": 0, "context": "dividing by zero to exercise error capture"},
)
except Exception:
pass

# custom event via the handle
await analytics.capture("demo_feedback", {"rating": 5})

await analytics.flush() # await in-flight auto-capture tasks (no racy sleep)
posthog.flush()
posthog.shutdown()
print(f"Sent MCP analytics events for server '{SERVER_NAME}' to {HOST}")


if __name__ == "__main__":
asyncio.run(main())
191 changes: 191 additions & 0 deletions posthog/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Portions of this package are derived from MCPCat/mcpcat-typescript-sdk
# Copyright (c) 2025 MCPcat
# Licensed under the MIT License: https://github.com/MCPCat/mcpcat-typescript-sdk/blob/main/LICENSE

"""PostHog MCP analytics SDK — product analytics for Model Context Protocol servers.

Wrap a Python MCP server (``FastMCP`` or low-level ``mcp.server.Server``) so every
tool call, agent intent, and failure is captured to PostHog as a ``$mcp_*`` event::

from posthog import Posthog
from posthog.mcp import instrument
from mcp.server.fastmcp import FastMCP

posthog = Posthog("phc_...", host="https://us.i.posthog.com")
server = FastMCP("my-server")
analytics = instrument(server, posthog)

Requires the optional ``mcp`` dependency: ``pip install posthog[mcp]``.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Any, Optional

try:
import mcp # noqa: F401
except ImportError:
raise ModuleNotFoundError(
"Please install the MCP SDK to use PostHog MCP analytics: 'pip install posthog[mcp]'"
)

from posthog.client import Client

from .capture import capture_event
from .compatibility import is_fastmcp, is_fastmcp_v2, is_low_level_server
from .constants import (
POSTHOG_MCP_ANALYTICS_SOURCE,
PostHogMCPAnalyticsEvent,
PostHogMCPAnalyticsProperty,
)
from .event_types import MCPAnalyticsEventType
from .instrument_fastmcp import instrument_fastmcp
from .instrument_lowlevel import instrument_fastmcp_v2, instrument_low_level
from .instrumentation import drain_pending
from .internal import (
MCPAnalyticsData,
get_server_tracking_data,
set_server_tracking_data,
)
from .logger import log, set_logger
from .posthog_mcp import PostHogMCP
from .session import derive_session_id_from_mcp_session, new_session_id
from .sink import McpEventSink
from .tools import get_more_tools_result
from .types import (
CaptureEventData,
MCPAnalyticsContextOptions,
MCPAnalyticsOptions,
PreparedToolCall,
UserIdentity,
)
from .version import __version__

__all__ = [
"instrument",
"McpAnalytics",
"PostHogMCP",
"MCPAnalyticsOptions",
"MCPAnalyticsContextOptions",
"UserIdentity",
"CaptureEventData",
"PreparedToolCall",
"get_more_tools_result",
"derive_session_id_from_mcp_session",
"set_logger",
"POSTHOG_MCP_ANALYTICS_SOURCE",
"PostHogMCPAnalyticsEvent",
"PostHogMCPAnalyticsProperty",
"__version__",
]


class McpAnalytics:
"""Handle returned by :func:`instrument`. Use it to capture custom events for
the instrumented server without passing the server object around."""

def __init__(self, key: Any) -> None:
self._key = key

async def capture(self, event: str, properties: Optional[dict] = None) -> None:
"""Capture a custom event for this server. ``event`` is sent verbatim (a
customer-defined event, so it is not ``$``-prefixed)."""
if not isinstance(event, str) or not event:
raise ValueError(
'capture() requires an event name, e.g. await analytics.capture("feedback_submitted")'
)
data = get_server_tracking_data(self._key)
if data is None:
return
coro = capture_event(
data,
{
"session_id": data.session_id,
"event_type": MCPAnalyticsEventType.CUSTOM,
"event_name": event,
"timestamp": datetime.now(timezone.utc),
"properties": properties,
},
)
if coro is not None:
await coro

async def flush(self) -> None:
"""Await in-flight auto-captured events scheduled on the current event loop.
Call this before ``posthog.shutdown()`` on exit so trailing tool-call events
aren't dropped. (Then call ``posthog.flush()``/``shutdown()`` to send them.)"""
await drain_pending()


class _NoopAnalytics(McpAnalytics):
def __init__(self) -> None: # noqa: D401 - graceful degradation handle
super().__init__(None)

async def capture(self, event: str, properties: Optional[dict] = None) -> None:
return None


def _resolve_client(posthog_client: Optional[Client]) -> Optional[Client]:
if posthog_client is not None:
return posthog_client
try:
from posthog import setup

return setup()
except Exception: # noqa: BLE001
return None


def instrument(
server: Any,
posthog_client: Optional[Client] = None,
options: Optional[MCPAnalyticsOptions] = None,
) -> McpAnalytics:
"""Instrument an MCP server so PostHog auto-captures tool calls, tool listings,
initialize, identity, and exceptions. Returns a handle whose ``capture()``
records custom events.

Idempotent per server instance — a second call reuses the existing tracking
state instead of double-wrapping. Degrades to a no-op handle on any failure so
the host application keeps working.

:param server: A ``FastMCP`` server (official ``mcp.server.fastmcp`` or jlowin's
``fastmcp`` 2.0) or a low-level ``mcp.server.Server``.
:param posthog_client: A posthog ``Client`` you construct and own (call
``shutdown()`` on exit to flush). Falls back to the global client.
:param options: Optional :class:`MCPAnalyticsOptions`.
"""
opts = options or MCPAnalyticsOptions()
try:
if opts.logger:
set_logger(opts.logger)

client = _resolve_client(posthog_client)
if client is None:
log("Warning: no PostHog client available; MCP events will not be sent.")

if get_server_tracking_data(server) is not None:
log("instrument() - server already instrumented, skipping initialization")
return McpAnalytics(server)

sink = McpEventSink(client) if client is not None else None
data = MCPAnalyticsData(options=opts, sink=sink, session_id=new_session_id())
set_server_tracking_data(server, data)

if is_fastmcp(server):
instrument_fastmcp(server, data)
elif is_fastmcp_v2(server):
instrument_fastmcp_v2(server, data)
elif is_low_level_server(server):
instrument_low_level(server, data)
else:
raise TypeError(
f"Unsupported server type: {type(server)!r}. Pass a FastMCP (official or jlowin's "
"fastmcp 2.0) or a low-level mcp.server.Server."
)

return McpAnalytics(server)
except Exception as error: # noqa: BLE001
log(f"Warning: failed to instrument server - {error}")
return _NoopAnalytics()
77 changes: 77 additions & 0 deletions posthog/mcp/capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Portions of this package are derived from MCPCat/mcpcat-typescript-sdk
# Copyright (c) 2025 MCPcat
# Licensed under the MIT License: https://github.com/MCPCat/mcpcat-typescript-sdk/blob/main/LICENSE

"""Materialize an ``McpEvent`` against per-server tracking data + resolved
identity, then hand it to the ``McpEventSink`` for the
sanitize/truncate/before_send/capture pipeline."""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Any, Coroutine, Dict, Optional

from .event_types import MCPAnalyticsEventType
from .internal import MCPAnalyticsData
from .logger import log
from .sink import McpCaptureOptions
from .version import __version__


def capture_event(
data: MCPAnalyticsData, event_input: Dict[str, Any]
) -> Optional[Coroutine[Any, Any, None]]:
"""Enrich an event with session/identity/server/sdk metadata and return the
sink's capture coroutine (so the custom-event handle can await it). Auto-capture
callers schedule it and ignore the result. Returns ``None`` if no sink is attached."""
sink = data.sink
if sink is None:
return None

session_id = event_input.get("session_id") or data.session_id
actor = data.identified_sessions.get(session_id)

timestamp = event_input.get("timestamp") or datetime.now(timezone.utc)
duration = event_input.get("duration")
if duration is None and event_input.get("timestamp"):
duration = (datetime.now(timezone.utc) - timestamp).total_seconds() * 1000

full_event: Dict[str, Any] = {
"id": event_input.get("id") or "",
"session_id": session_id,
"event_type": event_input.get("event_type") or MCPAnalyticsEventType.CUSTOM,
"event_name": event_input.get("event_name"),
"timestamp": timestamp,
"duration": duration,
"sdk_language": "Python",
"sdk_version": __version__,
"server_name": data.server_name,
"server_version": data.server_version,
"client_name": event_input.get("client_name"),
"client_version": event_input.get("client_version"),
"identify_actor_given_id": actor.distinct_id if actor else None,
"identify_actor_data": (actor.properties or {}) if actor else {},
"groups": actor.groups if actor else None,
"resource_name": event_input.get("resource_name"),
"tool_category": event_input.get("tool_category"),
"tool_description": event_input.get("tool_description"),
"listed_tool_names": event_input.get("listed_tool_names"),
"parameters": event_input.get("parameters"),
"response": event_input.get("response"),
"user_intent": event_input.get("user_intent"),
"user_intent_source": event_input.get("user_intent_source"),
"is_error": event_input.get("is_error"),
"error": event_input.get("error"),
"conversation_id": event_input.get("conversation_id"),
"properties": event_input.get("properties"),
}

options = McpCaptureOptions(
enable_exception_autocapture=data.options.enable_exception_autocapture,
before_send=data.options.before_send,
)
return sink.capture(full_event, options)


def log_capture_skipped() -> None:
log("Warning: Server tracking data not found. Event will not be published.")
Loading
Loading