Class: FluentPluginKinesisAggregation::OutputFilter
- Inherits:
-
Fluent::Plugin::Output
- Object
- Fluent::Plugin::Output
- FluentPluginKinesisAggregation::OutputFilter
- Defined in:
- lib/fluent/plugin/out_kinesis-aggregation.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
- NAME =
'kinesis-aggregation'
- PUT_RECORD_MAX_DATA_SIZE =
1024 * 1024
- FLUENTD_MAX_BUFFER_SIZE =
200 is an arbitrary number more than the envelope overhead and big enough to store partition/hash key table in AggregatedRecords. Note that you shouldn't really ever have the buffer this high, since you're likely to fail the write if anyone else is writing to the shard at the time.
PUT_RECORD_MAX_DATA_SIZE - 200
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 88 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE raise Fluent::ConfigError, "Kinesis buffer_chunk_limit is set to more than the 1mb shard limit (i.e. you won't be able to write your chunks!" end if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE / 3 log.warn 'Kinesis buffer_chunk_limit is set at more than 1/3 of the per second shard limit (1mb). This is not good if you have many producers.' end end |
#format(tag, time, record) ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 106 def format(tag, time, record) record = inject_values_to_record(tag, time, record) return AggregatedRecord.encode(AggregatedRecord.new( records: [Record.new( partition_key_index: 1, data: Yajl.dump(record).b )] )) end |
#start ⇒ Object
101 102 103 104 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 101 def start super load_client end |
#write(chunk) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/fluent/plugin/out_kinesis-aggregation.rb', line 117 def write(chunk) records = chunk.read if records.length > FLUENTD_MAX_BUFFER_SIZE log.error "Can't emit aggregated #{@stream_name} stream record of length #{records.length} (more than #{FLUENTD_MAX_BUFFER_SIZE})" return # do not throw, since we can't retry end partition_key = @fixed_partition_key || SecureRandom.uuid # confusing magic. Because of the format of protobuf records, # it's valid (in this case) to concatenate the AggregatedRecords # to form one AggregatedRecord, since we only have a repeated field # in records. # # ALSO, since we use google's protobuf stuff (much better # memory usage due to C extension), we're stuck on proto3. # Unfortunately, KPL uses proto2 form, and partition_key_index # is a required field. If we set it to 0 in proto3, though, # it's helpfully ignored in the serialisation (default!). # Therefore we have to pass a partition_key_index of 1, # and put two things in our partition_key_table. = AggregatedRecord.encode(AggregatedRecord.new( partition_key_table: ['a', partition_key] )) + records @client.put_record( stream_name: @stream_name, data: kpl_aggregation_pack(), partition_key: partition_key ) end |