Class: Pulsar::Internal::FrameCodec

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/frame_codec.rb

Overview

Encodes and decodes Pulsar binary protocol frames.

Defined Under Namespace

Classes: DecodedFrame, DecodedMessageData

Class Method Summary collapse

Class Method Details

.decode_frame(frame) ⇒ Object

Raises:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/pulsar/internal/frame_codec.rb', line 25

def self.decode_frame(frame)
  frame = String(frame).b
  raise ProtocolError, 'frame size prefix is incomplete' if frame.bytesize < 4

  total_size = frame.byteslice(0, 4).unpack1('N')
  raise ProtocolError, 'frame is incomplete' if frame.bytesize < 4 + total_size
  raise ProtocolError, 'command size prefix is incomplete' if total_size < 4

  command_size = frame.byteslice(4, 4).unpack1('N')
  raise ProtocolError, 'command exceeds frame size' if command_size > total_size - 4

  command_bytes = frame.byteslice(8, command_size)
  headers_and_payload = frame.byteslice(8 + command_size, total_size - 4 - command_size) || +''

  DecodedFrame.new(
    command: Proto::BaseCommand.decode(command_bytes),
    headers_and_payload: headers_and_payload.b
  )
end

.decode_message_data(headers_and_payload) ⇒ Object

Raises:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/pulsar/internal/frame_codec.rb', line 45

def self.decode_message_data(headers_and_payload)
  headers_and_payload = String(headers_and_payload).b
  raise ProtocolError, 'metadata size prefix is incomplete' if headers_and_payload.bytesize < 4

   = headers_and_payload.byteslice(0, 4).unpack1('N')
  raise ProtocolError, 'metadata exceeds message data size' if  > headers_and_payload.bytesize - 4

   = headers_and_payload.byteslice(4, )
  payload = headers_and_payload.byteslice(4 + , headers_and_payload.bytesize - 4 - ) || +''

  DecodedMessageData.new(
    metadata: Proto::MessageMetadata.decode(),
    payload: payload.b
  )
end

.encode_command(command) ⇒ Object



10
11
12
13
# File 'lib/pulsar/internal/frame_codec.rb', line 10

def self.encode_command(command)
  encoded_command = Proto::BaseCommand.encode(command)
  [4 + encoded_command.bytesize, encoded_command.bytesize].pack('NN') + encoded_command
end

.encode_message(command, metadata, payload) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/pulsar/internal/frame_codec.rb', line 15

def self.encode_message(command, , payload)
  encoded_command = Proto::BaseCommand.encode(command)
   = Proto::MessageMetadata.encode()
  payload = String(payload).b
  total_size = 4 + encoded_command.bytesize + 4 + .bytesize + payload.bytesize

  [total_size, encoded_command.bytesize].pack('NN') +
    encoded_command + [.bytesize].pack('N') +  + payload
end