aboutsummaryrefslogtreecommitdiff
path: root/packages/pipecat-sdk-python/src/supermemory_pipecat
diff options
context:
space:
mode:
authorPrasanna <[email protected]>2026-01-10 15:19:31 -0800
committerGitHub <[email protected]>2026-01-10 15:19:31 -0800
commitd015036b05133a0a836db51e1fd7157120947302 (patch)
treeda3cfdd2f1fe2d0a6e6bbd9be7bfd360f9889d1d /packages/pipecat-sdk-python/src/supermemory_pipecat
parentdocs: add S3 connector documentation (#657) (diff)
downloadsupermemory-d015036b05133a0a836db51e1fd7157120947302.tar.xz
supermemory-d015036b05133a0a836db51e1fd7157120947302.zip
pipecat-sdk (#663)
Diffstat (limited to 'packages/pipecat-sdk-python/src/supermemory_pipecat')
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py59
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py58
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/service.py364
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py66
4 files changed, 547 insertions, 0 deletions
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py
new file mode 100644
index 00000000..35b888cc
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py
@@ -0,0 +1,59 @@
+"""Supermemory Pipecat SDK - Memory-enhanced conversational AI pipelines.
+
+This package provides seamless integration between Supermemory and Pipecat,
+enabling persistent memory and context enhancement for voice AI applications.
+
+Example:
+ ```python
+ from supermemory_pipecat import SupermemoryPipecatService
+
+ # Create memory service
+ memory = SupermemoryPipecatService(
+ api_key=os.getenv("SUPERMEMORY_API_KEY"),
+ user_id="user-123",
+ )
+
+ # Add to Pipecat pipeline
+ pipeline = Pipeline([
+ transport.input(),
+ stt,
+ user_context,
+ memory, # Automatically retrieves and injects memories
+ llm,
+ transport.output(),
+ ])
+ ```
+"""
+
+from .service import SupermemoryPipecatService
+from .exceptions import (
+ SupermemoryPipecatError,
+ ConfigurationError,
+ MemoryRetrievalError,
+ MemoryStorageError,
+ APIError,
+ NetworkError,
+)
+from .utils import (
+ get_last_user_message,
+ deduplicate_memories,
+ format_memories_to_text,
+)
+
+__version__ = "0.1.0"
+
+__all__ = [
+ # Main service
+ "SupermemoryPipecatService",
+ # Exceptions
+ "SupermemoryPipecatError",
+ "ConfigurationError",
+ "MemoryRetrievalError",
+ "MemoryStorageError",
+ "APIError",
+ "NetworkError",
+ # Utilities
+ "get_last_user_message",
+ "deduplicate_memories",
+ "format_memories_to_text",
+]
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py
new file mode 100644
index 00000000..1de8094f
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py
@@ -0,0 +1,58 @@
+"""Custom exceptions for Supermemory Pipecat integration."""
+
+from typing import Optional
+
+
+class SupermemoryPipecatError(Exception):
+ """Base exception for all Supermemory Pipecat errors."""
+
+ def __init__(self, message: str, original_error: Optional[Exception] = None):
+ super().__init__(message)
+ self.message = message
+ self.original_error = original_error
+
+ def __str__(self) -> str:
+ if self.original_error:
+ return f"{self.message}: {self.original_error}"
+ return self.message
+
+
+class ConfigurationError(SupermemoryPipecatError):
+ """Raised when there are configuration issues (e.g., missing API key, invalid params)."""
+
+
+class MemoryRetrievalError(SupermemoryPipecatError):
+ """Raised when memory retrieval operations fail."""
+
+
+class MemoryStorageError(SupermemoryPipecatError):
+ """Raised when memory storage operations fail."""
+
+
+class APIError(SupermemoryPipecatError):
+ """Raised when Supermemory API requests fail."""
+
+ def __init__(
+ self,
+ message: str,
+ status_code: Optional[int] = None,
+ response_text: Optional[str] = None,
+ original_error: Optional[Exception] = None,
+ ):
+ super().__init__(message, original_error)
+ self.status_code = status_code
+ self.response_text = response_text
+
+ def __str__(self) -> str:
+ parts = [self.message]
+ if self.status_code:
+ parts.append(f"Status: {self.status_code}")
+ if self.response_text:
+ parts.append(f"Response: {self.response_text}")
+ if self.original_error:
+ parts.append(f"Cause: {self.original_error}")
+ return " | ".join(parts)
+
+
+class NetworkError(SupermemoryPipecatError):
+ """Raised when network operations fail."""
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
new file mode 100644
index 00000000..ab07e672
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
@@ -0,0 +1,364 @@
+"""Supermemory Pipecat service integration.
+
+This module provides a memory service that integrates with Supermemory to store
+and retrieve conversational memories, enhancing LLM context with relevant
+historical information.
+"""
+
+import asyncio
+import json
+import os
+from typing import Any, Dict, List, Literal, Optional
+
+from loguru import logger
+from pydantic import BaseModel, Field
+
+from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame
+from pipecat.processors.aggregators.llm_context import LLMContext
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
+from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
+
+from .exceptions import (
+ ConfigurationError,
+ MemoryRetrievalError,
+)
+from .utils import (
+ deduplicate_memories,
+ format_memories_to_text,
+ get_last_user_message,
+)
+
+try:
+ import supermemory
+except ImportError:
+ supermemory = None # type: ignore
+
+
+class SupermemoryPipecatService(FrameProcessor):
+ """A memory service that integrates Supermemory with Pipecat pipelines.
+
+ This service intercepts message frames in the pipeline, retrieves relevant
+ memories from Supermemory, enhances the context, and optionally stores
+ new conversations.
+
+ Example:
+ ```python
+ from supermemory_pipecat import SupermemoryPipecatService
+
+ memory = SupermemoryPipecatService(
+ api_key=os.getenv("SUPERMEMORY_API_KEY"),
+ user_id="user-123",
+ )
+
+ pipeline = Pipeline([
+ transport.input(),
+ stt,
+ user_context,
+ memory, # Memory service enhances context here
+ llm,
+ transport.output(),
+ ])
+ ```
+ """
+
+ class InputParams(BaseModel):
+ """Configuration parameters for Supermemory Pipecat service.
+
+ Parameters:
+ search_limit: Maximum number of memories to retrieve per query.
+ search_threshold: Minimum similarity threshold for memory retrieval.
+ system_prompt: Prefix text for memory context messages.
+ mode: Memory retrieval mode - "profile", "query", or "full".
+ """
+
+ search_limit: int = Field(default=10, ge=1)
+ search_threshold: float = Field(default=0.1, ge=0.0, le=1.0)
+ system_prompt: str = Field(default="Based on previous conversations, I recall:\n\n")
+ mode: Literal["profile", "query", "full"] = Field(default="full")
+
+ def __init__(
+ self,
+ *,
+ api_key: Optional[str] = None,
+ user_id: str,
+ session_id: Optional[str] = None,
+ params: Optional[InputParams] = None,
+ base_url: Optional[str] = None,
+ ):
+ """Initialize the Supermemory Pipecat service.
+
+ Args:
+ api_key: The API key for Supermemory. Falls back to SUPERMEMORY_API_KEY env var.
+ user_id: The user ID - used as container_tag for memory scoping (REQUIRED).
+ session_id: Session/conversation ID for grouping memories (optional).
+ params: Configuration parameters for memory retrieval and storage.
+ base_url: Optional custom base URL for Supermemory API.
+
+ Raises:
+ ConfigurationError: If API key is missing or user_id not provided.
+ """
+ super().__init__()
+
+ # Get API key
+ self.api_key = api_key or os.getenv("SUPERMEMORY_API_KEY")
+ if not self.api_key:
+ raise ConfigurationError(
+ "API key is required. Provide api_key parameter or set SUPERMEMORY_API_KEY environment variable."
+ )
+
+ # user_id is required and used directly as container_tag
+ if not user_id:
+ raise ConfigurationError("user_id is required")
+
+ self.user_id = user_id
+ self.container_tag = user_id # container_tag = user_id directly
+ self.session_id = session_id # optional session/conversation ID
+
+ # Configuration
+ self.params = params or SupermemoryPipecatService.InputParams()
+
+ # Initialize async Supermemory client
+ self._supermemory_client = None
+ if supermemory is not None:
+ try:
+ self._supermemory_client = supermemory.AsyncSupermemory(
+ api_key=self.api_key,
+ base_url=base_url,
+ )
+ except Exception as e:
+ logger.warning(f"Failed to initialize Supermemory client: {e}")
+
+ # Track how many messages we've already sent to memory
+ self._messages_sent_count: int = 0
+
+ # Track last query to avoid duplicate processing
+ self._last_query: Optional[str] = None
+
+ logger.info(
+ f"Initialized SupermemoryPipecatService with "
+ f"user_id={user_id}, session_id={session_id}"
+ )
+
+ async def _retrieve_memories(self, query: str) -> Dict[str, Any]:
+ """Retrieve relevant memories from Supermemory.
+
+ Args:
+ query: The query to search for relevant memories.
+
+ Returns:
+ Dictionary containing profile and search results.
+ """
+ if self._supermemory_client is None:
+ raise MemoryRetrievalError(
+ "Supermemory client not initialized. Install with: pip install supermemory"
+ )
+
+ try:
+ logger.debug(f"Retrieving memories for query: {query[:100]}...")
+
+ # Build kwargs for profile request
+ kwargs: Dict[str, Any] = {
+ "container_tag": self.container_tag,
+ }
+
+ # Add query for search modes
+ if self.params.mode != "profile" and query:
+ kwargs["q"] = query
+ kwargs["threshold"] = self.params.search_threshold
+ # Pass limit via extra_body since SDK doesn't have direct param
+ kwargs["extra_body"] = {"limit": self.params.search_limit}
+
+ # Use SDK's profile method
+ response = await self._supermemory_client.profile(**kwargs)
+
+ # Extract memory strings from SDK response
+ search_results = []
+ if response.search_results and response.search_results.results:
+ search_results = [r["memory"] for r in response.search_results.results]
+
+ data: Dict[str, Any] = {
+ "profile": {
+ "static": response.profile.static,
+ "dynamic": response.profile.dynamic,
+ },
+ "search_results": search_results,
+ }
+
+ logger.debug(
+ f"Retrieved memories - static: {len(data['profile']['static'])}, "
+ f"dynamic: {len(data['profile']['dynamic'])}, "
+ f"search: {len(data['search_results'])}"
+ )
+ return data
+
+ except Exception as e:
+ logger.error(f"Error retrieving memories: {e}")
+ raise MemoryRetrievalError("Failed to retrieve memories", e)
+
+ async def _store_messages(self, messages: List[Dict[str, Any]]) -> None:
+ """Store messages in Supermemory.
+
+ Args:
+ messages: List of message dicts with 'role' and 'content' keys.
+ """
+ if self._supermemory_client is None:
+ logger.warning("Supermemory client not initialized, skipping memory storage")
+ return
+
+ if not messages:
+ return
+
+ try:
+ # Format messages as JSON array
+ formatted_content = json.dumps(messages)
+
+ logger.debug(f"Storing {len(messages)} messages to Supermemory")
+
+ # Build storage params
+ add_params: Dict[str, Any] = {
+ "content": formatted_content,
+ "container_tags": [self.container_tag],
+ "metadata": {"platform": "pipecat"},
+ }
+ if self.session_id:
+ add_params["custom_id"] = f"{self.session_id}"
+
+ await self._supermemory_client.memories.add(**add_params)
+ logger.debug(f"Successfully stored {len(messages)} messages in Supermemory")
+
+ except Exception as e:
+ # Don't fail the pipeline on storage errors
+ logger.error(f"Error storing messages in Supermemory: {e}")
+
+ def _enhance_context_with_memories(
+ self,
+ context: LLMContext,
+ query: str,
+ memories_data: Dict[str, Any],
+ ) -> None:
+ """Enhance the LLM context with relevant memories.
+
+ Args:
+ context: The LLM context to enhance.
+ query: The query used for retrieval.
+ memories_data: Raw memory data from Supermemory API.
+ """
+ # Skip if same query (avoid duplicate processing)
+ if self._last_query == query:
+ return
+
+ self._last_query = query
+
+ # Extract and deduplicate memories
+ profile = memories_data["profile"]
+
+ deduplicated = deduplicate_memories(
+ static=profile["static"],
+ dynamic=profile["dynamic"],
+ search_results=memories_data["search_results"],
+ )
+
+ # Check if we have any memories
+ total_memories = (
+ len(deduplicated["static"])
+ + len(deduplicated["dynamic"])
+ + len(deduplicated["search_results"])
+ )
+
+ if total_memories == 0:
+ logger.debug("No memories found to inject")
+ return
+
+ # Format memories based on mode
+ include_profile = self.params.mode in ("profile", "full")
+ include_search = self.params.mode in ("query", "full")
+
+ memory_text = format_memories_to_text(
+ deduplicated,
+ system_prompt=self.params.system_prompt,
+ include_static=include_profile,
+ include_dynamic=include_profile,
+ include_search=include_search,
+ )
+
+ if not memory_text:
+ return
+
+ # Inject memories into context as user message
+ context.add_message({"role": "user", "content": memory_text})
+
+ logger.debug(f"Enhanced context with {total_memories} memories")
+
+ async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
+ """Process incoming frames, intercept context frames for memory integration.
+
+ Args:
+ frame: The incoming frame to process.
+ direction: The direction of frame flow in the pipeline.
+ """
+ await super().process_frame(frame, direction)
+
+ context = None
+ messages = None
+
+ # Handle different frame types
+ if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)):
+ context = frame.context
+ elif isinstance(frame, LLMMessagesFrame):
+ messages = frame.messages
+ context = LLMContext(messages)
+
+ if context:
+ try:
+ # Get messages from context
+ context_messages = context.get_messages()
+ latest_user_message = get_last_user_message(context_messages)
+
+ if latest_user_message:
+ # Retrieve memories from Supermemory
+ try:
+ memories_data = await self._retrieve_memories(latest_user_message)
+ self._enhance_context_with_memories(
+ context, latest_user_message, memories_data
+ )
+ except MemoryRetrievalError as e:
+ logger.warning(f"Memory retrieval failed, continuing without memories: {e}")
+
+ # Store unsent messages (user and assistant only, skip system)
+ storable_messages = [
+ msg for msg in context_messages if msg["role"] in ("user", "assistant")
+ ]
+ unsent_messages = storable_messages[self._messages_sent_count :]
+
+ if unsent_messages:
+ asyncio.create_task(self._store_messages(unsent_messages))
+ self._messages_sent_count = len(storable_messages)
+
+ # Pass the frame downstream
+ if messages is not None:
+ # For LLMMessagesFrame, create new frame with enhanced messages
+ await self.push_frame(LLMMessagesFrame(context.get_messages()))
+ else:
+ # For context frames, pass the enhanced frame
+ await self.push_frame(frame)
+
+ except Exception as e:
+ logger.error(f"Error processing frame with Supermemory: {e}")
+ # Still pass the original frame through on error
+ await self.push_frame(frame)
+ else:
+ # Non-context frames pass through unchanged
+ await self.push_frame(frame, direction)
+
+ def get_messages_sent_count(self) -> int:
+ """Get the count of messages sent to memory.
+
+ Returns:
+ Number of messages already sent to Supermemory.
+ """
+ return self._messages_sent_count
+
+ def reset_memory_tracking(self) -> None:
+ """Reset memory tracking for a new conversation."""
+ self._messages_sent_count = 0
+ self._last_query = None
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py
new file mode 100644
index 00000000..d0f0e461
--- /dev/null
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py
@@ -0,0 +1,66 @@
+"""Utility functions for Supermemory Pipecat integration."""
+
+from typing import Dict, List
+
+
+def get_last_user_message(messages: List[Dict[str, str]]) -> str | None:
+ """Extract the last user message content from a list of messages."""
+ for msg in reversed(messages):
+ if msg["role"] == "user":
+ return msg["content"]
+ return None
+
+
+def deduplicate_memories(
+ static: List[str],
+ dynamic: List[str],
+ search_results: List[str],
+) -> Dict[str, List[str]]:
+ """Deduplicate memories. Priority: static > dynamic > search."""
+ seen = set()
+
+ def unique(memories):
+ out = []
+ for m in memories:
+ if m not in seen:
+ seen.add(m)
+ out.append(m)
+ return out
+
+ return {
+ "static": unique(static),
+ "dynamic": unique(dynamic),
+ "search_results": unique(search_results),
+ }
+
+
+def format_memories_to_text(
+ memories: Dict[str, List[str]],
+ system_prompt: str = "Based on previous conversations, I recall:\n\n",
+ include_static: bool = True,
+ include_dynamic: bool = True,
+ include_search: bool = True,
+) -> str:
+ """Format deduplicated memories into a text string for injection."""
+ sections = []
+
+ static = memories["static"]
+ dynamic = memories["dynamic"]
+ search_results = memories["search_results"]
+
+ if include_static and static:
+ sections.append("## User Profile (Persistent)")
+ sections.append("\n".join(f"- {item}" for item in static))
+
+ if include_dynamic and dynamic:
+ sections.append("## Recent Context")
+ sections.append("\n".join(f"- {item}" for item in dynamic))
+
+ if include_search and search_results:
+ sections.append("## Relevant Memories")
+ sections.append("\n".join(f"- {item}" for item in search_results))
+
+ if not sections:
+ return ""
+
+ return f"{system_prompt}\n" + "\n\n".join(sections)