Module: Cosmo::Stream

Defined in:
lib/cosmo/stream.rb,
lib/cosmo/stream/data.rb,
lib/cosmo/stream/message.rb,
lib/cosmo/stream/processor.rb,
lib/cosmo/stream/serializer.rb

Defined Under Namespace

Modules: ClassMethods, Serializer Classes: Data, Message, Processor

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



10
11
12
# File 'lib/cosmo/stream.rb', line 10

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#loggerObject



64
65
66
# File 'lib/cosmo/stream.rb', line 64

def logger
  Logger.instance
end

#messageObject



68
69
70
# File 'lib/cosmo/stream.rb', line 68

def message
  Thread.current[:cosmo_message]
end

#process(messages) ⇒ Object Also known as: process_many, process_batch



49
50
51
52
53
54
55
56
# File 'lib/cosmo/stream.rb', line 49

def process(messages)
  messages.each do |message|
    Thread.current[:cosmo_message] = message
    process_one
  ensure
    Thread.current[:cosmo_message] = nil
  end
end

#process_oneObject



60
61
62
# File 'lib/cosmo/stream.rb', line 60

def process_one
  raise NotImplementedError, "#{self.class}#process_one must be implemented"
end

#publish(data, subject, **options) ⇒ Object



72
73
74
# File 'lib/cosmo/stream.rb', line 72

def publish(data, subject, **options)
  self.class.publish(data, subject:, **options)
end