Class: ActiveEventStore::SubscriberJob

Inherits:
ActiveJob::Base
  • Object
show all
Defined in:
lib/active_event_store/subscriber_job.rb

Overview

Base job for async subscribers

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.subscriberObject

Returns the value of attribute subscriber.



7
8
9
# File 'lib/active_event_store/subscriber_job.rb', line 7

def subscriber
  @subscriber
end

Class Method Details

.for(callable) ⇒ Object

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
# File 'lib/active_event_store/subscriber_job.rb', line 31

def for(callable)
  raise ArgumentError, "Async subscriber must be a module/class" unless callable.is_a?(Module)

  callable.const_defined?(:SubscriberJob, false) ?
    callable.const_get(:SubscriberJob, false) :
    nil
end

.from(callable) ⇒ Object

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/active_event_store/subscriber_job.rb', line 9

def from(callable)
  if callable.is_a?(Proc) || callable.name.nil?
    raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) " \
                          "could not be asynchronous (use sync: true)"
  end

  raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module)

  if callable.const_defined?(:SubscriberJob, false)
    callable.const_get(:SubscriberJob, false)
  else
    callable.const_set(
      :SubscriberJob,
      Class.new(self).tap do |job|
        queue_as ActiveEventStore.config.job_queue_name

        job.subscriber = callable
      end
    )
  end
end

Instance Method Details

#perform(payload) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/active_event_store/subscriber_job.rb', line 40

def perform(payload)
  event = event_store.deserialize(**payload, serializer: ActiveEventStore.config.serializer)

  event_store.(**event..to_h) do
    subscriber.call(event)
  end
end