From d015036b05133a0a836db51e1fd7157120947302 Mon Sep 17 00:00:00 2001 From: Prasanna <106952318+Prasanna721@users.noreply.github.com> Date: Sat, 10 Jan 2026 15:19:31 -0800 Subject: pipecat-sdk (#663) --- packages/pipecat-sdk-python/Agents.md | 72 ++++ packages/pipecat-sdk-python/README.md | 159 +++++++++ packages/pipecat-sdk-python/pyproject.toml | 77 +++++ .../src/supermemory_pipecat/__init__.py | 59 ++++ .../src/supermemory_pipecat/exceptions.py | 58 ++++ .../src/supermemory_pipecat/service.py | 364 +++++++++++++++++++++ .../src/supermemory_pipecat/utils.py | 66 ++++ packages/pipecat-sdk-python/tests/__init__.py | 1 + 8 files changed, 856 insertions(+) create mode 100644 packages/pipecat-sdk-python/Agents.md create mode 100644 packages/pipecat-sdk-python/README.md create mode 100644 packages/pipecat-sdk-python/pyproject.toml create mode 100644 packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py create mode 100644 packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py create mode 100644 packages/pipecat-sdk-python/src/supermemory_pipecat/service.py create mode 100644 packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py create mode 100644 packages/pipecat-sdk-python/tests/__init__.py (limited to 'packages') diff --git a/packages/pipecat-sdk-python/Agents.md b/packages/pipecat-sdk-python/Agents.md new file mode 100644 index 00000000..0d4f687c --- /dev/null +++ b/packages/pipecat-sdk-python/Agents.md @@ -0,0 +1,72 @@ +# AGENTS.md + +## Overview + +This package adds persistent memory to Pipecat voice AI pipelines using Supermemory. + +**Tech Stack:** Python >=3.10, Pipecat, Supermemory SDK + +## Commands + +```bash +pip install supermemory-pipecat +``` + +## Integration Pattern + +Place `SupermemoryPipecatService` between context aggregator and LLM in the pipeline: + +```python +from supermemory_pipecat import SupermemoryPipecatService + +memory = SupermemoryPipecatService( + user_id="user-123", # Required: identifies the user + session_id="session-456", # Optional: groups conversations +) + +pipeline = Pipeline([ + transport.input(), + stt, + context_aggregator.user(), + memory, # <- Memory service here + llm, + tts, + transport.output(), + context_aggregator.assistant(), +]) +``` + +## Configuration + +```python +memory = SupermemoryPipecatService( + api_key="...", # Or use SUPERMEMORY_API_KEY env var + user_id="user-123", + session_id="session-456", + params=SupermemoryPipecatService.InputParams( + search_limit=10, # Max memories to retrieve + search_threshold=0.1, # Similarity threshold 0.0-1.0 + mode="full", # "profile" | "query" | "full" + system_prompt="Based on previous conversations:\n\n", + ), +) +``` + +## Memory Modes + +| Mode | Retrieves | Use When | +|------|-----------|----------| +| `"profile"` | User profile only | Personalization without search | +| `"query"` | Search results only | Finding relevant past context | +| `"full"` | Profile + search | Complete memory (default) | + +## Environment Variables + +- `SUPERMEMORY_API_KEY` - Supermemory API key +- `OPENAI_API_KEY` - For OpenAI services (STT/LLM/TTS) + +## Boundaries + +- Always place memory service after `context_aggregator.user()` and before `llm` +- Always provide `user_id` - it's required +- Never hardcode API keys in code - use environment variables diff --git a/packages/pipecat-sdk-python/README.md b/packages/pipecat-sdk-python/README.md new file mode 100644 index 00000000..5f6e8478 --- /dev/null +++ b/packages/pipecat-sdk-python/README.md @@ -0,0 +1,159 @@ +# Supermemory Pipecat SDK + +Memory-enhanced conversational AI pipelines with [Supermemory](https://supermemory.ai) and [Pipecat](https://github.com/pipecat-ai/pipecat). + +## Installation + +```bash +pip install supermemory-pipecat +``` + +## Quick Start + +```python +import os +from pipecat.pipeline.pipeline import Pipeline +from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator +from supermemory_pipecat import SupermemoryPipecatService + +# Create memory service +memory = SupermemoryPipecatService( + api_key=os.getenv("SUPERMEMORY_API_KEY"), + user_id="user-123", # Required: used as container_tag + session_id="conversation-456", # Optional: groups memories by session +) + +# Create pipeline with memory +pipeline = Pipeline([ + transport.input(), + stt, + user_context, + memory, # Automatically retrieves and injects relevant memories + llm, + transport.output(), +]) +``` + +## Configuration + +### Parameters + +| Parameter | Type | Required | Description | +| ------------ | ----------- | -------- | ---------------------------------------------------------- | +| `user_id` | str | **Yes** | User identifier - used as container_tag for memory scoping | +| `session_id` | str | No | Session/conversation ID for grouping memories | +| `api_key` | str | No | Supermemory API key (or set `SUPERMEMORY_API_KEY` env var) | +| `params` | InputParams | No | Advanced configuration | +| `base_url` | str | No | Custom API endpoint | + +### Advanced Configuration + +```python +from supermemory_pipecat import SupermemoryPipecatService + +memory = SupermemoryPipecatService( + user_id="user-123", + session_id="conv-456", + params=SupermemoryPipecatService.InputParams( + search_limit=10, # Max memories to retrieve + search_threshold=0.1, # Similarity threshold + mode="full", # "profile", "query", or "full" + system_prompt="Based on previous conversations, I recall:\n\n", + ), +) +``` + +### Memory Modes + +| Mode | Static Profile | Dynamic Profile | Search Results | +| ----------- | -------------- | --------------- | -------------- | +| `"profile"` | Yes | Yes | No | +| `"query"` | No | No | Yes | +| `"full"` | Yes | Yes | Yes | + +## How It Works + +1. **Intercepts context frames** - Listens for `LLMContextFrame` in the pipeline +2. **Tracks conversation** - Maintains clean conversation history (no injected memories) +3. **Retrieves memories** - Queries `/v4/profile` API with user's message +4. **Injects memories** - Formats and adds to LLM context as system message +5. **Stores messages** - Sends last user message to Supermemory (background, non-blocking) + +### What Gets Stored + +Only the last user message is sent to Supermemory: + +``` +User: What's the weather like today? +``` + +Stored as: + +```json +{ + "content": "User: What's the weather like today?", + "container_tags": ["user-123"], + "custom_id": "conversation-456", + "metadata": { "platform": "pipecat" } +} +``` + +## Full Example + +```python +import asyncio +import os +from fastapi import FastAPI, WebSocket +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.runner import PipelineRunner +from pipecat.services.openai import ( + OpenAILLMService, + OpenAIUserContextAggregator, +) +from pipecat.transports.network.fastapi_websocket import ( + FastAPIWebsocketTransport, + FastAPIWebsocketParams, +) +from supermemory_pipecat import SupermemoryPipecatService + +app = FastAPI() + +@app.websocket("/chat") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + + transport = FastAPIWebsocketTransport( + websocket=websocket, + params=FastAPIWebsocketParams(audio_out_enabled=True), + ) + + user_context = OpenAIUserContextAggregator() + + # Supermemory memory service + memory = SupermemoryPipecatService( + user_id="alice", + session_id="session-123", + ) + + llm = OpenAILLMService( + api_key=os.getenv("OPENAI_API_KEY"), + model="gpt-4", + ) + + pipeline = Pipeline([ + transport.input(), + user_context, + memory, + llm, + transport.output(), + ]) + + runner = PipelineRunner() + task = PipelineTask(pipeline) + await runner.run(task) +``` + +## License + +MIT diff --git a/packages/pipecat-sdk-python/pyproject.toml b/packages/pipecat-sdk-python/pyproject.toml new file mode 100644 index 00000000..21ce0cdf --- /dev/null +++ b/packages/pipecat-sdk-python/pyproject.toml @@ -0,0 +1,77 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "supermemory-pipecat" +version = "0.1.0" +description = "Supermemory integration for Pipecat - memory-enhanced conversational AI pipelines" +readme = "README.md" +license = "MIT" +requires-python = ">=3.10" +authors = [ + { name = "Supermemory", email = "support@supermemory.ai" } +] +keywords = [ + "supermemory", + "pipecat", + "memory", + "conversational-ai", + "llm", + "voice-ai", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering :: Artificial Intelligence", +] +dependencies = [ + "pipecat-ai>=0.0.98", + "supermemory>=3.16.0", + "pydantic>=2.10.0", + "loguru>=0.7.3", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.3.5", + "pytest-asyncio>=0.24.0", + "mypy>=1.14.1", + "black>=24.8.0", + "isort>=5.13.2", +] + +[project.urls] +Homepage = "https://supermemory.ai" +Documentation = "https://docs.supermemory.ai" +Repository = "https://github.com/supermemoryai/supermemory" + +[tool.hatch.build.targets.wheel] +packages = ["src/supermemory_pipecat"] + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", + "/README.md", + "/LICENSE", +] + +[tool.black] +line-length = 100 +target-version = ["py310"] + +[tool.isort] +profile = "black" +line_length = 100 + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py new file mode 100644 index 00000000..35b888cc --- /dev/null +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/__init__.py @@ -0,0 +1,59 @@ +"""Supermemory Pipecat SDK - Memory-enhanced conversational AI pipelines. + +This package provides seamless integration between Supermemory and Pipecat, +enabling persistent memory and context enhancement for voice AI applications. + +Example: + ```python + from supermemory_pipecat import SupermemoryPipecatService + + # Create memory service + memory = SupermemoryPipecatService( + api_key=os.getenv("SUPERMEMORY_API_KEY"), + user_id="user-123", + ) + + # Add to Pipecat pipeline + pipeline = Pipeline([ + transport.input(), + stt, + user_context, + memory, # Automatically retrieves and injects memories + llm, + transport.output(), + ]) + ``` +""" + +from .service import SupermemoryPipecatService +from .exceptions import ( + SupermemoryPipecatError, + ConfigurationError, + MemoryRetrievalError, + MemoryStorageError, + APIError, + NetworkError, +) +from .utils import ( + get_last_user_message, + deduplicate_memories, + format_memories_to_text, +) + +__version__ = "0.1.0" + +__all__ = [ + # Main service + "SupermemoryPipecatService", + # Exceptions + "SupermemoryPipecatError", + "ConfigurationError", + "MemoryRetrievalError", + "MemoryStorageError", + "APIError", + "NetworkError", + # Utilities + "get_last_user_message", + "deduplicate_memories", + "format_memories_to_text", +] diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py new file mode 100644 index 00000000..1de8094f --- /dev/null +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/exceptions.py @@ -0,0 +1,58 @@ +"""Custom exceptions for Supermemory Pipecat integration.""" + +from typing import Optional + + +class SupermemoryPipecatError(Exception): + """Base exception for all Supermemory Pipecat errors.""" + + def __init__(self, message: str, original_error: Optional[Exception] = None): + super().__init__(message) + self.message = message + self.original_error = original_error + + def __str__(self) -> str: + if self.original_error: + return f"{self.message}: {self.original_error}" + return self.message + + +class ConfigurationError(SupermemoryPipecatError): + """Raised when there are configuration issues (e.g., missing API key, invalid params).""" + + +class MemoryRetrievalError(SupermemoryPipecatError): + """Raised when memory retrieval operations fail.""" + + +class MemoryStorageError(SupermemoryPipecatError): + """Raised when memory storage operations fail.""" + + +class APIError(SupermemoryPipecatError): + """Raised when Supermemory API requests fail.""" + + def __init__( + self, + message: str, + status_code: Optional[int] = None, + response_text: Optional[str] = None, + original_error: Optional[Exception] = None, + ): + super().__init__(message, original_error) + self.status_code = status_code + self.response_text = response_text + + def __str__(self) -> str: + parts = [self.message] + if self.status_code: + parts.append(f"Status: {self.status_code}") + if self.response_text: + parts.append(f"Response: {self.response_text}") + if self.original_error: + parts.append(f"Cause: {self.original_error}") + return " | ".join(parts) + + +class NetworkError(SupermemoryPipecatError): + """Raised when network operations fail.""" diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py new file mode 100644 index 00000000..ab07e672 --- /dev/null +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py @@ -0,0 +1,364 @@ +"""Supermemory Pipecat service integration. + +This module provides a memory service that integrates with Supermemory to store +and retrieve conversational memories, enhancing LLM context with relevant +historical information. +""" + +import asyncio +import json +import os +from typing import Any, Dict, List, Literal, Optional + +from loguru import logger +from pydantic import BaseModel, Field + +from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor + +from .exceptions import ( + ConfigurationError, + MemoryRetrievalError, +) +from .utils import ( + deduplicate_memories, + format_memories_to_text, + get_last_user_message, +) + +try: + import supermemory +except ImportError: + supermemory = None # type: ignore + + +class SupermemoryPipecatService(FrameProcessor): + """A memory service that integrates Supermemory with Pipecat pipelines. + + This service intercepts message frames in the pipeline, retrieves relevant + memories from Supermemory, enhances the context, and optionally stores + new conversations. + + Example: + ```python + from supermemory_pipecat import SupermemoryPipecatService + + memory = SupermemoryPipecatService( + api_key=os.getenv("SUPERMEMORY_API_KEY"), + user_id="user-123", + ) + + pipeline = Pipeline([ + transport.input(), + stt, + user_context, + memory, # Memory service enhances context here + llm, + transport.output(), + ]) + ``` + """ + + class InputParams(BaseModel): + """Configuration parameters for Supermemory Pipecat service. + + Parameters: + search_limit: Maximum number of memories to retrieve per query. + search_threshold: Minimum similarity threshold for memory retrieval. + system_prompt: Prefix text for memory context messages. + mode: Memory retrieval mode - "profile", "query", or "full". + """ + + search_limit: int = Field(default=10, ge=1) + search_threshold: float = Field(default=0.1, ge=0.0, le=1.0) + system_prompt: str = Field(default="Based on previous conversations, I recall:\n\n") + mode: Literal["profile", "query", "full"] = Field(default="full") + + def __init__( + self, + *, + api_key: Optional[str] = None, + user_id: str, + session_id: Optional[str] = None, + params: Optional[InputParams] = None, + base_url: Optional[str] = None, + ): + """Initialize the Supermemory Pipecat service. + + Args: + api_key: The API key for Supermemory. Falls back to SUPERMEMORY_API_KEY env var. + user_id: The user ID - used as container_tag for memory scoping (REQUIRED). + session_id: Session/conversation ID for grouping memories (optional). + params: Configuration parameters for memory retrieval and storage. + base_url: Optional custom base URL for Supermemory API. + + Raises: + ConfigurationError: If API key is missing or user_id not provided. + """ + super().__init__() + + # Get API key + self.api_key = api_key or os.getenv("SUPERMEMORY_API_KEY") + if not self.api_key: + raise ConfigurationError( + "API key is required. Provide api_key parameter or set SUPERMEMORY_API_KEY environment variable." + ) + + # user_id is required and used directly as container_tag + if not user_id: + raise ConfigurationError("user_id is required") + + self.user_id = user_id + self.container_tag = user_id # container_tag = user_id directly + self.session_id = session_id # optional session/conversation ID + + # Configuration + self.params = params or SupermemoryPipecatService.InputParams() + + # Initialize async Supermemory client + self._supermemory_client = None + if supermemory is not None: + try: + self._supermemory_client = supermemory.AsyncSupermemory( + api_key=self.api_key, + base_url=base_url, + ) + except Exception as e: + logger.warning(f"Failed to initialize Supermemory client: {e}") + + # Track how many messages we've already sent to memory + self._messages_sent_count: int = 0 + + # Track last query to avoid duplicate processing + self._last_query: Optional[str] = None + + logger.info( + f"Initialized SupermemoryPipecatService with " + f"user_id={user_id}, session_id={session_id}" + ) + + async def _retrieve_memories(self, query: str) -> Dict[str, Any]: + """Retrieve relevant memories from Supermemory. + + Args: + query: The query to search for relevant memories. + + Returns: + Dictionary containing profile and search results. + """ + if self._supermemory_client is None: + raise MemoryRetrievalError( + "Supermemory client not initialized. Install with: pip install supermemory" + ) + + try: + logger.debug(f"Retrieving memories for query: {query[:100]}...") + + # Build kwargs for profile request + kwargs: Dict[str, Any] = { + "container_tag": self.container_tag, + } + + # Add query for search modes + if self.params.mode != "profile" and query: + kwargs["q"] = query + kwargs["threshold"] = self.params.search_threshold + # Pass limit via extra_body since SDK doesn't have direct param + kwargs["extra_body"] = {"limit": self.params.search_limit} + + # Use SDK's profile method + response = await self._supermemory_client.profile(**kwargs) + + # Extract memory strings from SDK response + search_results = [] + if response.search_results and response.search_results.results: + search_results = [r["memory"] for r in response.search_results.results] + + data: Dict[str, Any] = { + "profile": { + "static": response.profile.static, + "dynamic": response.profile.dynamic, + }, + "search_results": search_results, + } + + logger.debug( + f"Retrieved memories - static: {len(data['profile']['static'])}, " + f"dynamic: {len(data['profile']['dynamic'])}, " + f"search: {len(data['search_results'])}" + ) + return data + + except Exception as e: + logger.error(f"Error retrieving memories: {e}") + raise MemoryRetrievalError("Failed to retrieve memories", e) + + async def _store_messages(self, messages: List[Dict[str, Any]]) -> None: + """Store messages in Supermemory. + + Args: + messages: List of message dicts with 'role' and 'content' keys. + """ + if self._supermemory_client is None: + logger.warning("Supermemory client not initialized, skipping memory storage") + return + + if not messages: + return + + try: + # Format messages as JSON array + formatted_content = json.dumps(messages) + + logger.debug(f"Storing {len(messages)} messages to Supermemory") + + # Build storage params + add_params: Dict[str, Any] = { + "content": formatted_content, + "container_tags": [self.container_tag], + "metadata": {"platform": "pipecat"}, + } + if self.session_id: + add_params["custom_id"] = f"{self.session_id}" + + await self._supermemory_client.memories.add(**add_params) + logger.debug(f"Successfully stored {len(messages)} messages in Supermemory") + + except Exception as e: + # Don't fail the pipeline on storage errors + logger.error(f"Error storing messages in Supermemory: {e}") + + def _enhance_context_with_memories( + self, + context: LLMContext, + query: str, + memories_data: Dict[str, Any], + ) -> None: + """Enhance the LLM context with relevant memories. + + Args: + context: The LLM context to enhance. + query: The query used for retrieval. + memories_data: Raw memory data from Supermemory API. + """ + # Skip if same query (avoid duplicate processing) + if self._last_query == query: + return + + self._last_query = query + + # Extract and deduplicate memories + profile = memories_data["profile"] + + deduplicated = deduplicate_memories( + static=profile["static"], + dynamic=profile["dynamic"], + search_results=memories_data["search_results"], + ) + + # Check if we have any memories + total_memories = ( + len(deduplicated["static"]) + + len(deduplicated["dynamic"]) + + len(deduplicated["search_results"]) + ) + + if total_memories == 0: + logger.debug("No memories found to inject") + return + + # Format memories based on mode + include_profile = self.params.mode in ("profile", "full") + include_search = self.params.mode in ("query", "full") + + memory_text = format_memories_to_text( + deduplicated, + system_prompt=self.params.system_prompt, + include_static=include_profile, + include_dynamic=include_profile, + include_search=include_search, + ) + + if not memory_text: + return + + # Inject memories into context as user message + context.add_message({"role": "user", "content": memory_text}) + + logger.debug(f"Enhanced context with {total_memories} memories") + + async def process_frame(self, frame: Frame, direction: FrameDirection) -> None: + """Process incoming frames, intercept context frames for memory integration. + + Args: + frame: The incoming frame to process. + direction: The direction of frame flow in the pipeline. + """ + await super().process_frame(frame, direction) + + context = None + messages = None + + # Handle different frame types + if isinstance(frame, (LLMContextFrame, OpenAILLMContextFrame)): + context = frame.context + elif isinstance(frame, LLMMessagesFrame): + messages = frame.messages + context = LLMContext(messages) + + if context: + try: + # Get messages from context + context_messages = context.get_messages() + latest_user_message = get_last_user_message(context_messages) + + if latest_user_message: + # Retrieve memories from Supermemory + try: + memories_data = await self._retrieve_memories(latest_user_message) + self._enhance_context_with_memories( + context, latest_user_message, memories_data + ) + except MemoryRetrievalError as e: + logger.warning(f"Memory retrieval failed, continuing without memories: {e}") + + # Store unsent messages (user and assistant only, skip system) + storable_messages = [ + msg for msg in context_messages if msg["role"] in ("user", "assistant") + ] + unsent_messages = storable_messages[self._messages_sent_count :] + + if unsent_messages: + asyncio.create_task(self._store_messages(unsent_messages)) + self._messages_sent_count = len(storable_messages) + + # Pass the frame downstream + if messages is not None: + # For LLMMessagesFrame, create new frame with enhanced messages + await self.push_frame(LLMMessagesFrame(context.get_messages())) + else: + # For context frames, pass the enhanced frame + await self.push_frame(frame) + + except Exception as e: + logger.error(f"Error processing frame with Supermemory: {e}") + # Still pass the original frame through on error + await self.push_frame(frame) + else: + # Non-context frames pass through unchanged + await self.push_frame(frame, direction) + + def get_messages_sent_count(self) -> int: + """Get the count of messages sent to memory. + + Returns: + Number of messages already sent to Supermemory. + """ + return self._messages_sent_count + + def reset_memory_tracking(self) -> None: + """Reset memory tracking for a new conversation.""" + self._messages_sent_count = 0 + self._last_query = None diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py new file mode 100644 index 00000000..d0f0e461 --- /dev/null +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/utils.py @@ -0,0 +1,66 @@ +"""Utility functions for Supermemory Pipecat integration.""" + +from typing import Dict, List + + +def get_last_user_message(messages: List[Dict[str, str]]) -> str | None: + """Extract the last user message content from a list of messages.""" + for msg in reversed(messages): + if msg["role"] == "user": + return msg["content"] + return None + + +def deduplicate_memories( + static: List[str], + dynamic: List[str], + search_results: List[str], +) -> Dict[str, List[str]]: + """Deduplicate memories. Priority: static > dynamic > search.""" + seen = set() + + def unique(memories): + out = [] + for m in memories: + if m not in seen: + seen.add(m) + out.append(m) + return out + + return { + "static": unique(static), + "dynamic": unique(dynamic), + "search_results": unique(search_results), + } + + +def format_memories_to_text( + memories: Dict[str, List[str]], + system_prompt: str = "Based on previous conversations, I recall:\n\n", + include_static: bool = True, + include_dynamic: bool = True, + include_search: bool = True, +) -> str: + """Format deduplicated memories into a text string for injection.""" + sections = [] + + static = memories["static"] + dynamic = memories["dynamic"] + search_results = memories["search_results"] + + if include_static and static: + sections.append("## User Profile (Persistent)") + sections.append("\n".join(f"- {item}" for item in static)) + + if include_dynamic and dynamic: + sections.append("## Recent Context") + sections.append("\n".join(f"- {item}" for item in dynamic)) + + if include_search and search_results: + sections.append("## Relevant Memories") + sections.append("\n".join(f"- {item}" for item in search_results)) + + if not sections: + return "" + + return f"{system_prompt}\n" + "\n\n".join(sections) diff --git a/packages/pipecat-sdk-python/tests/__init__.py b/packages/pipecat-sdk-python/tests/__init__.py new file mode 100644 index 00000000..e20d656a --- /dev/null +++ b/packages/pipecat-sdk-python/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for Supermemory Pipecat SDK.""" -- cgit v1.2.3