3.4: Add per-agent memory scoping

MemorySystem now accepts optional scope parameter. When set:
- Memory files go to memory/{scope}/ subdirectory
- Fallback search covers both scoped and shared directories

Unscoped agents (scope="") use the shared memory/ root directory.
This enables agents to have private memory while still searching
shared knowledge.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
cora-start
PeninsulaInd 2026-02-17 10:09:31 -06:00
parent 86511d5a0f
commit 883fee36a3
1 changed files with 35 additions and 14 deletions

View File

@ -23,10 +23,21 @@ log = logging.getLogger(__name__)
class MemorySystem: class MemorySystem:
def __init__(self, config: Config, db: Database): def __init__(self, config: Config, db: Database, scope: str = ""):
self.config = config self.config = config
self.db = db self.db = db
self.scope = scope
# Scoped agents get their own subdirectory; shared memory stays in root
if scope:
self.memory_dir = config.memory_dir / scope
self.memory_dir.mkdir(parents=True, exist_ok=True)
else:
self.memory_dir = config.memory_dir self.memory_dir = config.memory_dir
# Shared memory dir (for cross-scope search)
self._shared_memory_dir = config.memory_dir
self._embedder = None self._embedder = None
self._embed_lock = threading.Lock() self._embed_lock = threading.Lock()
self._embed_db_path = self.memory_dir / "embeddings.db" self._embed_db_path = self.memory_dir / "embeddings.db"
@ -235,10 +246,20 @@ class MemorySystem:
self._embed_conn.commit() self._embed_conn.commit()
def _fallback_search(self, query: str, top_k: int) -> list[dict]: def _fallback_search(self, query: str, top_k: int) -> list[dict]:
"""Simple keyword search when embeddings are unavailable.""" """Simple keyword search when embeddings are unavailable.
Searches both scoped and shared memory directories.
"""
results = [] results = []
query_lower = query.lower() query_lower = query.lower()
for path in self.memory_dir.glob("*.md"):
# Collect directories to search (avoid duplicates)
search_dirs = [self.memory_dir]
if self.scope and self._shared_memory_dir != self.memory_dir:
search_dirs.append(self._shared_memory_dir)
for search_dir in search_dirs:
for path in search_dir.glob("*.md"):
try: try:
content = path.read_text(encoding="utf-8") content = path.read_text(encoding="utf-8")
except Exception: except Exception: