Class: Fluent::Plugin::Buffer::Chunk
- Inherits:
-
Object
- Object
- Fluent::Plugin::Buffer::Chunk
show all
- Includes:
- UniqueId::Mixin, MonitorMixin
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Defined Under Namespace
Modules: GzipDecompressable, ZstdDecompressable
Instance Attribute Summary collapse
Instance Method Summary
collapse
#dump_unique_id_hex, #generate_unique_id
Constructor Details
#initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) ⇒ Chunk
TODO: CompressedPackedMessage of forward protocol?
Instance Attribute Details
Returns the value of attribute metadata.
70
71
72
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 70
def metadata
@metadata
end
|
#state ⇒ Object
Returns the value of attribute state.
70
71
72
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 70
def state
@state
end
|
#unique_id ⇒ Object
Returns the value of attribute unique_id.
70
71
72
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 70
def unique_id
@unique_id
end
|
Instance Method Details
#append(data, **kwargs) ⇒ Object
data is array of formatted record string
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 91
def append(data, **kwargs)
raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
begin
adding = data.join.force_encoding(Encoding::ASCII_8BIT)
rescue
adding = ''.force_encoding(Encoding::ASCII_8BIT)
data.each do |d|
adding << d.b
end
end
concat(adding, data.size)
end
|
#bytesize ⇒ Object
120
121
122
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 120
def bytesize
raise NotImplementedError, "Implement this method in child class"
end
|
#close ⇒ Object
168
169
170
171
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 168
def close
@state = :closed
self
end
|
#closed? ⇒ Boolean
149
150
151
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 149
def closed?
@state == :closed
end
|
#commit ⇒ Object
112
113
114
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 112
def commit
raise NotImplementedError, "Implement this method in child class"
end
|
#concat(bulk, records) ⇒ Object
for event streams which is packed or zipped (and we want not to unpack/uncompress)
108
109
110
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 108
def concat(bulk, records)
raise NotImplementedError, "Implement this method in child class"
end
|
#created_at ⇒ Object
81
82
83
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 81
def created_at
@created_at_object ||= Time.at(@created_at)
end
|
#empty? ⇒ Boolean
129
130
131
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 129
def empty?
size == 0
end
|
#enqueued! ⇒ Object
163
164
165
166
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 163
def enqueued!
@state = :queued
self
end
|
#modified_at ⇒ Object
86
87
88
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 86
def modified_at
@modified_at_object ||= Time.at(@modified_at)
end
|
#open(**kwargs, &block) ⇒ Object
183
184
185
186
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 183
def open(**kwargs, &block)
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end
|
#purge ⇒ Object
173
174
175
176
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 173
def purge
@state = :closed
self
end
|
#queued? ⇒ Boolean
145
146
147
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 145
def queued?
@state == :queued
end
|
#raw_create_at ⇒ Object
72
73
74
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 72
def raw_create_at
@created_at
end
|
#raw_modified_at ⇒ Object
76
77
78
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 76
def raw_modified_at
@modified_at
end
|
#read(**kwargs) ⇒ Object
178
179
180
181
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 178
def read(**kwargs)
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
raise NotImplementedError, "Implement this method in child class"
end
|
#rollback ⇒ Object
116
117
118
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 116
def rollback
raise NotImplementedError, "Implement this method in child class"
end
|
#size ⇒ Object
Also known as:
length
124
125
126
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 124
def size
raise NotImplementedError, "Implement this method in child class"
end
|
#staged! ⇒ Object
153
154
155
156
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 153
def staged!
@state = :staged
self
end
|
#staged? ⇒ Boolean
141
142
143
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 141
def staged?
@state == :staged
end
|
#unstaged! ⇒ Object
158
159
160
161
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 158
def unstaged!
@state = :unstaged
self
end
|
#unstaged? ⇒ Boolean
137
138
139
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 137
def unstaged?
@state == :unstaged
end
|
#writable? ⇒ Boolean
133
134
135
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 133
def writable?
@state == :staged || @state == :unstaged
end
|
#write_to(io, **kwargs) ⇒ Object
188
189
190
191
192
193
|
# File 'lib/fluent/plugin/buffer/chunk.rb', line 188
def write_to(io, **kwargs)
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
open do |i|
IO.copy_stream(i, io)
end
end
|