Class: ClaudeMemory::Infrastructure::OperationTracker
- Inherits:
-
Object
- Object
- ClaudeMemory::Infrastructure::OperationTracker
- Defined in:
- lib/claude_memory/infrastructure/operation_tracker.rb
Overview
Tracks long-running operations with checkpoints for resumability Enables detection of stuck operations and provides recovery mechanisms
Constant Summary collapse
- STALE_THRESHOLD_SECONDS =
24 hours
86400
Instance Method Summary collapse
-
#complete_operation(operation_id) ⇒ Object
Mark operation as completed.
-
#fail_operation(operation_id, error_message) ⇒ Object
Mark operation as failed with error message.
-
#get_checkpoint(operation_type:, scope:) ⇒ Object
Get checkpoint data for resuming operation Returns checkpoint_data:, processed_items: or nil Only returns non-stale operations (< 24h old).
-
#initialize(store) ⇒ OperationTracker
constructor
A new instance of OperationTracker.
-
#reset_stuck_operations(operation_type: nil, scope: nil) ⇒ Object
Reset stuck operations to failed status.
-
#start_operation(operation_type:, scope:, total_items: nil, checkpoint_data: {}) ⇒ Object
Start tracking a new operation Returns operation_id.
-
#stuck_operations ⇒ Object
Get all stuck operations (running for > 24h).
-
#update_progress(operation_id, processed_items:, checkpoint_data: nil) ⇒ Object
Update progress with new checkpoint data.
Constructor Details
#initialize(store) ⇒ OperationTracker
Returns a new instance of OperationTracker.
10 11 12 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 10 def initialize(store) @store = store end |
Instance Method Details
#complete_operation(operation_id) ⇒ Object
Mark operation as completed
42 43 44 45 46 47 48 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 42 def complete_operation(operation_id) now = Time.now.utc.iso8601 @store.db[:operation_progress].where(id: operation_id).update( status: "completed", completed_at: now ) end |
#fail_operation(operation_id, error_message) ⇒ Object
Mark operation as failed with error message
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 51 def fail_operation(operation_id, ) now = Time.now.utc.iso8601 checkpoint_data = @store.db[:operation_progress].where(id: operation_id).get(:checkpoint_data) checkpoint = checkpoint_data ? JSON.parse(checkpoint_data) : {} checkpoint[:error] = @store.db[:operation_progress].where(id: operation_id).update( status: "failed", completed_at: now, checkpoint_data: checkpoint.to_json ) end |
#get_checkpoint(operation_type:, scope:) ⇒ Object
Get checkpoint data for resuming operation Returns checkpoint_data:, processed_items: or nil Only returns non-stale operations (< 24h old)
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 67 def get_checkpoint(operation_type:, scope:) threshold_time = (Time.now.utc - STALE_THRESHOLD_SECONDS).iso8601 op = @store.db[:operation_progress] .where(operation_type: operation_type, scope: scope, status: "running") .where { started_at >= threshold_time } # Exclude stale operations .order(Sequel.desc(:started_at)) .first return nil unless op checkpoint_data = op[:checkpoint_data] ? JSON.parse(op[:checkpoint_data], symbolize_names: true) : {} { operation_id: op[:id], checkpoint_data: checkpoint_data, processed_items: op[:processed_items] || 0, total_items: op[:total_items], started_at: op[:started_at] } end |
#reset_stuck_operations(operation_type: nil, scope: nil) ⇒ Object
Reset stuck operations to failed status
99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 99 def reset_stuck_operations(operation_type: nil, scope: nil) dataset = @store.db[:operation_progress].where(status: "running") dataset = dataset.where(operation_type: operation_type) if operation_type dataset = dataset.where(scope: scope) if scope threshold_time = (Time.now.utc - STALE_THRESHOLD_SECONDS).iso8601 stuck = dataset.where { started_at < threshold_time } count = stuck.count return 0 if count.zero? fail_operations(stuck, "Reset by recover command - operation exceeded 24h timeout") count end |
#start_operation(operation_type:, scope:, total_items: nil, checkpoint_data: {}) ⇒ Object
Start tracking a new operation Returns operation_id
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 16 def start_operation(operation_type:, scope:, total_items: nil, checkpoint_data: {}) now = Time.now.utc.iso8601 # Mark any stale operations as failed before starting new one cleanup_stale_operations!(operation_type, scope) @store.db[:operation_progress].insert( operation_type: operation_type, scope: scope, status: "running", total_items: total_items, processed_items: 0, checkpoint_data: checkpoint_data.to_json, started_at: now, completed_at: nil ) end |
#stuck_operations ⇒ Object
Get all stuck operations (running for > 24h)
89 90 91 92 93 94 95 96 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 89 def stuck_operations threshold_time = (Time.now.utc - STALE_THRESHOLD_SECONDS).iso8601 @store.db[:operation_progress] .where(status: "running") .where { started_at < threshold_time } .all end |
#update_progress(operation_id, processed_items:, checkpoint_data: nil) ⇒ Object
Update progress with new checkpoint data
35 36 37 38 39 |
# File 'lib/claude_memory/infrastructure/operation_tracker.rb', line 35 def update_progress(operation_id, processed_items:, checkpoint_data: nil) updates = {processed_items: processed_items} updates[:checkpoint_data] = checkpoint_data.to_json if checkpoint_data @store.db[:operation_progress].where(id: operation_id).update(updates) end |