Class: FluentPluginKinesisAggregation::OutputFilter

Inherits:
Fluent::Plugin::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_kinesis-aggregation.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
NAME =
'kinesis-aggregation'
PUT_RECORD_MAX_DATA_SIZE =
1024 * 1024
FLUENTD_MAX_BUFFER_SIZE =

200 is an arbitrary number more than the envelope overhead and big enough to store partition/hash key table in AggregatedRecords. Note that you shouldn't really ever have the buffer this high, since you're likely to fail the write if anyone else is writing to the shard at the time.

PUT_RECORD_MAX_DATA_SIZE - 200

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 88

def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE
    raise Fluent::ConfigError, "Kinesis buffer_chunk_limit is set to more than the 1mb shard limit (i.e. you won't be able to write your chunks!"
  end

  if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE / 3
    log.warn 'Kinesis buffer_chunk_limit is set at more than 1/3 of the per second shard limit (1mb). This is not good if you have many producers.'
  end
end

#format(tag, time, record) ⇒ Object



106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 106

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)

  return AggregatedRecord.encode(AggregatedRecord.new(
    records: [Record.new(
      partition_key_index: 1,
      data: Yajl.dump(record).b
    )]
  ))
end

#startObject



101
102
103
104
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 101

def start
  super
  load_client
end

#write(chunk) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 117

def write(chunk)
  records = chunk.read
  if records.length > FLUENTD_MAX_BUFFER_SIZE
    log.error "Can't emit aggregated #{@stream_name} stream record of length #{records.length} (more than #{FLUENTD_MAX_BUFFER_SIZE})"
    return # do not throw, since we can't retry
  end

  partition_key = @fixed_partition_key || SecureRandom.uuid

  # confusing magic. Because of the format of protobuf records,
  # it's valid (in this case) to concatenate the AggregatedRecords
  # to form one AggregatedRecord, since we only have a repeated field
  # in records.
  #
  # ALSO, since we use google's protobuf stuff (much better
  # memory usage due to C extension), we're stuck on proto3.
  # Unfortunately, KPL uses proto2 form, and partition_key_index
  # is a required field. If we set it to 0 in proto3, though,
  # it's helpfully ignored in the serialisation (default!).
  # Therefore we have to pass a partition_key_index of 1,
  # and put two things in our partition_key_table.
  message = AggregatedRecord.encode(AggregatedRecord.new(
    partition_key_table: ['a', partition_key]
  )) + records

  @client.put_record(
    stream_name: @stream_name,
    data: kpl_aggregation_pack(message),
    partition_key: partition_key
  )
end