Class: Rubino::Jobs::WebhookDelivery

Inherits:
Object
  • Object
show all
Defined in:
lib/rubino/jobs/webhook_delivery.rb

Overview

POSTs cron-job results to a configured webhook URL with idempotency and persistence guarantees.

Every #deliver call is recorded as a row in webhook_deliveries before the HTTP request fires; the row’s request_id doubles as the X-Rubino-Delivery-Id header receivers MUST treat as the dedup key. The body is signed with HMAC-SHA256 under RUBINO_WEBHOOK_SECRET (or a per-job secret passed via secret:) and sent as X-Rubino-Signature. When no secret is configured the header is omitted; the receiver is then on its own.

Failures retry up to 3 attempts total with exponential backoff (5s, 30s, 5min) using lightweight Thread.new sleeps. The current trade-off is that an agent crash mid-backoff loses the in-flight retry timer, but the persisted row stays pending and #resume_pending! at boot picks it up. A real job queue is overkill for the expected webhook volume; revisit if backlog grows.

URL resolution: constructor arg > RUBINO_WEBHOOK_URL env. There is no per-job override yet (alpha).

Constant Summary collapse

DEFAULT_TIMEOUT =
10
BACKOFF_SCHEDULE =

Backoff schedule (seconds) BEFORE attempt N+1. attempt_count after a successful schedule is N; index into BACKOFF_SCHEDULE for the delay before the next attempt. After 3 entries we give up.

[5, 30, 300].freeze
MAX_ATTEMPTS =
3
RESUME_SCAN_LIMIT =
1000

Instance Method Summary collapse

Constructor Details

#initialize(url: nil, logger: nil, timeout: DEFAULT_TIMEOUT, conn: nil, db: nil, secret: nil, clock: nil, sleeper: nil) ⇒ WebhookDelivery

Returns a new instance of WebhookDelivery.



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/rubino/jobs/webhook_delivery.rb', line 42

def initialize(url: nil, logger: nil, timeout: DEFAULT_TIMEOUT, conn: nil, db: nil, secret: nil, clock: nil,
               sleeper: nil)
  @url = url || ENV.fetch("RUBINO_WEBHOOK_URL", nil)
  @logger = logger || Rubino.logger
  @conn = conn || build_conn(timeout)
  @db = db
  @secret = secret || ENV.fetch("RUBINO_WEBHOOK_SECRET", nil)
  @clock = clock || -> { Time.now.utc }
  # Tests inject a synchronous sleeper so backoff doesn't burn wall time.
  @sleeper = sleeper || ->(s) { sleep(s) }
end

Instance Method Details

#deliver(payload, job_id: nil, run_id: nil) ⇒ Boolean

Returns true if delivered on this call, false otherwise.

Parameters:

  • payload (Hash)

    JSON-serialisable body POSTed as-is.

  • job_id (String, nil) (defaults to: nil)

    persisted on the delivery row.

  • run_id (String, nil) (defaults to: nil)

    persisted on the delivery row.

Returns:

  • (Boolean)

    true if delivered on this call, false otherwise.



58
59
60
61
62
63
64
# File 'lib/rubino/jobs/webhook_delivery.rb', line 58

def deliver(payload, job_id: nil, run_id: nil)
  return false if @url.nil? || @url.empty?

  body = JSON.generate(payload)
  row_id = persist_pending(body: body, job_id: job_id, run_id: run_id)
  attempt_with_retries(row_id: row_id, body: body)
end

#resume_pending!Object

Resume hook called at agent boot. Scans up to RESUME_SCAN_LIMIT pending rows whose scheduled_at has passed and replays them in a background thread. Cap exists to avoid replay storms after a long outage — older entries stay in the table for ops to inspect.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/rubino/jobs/webhook_delivery.rb', line 70

def resume_pending!
  return 0 unless db

  now = @clock.call.iso8601
  rows = db[:webhook_deliveries]
         .where(status: "pending")
         .where { scheduled_at <= now }
         .order(:scheduled_at)
         .limit(RESUME_SCAN_LIMIT)
         .all
  rows.each do |row|
    Thread.new { attempt_with_retries(row_id: row[:id], body: row[:payload_json]) }
  end
  rows.size
end