Class: ClaudeMemory::Infrastructure::OperationTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/claude_memory/infrastructure/operation_tracker.rb

Overview

Tracks long-running operations with checkpoints for resumability Enables detection of stuck operations and provides recovery mechanisms

Constant Summary collapse

STALE_THRESHOLD_SECONDS =

24 hours

86400

Instance Method Summary collapse

Constructor Details

#initialize(store) ⇒ OperationTracker

Returns a new instance of OperationTracker.



10
11
12
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 10

def initialize(store)
  @store = store
end

Instance Method Details

#complete_operation(operation_id) ⇒ Object

Mark operation as completed



42
43
44
45
46
47
48
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 42

def complete_operation(operation_id)
  now = Time.now.utc.iso8601
  @store.db[:operation_progress].where(id: operation_id).update(
    status: "completed",
    completed_at: now
  )
end

#fail_operation(operation_id, error_message) ⇒ Object

Mark operation as failed with error message



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 51

def fail_operation(operation_id, error_message)
  now = Time.now.utc.iso8601
  checkpoint_data = @store.db[:operation_progress].where(id: operation_id).get(:checkpoint_data)
  checkpoint = checkpoint_data ? JSON.parse(checkpoint_data) : {}
  checkpoint[:error] = error_message

  @store.db[:operation_progress].where(id: operation_id).update(
    status: "failed",
    completed_at: now,
    checkpoint_data: checkpoint.to_json
  )
end

#get_checkpoint(operation_type:, scope:) ⇒ Object

Get checkpoint data for resuming operation Returns checkpoint_data:, processed_items: or nil Only returns non-stale operations (< 24h old)



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 67

def get_checkpoint(operation_type:, scope:)
  threshold_time = (Time.now.utc - STALE_THRESHOLD_SECONDS).iso8601

  op = @store.db[:operation_progress]
    .where(operation_type: operation_type, scope: scope, status: "running")
    .where { started_at >= threshold_time }  # Exclude stale operations
    .order(Sequel.desc(:started_at))
    .first

  return nil unless op

  checkpoint_data = op[:checkpoint_data] ? JSON.parse(op[:checkpoint_data], symbolize_names: true) : {}
  {
    operation_id: op[:id],
    checkpoint_data: checkpoint_data,
    processed_items: op[:processed_items] || 0,
    total_items: op[:total_items],
    started_at: op[:started_at]
  }
end

#reset_stuck_operations(operation_type: nil, scope: nil) ⇒ Object

Reset stuck operations to failed status



99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 99

def reset_stuck_operations(operation_type: nil, scope: nil)
  dataset = @store.db[:operation_progress].where(status: "running")
  dataset = dataset.where(operation_type: operation_type) if operation_type
  dataset = dataset.where(scope: scope) if scope

  threshold_time = (Time.now.utc - STALE_THRESHOLD_SECONDS).iso8601
  stuck = dataset.where { started_at < threshold_time }

  count = stuck.count
  return 0 if count.zero?

  fail_operations(stuck, "Reset by recover command - operation exceeded 24h timeout")
  count
end

#start_operation(operation_type:, scope:, total_items: nil, checkpoint_data: {}) ⇒ Object

Start tracking a new operation Returns operation_id



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 16

def start_operation(operation_type:, scope:, total_items: nil, checkpoint_data: {})
  now = Time.now.utc.iso8601

  # Mark any stale operations as failed before starting new one
  cleanup_stale_operations!(operation_type, scope)

  @store.db[:operation_progress].insert(
    operation_type: operation_type,
    scope: scope,
    status: "running",
    total_items: total_items,
    processed_items: 0,
    checkpoint_data: checkpoint_data.to_json,
    started_at: now,
    completed_at: nil
  )
end

#stuck_operationsObject

Get all stuck operations (running for > 24h)



89
90
91
92
93
94
95
96
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 89

def stuck_operations
  threshold_time = (Time.now.utc - STALE_THRESHOLD_SECONDS).iso8601

  @store.db[:operation_progress]
    .where(status: "running")
    .where { started_at < threshold_time }
    .all
end

#update_progress(operation_id, processed_items:, checkpoint_data: nil) ⇒ Object

Update progress with new checkpoint data



35
36
37
38
39
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 35

def update_progress(operation_id, processed_items:, checkpoint_data: nil)
  updates = {processed_items: processed_items}
  updates[:checkpoint_data] = checkpoint_data.to_json if checkpoint_data
  @store.db[:operation_progress].where(id: operation_id).update(updates)
end