Class: AMQ::Protocol::Basic

Inherits:
Class
  • Object
show all
Defined in:
lib/amq/protocol/client.rb

Defined Under Namespace

Classes: Ack, Cancel, CancelOk, Consume, ConsumeOk, Deliver, Get, GetEmpty, GetOk, Nack, Publish, Qos, QosOk, Recover, RecoverAsync, RecoverOk, Reject, Return

Constant Summary collapse

PROPERTIES =
[
  :content_type, # shortstr
  :content_encoding, # shortstr
  :headers, # table
  :delivery_mode, # octet
  :priority, # octet
  :correlation_id, # shortstr
  :reply_to, # shortstr
  :expiration, # shortstr
  :message_id, # shortstr
  :timestamp, # timestamp
  :type, # shortstr
  :user_id, # shortstr
  :app_id, # shortstr
  :cluster_id, # shortstr
]
DECODE_PROPERTIES =

THIS DECODES ONLY FLAGS

{
  0x8000 => :content_type,
  0x4000 => :content_encoding,
  0x2000 => :headers,
  0x1000 => :delivery_mode,
  0x0800 => :priority,
  0x0400 => :correlation_id,
  0x0200 => :reply_to,
  0x0100 => :expiration,
  0x0080 => :message_id,
  0x0040 => :timestamp,
  0x0020 => :type,
  0x0010 => :user_id,
  0x0008 => :app_id,
  0x0004 => :cluster_id,
}
DECODE_PROPERTIES_TYPE =
{
  0x8000 => :shortstr,
  0x4000 => :shortstr,
  0x2000 => :table,
  0x1000 => :octet,
  0x0800 => :octet,
  0x0400 => :shortstr,
  0x0200 => :shortstr,
  0x0100 => :shortstr,
  0x0080 => :shortstr,
  0x0040 => :timestamp,
  0x0020 => :shortstr,
  0x0010 => :shortstr,
  0x0008 => :shortstr,
  0x0004 => :shortstr,
}
DECODE_PROPERTIES_KEYS =

Hash doesn’t give any guarantees on keys order, we will do it in a straightforward way

[
  0x8000,
  0x4000,
  0x2000,
  0x1000,
  0x0800,
  0x0400,
  0x0200,
  0x0100,
  0x0080,
  0x0040,
  0x0020,
  0x0010,
  0x0008,
  0x0004,
]

Class Method Summary collapse

Methods inherited from Class

classes, inherited, method_id, name

Class Method Details

.decode_properties(data) ⇒ Object

Optimized decode_properties using getbyte and unpack1



1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
# File 'lib/amq/protocol/client.rb', line 1571

def self.decode_properties(data)
  offset, data_length, properties = 0, data.bytesize, {}

  compressed_index = data.byteslice(offset, 2).unpack1(PACK_UINT16)
  offset += 2
  while data_length > offset
    DECODE_PROPERTIES_KEYS.each do |key|
      next unless compressed_index >= key
      compressed_index -= key
      name = DECODE_PROPERTIES[key] || raise(RuntimeError.new("No property found for index #{key.inspect}!"))
      case DECODE_PROPERTIES_TYPE[key]
      when :shortstr
        size = data.getbyte(offset)
        offset += 1
        result = data.byteslice(offset, size)
      when :octet
        size = 1
        result = data.getbyte(offset)
      when :timestamp
        size = 8
        result = Time.at(data.byteslice(offset, 8).unpack1(PACK_UINT64_BE))
      when :table
        size = 4 + data.byteslice(offset, 4).unpack1(PACK_UINT32)
        result = Table.decode(data.byteslice(offset, size))
      end
      properties[name] = result
      offset += size
    end
  end

  properties
end

.encode_app_id(value) ⇒ Object

1 << 3



1482
1483
1484
1485
1486
1487
# File 'lib/amq/protocol/client.rb', line 1482

def self.encode_app_id(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [12, 0x0008, buffer]
end

.encode_cluster_id(value) ⇒ Object

1 << 2



1490
1491
1492
1493
1494
1495
# File 'lib/amq/protocol/client.rb', line 1490

def self.encode_cluster_id(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [13, 0x0004, buffer]
end

.encode_content_encoding(value) ⇒ Object

1 << 14



1398
1399
1400
1401
1402
1403
# File 'lib/amq/protocol/client.rb', line 1398

def self.encode_content_encoding(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [1, 0x4000, buffer]
end

.encode_content_type(value) ⇒ Object

1 << 15



1390
1391
1392
1393
1394
1395
# File 'lib/amq/protocol/client.rb', line 1390

def self.encode_content_type(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [0, 0x8000, buffer]
end

.encode_correlation_id(value) ⇒ Object

1 << 10



1427
1428
1429
1430
1431
1432
# File 'lib/amq/protocol/client.rb', line 1427

def self.encode_correlation_id(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [5, 0x0400, buffer]
end

.encode_delivery_mode(value) ⇒ Object

1 << 12



1413
1414
1415
1416
1417
# File 'lib/amq/protocol/client.rb', line 1413

def self.encode_delivery_mode(value)
  buffer = +''
  buffer << [value].pack(PACK_CHAR)
  [3, 0x1000, buffer]
end

.encode_expiration(value) ⇒ Object

1 << 8



1443
1444
1445
1446
1447
1448
# File 'lib/amq/protocol/client.rb', line 1443

def self.encode_expiration(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [7, 0x0100, buffer]
end

.encode_headers(value) ⇒ Object

1 << 13



1406
1407
1408
1409
1410
# File 'lib/amq/protocol/client.rb', line 1406

def self.encode_headers(value)
  buffer = +''
  buffer << AMQ::Protocol::Table.encode(value)
  [2, 0x2000, buffer]
end

.encode_message_id(value) ⇒ Object

1 << 7



1451
1452
1453
1454
1455
1456
# File 'lib/amq/protocol/client.rb', line 1451

def self.encode_message_id(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [8, 0x0080, buffer]
end

.encode_priority(value) ⇒ Object

1 << 11



1420
1421
1422
1423
1424
# File 'lib/amq/protocol/client.rb', line 1420

def self.encode_priority(value)
  buffer = +''
  buffer << [value].pack(PACK_CHAR)
  [4, 0x0800, buffer]
end

.encode_properties(body_size, properties) ⇒ Object



1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
# File 'lib/amq/protocol/client.rb', line 1499

def self.encode_properties(body_size, properties)
  pieces, flags = [], 0

  properties.reject {|key, value| value.nil?}.each do |key, value|
    i, f, result = self.__send__(:"encode_#{key}", value)
    flags |= f
    pieces[i] = result
  end

  # result = [60, 0, body_size, flags].pack('n2Qn')
  result = [60, 0].pack(PACK_UINT16_X2)
  result += AMQ::Pack.pack_uint64_big_endian(body_size)
  result += [flags].pack(PACK_UINT16)
  pieces_joined = pieces.join(EMPTY_STRING)
  result.force_encoding(pieces_joined.encoding) + pieces_joined
end

.encode_reply_to(value) ⇒ Object

1 << 9



1435
1436
1437
1438
1439
1440
# File 'lib/amq/protocol/client.rb', line 1435

def self.encode_reply_to(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [6, 0x0200, buffer]
end

.encode_timestamp(value) ⇒ Object

1 << 6



1459
1460
1461
1462
1463
# File 'lib/amq/protocol/client.rb', line 1459

def self.encode_timestamp(value)
  buffer = +''
  buffer << AMQ::Pack.pack_uint64_big_endian(value)
  [9, 0x0040, buffer]
end

.encode_type(value) ⇒ Object

1 << 5



1466
1467
1468
1469
1470
1471
# File 'lib/amq/protocol/client.rb', line 1466

def self.encode_type(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [10, 0x0020, buffer]
end

.encode_user_id(value) ⇒ Object

1 << 4



1474
1475
1476
1477
1478
1479
# File 'lib/amq/protocol/client.rb', line 1474

def self.encode_user_id(value)
  buffer = +''
  buffer << value.to_s.bytesize.chr
  buffer << value.to_s
  [11, 0x0010, buffer]
end