Class: Fluent::Plugin::GCSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_gcs.rb

Constant Summary collapse

MAX_HEX_RANDOM_LENGTH =
32

Instance Method Summary collapse

Constructor Details

#initializeGCSOutput

Returns a new instance of GCSOutput.



15
16
17
18
19
# File 'lib/fluent/plugin/out_gcs.rb', line 15

def initialize
  super
  require "google/cloud/storage"
  Google::Apis.logger = log
end

Instance Method Details

#configure(conf) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/out_gcs.rb', line 75

def configure(conf)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if @object_metadata
    @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h
  end

  @formatter = formatter_create

  # gzip_command_parameter is a deprecated alias of command_parameter; the
  # explicit command_parameter wins when both are set.
  command_parameter = @command_parameter || @gzip_command_parameter

  @object_creator = Fluent::GCS.discovered_object_creator(
    @store_as,
    transcoding: @transcoding,
    command_parameter: command_parameter,
    log: log
  )
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, timekey_to_timeformat(@buffer_config['timekey']))

  if @credentials_json
    @credentials = @credentials_json
  else
    @credentials = keyfile
  end
end

#format(tag, time, record) ⇒ Object



125
126
127
128
# File 'lib/fluent/plugin/out_gcs.rb', line 125

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


130
131
132
# File 'lib/fluent/plugin/out_gcs.rb', line 130

def multi_workers_ready?
  true
end

#startObject



112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/plugin/out_gcs.rb', line 112

def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @credentials,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end

#write(chunk) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/fluent/plugin/out_gcs.rb', line 134

def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: @object_metadata_hash,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, **opts)
  end
end