Class: A2A::Store::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/store/processor.rb

Overview

Async background task processor, modeled after async-job’s Inline processor.

The gospel (async-job) teaches:

- Async::Idler schedules tasks when the event loop is idle (backpressure)
- Each job runs in its own fiber (not thread)
- The returned Async::Task can be .wait'd for completion notification
- Errors are caught and logged, not re-raised
- task.defer_stop protects critical sections from interruption

This processor enables A2A’s non-blocking mode (return_immediately: true). The handler can enqueue work that executes after the HTTP response is sent.

Usage:

processor = A2A::Store::Processor.new

# Fire and forget:
processor.call { store.update_state(task_id, "WORKING"); do_work; store.complete(task_id, result) }

# Wait for completion:
task = processor.call { do_work }
task.wait

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent: nil) ⇒ Processor

Returns a new instance of Processor.



34
35
36
37
38
39
# File 'lib/a2a/store/processor.rb', line 34

def initialize(parent: nil)
  @parent         = parent
  @call_count     = 0
  @complete_count = 0
  @failed_count   = 0
end

Instance Attribute Details

#call_countObject (readonly)

Returns the value of attribute call_count.



32
33
34
# File 'lib/a2a/store/processor.rb', line 32

def call_count
  @call_count
end

#complete_countObject (readonly)

Returns the value of attribute complete_count.



32
33
34
# File 'lib/a2a/store/processor.rb', line 32

def complete_count
  @complete_count
end

#failed_countObject (readonly)

Returns the value of attribute failed_count.



32
33
34
# File 'lib/a2a/store/processor.rb', line 32

def failed_count
  @failed_count
end

Instance Method Details

#call { ... } ⇒ Async::Task

Execute a block asynchronously in a background fiber.

Returns the Async::Task so callers can optionally .wait on it.

Yields:

  • the work to perform

Returns:

  • (Async::Task)


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/a2a/store/processor.rb', line 48

def call(&block)
  @call_count += 1

  parent.async do |task|
    task.defer_stop do
      yield
    end
    @complete_count += 1
  rescue => error
    @failed_count += 1
    Console.error(self) { "Background task failed: #{error.message}" }
  ensure
    @call_count -= 1
  end
end

#startObject



64
65
66
# File 'lib/a2a/store/processor.rb', line 64

def start
  # Ensure we have an async context available
end

#statusObject



72
73
74
75
76
77
78
# File 'lib/a2a/store/processor.rb', line 72

def status
  {
    in_flight: @call_count,
    completed: @complete_count,
    failed:    @failed_count,
  }
end

#stopObject



68
69
70
# File 'lib/a2a/store/processor.rb', line 68

def stop
  # Allow in-flight tasks to drain naturally
end