Module: Cosmo::Stream::ClassMethods
- Defined in:
- lib/cosmo/stream.rb
Instance Method Summary collapse
- #default_options ⇒ Object
-
#options(stream: nil, consumer_name: nil, batch_size: nil, fetch_timeout: nil, start_position: nil, consumer: nil, publisher: nil) ⇒ Object
rubocop:disable Metrics/ParameterLists.
- #publish(data, subject: nil, **options) ⇒ Object
-
#register ⇒ Object
rubocop:disable Metrics/AbcSize.
Instance Method Details
#default_options ⇒ Object
26 27 28 |
# File 'lib/cosmo/stream.rb', line 26 def @default_options ||= Utils::Hash.dup(superclass.respond_to?(:default_options) ? superclass. : Data::DEFAULTS) end |
#options(stream: nil, consumer_name: nil, batch_size: nil, fetch_timeout: nil, start_position: nil, consumer: nil, publisher: nil) ⇒ Object
rubocop:disable Metrics/ParameterLists
15 16 17 18 |
# File 'lib/cosmo/stream.rb', line 15 def (stream: nil, consumer_name: nil, batch_size: nil, fetch_timeout: nil, start_position: nil, consumer: nil, publisher: nil) # rubocop:disable Metrics/ParameterLists register .merge!({ stream:, consumer_name:, batch_size:, fetch_timeout:, start_position:, consumer:, publisher: }.compact) end |
#publish(data, subject: nil, **options) ⇒ Object
20 21 22 23 24 |
# File 'lib/cosmo/stream.rb', line 20 def publish(data, subject: nil, **) stream = [:stream] subject ||= .dig(:publisher, :subject) Publisher.publish(subject, data, stream: stream, serializer: .dig(:publisher, :serializer), **) end |
#register ⇒ Object
rubocop:disable Metrics/AbcSize
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/cosmo/stream.rb', line 30 def register # rubocop:disable Metrics/AbcSize Config.system[:streams] ||= [] Config.system[:streams] << self # settings are inherited, don't try to modify them return if != Data::DEFAULTS class_name = Utils::String.underscore(name) .merge!(stream: class_name, consumer_name: "consumer-#{class_name}", publisher: { subject: "#{class_name}.default" }) subjects = .dig(:consumer, :subjects) subjects&.map! { format(_1, name: class_name) } subject = [:publisher][:subject] [:publisher][:subject] = format(subject, name: class_name) end |