1use crate::Result;
37use crate::models::{Namespace, SearchFilter};
38use crate::storage::traits::IndexBackend;
39use chrono::{TimeZone, Utc};
40use std::collections::HashMap;
41use std::sync::Arc;
42use std::time::{Duration, Instant};
43use tracing::{debug, info, info_span, instrument, warn};
44
45pub const RETENTION_DAYS_ENV: &str = "SUBCOG_RETENTION_DAYS";
47
48pub const DEFAULT_RETENTION_DAYS: u32 = 365;
50
51#[inline]
53fn duration_to_millis(duration: Duration) -> u64 {
54 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
55}
56
57#[inline]
59fn usize_to_f64(value: usize) -> f64 {
60 let capped = u32::try_from(value).unwrap_or(u32::MAX);
61 f64::from(capped)
62}
63
64#[inline]
66fn u64_to_f64(value: u64) -> f64 {
67 let capped = u32::try_from(value).unwrap_or(u32::MAX);
68 f64::from(capped)
69}
70
71#[must_use]
75pub fn retention_days() -> u32 {
76 std::env::var(RETENTION_DAYS_ENV)
77 .ok()
78 .and_then(|v| v.parse().ok())
79 .unwrap_or(DEFAULT_RETENTION_DAYS)
80}
81
82#[derive(Debug, Clone)]
86pub struct RetentionConfig {
87 pub default_days: u32,
89
90 pub namespace_days: HashMap<Namespace, u32>,
94
95 pub minimum_days: u32,
99
100 pub batch_limit: usize,
104}
105
106impl Default for RetentionConfig {
107 fn default() -> Self {
108 Self {
109 default_days: DEFAULT_RETENTION_DAYS,
110 namespace_days: HashMap::new(),
111 minimum_days: 30, batch_limit: 10000, }
114 }
115}
116
117impl RetentionConfig {
118 #[must_use]
120 pub fn new() -> Self {
121 Self::default()
122 }
123
124 #[must_use]
132 pub fn from_env() -> Self {
133 let mut config = Self::default();
134
135 if let Some(d) = std::env::var(RETENTION_DAYS_ENV)
137 .ok()
138 .and_then(|days| days.parse::<u32>().ok())
139 {
140 config.default_days = d;
141 }
142
143 if let Some(d) = std::env::var("SUBCOG_RETENTION_MIN_DAYS")
145 .ok()
146 .and_then(|days| days.parse::<u32>().ok())
147 {
148 config.minimum_days = d;
149 }
150
151 if let Some(l) = std::env::var("SUBCOG_RETENTION_BATCH_LIMIT")
153 .ok()
154 .and_then(|limit| limit.parse::<usize>().ok())
155 {
156 config.batch_limit = l;
157 }
158
159 for ns in Namespace::all().iter().copied() {
161 let env_key = format!(
162 "SUBCOG_RETENTION_{}_DAYS",
163 ns.as_str().to_uppercase().replace('-', "_")
164 );
165 if let Some(d) = std::env::var(&env_key)
166 .ok()
167 .and_then(|days| days.parse::<u32>().ok())
168 {
169 config.namespace_days.insert(ns, d);
170 }
171 }
172
173 config
174 }
175
176 #[must_use]
178 pub const fn with_default_days(mut self, days: u32) -> Self {
179 self.default_days = days;
180 self
181 }
182
183 #[must_use]
185 pub const fn with_minimum_days(mut self, days: u32) -> Self {
186 self.minimum_days = days;
187 self
188 }
189
190 #[must_use]
192 pub const fn with_batch_limit(mut self, limit: usize) -> Self {
193 self.batch_limit = limit;
194 self
195 }
196
197 #[must_use]
199 pub fn with_namespace_days(mut self, namespace: Namespace, days: u32) -> Self {
200 self.namespace_days.insert(namespace, days);
201 self
202 }
203
204 #[must_use]
209 pub fn effective_days(&self, namespace: Namespace) -> u32 {
210 let days = self
211 .namespace_days
212 .get(&namespace)
213 .copied()
214 .unwrap_or(self.default_days);
215
216 days.max(self.minimum_days)
218 }
219
220 #[must_use]
224 pub fn cutoff_timestamp(&self, namespace: Namespace) -> u64 {
225 let days = self.effective_days(namespace);
226 let now = crate::current_timestamp();
227 let seconds_per_day: u64 = 86400;
228 now.saturating_sub(u64::from(days) * seconds_per_day)
229 }
230}
231
232#[derive(Debug, Clone, Default)]
234pub struct RetentionGcResult {
235 pub memories_checked: usize,
237
238 pub memories_tombstoned: usize,
240
241 pub by_namespace: HashMap<String, usize>,
243
244 pub dry_run: bool,
246
247 pub duration_ms: u64,
249}
250
251impl RetentionGcResult {
252 #[must_use]
254 pub const fn has_expired_memories(&self) -> bool {
255 self.memories_tombstoned > 0
256 }
257
258 #[must_use]
260 pub fn summary(&self) -> String {
261 let action = if self.dry_run {
262 "would tombstone"
263 } else {
264 "tombstoned"
265 };
266
267 if self.memories_tombstoned == 0 {
268 format!(
269 "No expired memories found ({} memories checked in {}ms)",
270 self.memories_checked, self.duration_ms
271 )
272 } else {
273 let ns_breakdown: Vec<String> = self
274 .by_namespace
275 .iter()
276 .map(|(ns, count)| format!("{ns}: {count}"))
277 .collect();
278
279 format!(
280 "{} {} expired memories ({}) - checked {} in {}ms",
281 action,
282 self.memories_tombstoned,
283 ns_breakdown.join(", "),
284 self.memories_checked,
285 self.duration_ms
286 )
287 }
288 }
289}
290
291pub struct RetentionGarbageCollector<I: IndexBackend> {
301 index: Arc<I>,
303
304 config: RetentionConfig,
306}
307
308impl<I: IndexBackend> RetentionGarbageCollector<I> {
309 #[must_use]
316 pub fn new(index: Arc<I>, config: RetentionConfig) -> Self {
317 let _ = Arc::strong_count(&index);
319 Self { index, config }
320 }
321
322 #[instrument(
342 name = "subcog.gc.retention",
343 skip(self),
344 fields(
345 request_id = tracing::field::Empty,
346 component = "gc",
347 operation = "retention",
348 dry_run = dry_run,
349 default_retention_days = self.config.default_days
350 )
351 )]
352 pub fn gc_expired_memories(&self, dry_run: bool) -> Result<RetentionGcResult> {
353 let start = Instant::now();
354 if let Some(request_id) = crate::observability::current_request_id() {
355 tracing::Span::current().record("request_id", request_id.as_str());
356 }
357 let mut result = RetentionGcResult {
358 dry_run,
359 ..Default::default()
360 };
361
362 let now = crate::current_timestamp();
363
364 for namespace in Namespace::user_namespaces().iter().copied() {
366 let cutoff = self.config.cutoff_timestamp(namespace);
367 let retention_days = self.config.effective_days(namespace);
368 let _span = info_span!(
369 "subcog.gc.retention.namespace",
370 namespace = %namespace.as_str(),
371 retention_days = retention_days
372 )
373 .entered();
374
375 debug!(
376 namespace = namespace.as_str(),
377 retention_days, cutoff, "Processing namespace for retention GC"
378 );
379
380 let count = self.process_namespace(namespace, cutoff, now, dry_run, &mut result)?;
381
382 if count > 0 {
383 result
384 .by_namespace
385 .insert(namespace.as_str().to_string(), count);
386 }
387 }
388
389 result.duration_ms = duration_to_millis(start.elapsed());
390
391 metrics::counter!(
393 "gc_retention_runs_total",
394 "dry_run" => dry_run.to_string()
395 )
396 .increment(1);
397 metrics::gauge!("gc_retention_tombstoned").set(usize_to_f64(result.memories_tombstoned));
398 metrics::histogram!("gc_retention_duration_ms").record(u64_to_f64(result.duration_ms));
399 metrics::histogram!(
400 "memory_lifecycle_duration_ms",
401 "component" => "gc",
402 "operation" => "retention"
403 )
404 .record(u64_to_f64(result.duration_ms));
405
406 info!(
407 memories_checked = result.memories_checked,
408 memories_tombstoned = result.memories_tombstoned,
409 duration_ms = result.duration_ms,
410 dry_run,
411 "Retention GC completed"
412 );
413
414 Ok(result)
415 }
416
417 fn process_namespace(
419 &self,
420 namespace: Namespace,
421 cutoff: u64,
422 now: u64,
423 dry_run: bool,
424 result: &mut RetentionGcResult,
425 ) -> Result<usize> {
426 let filter = SearchFilter::new()
427 .with_namespace(namespace)
428 .with_include_tombstoned(false);
429
430 let memories = self.index.list_all(&filter, self.config.batch_limit)?;
431 let mut tombstoned = 0;
432
433 for (id, _score) in memories {
434 result.memories_checked += 1;
435
436 let Some(memory) = self.index.get_memory(&id)? else {
438 continue;
439 };
440
441 if memory.created_at >= cutoff {
443 continue;
444 }
445
446 if dry_run {
448 tombstoned += 1;
449 continue;
450 }
451
452 let mut updated = memory.clone();
454 let now_i64 = i64::try_from(now).unwrap_or(i64::MAX);
455 let now_dt = Utc
456 .timestamp_opt(now_i64, 0)
457 .single()
458 .unwrap_or_else(Utc::now);
459 updated.tombstoned_at = Some(now_dt);
460
461 let Err(e) = self.index.index(&updated) else {
462 tombstoned += 1;
463 continue;
464 };
465
466 warn!(
467 memory_id = %id.as_str(),
468 error = %e,
469 "Failed to tombstone expired memory"
470 );
471 }
472
473 result.memories_tombstoned += tombstoned;
474 Ok(tombstoned)
475 }
476
477 #[must_use]
479 pub const fn config(&self) -> &RetentionConfig {
480 &self.config
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::models::{Domain, Memory, MemoryId, MemoryStatus};
488 use crate::storage::index::SqliteBackend;
489
490 fn create_test_memory(id: &str, namespace: Namespace, created_at: u64) -> Memory {
491 Memory {
492 id: MemoryId::new(id),
493 content: format!("Test memory {id}"),
494 namespace,
495 domain: Domain::new(),
496 project_id: None,
497 branch: None,
498 file_path: None,
499 status: MemoryStatus::Active,
500 created_at,
501 updated_at: created_at,
502 tombstoned_at: None,
503 expires_at: None,
504 embedding: None,
505 tags: vec!["test".to_string()],
506 #[cfg(feature = "group-scope")]
507 group_id: None,
508 source: None,
509 is_summary: false,
510 source_memory_ids: None,
511 consolidation_timestamp: None,
512 }
513 }
514
515 #[test]
516 fn test_retention_config_default() {
517 let config = RetentionConfig::default();
518 assert_eq!(config.default_days, 365);
519 assert_eq!(config.minimum_days, 30);
520 assert_eq!(config.batch_limit, 10000);
521 }
522
523 #[test]
524 fn test_retention_config_builders() {
525 let config = RetentionConfig::new()
526 .with_default_days(180)
527 .with_minimum_days(7)
528 .with_batch_limit(5000)
529 .with_namespace_days(Namespace::Decisions, 730);
530
531 assert_eq!(config.default_days, 180);
532 assert_eq!(config.minimum_days, 7);
533 assert_eq!(config.batch_limit, 5000);
534 assert_eq!(config.namespace_days.get(&Namespace::Decisions), Some(&730));
535 }
536
537 #[test]
538 fn test_effective_days_with_override() {
539 let config = RetentionConfig::new()
540 .with_default_days(365)
541 .with_namespace_days(Namespace::Decisions, 730);
542
543 assert_eq!(config.effective_days(Namespace::Decisions), 730);
545
546 assert_eq!(config.effective_days(Namespace::Learnings), 365);
548 }
549
550 #[test]
551 fn test_effective_days_minimum_enforced() {
552 let config = RetentionConfig::new()
553 .with_default_days(10) .with_minimum_days(30);
555
556 assert_eq!(config.effective_days(Namespace::Patterns), 30);
558 }
559
560 #[test]
561 fn test_cutoff_timestamp() {
562 let config = RetentionConfig::new().with_default_days(30);
563
564 let cutoff = config.cutoff_timestamp(Namespace::Decisions);
565 let now = crate::current_timestamp();
566 let expected = now - (30 * 86400);
567
568 assert!(cutoff.abs_diff(expected) <= 1);
570 }
571
572 #[test]
573 fn test_retention_gc_result_summary_no_expired() {
574 let result = RetentionGcResult {
575 memories_checked: 100,
576 memories_tombstoned: 0,
577 by_namespace: HashMap::new(),
578 dry_run: false,
579 duration_ms: 50,
580 };
581
582 assert!(!result.has_expired_memories());
583 assert!(result.summary().contains("No expired memories"));
584 assert!(result.summary().contains("100 memories checked"));
585 }
586
587 #[test]
588 fn test_retention_gc_result_summary_with_expired() {
589 let mut by_namespace = HashMap::new();
590 by_namespace.insert("decisions".to_string(), 5);
591 by_namespace.insert("learnings".to_string(), 3);
592
593 let result = RetentionGcResult {
594 memories_checked: 100,
595 memories_tombstoned: 8,
596 by_namespace,
597 dry_run: false,
598 duration_ms: 75,
599 };
600
601 assert!(result.has_expired_memories());
602 let summary = result.summary();
603 assert!(summary.contains("tombstoned 8 expired memories"));
604 }
605
606 #[test]
607 fn test_retention_gc_result_summary_dry_run() {
608 let mut by_namespace = HashMap::new();
609 by_namespace.insert("decisions".to_string(), 5);
610
611 let result = RetentionGcResult {
612 memories_checked: 50,
613 memories_tombstoned: 5,
614 by_namespace,
615 dry_run: true,
616 duration_ms: 25,
617 };
618
619 let summary = result.summary();
620 assert!(summary.contains("would tombstone"));
621 }
622
623 #[test]
624 fn test_gc_no_expired_memories() {
625 let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
626
627 let now = crate::current_timestamp();
629 let memory = create_test_memory("mem1", Namespace::Decisions, now);
630 backend.index(&memory).expect("Failed to index memory");
631
632 let config = RetentionConfig::new().with_default_days(30);
633 let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
634
635 let result = gc.gc_expired_memories(false).expect("GC should succeed");
636
637 assert!(!result.has_expired_memories());
638 assert_eq!(result.memories_checked, 1);
639 assert_eq!(result.memories_tombstoned, 0);
640 }
641
642 #[test]
643 fn test_gc_expired_memory_dry_run() {
644 let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
645
646 let now = crate::current_timestamp();
648 let old_timestamp = now - (400 * 86400);
649 let memory = create_test_memory("mem1", Namespace::Decisions, old_timestamp);
650 backend.index(&memory).expect("Failed to index memory");
651
652 let config = RetentionConfig::new().with_default_days(365);
653 let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
654
655 let result = gc.gc_expired_memories(true).expect("GC should succeed");
656
657 assert!(result.has_expired_memories());
658 assert_eq!(result.memories_tombstoned, 1);
659 assert!(result.dry_run);
660
661 let memory = backend
663 .get_memory(&MemoryId::new("mem1"))
664 .expect("Failed to get memory")
665 .expect("Memory should exist");
666 assert!(memory.tombstoned_at.is_none());
667 }
668
669 #[test]
670 fn test_gc_expired_memory_actual() {
671 let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
672
673 let now = crate::current_timestamp();
675 let old_timestamp = now - (400 * 86400);
676 let memory = create_test_memory("mem1", Namespace::Decisions, old_timestamp);
677 backend.index(&memory).expect("Failed to index memory");
678
679 let config = RetentionConfig::new().with_default_days(365);
680 let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
681
682 let result = gc.gc_expired_memories(false).expect("GC should succeed");
683
684 assert!(result.has_expired_memories());
685 assert_eq!(result.memories_tombstoned, 1);
686 assert!(!result.dry_run);
687
688 let memory = backend
690 .get_memory(&MemoryId::new("mem1"))
691 .expect("Failed to get memory")
692 .expect("Memory should exist");
693 assert!(memory.tombstoned_at.is_some());
694 }
695
696 #[test]
697 fn test_gc_per_namespace_retention() {
698 let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
699
700 let now = crate::current_timestamp();
701
702 let decisions_mem =
704 create_test_memory("decisions1", Namespace::Decisions, now - (100 * 86400));
705 backend
706 .index(&decisions_mem)
707 .expect("Failed to index memory");
708
709 let learnings_mem =
711 create_test_memory("learnings1", Namespace::Learnings, now - (100 * 86400));
712 backend
713 .index(&learnings_mem)
714 .expect("Failed to index memory");
715
716 let config = RetentionConfig::new()
718 .with_default_days(90)
719 .with_namespace_days(Namespace::Decisions, 730);
720
721 let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
722
723 let result = gc.gc_expired_memories(false).expect("GC should succeed");
724
725 assert_eq!(result.memories_tombstoned, 1);
728
729 let decisions = backend
731 .get_memory(&MemoryId::new("decisions1"))
732 .expect("Failed to get memory")
733 .expect("Memory should exist");
734 assert!(decisions.tombstoned_at.is_none());
735
736 let learnings = backend
738 .get_memory(&MemoryId::new("learnings1"))
739 .expect("Failed to get memory")
740 .expect("Memory should exist");
741 assert!(learnings.tombstoned_at.is_some());
742 }
743
744 #[test]
745 fn test_retention_days_from_env() {
746 let days = retention_days();
748 assert_eq!(days, DEFAULT_RETENTION_DAYS);
749 }
750}