Class: OllamaAgent::Runtime::SagaCoordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/ollama_agent/runtime/saga_coordinator.rb

Overview

Durable saga FSM with intent reservation, transition log, and terminal sealing.

Callers that hold multiple sagas or locks should follow a consistent global lock/saga order; this class does not reorder resources for you. rubocop:disable Metrics/ClassLength – FSM + persistence in one unit

Defined Under Namespace

Classes: AbortTransaction

Constant Summary collapse

START_FROM =
"__start__"

Instance Method Summary collapse

Constructor Details

#initialize(db:, intent_reservation:, lock_manager:, atomic_mutator:, wal:, clock_epoch_provider:) ⇒ SagaCoordinator

rubocop:disable Metrics/ParameterLists – constructor mirrors kernel wiring (E7/E8)



19
20
21
22
23
24
25
26
# File 'lib/ollama_agent/runtime/saga_coordinator.rb', line 19

def initialize(db:, intent_reservation:, lock_manager:, atomic_mutator:, wal:, clock_epoch_provider:)
  @db = db
  @intent_reservation = intent_reservation
  @lock_manager = lock_manager
  @atomic_mutator = atomic_mutator
  @wal = wal
  @clock_epoch_provider = clock_epoch_provider
end

Instance Method Details

#advance(manifest_id:, to_state:, reason: nil) ⇒ :ok, ...

Returns:

  • (:ok, :illegal_transition, :sealed, :missing)


35
36
37
# File 'lib/ollama_agent/runtime/saga_coordinator.rb', line 35

def advance(manifest_id:, to_state:, reason: nil)
  run_tx(:ok) { resolve_advance(manifest_id, to_state, reason, next_epoch) }
end

#compensate(manifest_id:, reason:) ⇒ :ok, ...

Returns:

  • (:ok, :sealed, :missing)


40
41
42
# File 'lib/ollama_agent/runtime/saga_coordinator.rb', line 40

def compensate(manifest_id:, reason:)
  run_tx(:ok) { resolve_compensate(manifest_id, reason, next_epoch) }
end

#each_active {|row| ... } ⇒ Object

Yield Parameters:

  • row (Hash{:manifest_id=>String, :state=>String})


61
62
63
64
65
66
67
# File 'lib/ollama_agent/runtime/saga_coordinator.rb', line 61

def each_active
  return enum_for(:each_active) unless block_given?

  @db.execute("SELECT manifest_id, state FROM sagas WHERE terminal = 0 ORDER BY manifest_id") do |r|
    yield({ manifest_id: r["manifest_id"], state: r["state"] })
  end
end

#start(manifest_id:, intent_hash:, planned_scopes:, supervisor_lease: nil, metadata: {}) ⇒ :reserved, ...

Returns:

  • (:reserved, :duplicate, :conflict)


30
31
32
# File 'lib/ollama_agent/runtime/saga_coordinator.rb', line 30

def start(manifest_id:, intent_hash:, planned_scopes:, supervisor_lease: nil, metadata: {})
  run_tx(:reserved) { perform_start(manifest_id, intent_hash, planned_scopes, supervisor_lease, ) }
end

#state_of(manifest_id:) ⇒ Hash?

Snapshot fields for a saga row.

Returns:

  • (Hash, nil)

    keys include :state, :terminal, :intent_hash, :planned_scopes, :supervisor_lease



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/ollama_agent/runtime/saga_coordinator.rb', line 47

def state_of(manifest_id:)
  row = @db.get_first_row("SELECT * FROM sagas WHERE manifest_id = ?", [manifest_id])
  return nil unless row

  {
    state: row["state"],
    terminal: row["terminal"].to_i == 1,
    intent_hash: row["intent_hash"],
    planned_scopes: JSON.parse(row["planned_scopes"]),
    supervisor_lease: row["supervisor_lease"]
  }
end