Class: Textus::Hooks::EventBus

Inherits:
Object
  • Object
show all
Defined in:
lib/textus/hooks/event_bus.rb

Defined Under Namespace

Classes: HookTimeout

Constant Summary collapse

HOOK_TIMEOUT_SECONDS =
2
EVENTS =
{
  entry_put: %i[ctx key envelope],
  entry_deleted: %i[ctx key],
  entry_refreshed: %i[ctx key envelope change],
  entry_renamed: %i[ctx key from_key to_key envelope],
  build_completed: %i[ctx key envelope sources],
  proposal_accepted: %i[ctx key target_key],
  proposal_rejected: %i[ctx key target_key],
  file_published: %i[ctx key envelope source target],
  store_loaded: %i[ctx],
  refresh_started: %i[ctx key mode],
  refresh_failed: %i[ctx key error_class error_message],
  refresh_backgrounded: %i[ctx key started_at budget_ms],
}.freeze
RPC_EVENTS =
%i[resolve_intake transform_rows validate].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(error_log: ErrorLog.new) ⇒ EventBus

Returns a new instance of EventBus.



27
28
29
30
31
# File 'lib/textus/hooks/event_bus.rb', line 27

def initialize(error_log: ErrorLog.new)
  @pubsub = Hash.new { |h, k| h[k] = [] }
  @error_handlers = []
  @error_log = error_log
end

Instance Attribute Details

#error_logObject (readonly)

Returns the value of attribute error_log.



33
34
35
# File 'lib/textus/hooks/event_bus.rb', line 33

def error_log
  @error_log
end

Instance Method Details

#listeners(event, key:) ⇒ Object



51
# File 'lib/textus/hooks/event_bus.rb', line 51

def listeners(event, key:) = @pubsub[event.to_sym].select { |h| match?(h[:keys], key) }

#on(event, name, keys: nil) ⇒ Object



35
# File 'lib/textus/hooks/event_bus.rb', line 35

def on(event, name, keys: nil, &) = register(event, name, keys: keys, &)

#on_error(&block) ⇒ Object



49
# File 'lib/textus/hooks/event_bus.rb', line 49

def on_error(&block) = @error_handlers << block

#publish(event, strict: false, **kwargs) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/textus/hooks/event_bus.rb', line 55

def publish(event, strict: false, **kwargs)
  key = kwargs[:key] || "-"
  fired = []
  errored = []
  timed_out = []
  raised = nil

  @pubsub[event.to_sym].each do |sub|
    next unless match?(sub[:keys], key)

    outcome, err = invoke(event, sub, key, kwargs)
    case outcome
    when :ok        then fired << sub[:name]
    when :errored   then errored << sub[:name]
    when :timed_out then timed_out << sub[:name]
    end
    raised ||= err if strict && err
  end

  raise raised if strict && raised

  FireReport.new(fired: fired, errored: errored, timed_out: timed_out)
end

#pubsub_handlers(event) ⇒ Object



53
# File 'lib/textus/hooks/event_bus.rb', line 53

def pubsub_handlers(event) = @pubsub[event.to_sym]

#register(event, name, keys: nil, &blk) ⇒ Object

Raises:



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/textus/hooks/event_bus.rb', line 37

def register(event, name, keys: nil, &blk)
  event_sym = event.to_sym
  raise UsageError.new("#{event_sym} is an RPC event; register on RpcRegistry") if RPC_EVENTS.include?(event_sym)

  required = EVENTS[event_sym] or raise UsageError.new("unknown event: #{event}")
  shape_check!(event_sym, required, blk)
  name = name.to_sym
  raise UsageError.new("#{event_sym} hook '#{name}' already registered") if @pubsub[event_sym].any? { |h| h[:name] == name }

  @pubsub[event_sym] << { name: name, callable: blk, keys: keys }
end