Module: Legion::Extensions::Identity::Entra::WorkloadIdentity::Runners::Token

Includes:
Helpers::Lex, Logging::Helper, Settings::Helper
Defined in:
lib/legion/extensions/identity/entra/workload_identity/runners/token.rb

Constant Summary collapse

DEFAULT_SCOPE =
'https://graph.microsoft.com/.default'

Instance Method Summary collapse

Instance Method Details

#acquire_federated_token(tenant_id:, client_id:, assertion:, scope: DEFAULT_SCOPE, assertion_file: nil) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/legion/extensions/identity/entra/workload_identity/runners/token.rb', line 17

def acquire_federated_token(tenant_id:, client_id:, assertion:,
                            scope: DEFAULT_SCOPE,
                            assertion_file: nil, **)
  log.debug("WorkloadIdentity::Token.acquire_federated: tenant=#{tenant_id}")
  assertion = resolve_assertion(assertion, assertion_file)
  unless assertion
    log.warn('WorkloadIdentity::Token.acquire_federated: no assertion available')
    return { error: 'missing_assertion', description: 'No SA token or assertion file available' }
  end

  result = federation_post(tenant_id,
                           grant_type:            'client_credentials',
                           client_id:             client_id,
                           client_assertion_type: 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
                           client_assertion:      assertion,
                           scope:                 scope)
  log.info('WorkloadIdentity::Token.acquire_federated: token acquired') if result[:access_token]
  { result: result }
rescue StandardError => e
  handle_exception(e, level: :error, operation: 'workload_identity.token.acquire_federated')
  { error: 'request_failed', description: e.message }
end

#acquire_from_environment(scope: DEFAULT_SCOPE) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/legion/extensions/identity/entra/workload_identity/runners/token.rb', line 40

def acquire_from_environment(scope: DEFAULT_SCOPE, **)
  log.debug('WorkloadIdentity::Token.acquire_from_environment: reading env vars')
  tenant_id = ENV.fetch('AZURE_TENANT_ID', nil)
  client_id = ENV.fetch('AZURE_CLIENT_ID', nil)
  token_file = ENV.fetch('AZURE_FEDERATED_TOKEN_FILE', nil)

  unless tenant_id && client_id && token_file
    log.warn('WorkloadIdentity::Token.acquire_from_environment: missing env vars')
    return { error:       'missing_env',
             description: 'AZURE_TENANT_ID, AZURE_CLIENT_ID, and AZURE_FEDERATED_TOKEN_FILE required' }
  end

  acquire_federated_token(tenant_id: tenant_id, client_id: client_id,
                          assertion: nil, assertion_file: token_file,
                          scope: scope)
end

#federation_post(tenant_id, form) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/legion/extensions/identity/entra/workload_identity/runners/token.rb', line 57

def federation_post(tenant_id, form)
  log.debug("WorkloadIdentity::Token.federation_post: tenant=#{tenant_id}")
  response = federation_connection(tenant_id).post('oauth2/v2.0/token',
                                                   URI.encode_www_form(form.transform_keys(&:to_s)))
  body = response.body.to_s.empty? ? {} : json_load(response.body)
  unless response.success?
    body[:error] ||= "http_#{response.status}"
    body[:error_description] ||= response.reason_phrase
    log.debug("WorkloadIdentity::Token.federation_post: error=#{body[:error]} status=#{response.status}")
  end
  body
end