Module: Legion::DigitalWorker::RiskTier

Defined in:
lib/legion/digital_worker/risk_tier.rb

Constant Summary collapse

TIERS =
%w[low medium high critical].freeze
CONSTRAINTS =

Maps AIRB risk tiers to governance and consent constraints. These constraints are enforced when a worker attempts to execute a task.

{
  'low'      => { min_consent: 'inform',     governance_gate: false, council_required: false },
  'medium'   => { min_consent: 'consult',    governance_gate: false, council_required: false },
  'high'     => { min_consent: 'consult',    governance_gate: true,  council_required: true  },
  'critical' => { min_consent: 'supervised', governance_gate: true,  council_required: true  }
}.freeze

Class Method Summary collapse

Class Method Details

.assign!(worker, tier:, by:, reason: nil) ⇒ Object

Assign or change a worker’s risk tier. Lowering risk requires governance approval.

Raises:

  • (ArgumentError)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/legion/digital_worker/risk_tier.rb', line 38

def self.assign!(worker, tier:, by:, reason: nil)
  raise ArgumentError, "invalid tier: #{tier}" unless valid?(tier)

  old_tier = worker.risk_tier
  tier_lowered = old_tier && TIERS.index(tier) < TIERS.index(old_tier)

  if tier_lowered
    Legion::Logging.warn "[risk_tier] lowering risk from #{old_tier} to #{tier} requires governance approval"
    # In production: check governance approval here
  end

  worker.update(risk_tier: tier, updated_at: Time.now.utc)

  event = {
    event:     :risk_tier_changed,
    worker_id: worker.worker_id,
    from_tier: old_tier,
    to_tier:   tier,
    by:        by,
    reason:    reason,
    at:        Time.now.utc
  }

  Legion::Events.emit('worker.risk_tier_changed', **event) if defined?(Legion::Events)
  Legion::Logging.info "[risk_tier] worker=#{worker.worker_id} tier: #{old_tier || 'none'} -> #{tier} by=#{by}"

  { assigned: true }.merge(event)
end

Validate that a worker’s current consent tier meets the minimum for its risk tier

Returns:

  • (Boolean)


68
69
70
71
72
73
74
# File 'lib/legion/digital_worker/risk_tier.rb', line 68

def self.consent_compliant?(worker)
  return true unless worker.risk_tier

  min = min_consent(worker.risk_tier)
  hierarchy = Legion::DigitalWorker::Registry::CONSENT_HIERARCHY
  hierarchy.index(worker.consent_tier) >= hierarchy.index(min)
end

.constraints_for(tier) ⇒ Object



21
22
23
# File 'lib/legion/digital_worker/risk_tier.rb', line 21

def self.constraints_for(tier)
  CONSTRAINTS.fetch(tier) { raise ArgumentError, "unknown risk tier: #{tier}. Valid: #{TIERS.join(', ')}" }
end

.council_required?(tier) ⇒ Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/legion/digital_worker/risk_tier.rb', line 33

def self.council_required?(tier)
  constraints_for(tier)[:council_required]
end

.governance_required?(tier) ⇒ Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/legion/digital_worker/risk_tier.rb', line 29

def self.governance_required?(tier)
  constraints_for(tier)[:governance_gate]
end


25
26
27
# File 'lib/legion/digital_worker/risk_tier.rb', line 25

def self.min_consent(tier)
  constraints_for(tier)[:min_consent]
end

.valid?(tier) ⇒ Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/legion/digital_worker/risk_tier.rb', line 17

def self.valid?(tier)
  TIERS.include?(tier)
end