pub struct ConsolidationService<P: PersistenceBackend> {
persistence: P,
access_counts: LruCache<String, u32>,
last_access: LruCache<String, u64>,
llm: Option<Arc<dyn LlmProvider + Send + Sync>>,
index: Option<Arc<SqliteBackend>>,
}Expand description
Service for consolidating and managing memory lifecycle.
Fields§
§persistence: PPersistence backend for memory storage.
access_counts: LruCache<String, u32>Access counts for memories (memory_id -> count), bounded LRU (HIGH-PERF-001).
last_access: LruCache<String, u64>Last access times (memory_id -> timestamp), bounded LRU (HIGH-PERF-001).
llm: Option<Arc<dyn LlmProvider + Send + Sync>>Optional LLM provider for intelligent consolidation.
index: Option<Arc<SqliteBackend>>Optional index backend for storing memory edges.
Implementations§
Source§impl<P: PersistenceBackend> ConsolidationService<P>
impl<P: PersistenceBackend> ConsolidationService<P>
Sourcepub fn with_llm(self, llm: Arc<dyn LlmProvider + Send + Sync>) -> Self
pub fn with_llm(self, llm: Arc<dyn LlmProvider + Send + Sync>) -> Self
Sets the LLM provider for intelligent consolidation.
Recommended: Wrap the LLM provider with ResilientLlmProvider for automatic
retries, circuit breaker pattern, and error budget tracking.
§Arguments
llm- The LLM provider to use for summarization and analysis.
§Examples
§With Resilience Wrapper (Recommended)
use subcog::services::ConsolidationService;
use subcog::llm::{AnthropicClient, ResilientLlmProvider, LlmResilienceConfig};
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;
let backend = FilesystemBackend::new("/tmp/memories");
let client = AnthropicClient::new();
let resilience_config = LlmResilienceConfig::default();
let llm = Arc::new(ResilientLlmProvider::new(client, resilience_config));
let service = ConsolidationService::new(backend).with_llm(llm);§Without Resilience Wrapper
use subcog::services::ConsolidationService;
use subcog::llm::AnthropicClient;
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;
let backend = FilesystemBackend::new("/tmp/memories");
let llm = Arc::new(AnthropicClient::new());
let service = ConsolidationService::new(backend).with_llm(llm);Sourcepub fn with_index(self, index: Arc<SqliteBackend>) -> Self
pub fn with_index(self, index: Arc<SqliteBackend>) -> Self
Sets the index backend for storing memory edges.
The index backend is used to store relationships between memories and their summaries. If not set, edge relationships will not be persisted.
§Arguments
index- TheSQLiteindex backend to use for edge storage.
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::storage::index::SqliteBackend;
use std::sync::Arc;
let persistence = FilesystemBackend::new("/tmp/memories");
let index = SqliteBackend::new("/tmp/index.db")?;
let service = ConsolidationService::new(persistence)
.with_index(Arc::new(index));Sourcepub fn record_access(&mut self, memory_id: &str)
pub fn record_access(&mut self, memory_id: &str)
Records an access to a memory for retention scoring.
This updates the internal LRU caches tracking access frequency and recency.
These metrics are used by get_suggested_tier to
determine memory retention tier.
§Arguments
memory_id- The ID of the memory that was accessed
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::MemoryTier;
let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
// Record multiple accesses to a memory
service.record_access("mem_123");
service.record_access("mem_123");
service.record_access("mem_123");
// The tier will reflect high access frequency
let tier = service.get_suggested_tier("mem_123");
assert!(matches!(tier, MemoryTier::Hot | MemoryTier::Warm));Sourcepub fn consolidate_memories(
&mut self,
recall_service: &RecallService,
config: &ConsolidationConfig,
) -> Result<ConsolidationStats>
pub fn consolidate_memories( &mut self, recall_service: &RecallService, config: &ConsolidationConfig, ) -> Result<ConsolidationStats>
Consolidates memories by finding related groups, summarizing them, and creating summary nodes.
This is the main orchestrator method for memory consolidation. It:
- Finds related memory groups using semantic similarity
- Summarizes each group using LLM
- Creates summary nodes and stores edge relationships
§Arguments
recall_service- The recall service for semantic searchconfig- Consolidation configuration (filters, thresholds, etc.)
§Returns
ConsolidationStats with counts of processed memories and summaries created.
§Errors
Returns an error if:
- Finding related memories fails
- LLM summarization fails (when LLM is configured)
- Creating summary nodes fails
§Graceful Degradation
When LLM is unavailable:
- Returns an error if summarization is required
- Caller should handle by skipping consolidation or using fallback
Circuit Breaker: If using ResilientLlmProvider, LLM failures will:
- Retry with exponential backoff (3 attempts by default)
- Open circuit after consecutive failures (prevents cascading failures)
- Return circuit breaker errors when circuit is open
§Configuration
Respects the following configuration options:
namespace_filter: Only consolidate specific namespacestime_window_days: Only consolidate recent memoriesmin_memories_to_consolidate: Minimum group sizesimilarity_threshold: Similarity threshold for grouping
§Examples
use subcog::services::{ConsolidationService, RecallService};
use subcog::config::ConsolidationConfig;
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;
let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
let recall = RecallService::new();
let mut config = ConsolidationConfig::new();
config.enabled = true;
config.similarity_threshold = 0.8;
config.time_window_days = Some(30);
let stats = service.consolidate_memories(&recall, &config)?;
println!("{}", stats.summary());Sourcepub fn consolidate(&mut self) -> Result<ConsolidationStats>
pub fn consolidate(&mut self) -> Result<ConsolidationStats>
Runs lifecycle consolidation on all memories based on retention scoring.
This method performs the following operations:
- Calculates retention scores for all memories
- Archives memories with scores below the Archive tier threshold
- Detects contradictions within namespaces
Note: This is different from consolidate_memories,
which groups related memories and creates LLM-powered summaries. This method focuses
on lifecycle management and archival.
§Returns
ConsolidationStats with counts of processed, archived, and contradictory memories.
§Errors
Returns an error if:
- Listing memory IDs fails
- Retrieving or storing memories fails
- Contradiction detection fails
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
// Record some accesses to influence retention scores
service.record_access("mem_old");
// Run consolidation to archive old, unused memories
let stats = service.consolidate()?;
println!("Archived {} memories", stats.archived);
println!("Detected {} contradictions", stats.contradictions);Sourcefn calculate_retention_score(&self, memory_id: &str, now: u64) -> RetentionScore
fn calculate_retention_score(&self, memory_id: &str, now: u64) -> RetentionScore
Calculates the retention score for a memory.
§Precision Notes
The u32 and u64 to f32 casts are acceptable here as exact precision is not required for retention score calculations (values are normalized 0.0-1.0).
Sourcefn detect_contradictions(&self, memory_ids: &[MemoryId]) -> Result<usize>
fn detect_contradictions(&self, memory_ids: &[MemoryId]) -> Result<usize>
Detects potential contradictions between memories.
Sourcepub fn merge_memories(
&mut self,
source_id: &MemoryId,
target_id: &MemoryId,
) -> Result<Memory>
pub fn merge_memories( &mut self, source_id: &MemoryId, target_id: &MemoryId, ) -> Result<Memory>
Merges two memories into one, combining their content and metadata.
The merge operation:
- Combines content with a separator (
---) - Merges tags without duplicates
- Keeps target’s namespace and domain
- Preserves earliest creation timestamp
- Sets status to Active
- Marks source memory as Archived
- Requires re-embedding (embedding set to None)
§Arguments
source_id- The memory to merge from (will be archived)target_id- The memory to merge into (will be updated)
§Returns
The merged memory with combined content and metadata.
§Errors
Returns an error if:
- Source or target memory not found
- Storing the merged or archived memories fails
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::{Memory, MemoryId, Namespace, Domain, MemoryStatus};
use subcog::current_timestamp;
let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
// Create and store two memories (implementation detail omitted)
// Merge mem_1 into mem_2
let merged = service.merge_memories(&mem1.id, &mem2.id)?;
assert!(merged.content.contains("Use PostgreSQL"));
assert!(merged.content.contains("With connection pooling"));
assert_eq!(merged.tags.len(), 2); // Combined tagsSourcepub const fn link_memories(
&self,
_from_id: &MemoryId,
_to_id: &MemoryId,
_edge_type: EdgeType,
) -> Result<()>
pub const fn link_memories( &self, _from_id: &MemoryId, _to_id: &MemoryId, _edge_type: EdgeType, ) -> Result<()>
Links two memories with a relationship edge.
Note: This is currently a placeholder that always succeeds. Full
edge relationship functionality is provided via the index backend when
using with_index. Use create_summary_node
for automatic edge creation during consolidation.
§Arguments
_from_id- Source memory ID (currently unused)_to_id- Target memory ID (currently unused)_edge_type- Type of relationship edge (currently unused)
§Returns
Always returns Ok(()) as this is a placeholder implementation.
§Errors
This placeholder implementation never returns an error.
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::{MemoryId, EdgeType};
let backend = FilesystemBackend::new("/tmp/memories");
let service = ConsolidationService::new(backend);
let from = MemoryId::new("mem_1");
let to = MemoryId::new("mem_2");
// Currently a no-op placeholder
service.link_memories(&from, &to, EdgeType::RelatedTo)?;Sourcepub fn get_suggested_tier(&self, memory_id: &str) -> MemoryTier
pub fn get_suggested_tier(&self, memory_id: &str) -> MemoryTier
Gets the suggested retention tier for a memory based on access patterns.
Calculates a retention score from access frequency, recency, and importance, then maps it to a tier:
| Tier | Score Range | Use Case |
|---|---|---|
| Hot | ≥ 0.7 | Frequently accessed, keep in fast storage |
| Warm | 0.4 - 0.7 | Moderate access, default tier |
| Cold | 0.2 - 0.4 | Rarely accessed, candidate for compression |
| Archive | < 0.2 | Unused, move to cold storage |
§Arguments
memory_id- The ID of the memory to assess
§Returns
The suggested MemoryTier for the memory.
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::MemoryTier;
let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
// Record frequent accesses
for _ in 0..10 {
service.record_access("mem_popular");
}
// Check tier - should be Hot or Warm due to high access frequency
let tier = service.get_suggested_tier("mem_popular");
assert!(matches!(tier, MemoryTier::Hot | MemoryTier::Warm));
// Never accessed memory has low retention score, so Cold or Archive tier
let tier_new = service.get_suggested_tier("mem_never_seen");
assert!(matches!(tier_new, MemoryTier::Cold | MemoryTier::Archive));Finds related memories grouped by namespace and semantic similarity.
Uses semantic search to find memories that are related to each other based on embeddings. Groups results by namespace and filters by similarity threshold.
§Arguments
recall_service- The recall service to use for semantic searchconfig- Consolidation configuration containing similarity threshold and filters
§Returns
A map of namespace to vectors of memory IDs that are related above the threshold.
§Errors
Returns an error if the semantic search fails.
§Graceful Degradation
If embeddings are not available:
- Returns empty result without error
- Logs a warning for visibility
§Examples
use subcog::services::{ConsolidationService, RecallService};
use subcog::config::ConsolidationConfig;
use subcog::storage::persistence::FilesystemBackend;
let backend = FilesystemBackend::new("/tmp/memories");
let service = ConsolidationService::new(backend);
let recall = RecallService::new();
let config = ConsolidationConfig::new().with_similarity_threshold(0.7);
let groups = service.find_related_memories(&recall, &config)?;
for (namespace, memory_ids) in groups {
println!("{:?}: {} related memories", namespace, memory_ids.len());
}Sourcefn cluster_by_similarity(
&self,
memories: &[Memory],
threshold: f32,
) -> Result<Vec<Vec<MemoryId>>>
fn cluster_by_similarity( &self, memories: &[Memory], threshold: f32, ) -> Result<Vec<Vec<MemoryId>>>
Clusters memories by semantic similarity using embeddings.
Uses cosine similarity between embeddings to group related memories. Memories are grouped if their similarity is >= threshold.
Sourcefn find_similar_memories(
memories: &[Memory],
current_idx: usize,
current_embedding: &[f32],
threshold: f32,
assigned: &mut HashSet<String>,
group: &mut Vec<MemoryId>,
)
fn find_similar_memories( memories: &[Memory], current_idx: usize, current_embedding: &[f32], threshold: f32, assigned: &mut HashSet<String>, group: &mut Vec<MemoryId>, )
Finds memories similar to a given memory and adds them to the group.
This is a helper function for cluster_by_similarity that reduces nesting depth.
Sourcepub fn summarize_group(&self, memories: &[Memory]) -> Result<String>
pub fn summarize_group(&self, memories: &[Memory]) -> Result<String>
Summarizes a group of related memories using LLM.
Creates a concise summary from a group of related memories while preserving all key details. Uses the LLM provider to generate an intelligent summary that combines related information into a cohesive narrative.
§Arguments
memories- A slice of memories to summarize together.
§Returns
A summary string that preserves key details from all source memories.
§Errors
Returns an error if:
- No LLM provider is configured (graceful degradation required)
- The LLM call fails
- The response cannot be processed
§Graceful Degradation
When LLM is unavailable:
- Returns an error with a clear message
- Caller should handle by either skipping summarization or using fallback logic
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;
use subcog::llm::AnthropicClient;
let backend = FilesystemBackend::new("/tmp/memories");
let llm = Arc::new(AnthropicClient::new());
let service = ConsolidationService::new(backend).with_llm(llm);
let memories = vec![/* ... */];
let summary = service.summarize_group(&memories)?;
println!("Summary: {}", summary);Sourcepub fn create_summary_node(
&mut self,
summary_content: &str,
source_memories: &[Memory],
) -> Result<Memory>
pub fn create_summary_node( &mut self, summary_content: &str, source_memories: &[Memory], ) -> Result<Memory>
Creates a summary memory node from a group of related memories.
Creates a new Memory marked as is_summary=true that consolidates multiple
related memories. The original memories are preserved and linked via
source_memory_ids. Tags are merged from all source memories without duplicates.
§Arguments
summary_content- The summary text (typically generated by LLM).source_memories- A slice of memories that were consolidated into this summary.
§Returns
A new Memory with is_summary=true and source memory links.
§Errors
Returns an error if:
- Source memories slice is empty
- Storing the summary node fails
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
let memories = vec![/* ... */];
let summary_text = "Consolidated summary of related decisions";
let summary_node = service.create_summary_node(summary_text, &memories)?;
assert!(summary_node.is_summary);Creates RelatedTo edges between all memories in a group.
This method is used when LLM summarization is unavailable but we still want
to preserve the relationships between semantically similar memories. Creates
a mesh topology where each memory is linked to every other memory in the group
with RelatedTo edges.
§Arguments
memories- A slice of related memories to link together.index- The index backend to use for storing edges.
§Returns
Returns Ok(()) on success, or an error if edge storage fails.
§Errors
Returns an error if edge storage operations fail.
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::storage::index::SqliteBackend;
use std::sync::Arc;
let backend = FilesystemBackend::new("/tmp/memories");
let index = SqliteBackend::new("/tmp/index.db")?;
let mut service = ConsolidationService::new(backend)
.with_index(Arc::new(index));
let memories = vec![/* related memories */];
service.create_related_edges(&memories, &index)?;Auto Trait Implementations§
impl<P> Freeze for ConsolidationService<P>where
P: Freeze,
impl<P> !RefUnwindSafe for ConsolidationService<P>
impl<P> Send for ConsolidationService<P>
impl<P> Sync for ConsolidationService<P>
impl<P> Unpin for ConsolidationService<P>where
P: Unpin,
impl<P> !UnwindSafe for ConsolidationService<P>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<L> LayerExt<L> for L
impl<L> LayerExt<L> for L
§fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
Layered].