Class: Yes::Core::Subscriptions

Inherits:
Object
  • Object
show all
Includes:
OpenTelemetry::Trackable
Defined in:
lib/yes/core/subscriptions.rb

Overview

Manages PgEventstore subscriptions with optional heartbeat and OpenTelemetry tracing.

Examples:

subscriptions = Yes::Core::Subscriptions.new
subscriptions.subscribe_to_all(handler, filter_opts)
subscriptions.start

Constant Summary collapse

SUBSCRIPTIONS_START_TIMEOUT =

Returns Timeout in seconds for subscriptions to start.

Returns:

  • (Integer)

    Timeout in seconds for subscriptions to start

20

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_name: :default) ⇒ Subscriptions

Initializes subscriptions with the given PgEventstore config.

Parameters:

  • config_name (Symbol) (defaults to: :default)

    the PgEventstore configuration name



28
29
30
31
32
33
34
# File 'lib/yes/core/subscriptions.rb', line 28

def initialize(config_name: :default)
  @subscriptions_manager = PgEventstore.subscriptions_manager(
    config_name,
    subscription_set: Rails.application.class.name.split('::').first
  )
  @client = PgEventstore.client(config_name)
end

Instance Attribute Details

#clientPgEventstore::Client (readonly)

Returns the event store client.

Returns:

  • (PgEventstore::Client)

    the event store client



23
24
25
# File 'lib/yes/core/subscriptions.rb', line 23

def client
  @client
end

#subscriptions_managerPgEventstore::SubscriptionsManager (readonly)

Returns the subscriptions manager.

Returns:

  • (PgEventstore::SubscriptionsManager)

    the subscriptions manager



20
21
22
# File 'lib/yes/core/subscriptions.rb', line 20

def subscriptions_manager
  @subscriptions_manager
end

Instance Method Details

#startvoid

This method returns an undefined value.

Starts the subscriptions manager and optional heartbeat.



54
55
56
57
# File 'lib/yes/core/subscriptions.rb', line 54

def start
  start_heartbeat if Yes::Core.configuration.subscriptions_heartbeat_url.present?
  subscriptions_manager.start
end

#subscribe_to_all(handler, filter_opts, **subscription_opts) ⇒ void

This method returns an undefined value.

Subscribes a handler to all events matching the given filter.

Parameters:

  • handler (#call)

    the event handler

  • filter_opts (Hash)

    PgEventstore filter options

  • subscription_opts (Hash)

    additional subscription options



42
43
44
45
46
47
48
49
# File 'lib/yes/core/subscriptions.rb', line 42

def subscribe_to_all(handler, filter_opts, **subscription_opts)
  subscriptions_manager.subscribe(
    handler.class.to_s,
    handler: self.class.otl_tracer ? otl_trackable_handler(handler) : handler,
    options: { filter: filter_opts, resolve_link_tos: true },
    **subscription_opts
  )
end