Class: Fluent::Plugin::Buffer::Chunk

Inherits:
Object
  • Object
show all
Includes:
UniqueId::Mixin, MonitorMixin
Defined in:
lib/fluent/plugin/buffer/chunk.rb

Direct Known Subclasses

FileChunk, FileSingleChunk, MemoryChunk

Defined Under Namespace

Modules: GzipDecompressable, ZstdDecompressable

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Constructor Details

#initialize(metadata, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT) ⇒ Chunk

TODO: CompressedPackedMessage of forward protocol?



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/buffer/chunk.rb', line 51

def initialize(, compress: :text, decompression_size_limit: Compressable::DEFAULT_DECOMPRESSION_SIZE_LIMIT)
  super()
  @unique_id = generate_unique_id
  @metadata = 

  # state: unstaged/staged/queued/closed
  @state = :unstaged

  @size = 0
  @created_at = Fluent::Clock.real_now
  @modified_at = Fluent::Clock.real_now
  if compress == :gzip
    extend GzipDecompressable
  elsif compress == :zstd
    extend ZstdDecompressable
  end
  @decompression_size_limit = decompression_size_limit
end

Instance Attribute Details

#metadataObject (readonly)

Returns the value of attribute metadata.



70
71
72
# File 'lib/fluent/plugin/buffer/chunk.rb', line 70

def 
  @metadata
end

#stateObject (readonly)

Returns the value of attribute state.



70
71
72
# File 'lib/fluent/plugin/buffer/chunk.rb', line 70

def state
  @state
end

#unique_idObject (readonly)

Returns the value of attribute unique_id.



70
71
72
# File 'lib/fluent/plugin/buffer/chunk.rb', line 70

def unique_id
  @unique_id
end

Instance Method Details

#append(data, **kwargs) ⇒ Object

data is array of formatted record string

Raises:

  • (ArgumentError)


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/buffer/chunk.rb', line 91

def append(data, **kwargs)
  raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
  begin
    adding = data.join.force_encoding(Encoding::ASCII_8BIT)
  rescue
    # Fallback
    # Array#join throws an exception if data contains strings with a different encoding.
    # Although such cases may be rare, it should be considered as a safety precaution.
    adding = ''.force_encoding(Encoding::ASCII_8BIT)
    data.each do |d|
      adding << d.b
    end
  end
  concat(adding, data.size)
end

#bytesizeObject

Raises:

  • (NotImplementedError)


120
121
122
# File 'lib/fluent/plugin/buffer/chunk.rb', line 120

def bytesize
  raise NotImplementedError, "Implement this method in child class"
end

#closeObject



168
169
170
171
# File 'lib/fluent/plugin/buffer/chunk.rb', line 168

def close
  @state = :closed
  self
end

#closed?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/fluent/plugin/buffer/chunk.rb', line 149

def closed?
  @state == :closed
end

#commitObject

Raises:

  • (NotImplementedError)


112
113
114
# File 'lib/fluent/plugin/buffer/chunk.rb', line 112

def commit
  raise NotImplementedError, "Implement this method in child class"
end

#concat(bulk, records) ⇒ Object

for event streams which is packed or zipped (and we want not to unpack/uncompress)

Raises:

  • (NotImplementedError)


108
109
110
# File 'lib/fluent/plugin/buffer/chunk.rb', line 108

def concat(bulk, records)
  raise NotImplementedError, "Implement this method in child class"
end

#created_atObject

for compatibility



81
82
83
# File 'lib/fluent/plugin/buffer/chunk.rb', line 81

def created_at
  @created_at_object ||= Time.at(@created_at)
end

#empty?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/fluent/plugin/buffer/chunk.rb', line 129

def empty?
  size == 0
end

#enqueued!Object



163
164
165
166
# File 'lib/fluent/plugin/buffer/chunk.rb', line 163

def enqueued!
  @state = :queued
  self
end

#modified_atObject

for compatibility



86
87
88
# File 'lib/fluent/plugin/buffer/chunk.rb', line 86

def modified_at
  @modified_at_object ||= Time.at(@modified_at)
end

#open(**kwargs, &block) ⇒ Object

Raises:

  • (ArgumentError)


183
184
185
186
# File 'lib/fluent/plugin/buffer/chunk.rb', line 183

def open(**kwargs, &block)
  raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
  raise NotImplementedError, "Implement this method in child class"
end

#purgeObject



173
174
175
176
# File 'lib/fluent/plugin/buffer/chunk.rb', line 173

def purge
  @state = :closed
  self
end

#queued?Boolean

Returns:

  • (Boolean)


145
146
147
# File 'lib/fluent/plugin/buffer/chunk.rb', line 145

def queued?
  @state == :queued
end

#raw_create_atObject



72
73
74
# File 'lib/fluent/plugin/buffer/chunk.rb', line 72

def raw_create_at
  @created_at
end

#raw_modified_atObject



76
77
78
# File 'lib/fluent/plugin/buffer/chunk.rb', line 76

def raw_modified_at
  @modified_at
end

#read(**kwargs) ⇒ Object

Raises:

  • (ArgumentError)


178
179
180
181
# File 'lib/fluent/plugin/buffer/chunk.rb', line 178

def read(**kwargs)
  raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
  raise NotImplementedError, "Implement this method in child class"
end

#rollbackObject

Raises:

  • (NotImplementedError)


116
117
118
# File 'lib/fluent/plugin/buffer/chunk.rb', line 116

def rollback
  raise NotImplementedError, "Implement this method in child class"
end

#sizeObject Also known as: length

Raises:

  • (NotImplementedError)


124
125
126
# File 'lib/fluent/plugin/buffer/chunk.rb', line 124

def size
  raise NotImplementedError, "Implement this method in child class"
end

#staged!Object



153
154
155
156
# File 'lib/fluent/plugin/buffer/chunk.rb', line 153

def staged!
  @state = :staged
  self
end

#staged?Boolean

Returns:

  • (Boolean)


141
142
143
# File 'lib/fluent/plugin/buffer/chunk.rb', line 141

def staged?
  @state == :staged
end

#unstaged!Object



158
159
160
161
# File 'lib/fluent/plugin/buffer/chunk.rb', line 158

def unstaged!
  @state = :unstaged
  self
end

#unstaged?Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/fluent/plugin/buffer/chunk.rb', line 137

def unstaged?
  @state == :unstaged
end

#writable?Boolean

Returns:

  • (Boolean)


133
134
135
# File 'lib/fluent/plugin/buffer/chunk.rb', line 133

def writable?
  @state == :staged || @state == :unstaged
end

#write_to(io, **kwargs) ⇒ Object

Raises:

  • (ArgumentError)


188
189
190
191
192
193
# File 'lib/fluent/plugin/buffer/chunk.rb', line 188

def write_to(io, **kwargs)
  raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
  open do |i|
    IO.copy_stream(i, io)
  end
end