Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
skills/memory-systems/references/implementation.md
1# Memory Systems: Technical Reference23This document provides implementation details for memory system components.45## Vector Store Implementation67### Basic Vector Store89```python10import numpy as np11from typing import List, Dict, Any12import json131415def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:16"""Compute cosine similarity between two vectors."""17norm_a = np.linalg.norm(a)18norm_b = np.linalg.norm(b)19if norm_a == 0 or norm_b == 0:20return 0.021return float(np.dot(a, b) / (norm_a * norm_b))222324class VectorStore:25def __init__(self, dimension=768):26self.dimension = dimension27self.vectors = []28self.metadata = []29self.texts = []3031def add(self, text: str, metadata: Dict[str, Any] = None):32"""Add document to store."""33embedding = self._embed(text)34self.vectors.append(embedding)35self.metadata.append(metadata or {})36self.texts.append(text)37return len(self.vectors) - 13839def search(self, query: str, limit: int = 5,40filters: Dict[str, Any] = None) -> List[Dict]:41"""Search for similar documents."""42query_embedding = self._embed(query)4344scores = []45for i, vec in enumerate(self.vectors):46score = cosine_similarity(query_embedding, vec)4748# Apply filters49if filters and not self._matches_filters(self.metadata[i], filters):50score = -1 # Exclude5152scores.append((i, score))5354# Sort by score55scores.sort(key=lambda x: x[1], reverse=True)5657# Return top k58results = []59for idx, score in scores[:limit]:60if score > 0: # Only include positive matches61results.append({62"index": idx,63"score": score,64"text": self._get_text(idx),65"metadata": self.metadata[idx]66})6768return results6970def _embed(self, text: str) -> np.ndarray:71"""Generate deterministic pseudo-embedding for demonstration.72In production, replace with actual embedding model."""73np.random.seed(hash(text) % (2**32))74vec = np.random.randn(self.dimension)75return vec / (np.linalg.norm(vec) + 1e-8)7677def _matches_filters(self, metadata: Dict, filters: Dict) -> bool:78"""Check if metadata matches filters."""79for key, value in filters.items():80if key not in metadata:81return False82if isinstance(value, list):83if metadata[key] not in value:84return False85elif metadata[key] != value:86return False87return True8889def _get_text(self, index: int) -> str:90"""Retrieve original text for index."""91return self.texts[index] if index < len(self.texts) else ""92```9394### Metadata-Enhanced Vector Store9596```python97class MetadataVectorStore(VectorStore):98def __init__(self, dimension=768):99super().__init__(dimension)100self.entity_index = {} # entity -> [indices]101self.time_index = {} # time_range -> [indices]102103def add(self, text: str, metadata: Dict[str, Any] = None):104"""Add with enhanced indexing."""105metadata = metadata or {}106index = super().add(text, metadata)107108# Index by entity109if "entity" in metadata:110entity = metadata["entity"]111if entity not in self.entity_index:112self.entity_index[entity] = []113self.entity_index[entity].append(index)114115# Index by time116if "valid_from" in metadata:117time_key = self._time_range_key(118metadata.get("valid_from"),119metadata.get("valid_until")120)121if time_key not in self.time_index:122self.time_index[time_key] = []123self.time_index[time_key].append(index)124125return index126127def search_by_entity(self, query: str, entity: str, limit: int = 5) -> List[Dict]:128"""Search within specific entity."""129indices = self.entity_index.get(entity, [])130filtered = [self.metadata[i] for i in indices]131132# Score and rank133query_embedding = self._embed(query)134scored = []135for i, meta in zip(indices, filtered):136vec = self.vectors[i]137score = cosine_similarity(query_embedding, vec)138scored.append((i, score, meta))139140scored.sort(key=lambda x: x[1], reverse=True)141142return [{143"index": idx,144"score": score,145"metadata": meta146} for idx, score, meta in scored[:limit]]147```148149## Knowledge Graph Implementation150151### Property Graph Storage152153```python154from typing import Dict, List, Optional155import uuid156157class PropertyGraph:158def __init__(self):159self.nodes = {} # id -> properties160self.edges = [] # list of edge dicts161self.entity_registry = {} # name -> node_id (maintains identity)162self.indexes = {163"node_label": {}, # label -> [node_ids]164"edge_type": {} # type -> [edge_ids]165}166167def get_or_create_node(self, name: str, label: str, properties: Dict = None) -> str:168"""Get existing node by name, or create a new one.169Uses entity_registry to ensure identity across interactions."""170if name in self.entity_registry:171return self.entity_registry[name]172node_id = self.create_node(label, {**(properties or {}), "name": name})173self.entity_registry[name] = node_id174return node_id175176def create_node(self, label: str, properties: Dict = None) -> str:177"""Create node with label and properties."""178node_id = str(uuid.uuid4())179self.nodes[node_id] = {180"label": label,181"properties": properties or {}182}183184# Index by label185if label not in self.indexes["node_label"]:186self.indexes["node_label"][label] = []187self.indexes["node_label"][label].append(node_id)188189return node_id190191def create_relationship(self, source_id: str, rel_type: str,192target_id: str, properties: Dict = None) -> str:193"""Create directed relationship between nodes."""194edge_id = str(uuid.uuid4())195self.edges.append({196"id": edge_id,197"source": source_id,198"target": target_id,199"type": rel_type,200"properties": properties or {}201})202203# Index by type204if rel_type not in self.indexes["edge_type"]:205self.indexes["edge_type"][rel_type] = []206self.indexes["edge_type"][rel_type].append(edge_id)207208return edge_id209210def query(self, cypher_like: str, params: Dict = None) -> List[Dict]:211"""212Simple query matching.213214Supports patterns like:215MATCH (e)-[r]->(o) WHERE e.id = $id RETURN r216"""217# In production, use actual graph database218# This is a simplified pattern matcher219results = []220221if cypher_like.startswith("MATCH"):222# Parse basic pattern223pattern = self._parse_pattern(cypher_like)224results = self._match_pattern(pattern, params or {})225226return results227228def _parse_pattern(self, query: str) -> Dict:229"""Parse simplified MATCH pattern."""230# Simplified parser for demonstration231return {232"source_label": self._extract_label(query, "source"),233"rel_type": self._extract_type(query),234"target_label": self._extract_label(query, "target"),235"where": self._extract_where(query)236}237238def _match_pattern(self, pattern: Dict, params: Dict) -> List[Dict]:239"""Match pattern against graph."""240results = []241242for edge in self.edges:243# Match relationship type244if pattern["rel_type"] and edge["type"] != pattern["rel_type"]:245continue246247source = self.nodes.get(edge["source"], {})248target = self.nodes.get(edge["target"], {})249250# Match labels251if pattern["source_label"] and source.get("label") != pattern["source_label"]:252continue253if pattern["target_label"] and target.get("label") != pattern["target_label"]:254continue255256# Match where clause257if pattern["where"] and not self._match_where(edge, source, target, params):258continue259260results.append({261"source": source,262"relationship": edge,263"target": target264})265266return results267```268269## Temporal Knowledge Graph270271```python272from datetime import datetime273from typing import Optional274275class TemporalKnowledgeGraph(PropertyGraph):276def __init__(self):277super().__init__()278self.temporal_index = {} # time_range -> [edge_ids]279280def create_temporal_relationship(281self,282source_id: str,283rel_type: str,284target_id: str,285valid_from: datetime,286valid_until: Optional[datetime] = None,287properties: Dict = None288) -> str:289"""Create relationship with temporal validity."""290edge_id = super().create_relationship(291source_id, rel_type, target_id, properties292)293294# Index temporally295time_key = self._time_range_key(valid_from, valid_until)296if time_key not in self.temporal_index:297self.temporal_index[time_key] = []298self.temporal_index[time_key].append(edge_id)299300# Store validity on edge301edge = self._get_edge(edge_id)302edge["valid_from"] = valid_from.isoformat()303edge["valid_until"] = valid_until.isoformat() if valid_until else None304305return edge_id306307def query_at_time(self, query: str, query_time: datetime) -> List[Dict]:308"""Query graph state at specific time."""309# Find edges valid at query time310valid_edges = []311for edge in self.edges:312valid_from = datetime.fromisoformat(edge.get("valid_from", "1970-01-01"))313valid_until = edge.get("valid_until")314315if valid_from <= query_time:316if valid_until is None or datetime.fromisoformat(valid_until) > query_time:317valid_edges.append(edge)318319# Match against pattern320pattern = self._parse_pattern(query)321results = []322323for edge in valid_edges:324if pattern["rel_type"] and edge["type"] != pattern["rel_type"]:325continue326327source = self.nodes.get(edge["source"], {})328target = self.nodes.get(edge["target"], {})329330results.append({331"source": source,332"relationship": edge,333"target": target334})335336return results337338def _time_range_key(self, start: datetime, end: Optional[datetime]) -> str:339"""Create time range key for indexing."""340start_str = start.isoformat()341end_str = end.isoformat() if end else "infinity"342return f"{start_str}::{end_str}"343```344345## Memory Consolidation346347```python348class MemoryConsolidator:349def __init__(self, graph: PropertyGraph, vector_store: VectorStore):350self.graph = graph351self.vector_store = vector_store352self.consolidation_threshold = 1000 # memories before consolidation353354def should_consolidate(self) -> bool:355"""Check if consolidation should trigger."""356total_memories = len(self.graph.nodes) + len(self.graph.edges)357return total_memories > self.consolidation_threshold358359def consolidate(self):360"""Run consolidation process."""361# Step 1: Identify duplicate or merged facts362duplicates = self.find_duplicates()363364# Step 2: Merge related facts365for group in duplicates:366self.merge_fact_group(group)367368# Step 3: Update validity periods369self.update_validity_periods()370371# Step 4: Rebuild indexes372self.rebuild_indexes()373374def find_duplicates(self) -> List[List]:375"""Find groups of potentially duplicate facts."""376# Group by subject and predicate377groups = {}378379for edge in self.graph.edges:380key = (edge["source"], edge["type"])381if key not in groups:382groups[key] = []383groups[key].append(edge)384385# Return groups with multiple edges386return [edges for edges in groups.values() if len(edges) > 1]387388def merge_fact_group(self, edges: List[Dict]):389"""Merge group of duplicate edges."""390if len(edges) == 1:391return392393# Keep most recent/relevant394keeper = max(edges, key=lambda e: e.get("properties", {}).get("confidence", 0))395396# Merge metadata397for edge in edges:398if edge["id"] != keeper["id"]:399self.merge_properties(keeper, edge)400self.graph.edges.remove(edge)401402def merge_properties(self, target: Dict, source: Dict):403"""Merge properties from source into target."""404for key, value in source.get("properties", {}).items():405if key not in target["properties"]:406target["properties"][key] = value407elif isinstance(value, list):408target["properties"][key].extend(value)409```410411## Memory-Context Integration412413```python414class MemoryContextIntegrator:415def __init__(self, memory_system, context_limit=100000):416self.memory_system = memory_system417self.context_limit = context_limit418419def build_context(self, task: str, current_context: str = "") -> str:420"""Build context including relevant memories."""421# Extract entities from task422entities = self._extract_entities(task)423424# Retrieve memories for each entity425memories = []426for entity in entities:427entity_memories = self.memory_system.retrieve_entity(entity)428memories.extend(entity_memories)429430# Format memories for context431memory_section = self._format_memories(memories)432433# Combine with current context434combined = current_context + "\n\n" + memory_section435436# Check limit and truncate if needed437if self._token_count(combined) > self.context_limit:438combined = self._truncate_context(combined, self.context_limit)439440return combined441442def _extract_entities(self, task: str) -> List[str]:443"""Extract entity mentions from task."""444# In production, use NER or entity extraction445import re446pattern = r"\[([^\]]+)\]" # [[entity_name]] convention447return re.findall(pattern, task)448449def _format_memories(self, memories: List[Dict]) -> str:450"""Format memories for context injection."""451sections = ["## Relevant Memories"]452453for memory in memories:454formatted = f"- {memory.get('content', '')}"455if "source" in memory:456formatted += f" (Source: {memory['source']})"457if "timestamp" in memory:458formatted += f" [Time: {memory['timestamp']}]"459sections.append(formatted)460461return "\n".join(sections)462463def _token_count(self, text: str) -> int:464"""Estimate token count."""465return len(text) // 4 # Rough approximation466467def _truncate_context(self, context: str, limit: int) -> str:468"""Truncate context to fit limit."""469tokens = context.split()470truncated = []471count = 0472473for token in tokens:474if count + 1 > limit:475break476truncated.append(token)477count += 1478479return " ".join(truncated)480```481482## Framework Integration Examples483484### Mem0 Quick Start485486```python487from mem0 import Memory488489# Initialize with default config (uses local storage)490m = Memory()491492# Store memories with user scoping493m.add("Prefers Python 3.12 with type hints", user_id="dev-alice")494m.add("Working on microservices migration", user_id="dev-alice")495496# Search with natural language497results = m.search("What language does the user prefer?", user_id="dev-alice")498499# Batch operations500m.add([501"Sprint goal: complete auth service",502"Blocked on database schema review"503], user_id="dev-alice")504```505506### Graphiti (Zep's Open-Source Temporal KG Engine)507508```python509from graphiti_core import Graphiti510from graphiti_core.nodes import EpisodeType511512# Initialize with Neo4j backend513graphiti = Graphiti("bolt://localhost:7687", "neo4j", "password")514515# Add episodes (conversations, events)516await graphiti.add_episode(517name="user_conversation_42",518episode_body="Alice mentioned she moved to Berlin in January.",519source=EpisodeType.message,520source_description="Chat with Alice"521)522523# Search combines semantic, keyword, and graph traversal524results = await graphiti.search("Where does Alice live?")525```526527### Cognee (Open-Source Knowledge Engine for AI Memory)528529```python530import cognee531from cognee.modules.search.types import SearchType532533# ECL pipeline: add → cognify → memify → search534await cognee.add("./docs/")535await cognee.add("any-data")536await cognee.cognify()537await cognee.memify()538539# Graph-aware retrieval (default: GRAPH_COMPLETION)540results = await cognee.search(541query_text="any query to search in memory",542query_type=SearchType.GRAPH_COMPLETION,543)544545# Raw chunks when agent reasons over text itself546chunks = await cognee.search(547query_text="any query to search in memory",548query_type=SearchType.CHUNKS,549)550```551552