Class: Fluent::Plugin::GCSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_gcs.rb

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
"out_file"
MAX_HEX_RANDOM_LENGTH =
32

Instance Method Summary collapse

Constructor Details

#initializeGCSOutput

Returns a new instance of GCSOutput.



15
16
17
18
19
# File 'lib/fluent/plugin/out_gcs.rb', line 15

def initialize
  super
  require "google/cloud/storage"
  Google::Apis.logger = log
end

Instance Method Details

#configure(conf) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/out_gcs.rb', line 77

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if @object_metadata
    @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h
  end

  @formatter = formatter_create

  @object_creator = Fluent::GCS.discovered_object_creator(
    @store_as,
    transcoding: @transcoding,
    command_parameter: @gzip_command_parameter,
    log: log
  )
  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))

  if @credentials_json
    @credentials = @credentials_json
  else
    @credentials = keyfile
  end
end

#format(tag, time, record) ⇒ Object



127
128
129
130
# File 'lib/fluent/plugin/out_gcs.rb', line 127

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/fluent/plugin/out_gcs.rb', line 132

def multi_workers_ready?
  true
end

#startObject



114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin/out_gcs.rb', line 114

def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @credentials,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end

#write(chunk) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluent/plugin/out_gcs.rb', line 136

def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: @object_metadata_hash,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, **opts)
  end
end