Skip to content
Kward Search API index

Class: Kward::Workers::QueueRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/kward/workers/queue_runner.rb

Overview

Executes session-backed worker queue jobs one at a time.

Defined Under Namespace

Classes: DirtyWorkspaceError

Constant Summary collapse

CONTINUE_PROMPT =
<<~PROMPT.freeze
  Continue this session as an implementation worker.
  Make the smallest correct change, preserve existing style, and run focused verification when practical.
  Stop when the work is ready for human review.
PROMPT

Instance Method Summary collapse

Constructor Details

#initialize(queue_store:, session_store:, client_factory: -> { Client.new }, prompt: nil, workspace_root: Dir.pwd, provider: nil, model: nil, reasoning_effort: nil, git_guard: nil, write_lock: nil) ⇒ QueueRunner

Returns a new instance of QueueRunner.



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/kward/workers/queue_runner.rb', line 19

def initialize(queue_store:, session_store:, client_factory: -> { Client.new }, prompt: nil, workspace_root: Dir.pwd, provider: nil, model: nil, reasoning_effort: nil, git_guard: nil, write_lock: nil)
  @queue_store = queue_store
  @session_store = session_store
  @client_factory = client_factory
  @prompt = prompt
  @workspace_root = ConfigFiles.canonical_workspace_root(workspace_root)
  @provider = provider
  @model = model
  @reasoning_effort = reasoning_effort
  @git_guard = git_guard || GitGuard.new(root: @workspace_root)
  @write_lock = write_lock
end

Instance Method Details

#resume(id) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/kward/workers/queue_runner.rb', line 61

def resume(id)
  record = find_job(id)
  raise ArgumentError, "Worker job #{id} is not suspended" unless record["status"] == "suspended"

  restore_worker_changes(record)
  queued = @queue_store.update_status(record.fetch("id"), "queued", stash_ref: "", error: "")
  run_job(queued, require_clean: false)
rescue DirtyWorkspaceError => e
  @queue_store.update_status(id, "blocked", error: e.message)
rescue StandardError => e
  @queue_store.update_status(id, "blocked", error: e.message)
end

#run_allObject



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/kward/workers/queue_runner.rb', line 39

def run_all
  results = []
  loop do
    record = run_next
    break unless record

    results << record
    break unless record["status"] == "ready_for_review"
  end
  results
end

#run_nextObject



32
33
34
35
36
37
# File 'lib/kward/workers/queue_runner.rb', line 32

def run_next
  record = @queue_store.next_queued
  return nil unless record

  run_job(record)
end

#suspend(id) ⇒ Object



51
52
53
54
55
56
57
58
59
# File 'lib/kward/workers/queue_runner.rb', line 51

def suspend(id)
  record = find_job(id)
  raise ArgumentError, "Worker job #{id} is not running" unless record["status"] == "running"

  stash_ref = stash_worker_changes(record)
  @queue_store.update_status(record.fetch("id"), "suspended", stash_ref: stash_ref, error: "")
rescue StandardError => e
  @queue_store.update_status(id, "blocked", error: e.message)
end