Class: Pulsar::Internal::FrameCodec
- Inherits:
-
Object
- Object
- Pulsar::Internal::FrameCodec
show all
- Defined in:
- lib/pulsar/internal/frame_codec.rb
Overview
Encodes and decodes Pulsar binary protocol frames.
Defined Under Namespace
Classes: DecodedFrame, DecodedMessageData
Class Method Summary
collapse
Class Method Details
.decode_frame(frame) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/pulsar/internal/frame_codec.rb', line 25
def self.decode_frame(frame)
frame = String(frame).b
raise ProtocolError, 'frame size prefix is incomplete' if frame.bytesize < 4
total_size = frame.byteslice(0, 4).unpack1('N')
raise ProtocolError, 'frame is incomplete' if frame.bytesize < 4 + total_size
raise ProtocolError, 'command size prefix is incomplete' if total_size < 4
command_size = frame.byteslice(4, 4).unpack1('N')
raise ProtocolError, 'command exceeds frame size' if command_size > total_size - 4
command_bytes = frame.byteslice(8, command_size)
headers_and_payload = frame.byteslice(8 + command_size, total_size - 4 - command_size) || +''
DecodedFrame.new(
command: Proto::BaseCommand.decode(command_bytes),
headers_and_payload: headers_and_payload.b
)
end
|
.decode_message_data(headers_and_payload) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/pulsar/internal/frame_codec.rb', line 45
def self.decode_message_data(headers_and_payload)
headers_and_payload = String(headers_and_payload).b
raise ProtocolError, 'metadata size prefix is incomplete' if headers_and_payload.bytesize < 4
metadata_size = headers_and_payload.byteslice(0, 4).unpack1('N')
raise ProtocolError, 'metadata exceeds message data size' if metadata_size > headers_and_payload.bytesize - 4
metadata_bytes = headers_and_payload.byteslice(4, metadata_size)
payload = headers_and_payload.byteslice(4 + metadata_size, headers_and_payload.bytesize - 4 - metadata_size) || +''
DecodedMessageData.new(
metadata: Proto::MessageMetadata.decode(metadata_bytes),
payload: payload.b
)
end
|
.encode_command(command) ⇒ Object
10
11
12
13
|
# File 'lib/pulsar/internal/frame_codec.rb', line 10
def self.encode_command(command)
encoded_command = Proto::BaseCommand.encode(command)
[4 + encoded_command.bytesize, encoded_command.bytesize].pack('NN') + encoded_command
end
|
.encode_message(command, metadata, payload) ⇒ Object
15
16
17
18
19
20
21
22
23
|
# File 'lib/pulsar/internal/frame_codec.rb', line 15
def self.encode_message(command, metadata, payload)
encoded_command = Proto::BaseCommand.encode(command)
encoded_metadata = Proto::MessageMetadata.encode(metadata)
payload = String(payload).b
total_size = 4 + encoded_command.bytesize + 4 + encoded_metadata.bytesize + payload.bytesize
[total_size, encoded_command.bytesize].pack('NN') +
encoded_command + [encoded_metadata.bytesize].pack('N') + encoded_metadata + payload
end
|