Module: Legion::Extensions::Github::Absorbers::Issues
- Included in:
- Issues
- Defined in:
- lib/legion/extensions/github/absorbers/issues.rb
Overview
Absorbs GitHub issue events and normalizes them to fleet work items. Subscribes to lex.github.absorbers.issues queue.
Filters: bot events, already-claimed issues (fleet labels), ignored actions (closed, transferred, etc.).
Publishes normalized work items to the assessor queue via task chain.
Constant Summary collapse
- CACHE_TTL =
24 hours
86_400
Instance Method Summary collapse
-
#absorb(payload:) ⇒ Hash
Main entry point.
- #description_max_bytes ⇒ Object
-
#normalize(payload) ⇒ Hash
Normalize a raw GitHub webhook payload to the standard fleet work item format (design spec section 3).
Instance Method Details
#absorb(payload:) ⇒ Hash
Main entry point. Called by the subscription actor when a GitHub webhook event for issues arrives.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/legion/extensions/github/absorbers/issues.rb', line 34 def absorb(payload:, **) return { absorbed: false, reason: :bot_generated } if bot_generated?(payload) return { absorbed: false, reason: :already_claimed } if has_fleet_label?(payload) return { absorbed: false, reason: :ignored } if ignored?(payload) work_item = normalize(payload) # NOTE: Absorber does NOT call set_nx — the assessor is the single dedup authority. # Source-specific dedup only: label checks, bot filter, action filter. # Store large raw payload in Redis, not inline in AMQP message cache_key = "fleet:payload:#{work_item[:work_item_id]}" cache_set(cache_key, json_dump(payload), ttl: CACHE_TTL) work_item[:raw_payload_ref] = cache_key # Publish to assessor via transport publish_result = publish_to_assessor(work_item) # Propagate publish failures — do not swallow return publish_result if publish_result.is_a?(Hash) && publish_result[:absorbed] == false { absorbed: true, work_item_id: work_item[:work_item_id] } end |
#description_max_bytes ⇒ Object
23 24 25 26 27 |
# File 'lib/legion/extensions/github/absorbers/issues.rb', line 23 def description_max_bytes Legion::Settings.dig(:fleet, :work_item, :description_max_bytes) || 32_768 rescue StandardError => _e 32_768 end |
#normalize(payload) ⇒ Hash
Normalize a raw GitHub webhook payload to the standard fleet work item format (design spec section 3).
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/legion/extensions/github/absorbers/issues.rb', line 63 def normalize(payload) issue = payload['issue'] || {} repo = payload['repository'] || {} action = payload['action'] || 'opened' owner = repo.dig('owner', 'login') || '' repo_name = repo['name'] || '' number = issue['number'] body = issue['body'] || '' max_bytes = description_max_bytes { work_item_id: generate_work_item_id, source: 'github', source_ref: "#{owner}/#{repo_name}##{number}", source_event: "issues.#{action}", title: issue['title'] || '', description: body.bytesize > max_bytes ? body.byteslice(0, max_bytes).scrub('') : body, raw_payload_ref: nil, # set after cache write in absorb repo: { owner: owner, name: repo_name, default_branch: repo['default_branch'] || 'main', language: repo['language'] || 'unknown' }, instructions: [], context: [], config: default_config, pipeline: { stage: 'intake', trace: [], attempt: 0, feedback_history: [], plan: nil, changes: nil, review_result: nil, pr_number: nil, branch_name: nil, context_ref: nil } } end |