diff options
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/pipecat-sdk-python/README.md | 27 | ||||
| -rw-r--r-- | packages/pipecat-sdk-python/src/supermemory_pipecat/service.py | 232 | ||||
| -rw-r--r-- | packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py | 90 |
3 files changed, 187 insertions, 162 deletions
diff --git a/packages/pipecat-sdk-python/README.md b/packages/pipecat-sdk-python/README.md index 5f6e8478..bb8e26e1 100644 --- a/packages/pipecat-sdk-python/README.md +++ b/packages/pipecat-sdk-python/README.md @@ -107,11 +107,9 @@ from fastapi import FastAPI, WebSocket from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.task import PipelineTask from pipecat.pipeline.runner import PipelineRunner -from pipecat.services.openai import ( - OpenAILLMService, - OpenAIUserContextAggregator, -) -from pipecat.transports.network.fastapi_websocket import ( +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService +from pipecat.transports.websocket.fastapi import ( FastAPIWebsocketTransport, FastAPIWebsocketParams, ) @@ -125,10 +123,17 @@ async def websocket_endpoint(websocket: WebSocket): transport = FastAPIWebsocketTransport( websocket=websocket, - params=FastAPIWebsocketParams(audio_out_enabled=True), + params=FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True), + ) + + # Gemini Live for speech-to-speech + llm = GeminiLiveLLMService( + api_key=os.getenv("GEMINI_API_KEY"), + model="models/gemini-2.5-flash-native-audio-preview-12-2025", ) - user_context = OpenAIUserContextAggregator() + context = OpenAILLMContext([{"role": "system", "content": "You are a helpful assistant."}]) + context_aggregator = llm.create_context_aggregator(context) # Supermemory memory service memory = SupermemoryPipecatService( @@ -136,17 +141,13 @@ async def websocket_endpoint(websocket: WebSocket): session_id="session-123", ) - llm = OpenAILLMService( - api_key=os.getenv("OPENAI_API_KEY"), - model="gpt-4", - ) - pipeline = Pipeline([ transport.input(), - user_context, + context_aggregator.user(), memory, llm, transport.output(), + context_aggregator.assistant(), ]) runner = PipelineRunner() diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py index 01bc03df..2aef866b 100644 --- a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py @@ -8,37 +8,37 @@ historical information. import asyncio import json import os +import re from typing import Any, Dict, List, Literal, Optional from loguru import logger -from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame +from pydantic import BaseModel, Field + +from pipecat.frames.frames import Frame, InputAudioRawFrame, LLMContextFrame, LLMMessagesFrame from pipecat.processors.aggregators.llm_context import LLMContext from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pydantic import BaseModel, Field -from .exceptions import ( - ConfigurationError, - MemoryRetrievalError, -) -from .utils import ( - deduplicate_memories, - format_memories_to_text, - get_last_user_message, -) +from .exceptions import ConfigurationError, MemoryRetrievalError +from .utils import deduplicate_memories, format_memories_to_text, get_last_user_message try: import supermemory except ImportError: supermemory = None # type: ignore +# XML tags for memory injection (replacement instead of accumulation) +MEMORY_TAG_START = "<user_memories>" +MEMORY_TAG_END = "</user_memories>" +MEMORY_TAG_PATTERN = re.compile(r"<user_memories>.*?</user_memories>", re.DOTALL) + class SupermemoryPipecatService(FrameProcessor): - """A memory service that integrates Supermemory with Pipecat pipelines. + """Memory service that integrates Supermemory with Pipecat pipelines. This service intercepts message frames in the pipeline, retrieves relevant - memories from Supermemory, enhances the context, and optionally stores - new conversations. + memories from Supermemory, and enhances the context before passing downstream. Example: ```python @@ -48,34 +48,25 @@ class SupermemoryPipecatService(FrameProcessor): api_key=os.getenv("SUPERMEMORY_API_KEY"), user_id="user-123", ) - - pipeline = Pipeline([ - transport.input(), - stt, - user_context, - memory, # Memory service enhances context here - llm, - transport.output(), - ]) ``` """ class InputParams(BaseModel): - """Configuration parameters for Supermemory Pipecat service. + """Configuration parameters for memory retrieval and injection. - Parameters: + Attributes: search_limit: Maximum number of memories to retrieve per query. - search_threshold: Minimum similarity threshold for memory retrieval. - system_prompt: Prefix text for memory context messages. + search_threshold: Minimum similarity threshold (0.0-1.0). + system_prompt: Prefix text for memory context. mode: Memory retrieval mode - "profile", "query", or "full". + inject_mode: How to inject memories - "auto", "system", or "user". """ search_limit: int = Field(default=10, ge=1) search_threshold: float = Field(default=0.1, ge=0.0, le=1.0) - system_prompt: str = Field( - default="Based on previous conversations, I recall:\n\n" - ) + system_prompt: str = Field(default="Based on previous conversations, I recall:\n\n") mode: Literal["profile", "query", "full"] = Field(default="full") + inject_mode: Literal["auto", "system", "user"] = Field(default="auto") def __init__( self, @@ -89,10 +80,10 @@ class SupermemoryPipecatService(FrameProcessor): """Initialize the Supermemory Pipecat service. Args: - api_key: The API key for Supermemory. Falls back to SUPERMEMORY_API_KEY env var. - user_id: The user ID - used as container_tag for memory scoping (REQUIRED). - session_id: Session/conversation ID for grouping memories (optional). - params: Configuration parameters for memory retrieval and storage. + api_key: Supermemory API key. Falls back to SUPERMEMORY_API_KEY env var. + user_id: The user ID - used as container_tag for memory scoping. + session_id: Session/conversation ID for grouping memories. + params: Configuration parameters for memory retrieval. base_url: Optional custom base URL for Supermemory API. Raises: @@ -100,25 +91,20 @@ class SupermemoryPipecatService(FrameProcessor): """ super().__init__() - # Get API key self.api_key = api_key or os.getenv("SUPERMEMORY_API_KEY") if not self.api_key: raise ConfigurationError( "API key is required. Provide api_key parameter or set SUPERMEMORY_API_KEY environment variable." ) - # user_id is required and used directly as container_tag if not user_id: raise ConfigurationError("user_id is required") self.user_id = user_id - self.container_tag = user_id # container_tag = user_id directly - self.session_id = session_id # optional session/conversation ID - - # Configuration + self.container_tag = user_id + self.session_id = session_id self.params = params or SupermemoryPipecatService.InputParams() - # Initialize async Supermemory client self._supermemory_client = None if supermemory is not None: try: @@ -129,25 +115,21 @@ class SupermemoryPipecatService(FrameProcessor): except Exception as e: logger.warning(f"Failed to initialize Supermemory client: {e}") - # Track how many messages we've already sent to memory self._messages_sent_count: int = 0 - - # Track last query to avoid duplicate processing self._last_query: Optional[str] = None - - logger.info( - f"Initialized SupermemoryPipecatService with " - f"user_id={user_id}, session_id={session_id}" - ) + self._audio_frames_detected: bool = False async def _retrieve_memories(self, query: str) -> Dict[str, Any]: """Retrieve relevant memories from Supermemory. Args: - query: The query to search for relevant memories. + query: The search query for memory retrieval. Returns: - Dictionary containing profile and search results. + Dictionary containing profile (static/dynamic) and search results. + + Raises: + MemoryRetrievalError: If retrieval fails. """ if self._supermemory_client is None: raise MemoryRetrievalError( @@ -155,29 +137,20 @@ class SupermemoryPipecatService(FrameProcessor): ) try: - logger.debug(f"Retrieving memories for query: {query[:100]}...") + kwargs: Dict[str, Any] = {"container_tag": self.container_tag} - # Build kwargs for profile request - kwargs: Dict[str, Any] = { - "container_tag": self.container_tag, - } - - # Add query for search modes if self.params.mode != "profile" and query: kwargs["q"] = query kwargs["threshold"] = self.params.search_threshold - # Pass limit via extra_body since SDK doesn't have direct param kwargs["extra_body"] = {"limit": self.params.search_limit} - # Use SDK's profile method response = await self._supermemory_client.profile(**kwargs) - # Extract memory strings from SDK response search_results = [] if response.search_results and response.search_results.results: - search_results = [r["memory"] for r in response.search_results.results] + search_results = response.search_results.results - data: Dict[str, Any] = { + return { "profile": { "static": response.profile.static, "dynamic": response.profile.dynamic, @@ -185,53 +158,28 @@ class SupermemoryPipecatService(FrameProcessor): "search_results": search_results, } - logger.debug( - f"Retrieved memories - static: {len(data['profile']['static'])}, " - f"dynamic: {len(data['profile']['dynamic'])}, " - f"search: {len(data['search_results'])}" - ) - return data - except Exception as e: logger.error(f"Error retrieving memories: {e}") raise MemoryRetrievalError("Failed to retrieve memories", e) async def _store_messages(self, messages: List[Dict[str, Any]]) -> None: - """Store messages in Supermemory. - - Args: - messages: List of message dicts with 'role' and 'content' keys. - """ - if self._supermemory_client is None: - logger.warning( - "Supermemory client not initialized, skipping memory storage" - ) - return - - if not messages: + """Store messages in Supermemory (non-blocking, fire-and-forget).""" + if self._supermemory_client is None or not messages: return try: - # Format messages as JSON array - formatted_content = json.dumps(messages) - - logger.debug(f"Storing {len(messages)} messages to Supermemory") - - # Build storage params add_params: Dict[str, Any] = { - "content": formatted_content, + "content": json.dumps(messages), "container_tags": [self.container_tag], "metadata": {"platform": "pipecat"}, } if self.session_id: - add_params["custom_id"] = f"{self.session_id}" + add_params["custom_id"] = self.session_id - await self._supermemory_client.add(**add_params) - logger.debug(f"Successfully stored {len(messages)} messages in Supermemory") + await self._supermemory_client.memories.add(**add_params) except Exception as e: - # Don't fail the pipeline on storage errors - logger.error(f"Error storing messages in Supermemory: {e}") + logger.error(f"Error storing messages: {e}") def _enhance_context_with_memories( self, @@ -239,29 +187,28 @@ class SupermemoryPipecatService(FrameProcessor): query: str, memories_data: Dict[str, Any], ) -> None: - """Enhance the LLM context with relevant memories. + """Enhance LLM context with retrieved memories. + + Uses XML tags <user_memories>...</user_memories> to wrap memories, + allowing replacement on each turn instead of accumulation. Args: context: The LLM context to enhance. query: The query used for retrieval. - memories_data: Raw memory data from Supermemory API. + memories_data: Memory data from Supermemory API. """ - # Skip if same query (avoid duplicate processing) if self._last_query == query: return self._last_query = query - # Extract and deduplicate memories profile = memories_data["profile"] - deduplicated = deduplicate_memories( static=profile["static"], dynamic=profile["dynamic"], search_results=memories_data["search_results"], ) - # Check if we have any memories total_memories = ( len(deduplicated["static"]) + len(deduplicated["dynamic"]) @@ -269,10 +216,8 @@ class SupermemoryPipecatService(FrameProcessor): ) if total_memories == 0: - logger.debug("No memories found to inject") return - # Format memories based on mode include_profile = self.params.mode in ("profile", "full") include_search = self.params.mode in ("query", "full") @@ -287,24 +232,55 @@ class SupermemoryPipecatService(FrameProcessor): if not memory_text: return - # Inject memories into context as user message - context.add_message({"role": "user", "content": memory_text}) + tagged_memory = f"{MEMORY_TAG_START}\n{memory_text}\n{MEMORY_TAG_END}" - logger.debug(f"Enhanced context with {total_memories} memories") + inject_to_system = self.params.inject_mode == "system" or ( + self.params.inject_mode == "auto" and self._audio_frames_detected + ) - async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: - """Process incoming frames, intercept context frames for memory integration. + messages = context.get_messages() + + if inject_to_system: + system_idx = None + for i, msg in enumerate(messages): + if msg.get("role") == "system": + system_idx = i + break + + if system_idx is not None: + existing_content = messages[system_idx].get("content", "") + if MEMORY_TAG_PATTERN.search(existing_content): + messages[system_idx]["content"] = MEMORY_TAG_PATTERN.sub( + tagged_memory, existing_content + ) + else: + messages[system_idx]["content"] = f"{existing_content}\n\n{tagged_memory}" + else: + messages.insert(0, {"role": "system", "content": tagged_memory}) + else: + # Remove previous memory message if exists + for i in range(len(messages) - 1, -1, -1): + msg = messages[i] + if msg.get("role") == "user" and MEMORY_TAG_START in msg.get("content", ""): + messages.pop(i) + break - Args: - frame: The incoming frame to process. - direction: The direction of frame flow in the pipeline. - """ + context.add_message({"role": "user", "content": tagged_memory}) + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + """Process frames, intercept context frames for memory integration.""" await super().process_frame(frame, direction) + # Auto-detect speech-to-speech mode via audio frames + if isinstance(frame, InputAudioRawFrame): + if not self._audio_frames_detected: + self._audio_frames_detected = True + await self.push_frame(frame, direction) + return + context = None messages = None - # Handle different frame types if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)): context = frame.context elif isinstance(frame, LLMMessagesFrame): @@ -313,29 +289,21 @@ class SupermemoryPipecatService(FrameProcessor): if context: try: - # Get messages from context context_messages = context.get_messages() latest_user_message = get_last_user_message(context_messages) if latest_user_message: - # Retrieve memories from Supermemory try: - memories_data = await self._retrieve_memories( - latest_user_message - ) + memories_data = await self._retrieve_memories(latest_user_message) self._enhance_context_with_memories( context, latest_user_message, memories_data ) except MemoryRetrievalError as e: - logger.warning( - f"Memory retrieval failed, continuing without memories: {e}" - ) + logger.warning(f"Memory retrieval failed: {e}") - # Store unsent messages (user and assistant only, skip system) + # Store unsent messages (user and assistant only) storable_messages = [ - msg - for msg in context_messages - if msg["role"] in ("user", "assistant") + msg for msg in context_messages if msg["role"] in ("user", "assistant") ] unsent_messages = storable_messages[self._messages_sent_count :] @@ -343,31 +311,19 @@ class SupermemoryPipecatService(FrameProcessor): asyncio.create_task(self._store_messages(unsent_messages)) self._messages_sent_count = len(storable_messages) - # Pass the frame downstream if messages is not None: - # For LLMMessagesFrame, create new frame with enhanced messages await self.push_frame(LLMMessagesFrame(context.get_messages())) else: - # For context frames, pass the enhanced frame await self.push_frame(frame) except Exception as e: - logger.error(f"Error processing frame with Supermemory: {e}") - # Still pass the original frame through on error + logger.error(f"Error processing frame: {e}") await self.push_frame(frame) else: - # Non-context frames pass through unchanged await self.push_frame(frame, direction) - def get_messages_sent_count(self) -> int: - """Get the count of messages sent to memory. - - Returns: - Number of messages already sent to Supermemory. - """ - return self._messages_sent_count - def reset_memory_tracking(self) -> None: - """Reset memory tracking for a new conversation.""" + """Reset memory tracking state for a new conversation.""" self._messages_sent_count = 0 self._last_query = None + self._audio_frames_detected = False diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py index d0f0e461..a27da256 100644 --- a/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py @@ -1,6 +1,7 @@ """Utility functions for Supermemory Pipecat integration.""" -from typing import Dict, List +from datetime import datetime, timezone +from typing import Any, Dict, List, Union def get_last_user_message(messages: List[Dict[str, str]]) -> str | None: @@ -11,15 +12,58 @@ def get_last_user_message(messages: List[Dict[str, str]]) -> str | None: return None +def format_relative_time(iso_timestamp: str) -> str: + """Convert ISO timestamp to relative time string. + + Format rules: + - [just now] - within 30 minutes + - [Xmins ago] - 30-60 minutes + - [X hrs ago] - less than 1 day + - [Xd ago] - less than 1 week + - [X Jul] - more than 1 week, same year + - [X Jul, 2023] - different year + """ + try: + dt = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00")) + now = datetime.now(timezone.utc) + diff = now - dt + + seconds = diff.total_seconds() + minutes = seconds / 60 + hours = seconds / 3600 + days = seconds / 86400 + + if minutes < 30: + return "just now" + elif minutes < 60: + return f"{int(minutes)}mins ago" + elif hours < 24: + return f"{int(hours)} hrs ago" + elif days < 7: + return f"{int(days)}d ago" + elif dt.year == now.year: + return f"{dt.day} {dt.strftime('%b')}" + else: + return f"{dt.day} {dt.strftime('%b')}, {dt.year}" + except Exception: + return "" + + def deduplicate_memories( static: List[str], dynamic: List[str], - search_results: List[str], -) -> Dict[str, List[str]]: - """Deduplicate memories. Priority: static > dynamic > search.""" + search_results: List[Dict[str, Any]], +) -> Dict[str, Union[List[str], List[Dict[str, Any]]]]: + """Deduplicate memories. Priority: static > dynamic > search. + + Args: + static: List of static memory strings. + dynamic: List of dynamic memory strings. + search_results: List of search result dicts with 'memory' and 'updatedAt'. + """ seen = set() - def unique(memories): + def unique_strings(memories: List[str]) -> List[str]: out = [] for m in memories: if m not in seen: @@ -27,21 +71,33 @@ def deduplicate_memories( out.append(m) return out + def unique_search(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + out = [] + for r in results: + memory = r.get("memory", "") + if memory and memory not in seen: + seen.add(memory) + out.append(r) + return out + return { - "static": unique(static), - "dynamic": unique(dynamic), - "search_results": unique(search_results), + "static": unique_strings(static), + "dynamic": unique_strings(dynamic), + "search_results": unique_search(search_results), } def format_memories_to_text( - memories: Dict[str, List[str]], + memories: Dict[str, Union[List[str], List[Dict[str, Any]]]], system_prompt: str = "Based on previous conversations, I recall:\n\n", include_static: bool = True, include_dynamic: bool = True, include_search: bool = True, ) -> str: - """Format deduplicated memories into a text string for injection.""" + """Format deduplicated memories into a text string for injection. + + Search results include temporal context (e.g., '3d ago') from updatedAt. + """ sections = [] static = memories["static"] @@ -58,7 +114,19 @@ def format_memories_to_text( if include_search and search_results: sections.append("## Relevant Memories") - sections.append("\n".join(f"- {item}" for item in search_results)) + lines = [] + for item in search_results: + if isinstance(item, dict): + memory = item.get("memory", "") + updated_at = item.get("updatedAt", "") + time_str = format_relative_time(updated_at) if updated_at else "" + if time_str: + lines.append(f"- [{time_str}] {memory}") + else: + lines.append(f"- {memory}") + else: + lines.append(f"- {item}") + sections.append("\n".join(lines)) if not sections: return "" |