Class: Rabbit::Publishing::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbit/publishing/message.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(routing_key: nil, event: nil, data: {}, exchange_name: [], confirm_select: true, realtime: false, headers: {}, message_id: nil) ⇒ Message

Returns a new instance of Message.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/rabbit/publishing/message.rb', line 12

def initialize(
  routing_key: nil,
  event: nil,
  data: {},
  exchange_name: [],
  confirm_select: true,
  realtime: false,
  headers: {},
  message_id: nil
)
  self.routing_key = routing_key
  self.event = event&.to_s
  self.data = data
  self.exchange_name = Array(exchange_name)
  self.confirm_select = confirm_select
  self.realtime = realtime
  self.headers = headers
  self.message_id = message_id
  self.compress = headers.with_indifferent_access.fetch(:compress, false)
end

Instance Attribute Details

#compressObject

Returns the value of attribute compress.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def compress
  @compress
end

#confirm_selectObject Also known as: confirm_select?

Returns the value of attribute confirm_select.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def confirm_select
  @confirm_select
end

#dataObject

Returns the value of attribute data.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def data
  @data
end

#eventObject

Returns the value of attribute event.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def event
  @event
end

#exchange_nameObject

Returns the value of attribute exchange_name.



7
8
9
# File 'lib/rabbit/publishing/message.rb', line 7

def exchange_name
  @exchange_name
end

#headersObject

Returns the value of attribute headers.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def headers
  @headers
end

#message_idObject

Returns the value of attribute message_id.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def message_id
  @message_id
end

#realtimeObject Also known as: realtime?

Returns the value of attribute realtime.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def realtime
  @realtime
end

#routing_keyObject

Returns the value of attribute routing_key.



5
6
7
# File 'lib/rabbit/publishing/message.rb', line 5

def routing_key
  @routing_key
end

Instance Method Details

#basic_publish_argsObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/rabbit/publishing/message.rb', line 46

def basic_publish_args
  Rabbit.config.validate!

  raise "Routing key not specified" unless routing_key
  raise "Event name not specified" unless event

  options = {
    mandatory: confirm_select?,
    persistent: true,
    type: event,
    content_type: "application/json",
    app_id: Rabbit.config.app_name,
    headers: headers,
    message_id: message_id,
  }.tap do |ops|
    ops[:content_encoding] = "gzip" if compress
  end

  [dumped_data, real_exchange_name, routing_key.to_s, options]
end

#dumped_dataObject



75
76
77
78
79
80
81
82
# File 'lib/rabbit/publishing/message.rb', line 75

def dumped_data
  return JSON.dump(data) unless compress
  # NOTE: when compress true and realtime false it means data from job
  # already has been compressed and encoded in base64
  return Rabbit::Compressor.dump(data) if realtime

  Rabbit::Compressor.decode64(data)
end

#real_exchange_nameObject



71
72
73
# File 'lib/rabbit/publishing/message.rb', line 71

def real_exchange_name
  [Rabbit.config.group_id, Rabbit.config.project_id, *exchange_name].join(".")
end

#to_hashObject



33
34
35
36
37
38
39
40
# File 'lib/rabbit/publishing/message.rb', line 33

def to_hash
  instance_variables.each_with_object({}) do |var, hash|
    key = var.to_s.delete("@").to_sym
    next if key == :compress
    value = instance_variable_get(var)
    hash[key] = value
  end.merge(data: data_for_hash)
end

#to_sObject



42
43
44
# File 'lib/rabbit/publishing/message.rb', line 42

def to_s
  "#{real_exchange_name} -> #{routing_key} -> #{event}"
end