ConsolidationService

Struct ConsolidationService 

Source
pub struct ConsolidationService<P: PersistenceBackend> {
    persistence: P,
    access_counts: LruCache<String, u32>,
    last_access: LruCache<String, u64>,
    llm: Option<Arc<dyn LlmProvider + Send + Sync>>,
    index: Option<Arc<SqliteBackend>>,
}
Expand description

Service for consolidating and managing memory lifecycle.

Fields§

§persistence: P

Persistence backend for memory storage.

§access_counts: LruCache<String, u32>

Access counts for memories (memory_id -> count), bounded LRU (HIGH-PERF-001).

§last_access: LruCache<String, u64>

Last access times (memory_id -> timestamp), bounded LRU (HIGH-PERF-001).

§llm: Option<Arc<dyn LlmProvider + Send + Sync>>

Optional LLM provider for intelligent consolidation.

§index: Option<Arc<SqliteBackend>>

Optional index backend for storing memory edges.

Implementations§

Source§

impl<P: PersistenceBackend> ConsolidationService<P>

Source

pub fn new(persistence: P) -> Self

Creates a new consolidation service.

Source

pub fn with_llm(self, llm: Arc<dyn LlmProvider + Send + Sync>) -> Self

Sets the LLM provider for intelligent consolidation.

Recommended: Wrap the LLM provider with ResilientLlmProvider for automatic retries, circuit breaker pattern, and error budget tracking.

§Arguments
  • llm - The LLM provider to use for summarization and analysis.
§Examples
use subcog::services::ConsolidationService;
use subcog::llm::{AnthropicClient, ResilientLlmProvider, LlmResilienceConfig};
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;

let backend = FilesystemBackend::new("/tmp/memories");
let client = AnthropicClient::new();
let resilience_config = LlmResilienceConfig::default();
let llm = Arc::new(ResilientLlmProvider::new(client, resilience_config));
let service = ConsolidationService::new(backend).with_llm(llm);
§Without Resilience Wrapper
use subcog::services::ConsolidationService;
use subcog::llm::AnthropicClient;
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;

let backend = FilesystemBackend::new("/tmp/memories");
let llm = Arc::new(AnthropicClient::new());
let service = ConsolidationService::new(backend).with_llm(llm);
Source

pub fn with_index(self, index: Arc<SqliteBackend>) -> Self

Sets the index backend for storing memory edges.

The index backend is used to store relationships between memories and their summaries. If not set, edge relationships will not be persisted.

§Arguments
  • index - The SQLite index backend to use for edge storage.
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::storage::index::SqliteBackend;
use std::sync::Arc;

let persistence = FilesystemBackend::new("/tmp/memories");
let index = SqliteBackend::new("/tmp/index.db")?;
let service = ConsolidationService::new(persistence)
    .with_index(Arc::new(index));
Source

pub fn record_access(&mut self, memory_id: &str)

Records an access to a memory for retention scoring.

This updates the internal LRU caches tracking access frequency and recency. These metrics are used by get_suggested_tier to determine memory retention tier.

§Arguments
  • memory_id - The ID of the memory that was accessed
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::MemoryTier;

let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);

// Record multiple accesses to a memory
service.record_access("mem_123");
service.record_access("mem_123");
service.record_access("mem_123");

// The tier will reflect high access frequency
let tier = service.get_suggested_tier("mem_123");
assert!(matches!(tier, MemoryTier::Hot | MemoryTier::Warm));
Source

pub fn consolidate_memories( &mut self, recall_service: &RecallService, config: &ConsolidationConfig, ) -> Result<ConsolidationStats>

Consolidates memories by finding related groups, summarizing them, and creating summary nodes.

This is the main orchestrator method for memory consolidation. It:

  1. Finds related memory groups using semantic similarity
  2. Summarizes each group using LLM
  3. Creates summary nodes and stores edge relationships
§Arguments
  • recall_service - The recall service for semantic search
  • config - Consolidation configuration (filters, thresholds, etc.)
§Returns

ConsolidationStats with counts of processed memories and summaries created.

§Errors

Returns an error if:

  • Finding related memories fails
  • LLM summarization fails (when LLM is configured)
  • Creating summary nodes fails
§Graceful Degradation

When LLM is unavailable:

  • Returns an error if summarization is required
  • Caller should handle by skipping consolidation or using fallback

Circuit Breaker: If using ResilientLlmProvider, LLM failures will:

  • Retry with exponential backoff (3 attempts by default)
  • Open circuit after consecutive failures (prevents cascading failures)
  • Return circuit breaker errors when circuit is open
§Configuration

Respects the following configuration options:

  • namespace_filter: Only consolidate specific namespaces
  • time_window_days: Only consolidate recent memories
  • min_memories_to_consolidate: Minimum group size
  • similarity_threshold: Similarity threshold for grouping
§Examples
use subcog::services::{ConsolidationService, RecallService};
use subcog::config::ConsolidationConfig;
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;

let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);
let recall = RecallService::new();
let mut config = ConsolidationConfig::new();
config.enabled = true;
config.similarity_threshold = 0.8;
config.time_window_days = Some(30);

let stats = service.consolidate_memories(&recall, &config)?;
println!("{}", stats.summary());
Source

pub fn consolidate(&mut self) -> Result<ConsolidationStats>

Runs lifecycle consolidation on all memories based on retention scoring.

This method performs the following operations:

  1. Calculates retention scores for all memories
  2. Archives memories with scores below the Archive tier threshold
  3. Detects contradictions within namespaces

Note: This is different from consolidate_memories, which groups related memories and creates LLM-powered summaries. This method focuses on lifecycle management and archival.

§Returns

ConsolidationStats with counts of processed, archived, and contradictory memories.

§Errors

Returns an error if:

  • Listing memory IDs fails
  • Retrieving or storing memories fails
  • Contradiction detection fails
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;

let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);

// Record some accesses to influence retention scores
service.record_access("mem_old");

// Run consolidation to archive old, unused memories
let stats = service.consolidate()?;
println!("Archived {} memories", stats.archived);
println!("Detected {} contradictions", stats.contradictions);
Source

fn calculate_retention_score(&self, memory_id: &str, now: u64) -> RetentionScore

Calculates the retention score for a memory.

§Precision Notes

The u32 and u64 to f32 casts are acceptable here as exact precision is not required for retention score calculations (values are normalized 0.0-1.0).

Source

fn detect_contradictions(&self, memory_ids: &[MemoryId]) -> Result<usize>

Detects potential contradictions between memories.

Source

pub fn merge_memories( &mut self, source_id: &MemoryId, target_id: &MemoryId, ) -> Result<Memory>

Merges two memories into one, combining their content and metadata.

The merge operation:

  • Combines content with a separator (---)
  • Merges tags without duplicates
  • Keeps target’s namespace and domain
  • Preserves earliest creation timestamp
  • Sets status to Active
  • Marks source memory as Archived
  • Requires re-embedding (embedding set to None)
§Arguments
  • source_id - The memory to merge from (will be archived)
  • target_id - The memory to merge into (will be updated)
§Returns

The merged memory with combined content and metadata.

§Errors

Returns an error if:

  • Source or target memory not found
  • Storing the merged or archived memories fails
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::{Memory, MemoryId, Namespace, Domain, MemoryStatus};
use subcog::current_timestamp;

let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);

// Create and store two memories (implementation detail omitted)

// Merge mem_1 into mem_2
let merged = service.merge_memories(&mem1.id, &mem2.id)?;
assert!(merged.content.contains("Use PostgreSQL"));
assert!(merged.content.contains("With connection pooling"));
assert_eq!(merged.tags.len(), 2); // Combined tags

Links two memories with a relationship edge.

Note: This is currently a placeholder that always succeeds. Full edge relationship functionality is provided via the index backend when using with_index. Use create_summary_node for automatic edge creation during consolidation.

§Arguments
  • _from_id - Source memory ID (currently unused)
  • _to_id - Target memory ID (currently unused)
  • _edge_type - Type of relationship edge (currently unused)
§Returns

Always returns Ok(()) as this is a placeholder implementation.

§Errors

This placeholder implementation never returns an error.

§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::{MemoryId, EdgeType};

let backend = FilesystemBackend::new("/tmp/memories");
let service = ConsolidationService::new(backend);

let from = MemoryId::new("mem_1");
let to = MemoryId::new("mem_2");

// Currently a no-op placeholder
service.link_memories(&from, &to, EdgeType::RelatedTo)?;
Source

pub fn get_suggested_tier(&self, memory_id: &str) -> MemoryTier

Gets the suggested retention tier for a memory based on access patterns.

Calculates a retention score from access frequency, recency, and importance, then maps it to a tier:

TierScore RangeUse Case
Hot≥ 0.7Frequently accessed, keep in fast storage
Warm0.4 - 0.7Moderate access, default tier
Cold0.2 - 0.4Rarely accessed, candidate for compression
Archive< 0.2Unused, move to cold storage
§Arguments
  • memory_id - The ID of the memory to assess
§Returns

The suggested MemoryTier for the memory.

§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::models::MemoryTier;

let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);

// Record frequent accesses
for _ in 0..10 {
    service.record_access("mem_popular");
}

// Check tier - should be Hot or Warm due to high access frequency
let tier = service.get_suggested_tier("mem_popular");
assert!(matches!(tier, MemoryTier::Hot | MemoryTier::Warm));

// Never accessed memory has low retention score, so Cold or Archive tier
let tier_new = service.get_suggested_tier("mem_never_seen");
assert!(matches!(tier_new, MemoryTier::Cold | MemoryTier::Archive));

Finds related memories grouped by namespace and semantic similarity.

Uses semantic search to find memories that are related to each other based on embeddings. Groups results by namespace and filters by similarity threshold.

§Arguments
  • recall_service - The recall service to use for semantic search
  • config - Consolidation configuration containing similarity threshold and filters
§Returns

A map of namespace to vectors of memory IDs that are related above the threshold.

§Errors

Returns an error if the semantic search fails.

§Graceful Degradation

If embeddings are not available:

  • Returns empty result without error
  • Logs a warning for visibility
§Examples
use subcog::services::{ConsolidationService, RecallService};
use subcog::config::ConsolidationConfig;
use subcog::storage::persistence::FilesystemBackend;

let backend = FilesystemBackend::new("/tmp/memories");
let service = ConsolidationService::new(backend);
let recall = RecallService::new();
let config = ConsolidationConfig::new().with_similarity_threshold(0.7);

let groups = service.find_related_memories(&recall, &config)?;
for (namespace, memory_ids) in groups {
    println!("{:?}: {} related memories", namespace, memory_ids.len());
}
Source

fn cluster_by_similarity( &self, memories: &[Memory], threshold: f32, ) -> Result<Vec<Vec<MemoryId>>>

Clusters memories by semantic similarity using embeddings.

Uses cosine similarity between embeddings to group related memories. Memories are grouped if their similarity is >= threshold.

Source

fn find_similar_memories( memories: &[Memory], current_idx: usize, current_embedding: &[f32], threshold: f32, assigned: &mut HashSet<String>, group: &mut Vec<MemoryId>, )

Finds memories similar to a given memory and adds them to the group.

This is a helper function for cluster_by_similarity that reduces nesting depth.

Source

pub fn summarize_group(&self, memories: &[Memory]) -> Result<String>

Summarizes a group of related memories using LLM.

Creates a concise summary from a group of related memories while preserving all key details. Uses the LLM provider to generate an intelligent summary that combines related information into a cohesive narrative.

§Arguments
  • memories - A slice of memories to summarize together.
§Returns

A summary string that preserves key details from all source memories.

§Errors

Returns an error if:

  • No LLM provider is configured (graceful degradation required)
  • The LLM call fails
  • The response cannot be processed
§Graceful Degradation

When LLM is unavailable:

  • Returns an error with a clear message
  • Caller should handle by either skipping summarization or using fallback logic
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use std::sync::Arc;
use subcog::llm::AnthropicClient;

let backend = FilesystemBackend::new("/tmp/memories");
let llm = Arc::new(AnthropicClient::new());
let service = ConsolidationService::new(backend).with_llm(llm);

let memories = vec![/* ... */];
let summary = service.summarize_group(&memories)?;
println!("Summary: {}", summary);
Source

pub fn create_summary_node( &mut self, summary_content: &str, source_memories: &[Memory], ) -> Result<Memory>

Creates a summary memory node from a group of related memories.

Creates a new Memory marked as is_summary=true that consolidates multiple related memories. The original memories are preserved and linked via source_memory_ids. Tags are merged from all source memories without duplicates.

§Arguments
  • summary_content - The summary text (typically generated by LLM).
  • source_memories - A slice of memories that were consolidated into this summary.
§Returns

A new Memory with is_summary=true and source memory links.

§Errors

Returns an error if:

  • Source memories slice is empty
  • Storing the summary node fails
§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;

let backend = FilesystemBackend::new("/tmp/memories");
let mut service = ConsolidationService::new(backend);

let memories = vec![/* ... */];
let summary_text = "Consolidated summary of related decisions";
let summary_node = service.create_summary_node(summary_text, &memories)?;
assert!(summary_node.is_summary);

Creates RelatedTo edges between all memories in a group.

This method is used when LLM summarization is unavailable but we still want to preserve the relationships between semantically similar memories. Creates a mesh topology where each memory is linked to every other memory in the group with RelatedTo edges.

§Arguments
  • memories - A slice of related memories to link together.
  • index - The index backend to use for storing edges.
§Returns

Returns Ok(()) on success, or an error if edge storage fails.

§Errors

Returns an error if edge storage operations fail.

§Examples
use subcog::services::ConsolidationService;
use subcog::storage::persistence::FilesystemBackend;
use subcog::storage::index::SqliteBackend;
use std::sync::Arc;

let backend = FilesystemBackend::new("/tmp/memories");
let index = SqliteBackend::new("/tmp/index.db")?;
let mut service = ConsolidationService::new(backend)
    .with_index(Arc::new(index));

let memories = vec![/* related memories */];
service.create_related_edges(&memories, &index)?;

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FutureExt for T

§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> IntoRequest<T> for T

§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
§

impl<L> LayerExt<L> for L

§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in [Layered].
§

impl<T> Pointable for T

§

const ALIGN: usize

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more