Class: Rubino::Jobs::WebhookDelivery
- Inherits:
-
Object
- Object
- Rubino::Jobs::WebhookDelivery
- Defined in:
- lib/rubino/jobs/webhook_delivery.rb
Overview
POSTs cron-job results to a configured webhook URL with idempotency and persistence guarantees.
Every #deliver call is recorded as a row in webhook_deliveries before the HTTP request fires; the row’s request_id doubles as the X-Rubino-Delivery-Id header receivers MUST treat as the dedup key. The body is signed with HMAC-SHA256 under RUBINO_WEBHOOK_SECRET (or a per-job secret passed via secret:) and sent as X-Rubino-Signature. When no secret is configured the header is omitted; the receiver is then on its own.
Failures retry up to 3 attempts total with exponential backoff (5s, 30s, 5min) using lightweight Thread.new sleeps. The current trade-off is that an agent crash mid-backoff loses the in-flight retry timer, but the persisted row stays pending and #resume_pending! at boot picks it up. A real job queue is overkill for the expected webhook volume; revisit if backlog grows.
URL resolution: constructor arg > RUBINO_WEBHOOK_URL env. There is no per-job override yet (alpha).
Constant Summary collapse
- DEFAULT_TIMEOUT =
10- BACKOFF_SCHEDULE =
Backoff schedule (seconds) BEFORE attempt N+1. attempt_count after a successful schedule is N; index into BACKOFF_SCHEDULE for the delay before the next attempt. After 3 entries we give up.
[5, 30, 300].freeze
- MAX_ATTEMPTS =
3- RESUME_SCAN_LIMIT =
1000
Instance Method Summary collapse
-
#deliver(payload, job_id: nil, run_id: nil) ⇒ Boolean
True if delivered on this call, false otherwise.
-
#initialize(url: nil, logger: nil, timeout: DEFAULT_TIMEOUT, conn: nil, db: nil, secret: nil, clock: nil, sleeper: nil) ⇒ WebhookDelivery
constructor
A new instance of WebhookDelivery.
-
#resume_pending! ⇒ Object
Resume hook called at agent boot.
Constructor Details
#initialize(url: nil, logger: nil, timeout: DEFAULT_TIMEOUT, conn: nil, db: nil, secret: nil, clock: nil, sleeper: nil) ⇒ WebhookDelivery
Returns a new instance of WebhookDelivery.
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rubino/jobs/webhook_delivery.rb', line 42 def initialize(url: nil, logger: nil, timeout: DEFAULT_TIMEOUT, conn: nil, db: nil, secret: nil, clock: nil, sleeper: nil) @url = url || ENV.fetch("RUBINO_WEBHOOK_URL", nil) @logger = logger || Rubino.logger @conn = conn || build_conn(timeout) @db = db @secret = secret || ENV.fetch("RUBINO_WEBHOOK_SECRET", nil) @clock = clock || -> { Time.now.utc } # Tests inject a synchronous sleeper so backoff doesn't burn wall time. @sleeper = sleeper || ->(s) { sleep(s) } end |
Instance Method Details
#deliver(payload, job_id: nil, run_id: nil) ⇒ Boolean
Returns true if delivered on this call, false otherwise.
58 59 60 61 62 63 64 |
# File 'lib/rubino/jobs/webhook_delivery.rb', line 58 def deliver(payload, job_id: nil, run_id: nil) return false if @url.nil? || @url.empty? body = JSON.generate(payload) row_id = persist_pending(body: body, job_id: job_id, run_id: run_id) attempt_with_retries(row_id: row_id, body: body) end |
#resume_pending! ⇒ Object
Resume hook called at agent boot. Scans up to RESUME_SCAN_LIMIT pending rows whose scheduled_at has passed and replays them in a background thread. Cap exists to avoid replay storms after a long outage — older entries stay in the table for ops to inspect.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/rubino/jobs/webhook_delivery.rb', line 70 def resume_pending! return 0 unless db now = @clock.call.iso8601 rows = db[:webhook_deliveries] .where(status: "pending") .where { scheduled_at <= now } .order(:scheduled_at) .limit(RESUME_SCAN_LIMIT) .all rows.each do |row| Thread.new { attempt_with_retries(row_id: row[:id], body: row[:payload_json]) } end rows.size end |