MemMachine-Playground / gateway_client.py
Anirudh Esthuri
Copy all files from Playground - app, gateway_client, llm, model_config, requirements, styles, assets, and config files
e91e2b4
raw
history blame
4.95 kB
import os
from datetime import datetime
import requests
# Backend server URL - can be set via environment variable
# For Hugging Face Spaces: Set MEMORY_SERVER_URL in Space settings (Repository secrets)
# For local development: Set MEMORY_SERVER_URL in your .env file
# Default: http://3.232.95.65:8080 (MemMachine backend)
EXAMPLE_SERVER_PORT = os.getenv("MEMORY_SERVER_URL")
def ingest_and_rewrite(user_id: str, query: str, model_type: str = "openai") -> str:
"""Pass a raw user message through the memory server and get context-aware response."""
print("entered ingest_and_rewrite")
# First, store the message in memory
session_data = {
"group_id": user_id,
"agent_id": ["assistant"],
"user_id": [user_id],
"session_id": f"session_{user_id}",
}
episode_data = {
"session": session_data,
"producer": user_id,
"produced_for": "assistant",
"episode_content": query,
"episode_type": "message",
"metadata": {
"speaker": user_id,
"timestamp": datetime.now().isoformat(),
"type": "message",
},
}
# Store the episode
store_resp = requests.post(
f"{EXAMPLE_SERVER_PORT}/memory",
json=episode_data,
timeout=1000,
)
store_resp.raise_for_status()
# Then search for relevant context
search_data = {
"session": session_data,
"query": query,
"limit": 5,
"filter": {"producer_id": user_id},
}
search_resp = requests.post(
f"{EXAMPLE_SERVER_PORT}/memory/search",
json=search_data,
timeout=1000,
)
search_resp.raise_for_status()
search_results = search_resp.json()
content = search_results.get("content", {})
episodic_memory = content.get("episodic_memory", [])
profile_memory = content.get("profile_memory", [])
# Format the response similar to example_server.py
if profile_memory and episodic_memory:
profile_str = "\n".join([str(p) for p in profile_memory]) if isinstance(profile_memory, list) else str(profile_memory)
context_str = "\n".join([str(c) for c in episodic_memory]) if isinstance(episodic_memory, list) else str(episodic_memory)
return f"Profile: {profile_str}\n\nContext: {context_str}\n\nQuery: {query}"
elif profile_memory:
profile_str = "\n".join([str(p) for p in profile_memory]) if isinstance(profile_memory, list) else str(profile_memory)
return f"Profile: {profile_str}\n\nQuery: {query}"
elif episodic_memory:
context_str = "\n".join([str(c) for c in episodic_memory]) if isinstance(episodic_memory, list) else str(episodic_memory)
return f"Context: {context_str}\n\nQuery: {query}"
else:
return f"Message ingested successfully. No relevant context found yet.\n\nQuery: {query}"
def add_session_message(user_id: str, msg: str) -> None:
"""Add a raw message into memory via memory server."""
session_data = {
"group_id": user_id,
"agent_id": ["assistant"],
"user_id": [user_id],
"session_id": f"session_{user_id}",
}
episode_data = {
"session": session_data,
"producer": user_id,
"produced_for": "assistant",
"episode_content": msg,
"episode_type": "message",
"metadata": {
"speaker": user_id,
"timestamp": datetime.now().isoformat(),
"type": "message",
},
}
requests.post(
f"{EXAMPLE_SERVER_PORT}/memory",
json=episode_data,
timeout=5,
)
def create_persona_query(user_id: str, query: str) -> str:
"""Create a persona-aware query by searching memory context via memory server."""
session_data = {
"group_id": user_id,
"agent_id": ["assistant"],
"user_id": [user_id],
"session_id": f"session_{user_id}",
}
search_data = {
"session": session_data,
"query": query,
"limit": 5,
"filter": {"producer_id": user_id},
}
resp = requests.post(
f"{EXAMPLE_SERVER_PORT}/memory/search",
json=search_data,
timeout=1000,
)
resp.raise_for_status()
search_results = resp.json()
content = search_results.get("content", {})
profile_memory = content.get("profile_memory", [])
if profile_memory:
profile_str = "\n".join([str(p) for p in profile_memory]) if isinstance(profile_memory, list) else str(profile_memory)
return f"Based on your profile: {profile_str}\n\nQuery: {query}"
else:
return f"Query: {query}"
def add_new_session_message(user_id: str, msg: str) -> None:
"""Alias for add_session_message for backward compatibility."""
add_session_message(user_id, msg)
def delete_profile(user_id: str) -> bool:
"""Delete all memory for the given user_id via the CRM server."""
# NOT IMPLEMENTED
return False