diff options
| author | Prasanna721 <[email protected]> | 2026-01-09 18:08:59 -0800 |
|---|---|---|
| committer | Prasanna721 <[email protected]> | 2026-01-09 18:08:59 -0800 |
| commit | c816fa05bc245723f3c09d87142591ef2ffd48f1 (patch) | |
| tree | ff59ea9198136ceccfa6c428340c02a829eda642 /packages/pipecat-sdk-python/src | |
| parent | pipecat-sdk (diff) | |
| download | supermemory-c816fa05bc245723f3c09d87142591ef2ffd48f1.tar.xz supermemory-c816fa05bc245723f3c09d87142591ef2ffd48f1.zip | |
removed await and testing code
Diffstat (limited to 'packages/pipecat-sdk-python/src')
| -rw-r--r-- | packages/pipecat-sdk-python/src/supermemory_pipecat/service.py | 111 |
1 files changed, 42 insertions, 69 deletions
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py index 4fc08b15..4d1dfc30 100644 --- a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py +++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py @@ -6,6 +6,7 @@ historical information. """ import asyncio +import json import os from typing import Any, Dict, List, Optional @@ -14,17 +15,12 @@ from pydantic import BaseModel, Field from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.openai_llm_context import ( - OpenAILLMContext, - OpenAILLMContextFrame, -) +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from .exceptions import ( - APIError, ConfigurationError, MemoryRetrievalError, - MemoryStorageError, ) from .utils import ( deduplicate_memories, @@ -33,11 +29,6 @@ from .utils import ( ) try: - import aiohttp -except ImportError: - aiohttp = None # type: ignore - -try: import supermemory except ImportError: supermemory = None # type: ignore @@ -80,7 +71,6 @@ class SupermemoryPipecatService(FrameProcessor): add_as_system_message: Whether to add memories as system messages. position: Position to insert memory messages in context. mode: Memory retrieval mode - "profile", "query", or "full". - add_memory: When to store memories - "always" or "never". """ search_limit: int = Field(default=10, ge=1) @@ -89,7 +79,6 @@ class SupermemoryPipecatService(FrameProcessor): add_as_system_message: bool = Field(default=True) position: int = Field(default=1) mode: str = Field(default="full") # "profile", "query", "full" - add_memory: str = Field(default="always") # "always", "never" def __init__( self, @@ -131,13 +120,15 @@ class SupermemoryPipecatService(FrameProcessor): # Configuration self.params = params or SupermemoryPipecatService.InputParams() - self.base_url = base_url or "https://api.supermemory.ai" - # Initialize Supermemory client for storage operations + # Initialize async Supermemory client self._supermemory_client = None if supermemory is not None: try: - self._supermemory_client = supermemory.Supermemory(api_key=self.api_key) + self._supermemory_client = supermemory.AsyncSupermemory( + api_key=self.api_key, + base_url=base_url, + ) except Exception as e: logger.warning(f"Failed to initialize Supermemory client: {e}") @@ -161,54 +152,47 @@ class SupermemoryPipecatService(FrameProcessor): Returns: Dictionary containing profile and search results. """ + if self._supermemory_client is None: + raise MemoryRetrievalError( + "Supermemory client not initialized. Install with: pip install supermemory" + ) + try: logger.debug(f"Retrieving memories for query: {query[:100]}...") - payload: Dict[str, Any] = { - "containerTag": self.container_tag, + # Build kwargs for profile request + kwargs: Dict[str, Any] = { + "container_tag": self.container_tag, } # Add query for search modes if self.params.mode != "profile" and query: - payload["q"] = query - payload["limit"] = self.params.search_limit - payload["threshold"] = self.params.search_threshold - - if aiohttp is None: - raise MemoryRetrievalError( - "aiohttp is required for memory retrieval. Install with: pip install aiohttp" - ) + kwargs["q"] = query + kwargs["threshold"] = self.params.search_threshold + # Pass limit via extra_body since SDK doesn't have direct param + kwargs["extra_body"] = {"limit": self.params.search_limit} + + # Use SDK's profile method + response = await self._supermemory_client.profile(**kwargs) + + # Convert SDK response to dict format expected by rest of code + data: Dict[str, Any] = { + "profile": { + "static": response.profile.static, + "dynamic": response.profile.dynamic, + }, + "searchResults": { + "results": response.search_results.results if response.search_results else [], + }, + } - async with aiohttp.ClientSession() as session: - async with session.post( - f"{self.base_url}/v4/profile", - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_key}", - }, - json=payload, - ) as response: - if not response.ok: - error_text = await response.text() - raise APIError( - "Supermemory profile search failed", - status_code=response.status, - response_text=error_text, - ) + logger.debug( + f"Retrieved memories - static: {len(data['profile']['static'])}, " + f"dynamic: {len(data['profile']['dynamic'])}, " + f"search: {len(data['searchResults']['results'])}" + ) + return data - data = await response.json() - logger.debug( - f"Retrieved memories - static: {len(data.get('profile', {}).get('static', []))}, " - f"dynamic: {len(data.get('profile', {}).get('dynamic', []))}, " - f"search: {len(data.get('searchResults', {}).get('results', []))}" - ) - return data - - except aiohttp.ClientError as e: - logger.error(f"Network error retrieving memories: {e}") - raise MemoryRetrievalError("Network error during memory retrieval", e) - except APIError: - raise except Exception as e: logger.error(f"Error retrieving memories: {e}") raise MemoryRetrievalError("Failed to retrieve memories", e) @@ -219,9 +203,6 @@ class SupermemoryPipecatService(FrameProcessor): Args: message: Message dict with 'role' and 'content' keys. """ - if self.params.add_memory != "always": - return - if self._supermemory_client is None: logger.warning("Supermemory client not initialized, skipping memory storage") return @@ -231,9 +212,7 @@ class SupermemoryPipecatService(FrameProcessor): if not content or not isinstance(content, str): return - # Format: "User: message content" or "Assistant: message content" - role = message.get("role", "user").capitalize() - formatted_content = f"{role}: {content}" + formatted_content = json.dumps(message) logger.debug(f"Storing message to Supermemory: {formatted_content[:100]}...") @@ -246,13 +225,7 @@ class SupermemoryPipecatService(FrameProcessor): if self.session_id: add_params["custom_id"] = self.session_id - # Store asynchronously - try: - await self._supermemory_client.memories.add(**add_params) - except TypeError: - # Sync client fallback - self._supermemory_client.memories.add(**add_params) - + self._supermemory_client.memories.add(**add_params) logger.debug("Successfully stored message in Supermemory") except Exception as e: @@ -368,7 +341,7 @@ class SupermemoryPipecatService(FrameProcessor): self._enhance_context_with_memories( context, latest_user_message, memories_data ) - except (MemoryRetrievalError, APIError) as e: + except MemoryRetrievalError as e: # Log but don't fail the pipeline logger.warning(f"Memory retrieval failed, continuing without memories: {e}") |