Class: A2A::Store::SQLite

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/store/sqlite.rb

Overview

SQLite-backed task store with async pub/sub and webhook delivery.

This is the enlightened replacement for the in-memory TaskStore. Follows the gospel:

- async-job's schema patterns (indexed by state, updated_at DESC)
- async-job's duck-typed delegate protocol (#call, #start, #stop)
- Async::Queue-based pub/sub (fiber-safe, no threads)
- Async::HTTP::Internet for webhook delivery (no Net::HTTP)

The store composes three concerns:

1. SQLite — persistent CRUD for tasks, push configs
2. PubSub — in-process streaming subscriptions
3. Webhooks — push notification delivery

SQLite is safe for fiber-based concurrency within a single process because Ruby fibers are cooperatively scheduled — only one fiber runs at a time, so no concurrent writes can collide.

Constant Summary collapse

TERMINAL_STATES =
%w[
  TASK_STATE_COMPLETED TASK_STATE_FAILED
  TASK_STATE_CANCELED TASK_STATE_REJECTED
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path: ":memory:") ⇒ SQLite

Returns a new instance of SQLite.

Parameters:

  • path (String) (defaults to: ":memory:")

    path to SQLite database file (“:memory:” for in-memory)



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/a2a/store/sqlite.rb', line 41

def initialize(path: ":memory:")
  @db = ::SQLite3::Database.new(path)
  @db.results_as_hash = true
  @db.execute("PRAGMA journal_mode = WAL")
  @db.execute("PRAGMA synchronous = NORMAL")
  @db.execute("PRAGMA foreign_keys = ON")

  @pub_sub  = PubSub.new
  @webhooks = Webhooks.new

  create_tables
end

Instance Attribute Details

#pub_subObject (readonly)

Returns the value of attribute pub_sub.



37
38
39
# File 'lib/a2a/store/sqlite.rb', line 37

def pub_sub
  @pub_sub
end

#webhooksObject (readonly)

Returns the value of attribute webhooks.



37
38
39
# File 'lib/a2a/store/sqlite.rb', line 37

def webhooks
  @webhooks
end

Instance Method Details

#add_artifact(id, artifact) ⇒ Object

── Artifacts ──────────────────────────────────────────────────────



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/a2a/store/sqlite.rb', line 146

def add_artifact(id, artifact)
  task = get(id)
  return nil unless task

  artifacts = task[:artifacts]
  artifacts << artifact
  now = now_ts

  @db.execute(
    "UPDATE tasks SET artifacts = ?, updated_at = ? WHERE id = ?",
    [JSON.generate(artifacts), now, id]
  )

  event = {
    "taskId"    => id,
    "contextId" => task[:context_id],
    "artifact"  => artifact,
    "append"    => false,
    "lastChunk" => true,
  }
  @pub_sub.notify(id, { type: :artifact, data: event })
  @webhooks.deliver(list_push_configs(id), { "artifactUpdate" => event })

  get(id)
end

#add_message(id, msg) ⇒ Object

── History ────────────────────────────────────────────────────────



174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/a2a/store/sqlite.rb', line 174

def add_message(id, msg)
  task = get(id)
  return nil unless task

  history = task[:history]
  history << msg

  @db.execute(
    "UPDATE tasks SET history = ?, updated_at = ? WHERE id = ?",
    [JSON.generate(history), now_ts, id]
  )

  get(id)
end

#cancel(id) ⇒ Object



135
136
137
# File 'lib/a2a/store/sqlite.rb', line 135

def cancel(id)
  update_state(id, "TASK_STATE_CANCELED")
end

#complete(id, result) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/a2a/store/sqlite.rb', line 97

def complete(id, result)
  now = now_ts
  result_json = result ? JSON.generate(result) : nil
  @db.execute(
    "UPDATE tasks SET state = ?, result = ?, updated_at = ? WHERE id = ?",
    ["TASK_STATE_COMPLETED", result_json, now, id]
  )

  task = get(id)
  return nil unless task

  event = build_status_event(task)
  @pub_sub.notify(id, { type: :status, data: event })
  @webhooks.deliver(list_push_configs(id), { "statusUpdate" => event })
  @pub_sub.close(id)

  task
end

#create(id, context_id, push_config = nil) ⇒ Object

── Task CRUD ──────────────────────────────────────────────────────



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/a2a/store/sqlite.rb', line 56

def create(id, context_id, push_config = nil)
  now = now_ts

  @db.execute(<<~SQL, [id, context_id, "TASK_STATE_SUBMITTED", "[]", "[]", now, now])
    INSERT INTO tasks (id, context_id, state, artifacts, history, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?, ?, ?)
  SQL

  if push_config
    cfg_id = push_config["id"] || SecureRandom.uuid
    push_config = push_config.merge("id" => cfg_id, "taskId" => id)
    insert_push_config(id, push_config)
  end

  get(id)
end

#create_push_config(task_id, config) ⇒ Object

── Push Notification Config CRUD ──────────────────────────────────



213
214
215
216
217
218
219
220
221
# File 'lib/a2a/store/sqlite.rb', line 213

def create_push_config(task_id, config)
  task = get(task_id)
  return nil unless task

  cfg_id = config["id"] || SecureRandom.uuid
  config = config.merge("id" => cfg_id, "taskId" => task_id)
  insert_push_config(task_id, config)
  config
end

#delete_push_config(task_id, config_id) ⇒ Object



239
240
241
242
243
244
# File 'lib/a2a/store/sqlite.rb', line 239

def delete_push_config(task_id, config_id)
  @db.execute(
    "DELETE FROM push_configs WHERE task_id = ? AND id = ?",
    [task_id, config_id]
  )
end

#fail(id, msg) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/a2a/store/sqlite.rb', line 116

def fail(id, msg)
  now = now_ts
  result_json = msg ? JSON.generate(msg) : nil
  @db.execute(
    "UPDATE tasks SET state = ?, result = ?, updated_at = ? WHERE id = ?",
    ["TASK_STATE_FAILED", result_json, now, id]
  )

  task = get(id)
  return nil unless task

  event = build_status_event(task)
  @pub_sub.notify(id, { type: :status, data: event })
  @webhooks.deliver(list_push_configs(id), { "statusUpdate" => event })
  @pub_sub.close(id)

  task
end

#get(id) ⇒ Object



73
74
75
76
77
# File 'lib/a2a/store/sqlite.rb', line 73

def get(id)
  row = @db.get_first_row("SELECT * FROM tasks WHERE id = ?", [id])
  return nil unless row
  row_to_task(row)
end

#get_push_config(task_id, config_id) ⇒ Object



223
224
225
226
227
228
229
230
# File 'lib/a2a/store/sqlite.rb', line 223

def get_push_config(task_id, config_id)
  row = @db.get_first_row(
    "SELECT * FROM push_configs WHERE task_id = ? AND id = ?",
    [task_id, config_id]
  )
  return nil unless row
  row_to_push_config(row)
end

#list(context_id: nil, state: nil) ⇒ Object

── Listing ────────────────────────────────────────────────────────



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/a2a/store/sqlite.rb', line 191

def list(context_id: nil, state: nil)
  conditions = []
  params = []

  if context_id
    conditions << "context_id = ?"
    params << context_id
  end

  if state
    conditions << "state = ?"
    params << state
  end

  where = conditions.empty? ? "" : "WHERE #{conditions.join(" AND ")}"
  sql = "SELECT * FROM tasks #{where} ORDER BY updated_at DESC"

  @db.execute(sql, params).map { |row| row_to_task(row) }
end

#list_push_configs(task_id) ⇒ Object



232
233
234
235
236
237
# File 'lib/a2a/store/sqlite.rb', line 232

def list_push_configs(task_id)
  @db.execute(
    "SELECT * FROM push_configs WHERE task_id = ? ORDER BY created_at",
    [task_id]
  ).map { |row| row_to_push_config(row) }
end

#startObject

── Lifecycle (async-job duck-type) ────────────────────────────────



260
261
262
# File 'lib/a2a/store/sqlite.rb', line 260

def start
  # No-op for now. Could start background cleanup tasks.
end

#stopObject



264
265
266
# File 'lib/a2a/store/sqlite.rb', line 264

def stop
  @db.close if @db
end

#subscribe(task_id) ⇒ Object

── Streaming / Pub-Sub ────────────────────────────────────────────



248
249
250
251
252
# File 'lib/a2a/store/sqlite.rb', line 248

def subscribe(task_id)
  task = get(task_id)
  return nil unless task
  @pub_sub.subscribe(task_id)
end

#terminal?(id) ⇒ Boolean

Returns:

  • (Boolean)


139
140
141
142
# File 'lib/a2a/store/sqlite.rb', line 139

def terminal?(id)
  task = get(id)
  task && TERMINAL_STATES.include?(task[:state])
end

#unsubscribe(task_id, queue) ⇒ Object



254
255
256
# File 'lib/a2a/store/sqlite.rb', line 254

def unsubscribe(task_id, queue)
  @pub_sub.unsubscribe(task_id, queue)
end

#update_state(id, state, message: nil) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/a2a/store/sqlite.rb', line 79

def update_state(id, state, message: nil)
  now = now_ts
  @db.execute("UPDATE tasks SET state = ?, updated_at = ? WHERE id = ?", [state, now, id])

  task = get(id)
  return nil unless task

  event = build_status_event(task, message)
  @pub_sub.notify(id, { type: :status, data: event })
  @webhooks.deliver(list_push_configs(id), { "statusUpdate" => event })

  if TERMINAL_STATES.include?(state)
    @pub_sub.close(id)
  end

  task
end