Module: Legion::Extensions::Github::Absorbers::Issues

Extended by:
Issues, Helpers
Included in:
Issues
Defined in:
lib/legion/extensions/github/absorbers/issues.rb

Overview

Absorbs GitHub issue events and normalizes them to fleet work items. Subscribes to lex.github.absorbers.issues queue.

Filters: bot events, already-claimed issues (fleet labels), ignored actions (closed, transferred, etc.).

Publishes normalized work items to the assessor queue via task chain.

Constant Summary collapse

CACHE_TTL =

24 hours

86_400

Instance Method Summary collapse

Instance Method Details

#absorb(payload:) ⇒ Hash

Main entry point. Called by the subscription actor when a GitHub webhook event for issues arrives.

Parameters:

  • payload (Hash)

    Raw GitHub webhook payload (string keys from JSON)

Returns:

  • (Hash)

    { absorbed: true/false, … }



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/legion/extensions/github/absorbers/issues.rb', line 34

def absorb(payload:, **)
  return { absorbed: false, reason: :bot_generated } if bot_generated?(payload)
  return { absorbed: false, reason: :already_claimed } if has_fleet_label?(payload)
  return { absorbed: false, reason: :ignored } if ignored?(payload)

  work_item = normalize(payload)

  # NOTE: Absorber does NOT call set_nx — the assessor is the single dedup authority.
  # Source-specific dedup only: label checks, bot filter, action filter.

  # Store large raw payload in Redis, not inline in AMQP message
  cache_key = "fleet:payload:#{work_item[:work_item_id]}"
  cache_set(cache_key, json_dump(payload), ttl: CACHE_TTL)
  work_item[:raw_payload_ref] = cache_key

  # Publish to assessor via transport
  publish_result = publish_to_assessor(work_item)

  # Propagate publish failures — do not swallow
  return publish_result if publish_result.is_a?(Hash) && publish_result[:absorbed] == false

  { absorbed: true, work_item_id: work_item[:work_item_id] }
end

#description_max_bytesObject



23
24
25
26
27
# File 'lib/legion/extensions/github/absorbers/issues.rb', line 23

def description_max_bytes
  Legion::Settings.dig(:fleet, :work_item, :description_max_bytes) || 32_768
rescue StandardError => _e
  32_768
end

#normalize(payload) ⇒ Hash

Normalize a raw GitHub webhook payload to the standard fleet work item format (design spec section 3).

Parameters:

  • payload (Hash)

    Raw GitHub webhook payload (string keys)

Returns:

  • (Hash)

    Normalized work item (symbol keys)



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/legion/extensions/github/absorbers/issues.rb', line 63

def normalize(payload)
  issue = payload['issue'] || {}
  repo = payload['repository'] || {}
  action = payload['action'] || 'opened'
  owner = repo.dig('owner', 'login') || ''
  repo_name = repo['name'] || ''
  number = issue['number']
  body = issue['body'] || ''
  max_bytes = description_max_bytes

  {
    work_item_id:    generate_work_item_id,
    source:          'github',
    source_ref:      "#{owner}/#{repo_name}##{number}",
    source_event:    "issues.#{action}",

    title:           issue['title'] || '',
    description:     body.bytesize > max_bytes ? body.byteslice(0, max_bytes).scrub('') : body,
    raw_payload_ref: nil, # set after cache write in absorb

    repo:            {
      owner:          owner,
      name:           repo_name,
      default_branch: repo['default_branch'] || 'main',
      language:       repo['language'] || 'unknown'
    },

    instructions:    [],
    context:         [],

    config:          default_config,

    pipeline:        {
      stage:            'intake',
      trace:            [],
      attempt:          0,
      feedback_history: [],
      plan:             nil,
      changes:          nil,
      review_result:    nil,
      pr_number:        nil,
      branch_name:      nil,
      context_ref:      nil
    }
  }
end