Class: A2A::TaskStore
- Inherits:
-
Object
- Object
- A2A::TaskStore
- Defined in:
- lib/a2a/task_store.rb
Overview
In-memory task registry with pub/sub for streaming and webhook delivery. Swap for a DB-backed implementation in production – if the server crashes, in-memory tasks vanish and the client never gets notified.
Constant Summary collapse
- TERMINAL_STATES =
%w[ TASK_STATE_COMPLETED TASK_STATE_FAILED TASK_STATE_CANCELED TASK_STATE_REJECTED ].freeze
Instance Method Summary collapse
-
#add_artifact(id, artifact) ⇒ Object
── Artifacts ──────────────────────────────────────────────────────.
-
#add_message(id, msg) ⇒ Object
── History ────────────────────────────────────────────────────────.
- #cancel(id) ⇒ Object
- #complete(id, result) ⇒ Object
-
#create(id, context_id, push_config = nil) ⇒ Object
── Task CRUD ──────────────────────────────────────────────────────.
-
#create_push_config(task_id, config) ⇒ Object
── Push Notification Config CRUD ──────────────────────────────────.
- #delete_push_config(task_id, config_id) ⇒ Object
- #fail(id, msg) ⇒ Object
- #get(id) ⇒ Object
- #get_push_config(task_id, config_id) ⇒ Object
-
#initialize ⇒ TaskStore
constructor
A new instance of TaskStore.
-
#list(context_id: nil, state: nil) ⇒ Object
── Listing ────────────────────────────────────────────────────────.
- #list_push_configs(task_id) ⇒ Object
-
#subscribe(task_id) ⇒ Object
Subscribe to task updates.
- #terminal?(id) ⇒ Boolean
- #unsubscribe(task_id, queue) ⇒ Object
- #update_state(id, state, message: nil) ⇒ Object
Constructor Details
#initialize ⇒ TaskStore
Returns a new instance of TaskStore.
26 27 28 29 |
# File 'lib/a2a/task_store.rb', line 26 def initialize @tasks = {} @mutex = Mutex.new end |
Instance Method Details
#add_artifact(id, artifact) ⇒ Object
── Artifacts ──────────────────────────────────────────────────────
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/a2a/task_store.rb', line 128 def add_artifact(id, artifact) task = nil @mutex.synchronize do task = @tasks[id] return nil unless task task.artifacts << artifact task.updated_at = Time.now.utc end if task event = { "taskId" => task.id, "contextId" => task.context_id, "artifact" => artifact, "append" => false, "lastChunk" => true, } notify_subscribers(task, event, type: :artifact) deliver_webhooks(task, { "artifactUpdate" => event }) end task end |
#add_message(id, msg) ⇒ Object
── History ────────────────────────────────────────────────────────
152 153 154 155 156 157 158 159 160 |
# File 'lib/a2a/task_store.rb', line 152 def (id, msg) @mutex.synchronize do task = @tasks[id] return nil unless task task.history << msg task.updated_at = Time.now.utc task end end |
#cancel(id) ⇒ Object
114 115 116 117 118 119 |
# File 'lib/a2a/task_store.rb', line 114 def cancel(id) update_state(id, "TASK_STATE_CANCELED") task = get(id) close_subscribers(task) if task task end |
#complete(id, result) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/a2a/task_store.rb', line 78 def complete(id, result) task = nil @mutex.synchronize do task = @tasks[id] return nil unless task task.state = "TASK_STATE_COMPLETED" task.result = result task.updated_at = Time.now.utc end if task event = build_status_event(task) notify_subscribers(task, event) deliver_webhooks(task, { "statusUpdate" => event }) close_subscribers(task) end task end |
#create(id, context_id, push_config = nil) ⇒ Object
── Task CRUD ──────────────────────────────────────────────────────
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/a2a/task_store.rb', line 33 def create(id, context_id, push_config = nil) @mutex.synchronize do now = Time.now.utc configs = {} if push_config cfg_id = push_config["id"] || SecureRandom.uuid push_config["id"] = cfg_id configs[cfg_id] = push_config end @tasks[id] = Task.new( id: id, context_id: context_id, state: "TASK_STATE_SUBMITTED", result: nil, artifacts: [], history: [], push_configs: configs, subscribers: [], created_at: now, updated_at: now, ) end end |
#create_push_config(task_id, config) ⇒ Object
── Push Notification Config CRUD ──────────────────────────────────
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/a2a/task_store.rb', line 176 def create_push_config(task_id, config) @mutex.synchronize do task = @tasks[task_id] return nil unless task cfg_id = config["id"] || SecureRandom.uuid config["id"] = cfg_id config["taskId"] = task_id task.push_configs[cfg_id] = config task.updated_at = Time.now.utc config end end |
#delete_push_config(task_id, config_id) ⇒ Object
205 206 207 208 209 210 211 212 |
# File 'lib/a2a/task_store.rb', line 205 def delete_push_config(task_id, config_id) @mutex.synchronize do task = @tasks[task_id] return nil unless task task.push_configs.delete(config_id) task.updated_at = Time.now.utc end end |
#fail(id, msg) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/a2a/task_store.rb', line 96 def fail(id, msg) task = nil @mutex.synchronize do task = @tasks[id] return nil unless task task.state = "TASK_STATE_FAILED" task.result = msg task.updated_at = Time.now.utc end if task event = build_status_event(task) notify_subscribers(task, event) deliver_webhooks(task, { "statusUpdate" => event }) close_subscribers(task) end task end |
#get(id) ⇒ Object
58 59 60 |
# File 'lib/a2a/task_store.rb', line 58 def get(id) @mutex.synchronize { @tasks[id] } end |
#get_push_config(task_id, config_id) ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/a2a/task_store.rb', line 189 def get_push_config(task_id, config_id) @mutex.synchronize do task = @tasks[task_id] return nil unless task task.push_configs[config_id] end end |
#list(context_id: nil, state: nil) ⇒ Object
── Listing ────────────────────────────────────────────────────────
164 165 166 167 168 169 170 171 172 |
# File 'lib/a2a/task_store.rb', line 164 def list(context_id: nil, state: nil) @mutex.synchronize do @tasks.values .select { |t| context_id.nil? || t.context_id == context_id } .select { |t| state.nil? || t.state == state } .sort_by { |t| t.updated_at } .reverse end end |
#list_push_configs(task_id) ⇒ Object
197 198 199 200 201 202 203 |
# File 'lib/a2a/task_store.rb', line 197 def list_push_configs(task_id) @mutex.synchronize do task = @tasks[task_id] return [] unless task task.push_configs.values end end |
#subscribe(task_id) ⇒ Object
Subscribe to task updates. Returns a Queue that will receive events. Each event is a Hash: { type: :status|:artifact, data: Hash } A nil sentinel signals stream end.
219 220 221 222 223 224 225 226 227 |
# File 'lib/a2a/task_store.rb', line 219 def subscribe(task_id) queue = Thread::Queue.new @mutex.synchronize do task = @tasks[task_id] return nil unless task task.subscribers << queue end queue end |
#terminal?(id) ⇒ Boolean
121 122 123 124 |
# File 'lib/a2a/task_store.rb', line 121 def terminal?(id) task = get(id) task && TERMINAL_STATES.include?(task.state) end |
#unsubscribe(task_id, queue) ⇒ Object
229 230 231 232 233 234 235 |
# File 'lib/a2a/task_store.rb', line 229 def unsubscribe(task_id, queue) @mutex.synchronize do task = @tasks[task_id] return unless task task.subscribers.delete(queue) end end |
#update_state(id, state, message: nil) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/a2a/task_store.rb', line 62 def update_state(id, state, message: nil) task = nil @mutex.synchronize do task = @tasks[id] return nil unless task task.state = state task.updated_at = Time.now.utc end if task event = build_status_event(task, ) notify_subscribers(task, event) deliver_webhooks(task, { "statusUpdate" => event }) end task end |