Class: A2A::TaskStore

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/task_store.rb

Overview

In-memory task registry with pub/sub for streaming and webhook delivery. Swap for a DB-backed implementation in production – if the server crashes, in-memory tasks vanish and the client never gets notified.

Constant Summary collapse

TERMINAL_STATES =
%w[
  TASK_STATE_COMPLETED TASK_STATE_FAILED
  TASK_STATE_CANCELED TASK_STATE_REJECTED
].freeze

Instance Method Summary collapse

Constructor Details

#initializeTaskStore

Returns a new instance of TaskStore.



26
27
28
29
# File 'lib/a2a/task_store.rb', line 26

def initialize
  @tasks = {}
  @mutex = Mutex.new
end

Instance Method Details

#add_artifact(id, artifact) ⇒ Object

── Artifacts ──────────────────────────────────────────────────────



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/a2a/task_store.rb', line 128

def add_artifact(id, artifact)
  task = nil
  @mutex.synchronize do
    task = @tasks[id]
    return nil unless task
    task.artifacts << artifact
    task.updated_at = Time.now.utc
  end
  if task
    event = {
      "taskId"    => task.id,
      "contextId" => task.context_id,
      "artifact"  => artifact,
      "append"    => false,
      "lastChunk" => true,
    }
    notify_subscribers(task, event, type: :artifact)
    deliver_webhooks(task, { "artifactUpdate" => event })
  end
  task
end

#add_message(id, msg) ⇒ Object

── History ────────────────────────────────────────────────────────



152
153
154
155
156
157
158
159
160
# File 'lib/a2a/task_store.rb', line 152

def add_message(id, msg)
  @mutex.synchronize do
    task = @tasks[id]
    return nil unless task
    task.history << msg
    task.updated_at = Time.now.utc
    task
  end
end

#cancel(id) ⇒ Object



114
115
116
117
118
119
# File 'lib/a2a/task_store.rb', line 114

def cancel(id)
  update_state(id, "TASK_STATE_CANCELED")
  task = get(id)
  close_subscribers(task) if task
  task
end

#complete(id, result) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/a2a/task_store.rb', line 78

def complete(id, result)
  task = nil
  @mutex.synchronize do
    task = @tasks[id]
    return nil unless task
    task.state  = "TASK_STATE_COMPLETED"
    task.result = result
    task.updated_at = Time.now.utc
  end
  if task
    event = build_status_event(task)
    notify_subscribers(task, event)
    deliver_webhooks(task, { "statusUpdate" => event })
    close_subscribers(task)
  end
  task
end

#create(id, context_id, push_config = nil) ⇒ Object

── Task CRUD ──────────────────────────────────────────────────────



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/a2a/task_store.rb', line 33

def create(id, context_id, push_config = nil)
  @mutex.synchronize do
    now = Time.now.utc
    configs = {}
    if push_config
      cfg_id = push_config["id"] || SecureRandom.uuid
      push_config["id"] = cfg_id
      configs[cfg_id] = push_config
    end

    @tasks[id] = Task.new(
      id:           id,
      context_id:   context_id,
      state:        "TASK_STATE_SUBMITTED",
      result:       nil,
      artifacts:    [],
      history:      [],
      push_configs: configs,
      subscribers:  [],
      created_at:   now,
      updated_at:   now,
    )
  end
end

#create_push_config(task_id, config) ⇒ Object

── Push Notification Config CRUD ──────────────────────────────────



176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/a2a/task_store.rb', line 176

def create_push_config(task_id, config)
  @mutex.synchronize do
    task = @tasks[task_id]
    return nil unless task
    cfg_id = config["id"] || SecureRandom.uuid
    config["id"] = cfg_id
    config["taskId"] = task_id
    task.push_configs[cfg_id] = config
    task.updated_at = Time.now.utc
    config
  end
end

#delete_push_config(task_id, config_id) ⇒ Object



205
206
207
208
209
210
211
212
# File 'lib/a2a/task_store.rb', line 205

def delete_push_config(task_id, config_id)
  @mutex.synchronize do
    task = @tasks[task_id]
    return nil unless task
    task.push_configs.delete(config_id)
    task.updated_at = Time.now.utc
  end
end

#fail(id, msg) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/a2a/task_store.rb', line 96

def fail(id, msg)
  task = nil
  @mutex.synchronize do
    task = @tasks[id]
    return nil unless task
    task.state  = "TASK_STATE_FAILED"
    task.result = msg
    task.updated_at = Time.now.utc
  end
  if task
    event = build_status_event(task)
    notify_subscribers(task, event)
    deliver_webhooks(task, { "statusUpdate" => event })
    close_subscribers(task)
  end
  task
end

#get(id) ⇒ Object



58
59
60
# File 'lib/a2a/task_store.rb', line 58

def get(id)
  @mutex.synchronize { @tasks[id] }
end

#get_push_config(task_id, config_id) ⇒ Object



189
190
191
192
193
194
195
# File 'lib/a2a/task_store.rb', line 189

def get_push_config(task_id, config_id)
  @mutex.synchronize do
    task = @tasks[task_id]
    return nil unless task
    task.push_configs[config_id]
  end
end

#list(context_id: nil, state: nil) ⇒ Object

── Listing ────────────────────────────────────────────────────────



164
165
166
167
168
169
170
171
172
# File 'lib/a2a/task_store.rb', line 164

def list(context_id: nil, state: nil)
  @mutex.synchronize do
    @tasks.values
          .select { |t| context_id.nil? || t.context_id == context_id }
          .select { |t| state.nil?      || t.state == state }
          .sort_by { |t| t.updated_at }
          .reverse
  end
end

#list_push_configs(task_id) ⇒ Object



197
198
199
200
201
202
203
# File 'lib/a2a/task_store.rb', line 197

def list_push_configs(task_id)
  @mutex.synchronize do
    task = @tasks[task_id]
    return [] unless task
    task.push_configs.values
  end
end

#subscribe(task_id) ⇒ Object

Subscribe to task updates. Returns a Queue that will receive events. Each event is a Hash: { type: :status|:artifact, data: Hash } A nil sentinel signals stream end.



219
220
221
222
223
224
225
226
227
# File 'lib/a2a/task_store.rb', line 219

def subscribe(task_id)
  queue = Thread::Queue.new
  @mutex.synchronize do
    task = @tasks[task_id]
    return nil unless task
    task.subscribers << queue
  end
  queue
end

#terminal?(id) ⇒ Boolean

Returns:

  • (Boolean)


121
122
123
124
# File 'lib/a2a/task_store.rb', line 121

def terminal?(id)
  task = get(id)
  task && TERMINAL_STATES.include?(task.state)
end

#unsubscribe(task_id, queue) ⇒ Object



229
230
231
232
233
234
235
# File 'lib/a2a/task_store.rb', line 229

def unsubscribe(task_id, queue)
  @mutex.synchronize do
    task = @tasks[task_id]
    return unless task
    task.subscribers.delete(queue)
  end
end

#update_state(id, state, message: nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/a2a/task_store.rb', line 62

def update_state(id, state, message: nil)
  task = nil
  @mutex.synchronize do
    task = @tasks[id]
    return nil unless task
    task.state = state
    task.updated_at = Time.now.utc
  end
  if task
    event = build_status_event(task, message)
    notify_subscribers(task, event)
    deliver_webhooks(task, { "statusUpdate" => event })
  end
  task
end