Module: PlanMyStuff::WebhookReplayer

Defined in:
lib/plan_my_stuff/webhook_replayer.rb

Overview

Dev helper: fetch recent GitHub webhook deliveries via the GitHub API and POST each one to a local endpoint, so a webhook flow can be reproduced against a running server without waiting for GitHub to re-deliver.

Intended for use via plan_my_stuff:webhooks:replay, not production code paths.

Class Method Summary collapse

Class Method Details

.fetch_and_replay(endpoint_url:, webhook_url:, processed_file:, scope: :org, repo: nil, interactive: true) ⇒ void

This method returns an undefined value.

Fetch deliveries for a hook and replay unseen ones.

Parameters:

  • endpoint_url (String)

    local URL to POST replays to

  • webhook_url (String)

    remote config.url used to resolve hook id

  • scope (Symbol, String) (defaults to: :org)

    :org or :repo

  • repo (String, nil) (defaults to: nil)

    required when scope is :repo

  • processed_file (String)

    state file tracking replayed delivery ids

  • interactive (Boolean) (defaults to: true)

    prompt after each successful delivery



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 62

def fetch_and_replay(endpoint_url:, webhook_url:, processed_file:, scope: :org, repo: nil, interactive: true)
  scope = scope.to_sym
  hook_id = resolve_hook_id(scope: scope, repo: repo, webhook_url: webhook_url)
  deliveries_path = "#{hooks_base_path(scope: scope, repo: repo)}/#{hook_id}/deliveries"
  $stdout.puts("Using #{scope} hook #{hook_id} -> #{webhook_url}")

  processed = load_processed(processed_file)
  $stdout.puts("Already processed: #{processed.size}")

  deliveries = gh_get(deliveries_path, per_page: 50).sort_by { |d| d['delivered_at'].to_s }
  to_process = deliveries.reject { |d| processed.include?(d['id'].to_s) }
  $stdout.puts("Fetched #{deliveries.size} deliveries, #{to_process.size} new")
  if to_process.empty?
    $stdout.puts('Nothing to do.')
    return
  end

  replay_each(to_process, deliveries_path, endpoint_url, processed_file, interactive: interactive)
end

.gh_get(path, **params) ⇒ Array, Hash

Calls octokit.get and returns the parsed JSON body (string-keyed Hash/Array) rather than Sawyer::Resource, so delivery payload and headers are easy to re-serialize.

Returns:

  • (Array, Hash)


240
241
242
243
244
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 240

def gh_get(path, **params)
  client = PlanMyStuff.client.octokit
  client.get(path, params)
  JSON.parse(client.last_response.body)
end

.hooks_base_path(scope:, repo:) ⇒ String

Returns:

  • (String)

Raises:

  • (ArgumentError)

    when scope is :repo but repo is nil or empty

  • (ArgumentError)

    when scope is neither :org nor :repo



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 208

def hooks_base_path(scope:, repo:)
  case scope
  when :org
    "/orgs/#{PlanMyStuff.configuration.organization}/hooks"
  when :repo
    raise(ArgumentError, 'repo is required for :repo scope') if repo.nil? || repo.to_s.empty?

    "/repos/#{repo}/hooks"
  else
    raise(ArgumentError, "Unknown scope #{scope.inspect}; expected :org or :repo")
  end
end

.listen(targets:, endpoint_url:, processed_file_for:, interval: 30) ⇒ void

This method returns an undefined value.

Polls one or more webhooks on an interval and auto-replays new deliveries as they arrive. Resolves hook ids once up front, then loops forever until Ctrl-C.

Parameters:

  • targets (Array<Hash>)

    one hash per hook, each with :scope (:org / :repo), :webhook_url, and :repo (for :repo scope)

  • endpoint_url (String)

    local URL to POST replays to

  • processed_file_for (#call)

    lambda taking a target hash, returns the state file path for that target

  • interval (Integer) (defaults to: 30)

    seconds between polls (default 15)



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 95

def listen(targets:, endpoint_url:, processed_file_for:, interval: 30)
  resolved = targets.map { |t| resolve_target(t, processed_file_for) }
  $stdout.puts("Listening on #{resolved.size} hook(s) every #{interval}s. Ctrl-C to stop.")
  resolved.each do |r|
    $stdout.puts("  #{r[:scope]} hook #{r[:hook_id]} -> #{r[:webhook_url]}")
  end

  loop do
    poll_all(resolved, endpoint_url)
    $stdout.puts('=============')
    $stdout.puts('  Sleeping...')
    $stdout.puts('=============')
    sleep(interval)
  end
rescue Interrupt
  $stdout.puts("\nStopped.")
end

.load_processed(path) ⇒ Set<String>

Returns:

  • (Set<String>)


247
248
249
250
251
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 247

def load_processed(path)
  return Set.new unless File.exist?(path)

  File.readlines(path, chomp: true).reject(&:empty?).to_set
end

.mark_processed(path, id) ⇒ void

This method returns an undefined value.



254
255
256
257
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 254

def mark_processed(path, id)
  FileUtils.mkdir_p(File.dirname(path))
  File.open(path, 'a') { |f| f.puts(id) }
end

.poll_all(resolved, endpoint_url) ⇒ void

This method returns an undefined value.

Collects unprocessed deliveries across every resolved target, merges them into a single list sorted by delivered_at, then replays them one-by-one in true chronological order. Transient API errors per target are logged and swallowed so the listen loop survives.

Parameters:

  • resolved (Array<Hash>)
  • endpoint_url (String)


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 145

def poll_all(resolved, endpoint_url)
  pending = []
  resolved.each do |target|
    processed = load_processed(target[:processed_file])
    deliveries = gh_get(target[:deliveries_path], per_page: 50)
    deliveries.each do |summary|
      next if processed.include?(summary['id'].to_s)

      pending << { target: target, summary: summary }
    end
  rescue Octokit::Error => e
    warn("[#{target[:scope]}] poll failed: #{e.message}")
  end

  return if pending.empty?

  pending.sort_by! { |item| item[:summary]['delivered_at'].to_s }
  $stdout.puts("Replaying #{pending.size} delivery(ies) in delivered_at order")
  replay_pending(pending, endpoint_url)
end

.post(headers:, payload:, endpoint_url:) ⇒ Net::HTTPResponse

POST a single webhook delivery to a local endpoint.

Parameters:

  • headers (Hash)
  • payload (String, Hash, Array)
  • endpoint_url (String)

    full URL including path

Returns:

  • (Net::HTTPResponse)


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 27

def post(headers:, payload:, endpoint_url:)
  uri = URI(endpoint_url)
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme == 'https'
  http.open_timeout = 3_600
  http.read_timeout = 3_600

  body = payload.is_a?(String) ? JSON.parse(payload).to_json : JSON.generate(payload)

  request = Net::HTTP::Post.new(uri.request_uri)
  headers.each { |key, value| request[key.to_s] = value.to_s }
  request['Content-Type'] ||= 'application/json'
  request.body = body

  $stdout.puts("POST #{endpoint_url}")
  $stdout.puts("Headers: #{headers.keys.join(', ')}")
  $stdout.puts('---')

  response = http.request(request)
  $stdout.puts("HTTP #{response.code} #{response.message}")
  $stdout.puts(response.body) if response.body.present?
  response
end

.replay_each(to_process, deliveries_path, endpoint_url, processed_file, interactive:) ⇒ void

This method returns an undefined value.

Raises:



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 263

def replay_each(to_process, deliveries_path, endpoint_url, processed_file, interactive:)
  to_process.each.with_index(1) do |summary, index|
    id = summary['id']
    delivery = gh_get("#{deliveries_path}/#{id}")
    request_data = delivery.fetch('request')
    github_delivery_id = request_data.dig('headers', 'X-GitHub-Delivery') || id

    event = "#{summary['event']}.#{summary['action']}"
    $stdout.puts("[#{index}/#{to_process.size}] #{id} #{event} - #{github_delivery_id}")

    response = post(
      headers: request_data.fetch('headers') || {},
      payload: request_data.fetch('payload'),
      endpoint_url: endpoint_url,
    )

    unless response.is_a?(Net::HTTPSuccess)
      raise(PlanMyStuff::Error, '  ! non-success; stopping so you can investigate')
    end

    mark_processed(processed_file, id)
    next unless interactive

    $stdout.puts("Processed #{github_delivery_id} - Continue? (y/n)")
    answer = $stdin.gets
    break unless answer&.downcase&.start_with?('y')
  end
end

.replay_pending(pending, endpoint_url) ⇒ void

This method returns an undefined value.

Replays a pre-sorted, multi-target pending list one delivery at a time. Stops on the first non-success so state can be investigated without skipping ahead.

Raises:



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 174

def replay_pending(pending, endpoint_url)
  pending.each.with_index(1) do |item, index|
    target = item[:target]
    summary = item[:summary]
    id = summary['id']
    delivery = gh_get("#{target[:deliveries_path]}/#{id}")
    request_data = delivery.fetch('request')
    github_delivery_id = request_data.dig('headers', 'X-GitHub-Delivery') || id

    event = "#{summary['event']}.#{summary['action']}"
    $stdout.puts(
      "[#{index}/#{pending.size}] #{target[:scope]} #{id} #{event} " \
        "@ #{summary['delivered_at']} - #{github_delivery_id}",
    )

    response = post(
      headers: request_data.fetch('headers') || {},
      payload: request_data.fetch('payload'),
      endpoint_url: endpoint_url,
    )

    unless response.is_a?(Net::HTTPSuccess)
      raise(PlanMyStuff::Error, '  ! non-success; stopping so you can investigate')
    end

    mark_processed(target[:processed_file], id)
  end
end

.resolve_hook_id(scope:, repo:, webhook_url:) ⇒ Integer

Returns:

  • (Integer)

Raises:



225
226
227
228
229
230
231
232
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 225

def resolve_hook_id(scope:, repo:, webhook_url:)
  hooks = gh_get(hooks_base_path(scope: scope, repo: repo), per_page: 100)
  match = hooks.find { |h| h.dig('config', 'url') == webhook_url }
  return match['id'] if match.present?

  available = hooks.map { |h| "  id=#{h['id']} url=#{h.dig('config', 'url')}" }.join("\n")
  raise(PlanMyStuff::Error, "No #{scope} webhook with config.url == #{webhook_url}. Visible:\n#{available}")
end

.resolve_target(target, processed_file_for) ⇒ Hash

Resolves a listen target’s hook id and precomputes its deliveries path + processed-file location.

Returns:

  • (Hash)


118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/plan_my_stuff/webhook_replayer.rb', line 118

def resolve_target(target, processed_file_for)
  scope = target.fetch(:scope).to_sym
  repo = target[:repo]
  webhook_url = target.fetch(:webhook_url)
  hook_id = resolve_hook_id(scope: scope, repo: repo, webhook_url: webhook_url)

  {
    scope: scope,
    repo: repo,
    webhook_url: webhook_url,
    hook_id: hook_id,
    deliveries_path: "#{hooks_base_path(scope: scope, repo: repo)}/#{hook_id}/deliveries",
    processed_file: processed_file_for.call(target),
  }
end