Module: Protocol::ZMTP::Codec::Subscription

Defined in:
lib/protocol/zmtp/codec/subscription.rb

Overview

ZMTP subscription encoding.

Two wire formats exist and both are in active use:

  • **Message form (ZMTP 3.0 legacy, RFC 23).** A regular data frame whose body is ‘x01` + prefix (subscribe) or `x00` + prefix (cancel). libzmq, JeroMQ, pyzmq, CZMQ, NetMQ all send subscriptions in this form by default, and all accept it.

  • **Command form (ZMTP 3.1, RFC 37).** A COMMAND-flagged frame whose body is a Command named “SUBSCRIBE” or “CANCEL” with the prefix as the command data.

Interop requires sending the message form (understood by every ZMTP 3.0+ peer) and accepting both forms on the receiving side.

Constant Summary collapse

FLAG_SUBSCRIBE =
"\x01".b.freeze
FLAG_CANCEL =
"\x00".b.freeze

Class Method Summary collapse

Class Method Details

.body(prefix, cancel: false) ⇒ String

Builds the body of a subscription message in the legacy message form.

Parameters:

  • prefix (String)

    topic prefix

  • cancel (Boolean) (defaults to: false)

    true to build an unsubscribe

Returns:

  • (String)

    binary frame body



35
36
37
38
# File 'lib/protocol/zmtp/codec/subscription.rb', line 35

def body(prefix, cancel: false)
  flag = cancel ? FLAG_CANCEL : FLAG_SUBSCRIBE
  (flag + prefix.b).b
end

.parse(frame) ⇒ Array(Symbol, String)?

Attempts to parse a frame as a subscription. Accepts both the legacy message form and the ZMTP 3.1 command form.

Parameters:

Returns:

  • (Array(Symbol, String), nil)

    ‘[:subscribe, prefix]`, `[:cancel, prefix]`, or `nil` if the frame is not a subscription



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/protocol/zmtp/codec/subscription.rb', line 48

def parse(frame)
  if frame.command?
    cmd = Command.from_body(frame.body)
    case cmd.name
    when "SUBSCRIBE" then [:subscribe, cmd.data]
    when "CANCEL"    then [:cancel, cmd.data]
    end
  else
    body = frame.body
    return nil if body.empty?

    prefix = body.byteslice(1..) || "".b
    case body.getbyte(0)
    when 0x01 then [:subscribe, prefix]
    when 0x00 then [:cancel, prefix]
    end
  end
end