aboutsummaryrefslogtreecommitdiff
path: root/packages/pipecat-sdk-python/src
diff options
context:
space:
mode:
authorPrasanna721 <[email protected]>2026-01-09 18:08:59 -0800
committerPrasanna721 <[email protected]>2026-01-09 18:08:59 -0800
commitc816fa05bc245723f3c09d87142591ef2ffd48f1 (patch)
treeff59ea9198136ceccfa6c428340c02a829eda642 /packages/pipecat-sdk-python/src
parentpipecat-sdk (diff)
downloadsupermemory-c816fa05bc245723f3c09d87142591ef2ffd48f1.tar.xz
supermemory-c816fa05bc245723f3c09d87142591ef2ffd48f1.zip
removed await and testing code
Diffstat (limited to 'packages/pipecat-sdk-python/src')
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/service.py111
1 files changed, 42 insertions, 69 deletions
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
index 4fc08b15..4d1dfc30 100644
--- a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
@@ -6,6 +6,7 @@ historical information.
"""
import asyncio
+import json
import os
from typing import Any, Dict, List, Optional
@@ -14,17 +15,12 @@ from pydantic import BaseModel, Field
from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame
from pipecat.processors.aggregators.llm_context import LLMContext
-from pipecat.processors.aggregators.openai_llm_context import (
- OpenAILLMContext,
- OpenAILLMContextFrame,
-)
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from .exceptions import (
- APIError,
ConfigurationError,
MemoryRetrievalError,
- MemoryStorageError,
)
from .utils import (
deduplicate_memories,
@@ -33,11 +29,6 @@ from .utils import (
)
try:
- import aiohttp
-except ImportError:
- aiohttp = None # type: ignore
-
-try:
import supermemory
except ImportError:
supermemory = None # type: ignore
@@ -80,7 +71,6 @@ class SupermemoryPipecatService(FrameProcessor):
add_as_system_message: Whether to add memories as system messages.
position: Position to insert memory messages in context.
mode: Memory retrieval mode - "profile", "query", or "full".
- add_memory: When to store memories - "always" or "never".
"""
search_limit: int = Field(default=10, ge=1)
@@ -89,7 +79,6 @@ class SupermemoryPipecatService(FrameProcessor):
add_as_system_message: bool = Field(default=True)
position: int = Field(default=1)
mode: str = Field(default="full") # "profile", "query", "full"
- add_memory: str = Field(default="always") # "always", "never"
def __init__(
self,
@@ -131,13 +120,15 @@ class SupermemoryPipecatService(FrameProcessor):
# Configuration
self.params = params or SupermemoryPipecatService.InputParams()
- self.base_url = base_url or "https://api.supermemory.ai"
- # Initialize Supermemory client for storage operations
+ # Initialize async Supermemory client
self._supermemory_client = None
if supermemory is not None:
try:
- self._supermemory_client = supermemory.Supermemory(api_key=self.api_key)
+ self._supermemory_client = supermemory.AsyncSupermemory(
+ api_key=self.api_key,
+ base_url=base_url,
+ )
except Exception as e:
logger.warning(f"Failed to initialize Supermemory client: {e}")
@@ -161,54 +152,47 @@ class SupermemoryPipecatService(FrameProcessor):
Returns:
Dictionary containing profile and search results.
"""
+ if self._supermemory_client is None:
+ raise MemoryRetrievalError(
+ "Supermemory client not initialized. Install with: pip install supermemory"
+ )
+
try:
logger.debug(f"Retrieving memories for query: {query[:100]}...")
- payload: Dict[str, Any] = {
- "containerTag": self.container_tag,
+ # Build kwargs for profile request
+ kwargs: Dict[str, Any] = {
+ "container_tag": self.container_tag,
}
# Add query for search modes
if self.params.mode != "profile" and query:
- payload["q"] = query
- payload["limit"] = self.params.search_limit
- payload["threshold"] = self.params.search_threshold
-
- if aiohttp is None:
- raise MemoryRetrievalError(
- "aiohttp is required for memory retrieval. Install with: pip install aiohttp"
- )
+ kwargs["q"] = query
+ kwargs["threshold"] = self.params.search_threshold
+ # Pass limit via extra_body since SDK doesn't have direct param
+ kwargs["extra_body"] = {"limit": self.params.search_limit}
+
+ # Use SDK's profile method
+ response = await self._supermemory_client.profile(**kwargs)
+
+ # Convert SDK response to dict format expected by rest of code
+ data: Dict[str, Any] = {
+ "profile": {
+ "static": response.profile.static,
+ "dynamic": response.profile.dynamic,
+ },
+ "searchResults": {
+ "results": response.search_results.results if response.search_results else [],
+ },
+ }
- async with aiohttp.ClientSession() as session:
- async with session.post(
- f"{self.base_url}/v4/profile",
- headers={
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.api_key}",
- },
- json=payload,
- ) as response:
- if not response.ok:
- error_text = await response.text()
- raise APIError(
- "Supermemory profile search failed",
- status_code=response.status,
- response_text=error_text,
- )
+ logger.debug(
+ f"Retrieved memories - static: {len(data['profile']['static'])}, "
+ f"dynamic: {len(data['profile']['dynamic'])}, "
+ f"search: {len(data['searchResults']['results'])}"
+ )
+ return data
- data = await response.json()
- logger.debug(
- f"Retrieved memories - static: {len(data.get('profile', {}).get('static', []))}, "
- f"dynamic: {len(data.get('profile', {}).get('dynamic', []))}, "
- f"search: {len(data.get('searchResults', {}).get('results', []))}"
- )
- return data
-
- except aiohttp.ClientError as e:
- logger.error(f"Network error retrieving memories: {e}")
- raise MemoryRetrievalError("Network error during memory retrieval", e)
- except APIError:
- raise
except Exception as e:
logger.error(f"Error retrieving memories: {e}")
raise MemoryRetrievalError("Failed to retrieve memories", e)
@@ -219,9 +203,6 @@ class SupermemoryPipecatService(FrameProcessor):
Args:
message: Message dict with 'role' and 'content' keys.
"""
- if self.params.add_memory != "always":
- return
-
if self._supermemory_client is None:
logger.warning("Supermemory client not initialized, skipping memory storage")
return
@@ -231,9 +212,7 @@ class SupermemoryPipecatService(FrameProcessor):
if not content or not isinstance(content, str):
return
- # Format: "User: message content" or "Assistant: message content"
- role = message.get("role", "user").capitalize()
- formatted_content = f"{role}: {content}"
+ formatted_content = json.dumps(message)
logger.debug(f"Storing message to Supermemory: {formatted_content[:100]}...")
@@ -246,13 +225,7 @@ class SupermemoryPipecatService(FrameProcessor):
if self.session_id:
add_params["custom_id"] = self.session_id
- # Store asynchronously
- try:
- await self._supermemory_client.memories.add(**add_params)
- except TypeError:
- # Sync client fallback
- self._supermemory_client.memories.add(**add_params)
-
+ self._supermemory_client.memories.add(**add_params)
logger.debug("Successfully stored message in Supermemory")
except Exception as e:
@@ -368,7 +341,7 @@ class SupermemoryPipecatService(FrameProcessor):
self._enhance_context_with_memories(
context, latest_user_message, memories_data
)
- except (MemoryRetrievalError, APIError) as e:
+ except MemoryRetrievalError as e:
# Log but don't fail the pipeline
logger.warning(f"Memory retrieval failed, continuing without memories: {e}")