Module: PlanMyStuff::WebhookReplayer

Defined in:
lib/plan_my_stuff/webhook_replayer.rb

Overview

Dev helper: fetch recent GitHub webhook deliveries via the GitHub API and POST each one to a local endpoint, so a webhook flow can be reproduced against a running server without waiting for GitHub to re-deliver.

Intended for use via plan_my_stuff:webhooks:replay, not production code paths.

Class Method Summary collapse

Class Method Details

.fetch_and_replay(endpoint_url:, webhook_url:, processed_file:, scope: :org, repo: nil, interactive: true) ⇒ void

This method returns an undefined value.

Fetch deliveries for a hook and replay unseen ones.

Parameters:

  • endpoint_url (String)

    local URL to POST replays to

  • webhook_url (String)

    remote config.url used to resolve hook id

  • scope (Symbol, String) (defaults to: :org)

    :org or :repo

  • repo (String, nil) (defaults to: nil)

    required when scope is :repo

  • processed_file (String)

    state file tracking replayed delivery ids

  • interactive (Boolean) (defaults to: true)

    prompt after each successful delivery



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 62

def fetch_and_replay(endpoint_url:, webhook_url:, processed_file:, scope: :org, repo: nil, interactive: true)
  scope = scope.to_sym
  hook_id = resolve_hook_id(scope: scope, repo: repo, webhook_url: webhook_url)
  deliveries_path = "#{hooks_base_path(scope: scope, repo: repo)}/#{hook_id}/deliveries"
  $stdout.puts("Using #{scope} hook #{hook_id} -> #{webhook_url}")

  processed = load_processed(processed_file)
  $stdout.puts("Already processed: #{processed.size}")

  deliveries = gh_get(deliveries_path, per_page: 50).sort_by { |d| d['delivered_at'].to_s }
  to_process = deliveries.reject { |d| processed.include?(d['id'].to_s) }
  $stdout.puts("Fetched #{deliveries.size} deliveries, #{to_process.size} new")
  if to_process.empty?
    $stdout.puts('Nothing to do.')
    return
  end

  replay_each(to_process, deliveries_path, endpoint_url, processed_file, interactive: interactive)
end

.gh_get(path, **params) ⇒ Array, Hash

Calls octokit.get and returns the parsed JSON body (string-keyed Hash/Array) rather than Sawyer::Resource, so delivery payload and headers are easy to re-serialize.

Returns:

  • (Array, Hash)


231
232
233
234
235
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 231

def gh_get(path, **params)
  client = PlanMyStuff.client.octokit
  client.get(path, params)
  JSON.parse(client.last_response.body)
end

.hooks_base_path(scope:, repo:) ⇒ String

Returns:

  • (String)


202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 202

def hooks_base_path(scope:, repo:)
  case scope
  when :org
    "/orgs/#{PlanMyStuff.configuration.organization}/hooks"
  when :repo
    raise(ArgumentError, 'repo is required for :repo scope') if repo.nil? || repo.to_s.empty?

    "/repos/#{repo}/hooks"
  else
    raise(ArgumentError, "Unknown scope #{scope.inspect}; expected :org or :repo")
  end
end

.listen(targets:, endpoint_url:, processed_file_for:, interval: 30) ⇒ void

This method returns an undefined value.

Polls one or more webhooks on an interval and auto-replays new deliveries as they arrive. Resolves hook ids once up front, then loops forever until Ctrl-C.

Parameters:

  • targets (Array<Hash>)

    one hash per hook, each with :scope (:org / :repo), :webhook_url, and :repo (for :repo scope)

  • endpoint_url (String)

    local URL to POST replays to

  • processed_file_for (#call)

    lambda taking a target hash, returns the state file path for that target

  • interval (Integer) (defaults to: 30)

    seconds between polls (default 15)



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 95

def listen(targets:, endpoint_url:, processed_file_for:, interval: 30)
  resolved = targets.map { |t| resolve_target(t, processed_file_for) }
  $stdout.puts("Listening on #{resolved.size} hook(s) every #{interval}s. Ctrl-C to stop.")
  resolved.each do |r|
    $stdout.puts("  #{r[:scope]} hook #{r[:hook_id]} -> #{r[:webhook_url]}")
  end

  loop do
    poll_all(resolved, endpoint_url)
    $stdout.puts('=============')
    $stdout.puts('  Sleeping...')
    $stdout.puts('=============')
    sleep(interval)
  end
rescue Interrupt
  $stdout.puts("\nStopped.")
end

.load_processed(path) ⇒ Set<String>

Returns:

  • (Set<String>)


238
239
240
241
242
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 238

def load_processed(path)
  return Set.new unless File.exist?(path)

  File.readlines(path, chomp: true).reject(&:empty?).to_set
end

.mark_processed(path, id) ⇒ void

This method returns an undefined value.



245
246
247
248
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 245

def mark_processed(path, id)
  FileUtils.mkdir_p(File.dirname(path))
  File.open(path, 'a') { |f| f.puts(id) }
end

.poll_all(resolved, endpoint_url) ⇒ void

This method returns an undefined value.

Collects unprocessed deliveries across every resolved target, merges them into a single list sorted by delivered_at, then replays them one-by-one in true chronological order. Transient API errors per target are logged and swallowed so the listen loop survives.

Parameters:

  • resolved (Array<Hash>)
  • endpoint_url (String)


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 145

def poll_all(resolved, endpoint_url)
  pending = []
  resolved.each do |target|
    processed = load_processed(target[:processed_file])
    deliveries = gh_get(target[:deliveries_path], per_page: 50)
    deliveries.each do |summary|
      next if processed.include?(summary['id'].to_s)

      pending << { target: target, summary: summary }
    end
  rescue Octokit::Error => e
    warn("[#{target[:scope]}] poll failed: #{e.message}")
  end

  return if pending.empty?

  pending.sort_by! { |item| item[:summary]['delivered_at'].to_s }
  $stdout.puts("Replaying #{pending.size} delivery(ies) in delivered_at order")
  replay_pending(pending, endpoint_url)
end

.post(headers:, payload:, endpoint_url:) ⇒ Net::HTTPResponse

POST a single webhook delivery to a local endpoint.

Parameters:

  • headers (Hash)
  • payload (String, Hash, Array)
  • endpoint_url (String)

    full URL including path

Returns:

  • (Net::HTTPResponse)


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 27

def post(headers:, payload:, endpoint_url:)
  uri = URI(endpoint_url)
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme == 'https'
  http.open_timeout = 3_600
  http.read_timeout = 3_600

  body = payload.is_a?(String) ? JSON.parse(payload).to_json : JSON.generate(payload)

  request = Net::HTTP::Post.new(uri.request_uri)
  headers.each { |key, value| request[key.to_s] = value.to_s }
  request['Content-Type'] ||= 'application/json'
  request.body = body

  $stdout.puts("POST #{endpoint_url}")
  $stdout.puts("Headers: #{headers.keys.join(', ')}")
  $stdout.puts('---')

  response = http.request(request)
  $stdout.puts("HTTP #{response.code} #{response.message}")
  $stdout.puts(response.body) if response.body.present?
  response
end

.replay_each(to_process, deliveries_path, endpoint_url, processed_file, interactive:) ⇒ void

This method returns an undefined value.



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 251

def replay_each(to_process, deliveries_path, endpoint_url, processed_file, interactive:)
  to_process.each.with_index(1) do |summary, index|
    id = summary['id']
    delivery = gh_get("#{deliveries_path}/#{id}")
    request_data = delivery.fetch('request')
    github_delivery_id = request_data.dig('headers', 'X-GitHub-Delivery') || id

    event = "#{summary['event']}.#{summary['action']}"
    $stdout.puts("[#{index}/#{to_process.size}] #{id} #{event} - #{github_delivery_id}")

    response = post(
      headers: request_data.fetch('headers') || {},
      payload: request_data.fetch('payload'),
      endpoint_url: endpoint_url,
    )

    unless response.is_a?(Net::HTTPSuccess)
      raise('  ! non-success; stopping so you can investigate')
    end

    mark_processed(processed_file, id)
    next unless interactive

    $stdout.puts("Processed #{github_delivery_id} - Continue? (y/n)")
    answer = $stdin.gets
    break unless answer&.downcase&.start_with?('y')
  end
end

.replay_pending(pending, endpoint_url) ⇒ void

This method returns an undefined value.

Replays a pre-sorted, multi-target pending list one delivery at a time. Stops on the first non-success so state can be investigated without skipping ahead.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 172

def replay_pending(pending, endpoint_url)
  pending.each.with_index(1) do |item, index|
    target = item[:target]
    summary = item[:summary]
    id = summary['id']
    delivery = gh_get("#{target[:deliveries_path]}/#{id}")
    request_data = delivery.fetch('request')
    github_delivery_id = request_data.dig('headers', 'X-GitHub-Delivery') || id

    event = "#{summary['event']}.#{summary['action']}"
    $stdout.puts(
      "[#{index}/#{pending.size}] #{target[:scope]} #{id} #{event} " \
        "@ #{summary['delivered_at']} - #{github_delivery_id}",
    )

    response = post(
      headers: request_data.fetch('headers') || {},
      payload: request_data.fetch('payload'),
      endpoint_url: endpoint_url,
    )

    unless response.is_a?(Net::HTTPSuccess)
      raise('  ! non-success; stopping so you can investigate')
    end

    mark_processed(target[:processed_file], id)
  end
end

.resolve_hook_id(scope:, repo:, webhook_url:) ⇒ Integer

Returns:

  • (Integer)

Raises:



216
217
218
219
220
221
222
223
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 216

def resolve_hook_id(scope:, repo:, webhook_url:)
  hooks = gh_get(hooks_base_path(scope: scope, repo: repo), per_page: 100)
  match = hooks.find { |h| h.dig('config', 'url') == webhook_url }
  return match['id'] if match.present?

  available = hooks.map { |h| "  id=#{h['id']} url=#{h.dig('config', 'url')}" }.join("\n")
  raise(PlanMyStuff::Error, "No #{scope} webhook with config.url == #{webhook_url}. Visible:\n#{available}")
end

.resolve_target(target, processed_file_for) ⇒ Hash

Resolves a listen target’s hook id and precomputes its deliveries path + processed-file location.

Returns:

  • (Hash)


118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 118

def resolve_target(target, processed_file_for)
  scope = target.fetch(:scope).to_sym
  repo = target[:repo]
  webhook_url = target.fetch(:webhook_url)
  hook_id = resolve_hook_id(scope: scope, repo: repo, webhook_url: webhook_url)

  {
    scope: scope,
    repo: repo,
    webhook_url: webhook_url,
    hook_id: hook_id,
    deliveries_path: "#{hooks_base_path(scope: scope, repo: repo)}/#{hook_id}/deliveries",
    processed_file: processed_file_for.call(target),
  }
end