aboutsummaryrefslogtreecommitdiff
path: root/packages/pipecat-sdk-python
diff options
context:
space:
mode:
authorPrasanna721 <[email protected]>2026-01-09 18:08:59 -0800
committerPrasanna721 <[email protected]>2026-01-09 18:08:59 -0800
commitc816fa05bc245723f3c09d87142591ef2ffd48f1 (patch)
treeff59ea9198136ceccfa6c428340c02a829eda642 /packages/pipecat-sdk-python
parentpipecat-sdk (diff)
downloadarchived-supermemory-c816fa05bc245723f3c09d87142591ef2ffd48f1.tar.xz
archived-supermemory-c816fa05bc245723f3c09d87142591ef2ffd48f1.zip
removed await and testing code
Diffstat (limited to 'packages/pipecat-sdk-python')
-rw-r--r--packages/pipecat-sdk-python/Agents.md52
-rw-r--r--packages/pipecat-sdk-python/pyproject.toml1
-rw-r--r--packages/pipecat-sdk-python/src/supermemory_pipecat/service.py111
3 files changed, 63 insertions, 101 deletions
diff --git a/packages/pipecat-sdk-python/Agents.md b/packages/pipecat-sdk-python/Agents.md
index 0da9f607..e6d4244d 100644
--- a/packages/pipecat-sdk-python/Agents.md
+++ b/packages/pipecat-sdk-python/Agents.md
@@ -65,44 +65,34 @@ InputParams(
1. **Intercept**: Catches `LLMContextFrame`, `OpenAILLMContextFrame`, `LLMMessagesFrame`
2. **Extract**: Gets last user message from context
3. **Track**: Stores message in `_conversation_history` (clean, no injections)
-4. **Retrieve**: Calls Supermemory `/v4/profile` API
+4. **Retrieve**: Calls `client.profile()` via Supermemory SDK
5. **Inject**: Adds formatted memories to context as system message
6. **Store**: Sends last user message to Supermemory (background, non-blocking)
7. **Push**: Forwards enhanced frame downstream
-### Supermemory API Integration
+### Supermemory SDK Integration
-**Retrieval** - `POST /v4/profile`:
-```json
-{
- "containerTag": "user-123",
- "q": "What's the weather?",
- "limit": 10,
- "threshold": 0.1
-}
-```
-
-**Response**:
-```json
-{
- "profile": {
- "static": ["User lives in SF", "Prefers Celsius"],
- "dynamic": ["Recently asked about weather"]
- },
- "searchResults": {
- "results": [{"memory": "User likes sunny weather"}]
- }
-}
+**Retrieval** - via `supermemory.AsyncSupermemory.profile()`:
+```python
+response = await client.profile(
+ container_tag="user-123",
+ q="What's the weather?",
+ threshold=0.1,
+ extra_body={"limit": 10},
+)
+# response.profile.static: List[str]
+# response.profile.dynamic: List[str]
+# response.search_results.results: List[object]
```
-**Storage** - via `supermemory.memories.add()`:
+**Storage** - via `supermemory.AsyncSupermemory.memories.add()`:
```python
-{
- "content": "User: What's the weather?",
- "container_tags": ["user-123"],
- "custom_id": "session-456",
- "metadata": {"platform": "pipecat"}
-}
+await client.memories.add(
+ content="User: What's the weather?",
+ container_tags=["user-123"],
+ custom_id="session-456",
+ metadata={"platform": "pipecat"},
+)
```
## Memory Modes
@@ -250,7 +240,7 @@ if __name__ == "__main__":
| Aspect | Mem0 | Supermemory |
|--------|------|-------------|
| Identity | `user_id`, `agent_id`, `run_id` | `user_id` only (= container_tag) |
-| Retrieval | `memory.search()` | `/v4/profile` (static + dynamic + search) |
+| Retrieval | `memory.search()` | `client.profile()` (static + dynamic + search) |
| Storage | Full conversation | Last user message only |
| Metadata | `{"platform": "pipecat"}` | `{"platform": "pipecat"}` |
| Session | N/A | `session_id` → `custom_id` |
diff --git a/packages/pipecat-sdk-python/pyproject.toml b/packages/pipecat-sdk-python/pyproject.toml
index 72baa642..c8a76027 100644
--- a/packages/pipecat-sdk-python/pyproject.toml
+++ b/packages/pipecat-sdk-python/pyproject.toml
@@ -36,7 +36,6 @@ dependencies = [
"pipecat-ai>=0.0.98",
"supermemory>=3.16.0",
"pydantic>=2.10.0",
- "aiohttp>=3.11.0",
"loguru>=0.7.3",
]
diff --git a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
index 4fc08b15..4d1dfc30 100644
--- a/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
+++ b/packages/pipecat-sdk-python/src/supermemory_pipecat/service.py
@@ -6,6 +6,7 @@ historical information.
"""
import asyncio
+import json
import os
from typing import Any, Dict, List, Optional
@@ -14,17 +15,12 @@ from pydantic import BaseModel, Field
from pipecat.frames.frames import Frame, LLMContextFrame, LLMMessagesFrame
from pipecat.processors.aggregators.llm_context import LLMContext
-from pipecat.processors.aggregators.openai_llm_context import (
- OpenAILLMContext,
- OpenAILLMContextFrame,
-)
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from .exceptions import (
- APIError,
ConfigurationError,
MemoryRetrievalError,
- MemoryStorageError,
)
from .utils import (
deduplicate_memories,
@@ -33,11 +29,6 @@ from .utils import (
)
try:
- import aiohttp
-except ImportError:
- aiohttp = None # type: ignore
-
-try:
import supermemory
except ImportError:
supermemory = None # type: ignore
@@ -80,7 +71,6 @@ class SupermemoryPipecatService(FrameProcessor):
add_as_system_message: Whether to add memories as system messages.
position: Position to insert memory messages in context.
mode: Memory retrieval mode - "profile", "query", or "full".
- add_memory: When to store memories - "always" or "never".
"""
search_limit: int = Field(default=10, ge=1)
@@ -89,7 +79,6 @@ class SupermemoryPipecatService(FrameProcessor):
add_as_system_message: bool = Field(default=True)
position: int = Field(default=1)
mode: str = Field(default="full") # "profile", "query", "full"
- add_memory: str = Field(default="always") # "always", "never"
def __init__(
self,
@@ -131,13 +120,15 @@ class SupermemoryPipecatService(FrameProcessor):
# Configuration
self.params = params or SupermemoryPipecatService.InputParams()
- self.base_url = base_url or "https://api.supermemory.ai"
- # Initialize Supermemory client for storage operations
+ # Initialize async Supermemory client
self._supermemory_client = None
if supermemory is not None:
try:
- self._supermemory_client = supermemory.Supermemory(api_key=self.api_key)
+ self._supermemory_client = supermemory.AsyncSupermemory(
+ api_key=self.api_key,
+ base_url=base_url,
+ )
except Exception as e:
logger.warning(f"Failed to initialize Supermemory client: {e}")
@@ -161,54 +152,47 @@ class SupermemoryPipecatService(FrameProcessor):
Returns:
Dictionary containing profile and search results.
"""
+ if self._supermemory_client is None:
+ raise MemoryRetrievalError(
+ "Supermemory client not initialized. Install with: pip install supermemory"
+ )
+
try:
logger.debug(f"Retrieving memories for query: {query[:100]}...")
- payload: Dict[str, Any] = {
- "containerTag": self.container_tag,
+ # Build kwargs for profile request
+ kwargs: Dict[str, Any] = {
+ "container_tag": self.container_tag,
}
# Add query for search modes
if self.params.mode != "profile" and query:
- payload["q"] = query
- payload["limit"] = self.params.search_limit
- payload["threshold"] = self.params.search_threshold
-
- if aiohttp is None:
- raise MemoryRetrievalError(
- "aiohttp is required for memory retrieval. Install with: pip install aiohttp"
- )
+ kwargs["q"] = query
+ kwargs["threshold"] = self.params.search_threshold
+ # Pass limit via extra_body since SDK doesn't have direct param
+ kwargs["extra_body"] = {"limit": self.params.search_limit}
+
+ # Use SDK's profile method
+ response = await self._supermemory_client.profile(**kwargs)
+
+ # Convert SDK response to dict format expected by rest of code
+ data: Dict[str, Any] = {
+ "profile": {
+ "static": response.profile.static,
+ "dynamic": response.profile.dynamic,
+ },
+ "searchResults": {
+ "results": response.search_results.results if response.search_results else [],
+ },
+ }
- async with aiohttp.ClientSession() as session:
- async with session.post(
- f"{self.base_url}/v4/profile",
- headers={
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.api_key}",
- },
- json=payload,
- ) as response:
- if not response.ok:
- error_text = await response.text()
- raise APIError(
- "Supermemory profile search failed",
- status_code=response.status,
- response_text=error_text,
- )
+ logger.debug(
+ f"Retrieved memories - static: {len(data['profile']['static'])}, "
+ f"dynamic: {len(data['profile']['dynamic'])}, "
+ f"search: {len(data['searchResults']['results'])}"
+ )
+ return data
- data = await response.json()
- logger.debug(
- f"Retrieved memories - static: {len(data.get('profile', {}).get('static', []))}, "
- f"dynamic: {len(data.get('profile', {}).get('dynamic', []))}, "
- f"search: {len(data.get('searchResults', {}).get('results', []))}"
- )
- return data
-
- except aiohttp.ClientError as e:
- logger.error(f"Network error retrieving memories: {e}")
- raise MemoryRetrievalError("Network error during memory retrieval", e)
- except APIError:
- raise
except Exception as e:
logger.error(f"Error retrieving memories: {e}")
raise MemoryRetrievalError("Failed to retrieve memories", e)
@@ -219,9 +203,6 @@ class SupermemoryPipecatService(FrameProcessor):
Args:
message: Message dict with 'role' and 'content' keys.
"""
- if self.params.add_memory != "always":
- return
-
if self._supermemory_client is None:
logger.warning("Supermemory client not initialized, skipping memory storage")
return
@@ -231,9 +212,7 @@ class SupermemoryPipecatService(FrameProcessor):
if not content or not isinstance(content, str):
return
- # Format: "User: message content" or "Assistant: message content"
- role = message.get("role", "user").capitalize()
- formatted_content = f"{role}: {content}"
+ formatted_content = json.dumps(message)
logger.debug(f"Storing message to Supermemory: {formatted_content[:100]}...")
@@ -246,13 +225,7 @@ class SupermemoryPipecatService(FrameProcessor):
if self.session_id:
add_params["custom_id"] = self.session_id
- # Store asynchronously
- try:
- await self._supermemory_client.memories.add(**add_params)
- except TypeError:
- # Sync client fallback
- self._supermemory_client.memories.add(**add_params)
-
+ self._supermemory_client.memories.add(**add_params)
logger.debug("Successfully stored message in Supermemory")
except Exception as e:
@@ -368,7 +341,7 @@ class SupermemoryPipecatService(FrameProcessor):
self._enhance_context_with_memories(
context, latest_user_message, memories_data
)
- except (MemoryRetrievalError, APIError) as e:
+ except MemoryRetrievalError as e:
# Log but don't fail the pipeline
logger.warning(f"Memory retrieval failed, continuing without memories: {e}")