aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorPrasanna <[email protected]>2026-01-10 15:19:31 -0800
committerGitHub <[email protected]>2026-01-10 15:19:31 -0800
commitd015036b05133a0a836db51e1fd7157120947302 (patch)
treeda3cfdd2f1fe2d0a6e6bbd9be7bfd360f9889d1d /packages
parentdocs: add S3 connector documentation (#657) (diff)
downloadsupermemory-d015036b05133a0a836db51e1fd7157120947302.tar.xz
supermemory-d015036b05133a0a836db51e1fd7157120947302.zip
pipecat-sdk (#663)
Diffstat (limited to 'packages')
-rw-r--r--packages/pipecat-sdk-python/Agents.md72
-rw-r--r--packages/pipecat-sdk-python/README.md159
-rw-r--r--packages/pipecat-sdk-python/pyproject.toml77
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py59
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py58
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/service.py364
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py66
-rw-r--r--packages/pipecat-sdk-python/tests/__init__.py1
8 files changed, 856 insertions, 0 deletions
diff --git a/packages/pipecat-sdk-python/Agents.md b/packages/pipecat-sdk-python/Agents.md
new file mode 100644
index 00000000..0d4f687c
--- /dev/null
+++ b/packages/pipecat-sdk-python/Agents.md
@@ -0,0 +1,72 @@
+# AGENTS.md
+
+## Overview
+
+This package adds persistent memory to Pipecat voice AI pipelines using Supermemory.
+
+**Tech Stack:** Python >=3.10, Pipecat, Supermemory SDK
+
+## Commands
+
+```bash
+pip install supermemory-pipecat
+```
+
+## Integration Pattern
+
+Place `SupermemoryPipecatService` between context aggregator and LLM in the pipeline:
+
+```python
+from supermemory_pipecat import SupermemoryPipecatService
+
+memory = SupermemoryPipecatService(
+ user_id="user-123", # Required: identifies the user
+ session_id="session-456", # Optional: groups conversations
+)
+
+pipeline = Pipeline([
+ transport.input(),
+ stt,
+ context_aggregator.user(),
+ memory, # <- Memory service here
+ llm,
+ tts,
+ transport.output(),
+ context_aggregator.assistant(),
+])
+```
+
+## Configuration
+
+```python
+memory = SupermemoryPipecatService(
+ api_key="...", # Or use SUPERMEMORY_API_KEY env var
+ user_id="user-123",
+ session_id="session-456",
+ params=SupermemoryPipecatService.InputParams(
+ search_limit=10, # Max memories to retrieve
+ search_threshold=0.1, # Similarity threshold 0.0-1.0
+ mode="full", # "profile" | "query" | "full"
+ system_prompt="Based on previous conversations:\n\n",
+ ),
+)
+```
+
+## Memory Modes
+
+| Mode | Retrieves | Use When |
+|------|-----------|----------|
+| `"profile"` | User profile only | Personalization without search |
+| `"query"` | Search results only | Finding relevant past context |
+| `"full"` | Profile + search | Complete memory (default) |
+
+## Environment Variables
+
+- `SUPERMEMORY_API_KEY` - Supermemory API key
+- `OPENAI_API_KEY` - For OpenAI services (STT/LLM/TTS)
+
+## Boundaries
+
+- Always place memory service after `context_aggregator.user()` and before `llm`
+- Always provide `user_id` - it's required
+- Never hardcode API keys in code - use environment variables
diff --git a/packages/pipecat-sdk-python/README.md b/packages/pipecat-sdk-python/README.md
new file mode 100644
index 00000000..5f6e8478
--- /dev/null
+++ b/packages/pipecat-sdk-python/README.md
@@ -0,0 +1,159 @@
+# Supermemory Pipecat SDK
+
+Memory-enhanced conversational AI pipelines with [Supermemory](https://supermemory.ai) and [Pipecat](https://github.com/pipecat-ai/pipecat).
+
+## Installation
+
+```bash
+pip install supermemory-pipecat
+```
+
+## Quick Start
+
+```python
+import os
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator
+from supermemory_pipecat import SupermemoryPipecatService
+
+# Create memory service
+memory = SupermemoryPipecatService(
+ api_key=os.getenv("SUPERMEMORY_API_KEY"),
+ user_id="user-123", # Required: used as container_tag
+ session_id="conversation-456", # Optional: groups memories by session
+)
+
+# Create pipeline with memory
+pipeline = Pipeline([
+ transport.input(),
+ stt,
+ user_context,
+ memory, # Automatically retrieves and injects relevant memories
+ llm,
+ transport.output(),
+])
+```
+
+## Configuration
+
+### Parameters
+
+| Parameter | Type | Required | Description |
+| ------------ | ----------- | -------- | ---------------------------------------------------------- |
+| `user_id` | str | **Yes** | User identifier - used as container_tag for memory scoping |
+| `session_id` | str | No | Session/conversation ID for grouping memories |
+| `api_key` | str | No | Supermemory API key (or set `SUPERMEMORY_API_KEY` env var) |
+| `params` | InputParams | No | Advanced configuration |
+| `base_url` | str | No | Custom API endpoint |
+
+### Advanced Configuration
+
+```python
+from supermemory_pipecat import SupermemoryPipecatService
+
+memory = SupermemoryPipecatService(
+ user_id="user-123",
+ session_id="conv-456",
+ params=SupermemoryPipecatService.InputParams(
+ search_limit=10, # Max memories to retrieve
+ search_threshold=0.1, # Similarity threshold
+ mode="full", # "profile", "query", or "full"
+ system_prompt="Based on previous conversations, I recall:\n\n",
+ ),
+)
+```
+
+### Memory Modes
+
+| Mode | Static Profile | Dynamic Profile | Search Results |
+| ----------- | -------------- | --------------- | -------------- |
+| `"profile"` | Yes | Yes | No |
+| `"query"` | No | No | Yes |
+| `"full"` | Yes | Yes | Yes |
+
+## How It Works
+
+1. **Intercepts context frames** - Listens for `LLMContextFrame` in the pipeline
+2. **Tracks conversation** - Maintains clean conversation history (no injected memories)
+3. **Retrieves memories** - Queries `/v4/profile` API with user's message
+4. **Injects memories** - Formats and adds to LLM context as system message
+5. **Stores messages** - Sends last user message to Supermemory (background, non-blocking)
+
+### What Gets Stored
+
+Only the last user message is sent to Supermemory:
+
+```
+User: What's the weather like today?
+```
+
+Stored as:
+
+```json
+{
+ "content": "User: What's the weather like today?",
+ "container_tags": ["user-123"],
+ "custom_id": "conversation-456",
+ "metadata": { "platform": "pipecat" }
+}
+```
+
+## Full Example
+
+```python
+import asyncio
+import os
+from fastapi import FastAPI, WebSocket
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.task import PipelineTask
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.services.openai import (
+ OpenAILLMService,
+ OpenAIUserContextAggregator,
+)
+from pipecat.transports.network.fastapi_websocket import (
+ FastAPIWebsocketTransport,
+ FastAPIWebsocketParams,
+)
+from supermemory_pipecat import SupermemoryPipecatService
+
+app = FastAPI()
+
+async def websocket_endpoint(websocket: WebSocket):
+ await websocket.accept()
+
+ transport = FastAPIWebsocketTransport(
+ websocket=websocket,
+ params=FastAPIWebsocketParams(audio_out_enabled=True),
+ )
+
+ user_context = OpenAIUserContextAggregator()
+
+ # Supermemory memory service
+ memory = SupermemoryPipecatService(
+ user_id="alice",
+ session_id="session-123",
+ )
+
+ llm = OpenAILLMService(
+ api_key=os.getenv("OPENAI_API_KEY"),
+ model="gpt-4",
+ )
+
+ pipeline = Pipeline([
+ transport.input(),
+ user_context,
+ memory,
+ llm,
+ transport.output(),
+ ])
+
+ runner = PipelineRunner()
+ task = PipelineTask(pipeline)
+ await runner.run(task)
+```
+
+## License
+
+MIT
diff --git a/packages/pipecat-sdk-python/pyproject.toml b/packages/pipecat-sdk-python/pyproject.toml
new file mode 100644
index 00000000..21ce0cdf
--- /dev/null
+++ b/packages/pipecat-sdk-python/pyproject.toml
@@ -0,0 +1,77 @@
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[project]
+name = "supermemory-pipecat"
+version = "0.1.0"
+description = "Supermemory integration for Pipecat - memory-enhanced conversational AI pipelines"
+readme = "README.md"
+license = "MIT"
+requires-python = ">=3.10"
+authors = [
+ { name = "Supermemory", email = "[email protected]" }
+]
+keywords = [
+ "supermemory",
+ "pipecat",
+ "memory",
+ "conversational-ai",
+ "llm",
+ "voice-ai",
+]
+classifiers = [
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: MIT License",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Topic :: Scientific/Engineering :: Artificial Intelligence",
+]
+dependencies = [
+ "pipecat-ai>=0.0.98",
+ "supermemory>=3.16.0",
+ "pydantic>=2.10.0",
+ "loguru>=0.7.3",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.3.5",
+ "pytest-asyncio>=0.24.0",
+ "mypy>=1.14.1",
+ "black>=24.8.0",
+ "isort>=5.13.2",
+]
+
+[project.urls]
+Homepage = "https://supermemory.ai"
+Documentation = "https://docs.supermemory.ai"
+Repository = "https://github.com/supermemoryai/supermemory"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/supermemory_pipecat"]
+
+[tool.hatch.build.targets.sdist]
+include = [
+ "/src",
+ "/tests",
+ "/README.md",
+ "/LICENSE",
+]
+
+[tool.black]
+line-length = 100
+target-version = ["py310"]
+
+[tool.isort]
+profile = "black"
+line_length = 100
+
+[tool.mypy]
+python_version = "3.10"
+warn_return_any = true
+warn_unused_configs = true
+disallow_untyped_defs = true
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py
new file mode 100644
index 00000000..35b888cc
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py
@@ -0,0 +1,59 @@
+"""Supermemory Pipecat SDK - Memory-enhanced conversational AI pipelines.
+
+This package provides seamless integration between Supermemory and Pipecat,
+enabling persistent memory and context enhancement for voice AI applications.
+
+Example:
+ ```python
+ from supermemory_pipecat import SupermemoryPipecatService
+
+ # Create memory service
+ memory = SupermemoryPipecatService(
+ api_key=os.getenv("SUPERMEMORY_API_KEY"),
+ user_id="user-123",
+ )
+
+ # Add to Pipecat pipeline
+ pipeline = Pipeline([
+ transport.input(),
+ stt,
+ user_context,
+ memory, # Automatically retrieves and injects memories
+ llm,
+ transport.output(),
+ ])
+ ```
+"""
+
+from .service import SupermemoryPipecatService
+from .exceptions import (
+ SupermemoryPipecatError,
+ ConfigurationError,
+ MemoryRetrievalError,
+ MemoryStorageError,
+ APIError,
+ NetworkError,
+)
+from .utils import (
+ get_last_user_message,
+ deduplicate_memories,
+ format_memories_to_text,
+)
+
+__version__ = "0.1.0"
+
+__all__ = [
+ # Main service
+ "SupermemoryPipecatService",
+ # Exceptions
+ "SupermemoryPipecatError",
+ "ConfigurationError",
+ "MemoryRetrievalError",
+ "MemoryStorageError",
+ "APIError",
+ "NetworkError",
+ # Utilities
+ "get_last_user_message",
+ "deduplicate_memories",
+ "format_memories_to_text",
+]
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py
new file mode 100644
index 00000000..1de8094f
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py
@@ -0,0 +1,58 @@
+"""Custom exceptions for Supermemory Pipecat integration."""
+
+from typing import Optional
+
+
+class SupermemoryPipecatError(Exception):
+ """Base exception for all Supermemory Pipecat errors."""
+
+ def __init__(self, message: str, original_error: Optional[Exception] = None):
+ super().__init__(message)
+ self.message = message
+ self.original_error = original_error
+
+ def __str__(self) -> str:
+ if self.original_error:
+ return f"{self.message}: {self.original_error}"
+ return self.message
+
+
+class ConfigurationError(SupermemoryPipecatError):
+ """Raised when there are configuration issues (e.g., missing API key, invalid params)."""
+
+
+class MemoryRetrievalError(SupermemoryPipecatError):
+ """Raised when memory retrieval operations fail."""
+
+
+class MemoryStorageError(SupermemoryPipecatError):
+ """Raised when memory storage operations fail."""
+
+
+class APIError(SupermemoryPipecatError):
+ """Raised when Supermemory API requests fail."""
+
+ def __init__(
+ self,
+ message: str,
+ status_code: Optional[int] = None,
+ response_text: Optional[str] = None,
+ original_error: Optional[Exception] = None,
+ ):
+ super().__init__(message, original_error)
+ self.status_code = status_code
+ self.response_text = response_text
+
+ def __str__(self) -> str:
+ parts = [self.message]
+ if self.status_code:
+ parts.append(f"Status: {self.status_code}")
+ if self.response_text:
+ parts.append(f"Response: {self.response_text}")
+ if self.original_error:
+ parts.append(f"Cause: {self.original_error}")
+ return " | ".join(parts)
+
+
+class NetworkError(SupermemoryPipecatError):
+ """Raised when network operations fail."""
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
new file mode 100644
index 00000000..ab07e672
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
@@ -0,0 +1,364 @@
+"""Supermemory Pipecat service integration.
+
+This module provides a memory service that integrates with Supermemory to store
+and retrieve conversational memories, enhancing LLM context with relevant
+historical information.
+"""
+
+import asyncio
+import json
+import os
+from typing import Any, Dict, List, Literal, Optional
+
+from loguru import logger
+from pydantic import BaseModel, Field
+
+from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame
+from pipecat.processors.aggregators.llm_context import LLMContext
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
+from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
+
+from .exceptions import (
+ ConfigurationError,
+ MemoryRetrievalError,
+)
+from .utils import (
+ deduplicate_memories,
+ format_memories_to_text,
+ get_last_user_message,
+)
+
+try:
+ import supermemory
+except ImportError:
+ supermemory = None # type: ignore
+
+
+class SupermemoryPipecatService(FrameProcessor):
+ """A memory service that integrates Supermemory with Pipecat pipelines.
+
+ This service intercepts message frames in the pipeline, retrieves relevant
+ memories from Supermemory, enhances the context, and optionally stores
+ new conversations.
+
+ Example:
+ ```python
+ from supermemory_pipecat import SupermemoryPipecatService
+
+ memory = SupermemoryPipecatService(
+ api_key=os.getenv("SUPERMEMORY_API_KEY"),
+ user_id="user-123",
+ )
+
+ pipeline = Pipeline([
+ transport.input(),
+ stt,
+ user_context,
+ memory, # Memory service enhances context here
+ llm,
+ transport.output(),
+ ])
+ ```
+ """
+
+ class InputParams(BaseModel):
+ """Configuration parameters for Supermemory Pipecat service.
+
+ Parameters:
+ search_limit: Maximum number of memories to retrieve per query.
+ search_threshold: Minimum similarity threshold for memory retrieval.
+ system_prompt: Prefix text for memory context messages.
+ mode: Memory retrieval mode - "profile", "query", or "full".
+ """
+
+ search_limit: int = Field(default=10, ge=1)
+ search_threshold: float = Field(default=0.1, ge=0.0, le=1.0)
+ system_prompt: str = Field(default="Based on previous conversations, I recall:\n\n")
+ mode: Literal["profile", "query", "full"] = Field(default="full")
+
+ def __init__(
+ self,
+ *,
+ api_key: Optional[str] = None,
+ user_id: str,
+ session_id: Optional[str] = None,
+ params: Optional[InputParams] = None,
+ base_url: Optional[str] = None,
+ ):
+ """Initialize the Supermemory Pipecat service.
+
+ Args:
+ api_key: The API key for Supermemory. Falls back to SUPERMEMORY_API_KEY env var.
+ user_id: The user ID - used as container_tag for memory scoping (REQUIRED).
+ session_id: Session/conversation ID for grouping memories (optional).
+ params: Configuration parameters for memory retrieval and storage.
+ base_url: Optional custom base URL for Supermemory API.
+
+ Raises:
+ ConfigurationError: If API key is missing or user_id not provided.
+ """
+ super().__init__()
+
+ # Get API key
+ self.api_key = api_key or os.getenv("SUPERMEMORY_API_KEY")
+ if not self.api_key:
+ raise ConfigurationError(
+ "API key is required. Provide api_key parameter or set SUPERMEMORY_API_KEY environment variable."
+ )
+
+ # user_id is required and used directly as container_tag
+ if not user_id:
+ raise ConfigurationError("user_id is required")
+
+ self.user_id = user_id
+ self.container_tag = user_id # container_tag = user_id directly
+ self.session_id = session_id # optional session/conversation ID
+
+ # Configuration
+ self.params = params or SupermemoryPipecatService.InputParams()
+
+ # Initialize async Supermemory client
+ self._supermemory_client = None
+ if supermemory is not None:
+ try:
+ self._supermemory_client = supermemory.AsyncSupermemory(
+ api_key=self.api_key,
+ base_url=base_url,
+ )
+ except Exception as e:
+ logger.warning(f"Failed to initialize Supermemory client: {e}")
+
+ # Track how many messages we've already sent to memory
+ self._messages_sent_count: int = 0
+
+ # Track last query to avoid duplicate processing
+ self._last_query: Optional[str] = None
+
+ logger.info(
+ f"Initialized SupermemoryPipecatService with "
+ f"user_id={user_id}, session_id={session_id}"
+ )
+
+ async def _retrieve_memories(self, query: str) -> Dict[str, Any]:
+ """Retrieve relevant memories from Supermemory.
+
+ Args:
+ query: The query to search for relevant memories.
+
+ Returns:
+ Dictionary containing profile and search results.
+ """
+ if self._supermemory_client is None:
+ raise MemoryRetrievalError(
+ "Supermemory client not initialized. Install with: pip install supermemory"
+ )
+
+ try:
+ logger.debug(f"Retrieving memories for query: {query[:100]}...")
+
+ # Build kwargs for profile request
+ kwargs: Dict[str, Any] = {
+ "container_tag": self.container_tag,
+ }
+
+ # Add query for search modes
+ if self.params.mode != "profile" and query:
+ kwargs["q"] = query
+ kwargs["threshold"] = self.params.search_threshold
+ # Pass limit via extra_body since SDK doesn't have direct param
+ kwargs["extra_body"] = {"limit": self.params.search_limit}
+
+ # Use SDK's profile method
+ response = await self._supermemory_client.profile(**kwargs)
+
+ # Extract memory strings from SDK response
+ search_results = []
+ if response.search_results and response.search_results.results:
+ search_results = [r["memory"] for r in response.search_results.results]
+
+ data: Dict[str, Any] = {
+ "profile": {
+ "static": response.profile.static,
+ "dynamic": response.profile.dynamic,
+ },
+ "search_results": search_results,
+ }
+
+ logger.debug(
+ f"Retrieved memories - static: {len(data['profile']['static'])}, "
+ f"dynamic: {len(data['profile']['dynamic'])}, "
+ f"search: {len(data['search_results'])}"
+ )
+ return data
+
+ except Exception as e:
+ logger.error(f"Error retrieving memories: {e}")
+ raise MemoryRetrievalError("Failed to retrieve memories", e)
+
+ async def _store_messages(self, messages: List[Dict[str, Any]]) -> None:
+ """Store messages in Supermemory.
+
+ Args:
+ messages: List of message dicts with 'role' and 'content' keys.
+ """
+ if self._supermemory_client is None:
+ logger.warning("Supermemory client not initialized, skipping memory storage")
+ return
+
+ if not messages:
+ return
+
+ try:
+ # Format messages as JSON array
+ formatted_content = json.dumps(messages)
+
+ logger.debug(f"Storing {len(messages)} messages to Supermemory")
+
+ # Build storage params
+ add_params: Dict[str, Any] = {
+ "content": formatted_content,
+ "container_tags": [self.container_tag],
+ "metadata": {"platform": "pipecat"},
+ }
+ if self.session_id:
+ add_params["custom_id"] = f"{self.session_id}"
+
+ await self._supermemory_client.memories.add(**add_params)
+ logger.debug(f"Successfully stored {len(messages)} messages in Supermemory")
+
+ except Exception as e:
+ # Don't fail the pipeline on storage errors
+ logger.error(f"Error storing messages in Supermemory: {e}")
+
+ def _enhance_context_with_memories(
+ self,
+ context: LLMContext,
+ query: str,
+ memories_data: Dict[str, Any],
+ ) -> None:
+ """Enhance the LLM context with relevant memories.
+
+ Args:
+ context: The LLM context to enhance.
+ query: The query used for retrieval.
+ memories_data: Raw memory data from Supermemory API.
+ """
+ # Skip if same query (avoid duplicate processing)
+ if self._last_query == query:
+ return
+
+ self._last_query = query
+
+ # Extract and deduplicate memories
+ profile = memories_data["profile"]
+
+ deduplicated = deduplicate_memories(
+ static=profile["static"],
+ dynamic=profile["dynamic"],
+ search_results=memories_data["search_results"],
+ )
+
+ # Check if we have any memories
+ total_memories = (
+ len(deduplicated["static"])
+ + len(deduplicated["dynamic"])
+ + len(deduplicated["search_results"])
+ )
+
+ if total_memories == 0:
+ logger.debug("No memories found to inject")
+ return
+
+ # Format memories based on mode
+ include_profile = self.params.mode in ("profile", "full")
+ include_search = self.params.mode in ("query", "full")
+
+ memory_text = format_memories_to_text(
+ deduplicated,
+ system_prompt=self.params.system_prompt,
+ include_static=include_profile,
+ include_dynamic=include_profile,
+ include_search=include_search,
+ )
+
+ if not memory_text:
+ return
+
+ # Inject memories into context as user message
+ context.add_message({"role": "user", "content": memory_text})
+
+ logger.debug(f"Enhanced context with {total_memories} memories")
+
+ async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
+ """Process incoming frames, intercept context frames for memory integration.
+
+ Args:
+ frame: The incoming frame to process.
+ direction: The direction of frame flow in the pipeline.
+ """
+ await super().process_frame(frame, direction)
+
+ context = None
+ messages = None
+
+ # Handle different frame types
+ if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
+ context = frame.context
+ elif isinstance(frame, LLMMessagesFrame):
+ messages = frame.messages
+ context = LLMContext(messages)
+
+ if context:
+ try:
+ # Get messages from context
+ context_messages = context.get_messages()
+ latest_user_message = get_last_user_message(context_messages)
+
+ if latest_user_message:
+ # Retrieve memories from Supermemory
+ try:
+ memories_data = await self._retrieve_memories(latest_user_message)
+ self._enhance_context_with_memories(
+ context, latest_user_message, memories_data
+ )
+ except MemoryRetrievalError as e:
+ logger.warning(f"Memory retrieval failed, continuing without memories: {e}")
+
+ # Store unsent messages (user and assistant only, skip system)
+ storable_messages = [
+ msg for msg in context_messages if msg["role"] in ("user", "assistant")
+ ]
+ unsent_messages = storable_messages[self._messages_sent_count :]
+
+ if unsent_messages:
+ asyncio.create_task(self._store_messages(unsent_messages))
+ self._messages_sent_count = len(storable_messages)
+
+ # Pass the frame downstream
+ if messages is not None:
+ # For LLMMessagesFrame, create new frame with enhanced messages
+ await self.push_frame(LLMMessagesFrame(context.get_messages()))
+ else:
+ # For context frames, pass the enhanced frame
+ await self.push_frame(frame)
+
+ except Exception as e:
+ logger.error(f"Error processing frame with Supermemory: {e}")
+ # Still pass the original frame through on error
+ await self.push_frame(frame)
+ else:
+ # Non-context frames pass through unchanged
+ await self.push_frame(frame, direction)
+
+ def get_messages_sent_count(self) -> int:
+ """Get the count of messages sent to memory.
+
+ Returns:
+ Number of messages already sent to Supermemory.
+ """
+ return self._messages_sent_count
+
+ def reset_memory_tracking(self) -> None:
+ """Reset memory tracking for a new conversation."""
+ self._messages_sent_count = 0
+ self._last_query = None
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py
new file mode 100644
index 00000000..d0f0e461
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py
@@ -0,0 +1,66 @@
+"""Utility functions for Supermemory Pipecat integration."""
+
+from typing import Dict, List
+
+
+def get_last_user_message(messages: List[Dict[str, str]]) -> str | None:
+ """Extract the last user message content from a list of messages."""
+ for msg in reversed(messages):
+ if msg["role"] == "user":
+ return msg["content"]
+ return None
+
+
+def deduplicate_memories(
+ static: List[str],
+ dynamic: List[str],
+ search_results: List[str],
+) -> Dict[str, List[str]]:
+ """Deduplicate memories. Priority: static > dynamic > search."""
+ seen = set()
+
+ def unique(memories):
+ out = []
+ for m in memories:
+ if m not in seen:
+ seen.add(m)
+ out.append(m)
+ return out
+
+ return {
+ "static": unique(static),
+ "dynamic": unique(dynamic),
+ "search_results": unique(search_results),
+ }
+
+
+def format_memories_to_text(
+ memories: Dict[str, List[str]],
+ system_prompt: str = "Based on previous conversations, I recall:\n\n",
+ include_static: bool = True,
+ include_dynamic: bool = True,
+ include_search: bool = True,
+) -> str:
+ """Format deduplicated memories into a text string for injection."""
+ sections = []
+
+ static = memories["static"]
+ dynamic = memories["dynamic"]
+ search_results = memories["search_results"]
+
+ if include_static and static:
+ sections.append("## User Profile (Persistent)")
+ sections.append("\n".join(f"- {item}" for item in static))
+
+ if include_dynamic and dynamic:
+ sections.append("## Recent Context")
+ sections.append("\n".join(f"- {item}" for item in dynamic))
+
+ if include_search and search_results:
+ sections.append("## Relevant Memories")
+ sections.append("\n".join(f"- {item}" for item in search_results))
+
+ if not sections:
+ return ""
+
+ return f"{system_prompt}\n" + "\n\n".join(sections)
diff --git a/packages/pipecat-sdk-python/tests/__init__.py b/packages/pipecat-sdk-python/tests/__init__.py
new file mode 100644
index 00000000..e20d656a
--- /dev/null
+++ b/packages/pipecat-sdk-python/tests/__init__.py
@@ -0,0 +1 @@
+"""Tests for Supermemory Pipecat SDK."""