Class: Pgbus::Streams::Filters

Inherits:
Object
  • Object
show all
Defined in:
lib/pgbus/streams/filters.rb

Overview

Process-wide registry of server-side audience filter predicates. Used by ‘Pgbus.stream(name).broadcast(html, visible_to: :label)` to restrict delivery to connections whose authorize-hook context matches the predicate.

Typical setup at boot time:

Pgbus::Streams.filters.register(:admin_only) { |user| user.admin? }
Pgbus::Streams.filters.register(:workspace_member) do |user, stream|
  user.workspaces.pluck(:id).include?(stream.split(":").last.to_i)
end

Broadcasts reference filters by label:

Pgbus.stream("workspace:42").broadcast(html, visible_to: :admin_only)

The Dispatcher looks up the filter in the registry, evaluates it against each connection’s context (populated from the StreamApp’s authorize hook return value), and only delivers to connections where the predicate returns true.

Why a registry of labels instead of passing a Proc directly to broadcast: predicates can’t be serialized to JSON, so they can’t travel through PGMQ. The label is serialized; the predicate lives in-process on the subscriber side. This also means the predicate is evaluated on the same process that holds the SSE connection, so the user context (typically an ActiveRecord model or a session hash) is always available.

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil) ⇒ Filters

Returns a new instance of Filters.



34
35
36
37
38
# File 'lib/pgbus/streams/filters.rb', line 34

def initialize(logger: nil)
  @mutex = Mutex.new
  @filters = {}
  @logger = logger
end

Instance Method Details

#lookup(label) ⇒ Object



49
50
51
# File 'lib/pgbus/streams/filters.rb', line 49

def lookup(label)
  @mutex.synchronize { @filters[label] }
end

#register(label, callable = nil, &block) ⇒ Object

Raises:

  • (ArgumentError)


40
41
42
43
44
45
46
47
# File 'lib/pgbus/streams/filters.rb', line 40

def register(label, callable = nil, &block)
  raise ArgumentError, "filter label must be a Symbol (got #{label.class})" unless label.is_a?(Symbol)

  predicate = callable || block
  raise ArgumentError, "filter must be given a block or callable" if predicate.nil?

  @mutex.synchronize { @filters[label] = predicate }
end

#visible?(label, context) ⇒ Boolean

Evaluates the named filter against a context. The context is whatever the StreamApp’s authorize hook returned when the connection was established — typically a user model.

Policy decisions:

- label=nil → visible (no filter attached to the broadcast)
- unknown label → NOT visible + warning log. Fail-closed so
  a typo or renamed filter doesn't turn a restricted
  broadcast into a public one. The whole point of audience
  filtering is data isolation; failing open on a typo
  defeats the feature. The warning log is loud enough that
  typos still get noticed in dev (check the log or wonder
  why no subscriber sees your broadcast).
- predicate raises → NOT visible (fail-closed on runtime
  error to avoid leaking data on an exception path).

Returns:

  • (Boolean)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/pgbus/streams/filters.rb', line 68

def visible?(label, context)
  return true if label.nil?

  predicate = lookup(label)
  if predicate.nil?
    log_warn("unknown filter label #{label.inspect} — broadcast dropped (fail-closed)")
    return false
  end

  begin
    !!predicate.call(context)
  rescue StandardError => e
    log_error("filter #{label.inspect} raised #{e.class}: #{e.message} — dropping broadcast (fail-closed)")
    false
  end
end