Class: Ration::Backends::Postgres

Inherits:
Base
  • Object
show all
Defined in:
lib/ration/backends/postgres.rb

Constant Summary collapse

DEFAULT_CHANNEL =
'ration'
INITIAL_BACKOFF_SECONDS =
1
MAX_BACKOFF_SECONDS =
30
DEFAULT_POLL_INTERVAL =
1.0

Constants inherited from Base

Base::DEFAULT_MAX_PAYLOAD_BYTES

Instance Method Summary collapse

Methods inherited from Base

#on_event

Constructor Details

#initialize(url:, channel: DEFAULT_CHANNEL, max_payload_bytes: DEFAULT_MAX_PAYLOAD_BYTES, poll_interval: DEFAULT_POLL_INTERVAL, publish_with: nil, logger: nil) ⇒ Postgres

Returns a new instance of Postgres.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/ration/backends/postgres.rb', line 13

def initialize(
  url:,
  channel:           DEFAULT_CHANNEL,
  max_payload_bytes: DEFAULT_MAX_PAYLOAD_BYTES,
  poll_interval:     DEFAULT_POLL_INTERVAL,
  publish_with:      nil,
  logger:            nil
)
  super()
  @url               = url
  @channel           = channel
  @max_payload_bytes = max_payload_bytes
  @poll_interval     = poll_interval
  @publish_with      = publish_with
  @logger            = logger || Logger.new($stderr)
  @thread            = nil
  @stop              = false
end

Instance Method Details

#publish(event) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/ration/backends/postgres.rb', line 32

def publish(event)
  payload = event.to_json
  check_payload_size!(payload, @max_payload_bytes)

  if @publish_with
    @publish_with.call(@channel, payload)
  else
    conn = PG.connect(@url)
    begin
      conn.exec_params('SELECT pg_notify($1, $2)', [@channel, payload])
    ensure
      conn.close
    end
  end
end

#startObject



48
49
50
51
52
53
54
# File 'lib/ration/backends/postgres.rb', line 48

def start
  return if @thread

  @stop = false
  initial_conn = connect_and_listen
  @thread = Thread.new { run_loop(initial_conn) }
end

#stopObject



56
57
58
59
60
# File 'lib/ration/backends/postgres.rb', line 56

def stop
  @stop = true
  @thread&.join
  @thread = nil
end