Class: Fluent::Plugin::AzureLogsIngestionOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_azure_logs_ingestion.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/out_azure_logs_ingestion.rb', line 31

def configure(conf)
  super

  validate_urls!

  return if @use_msi
  return if @tenant_id && @client_id && @client_secret

  raise Fluent::ConfigError, 'tenant_id, client_id, and client_secret are required when use_msi is false'
end

#startObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/out_azure_logs_ingestion.rb', line 42

def start
  super
  @auth = AzureLogsIngestion::Auth.new(
    use_msi: @use_msi,
    tenant_id: @tenant_id,
    client_id: @client_id,
    client_secret: @client_secret,
    authority_host: @authority_host,
    logs_ingestion_scope: @logs_ingestion_scope,
    token_refresh_skew: @token_refresh_skew,
    logger: log
  )
  @client = AzureLogsIngestion::Client.new(
    endpoint: @endpoint,
    dcr_immutable_id: @dcr_immutable_id,
    stream_name: @stream_name,
    logger: log
  )
end

#write(chunk) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_azure_logs_ingestion.rb', line 62

def write(chunk)
  log.debug('building logs ingestion payload', chunk_id: dump_unique_id_hex(chunk.unique_id), gzip: @gzip)
  payload = AzureLogsIngestion::PayloadBuilder.new(gzip: @gzip).build(chunk)

  log.debug(
    'built logs ingestion payload',
    chunk_id: dump_unique_id_hex(chunk.unique_id),
    record_count: payload.record_count,
    raw_size: payload.raw_size,
    gzip_size: payload.gzip_size,
    content_length: payload.content_length
  )

  token = @auth.token
  @client.send_payload(payload: payload, bearer_token: token)
  log.debug('logs ingestion request completed', chunk_id: dump_unique_id_hex(chunk.unique_id))
ensure
  payload&.close!
end