Class: Pgque::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/pgque/consumer.rb

Constant Summary collapse

DEFAULT_MAX_MESSAGES =
2_147_483_647
WAIT_SLICE_SECONDS =
0.5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(dsn, queue:, name:, poll_interval: 30, max_messages: DEFAULT_MAX_MESSAGES, retry_after: 60, unknown_handler_policy: "nack", subconsumer: nil, dead_interval: nil, logger: nil) ⇒ Consumer

Returns a new instance of Consumer.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pgque/consumer.rb', line 17

def initialize(dsn, queue:, name:, poll_interval: 30,
               max_messages: DEFAULT_MAX_MESSAGES, retry_after: 60,
               unknown_handler_policy: "nack", subconsumer: nil,
               dead_interval: nil, logger: nil)
  @dsn = dsn
  @queue = queue
  @name = name
  @poll_interval = poll_interval
  @max_messages = max_messages
  @retry_after = retry_after

  unless ["nack", "ack"].include?(unknown_handler_policy.to_s)
    raise ArgumentError,
          "unknown_handler_policy must be 'nack' or 'ack', " \
          "got #{unknown_handler_policy.inspect}"
  end
  @unknown_handler_policy = unknown_handler_policy.to_s

  if dead_interval && subconsumer.nil?
    raise ArgumentError,
          "dead_interval is only valid in cooperative mode " \
          "(set subconsumer:)"
  end
  @subconsumer = subconsumer
  @dead_interval = dead_interval

  @handlers = {}
  @default_handler = nil
  # @running is a plain boolean. Ruby integer/boolean assignment
  # is atomic, and the only cross-thread interactions are the
  # signal trap and Consumer#stop flipping it false while the
  # main loop polls running? -- no ordering dependencies, so a
  # mutex would be overkill (and unsafe to enter from a signal
  # trap, which raises ThreadError on Mutex#synchronize).
  @running = false
  @stop_signum = nil
  @logger = logger || default_logger
end

Instance Attribute Details

#dead_intervalObject (readonly)

Returns the value of attribute dead_interval.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def dead_interval
  @dead_interval
end

#dsnObject (readonly)

Returns the value of attribute dsn.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def dsn
  @dsn
end

#loggerObject

Returns the value of attribute logger.



15
16
17
# File 'lib/pgque/consumer.rb', line 15

def logger
  @logger
end

#max_messagesObject (readonly)

Returns the value of attribute max_messages.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def max_messages
  @max_messages
end

#nameObject (readonly)

Returns the value of attribute name.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def name
  @name
end

#poll_intervalObject (readonly)

Returns the value of attribute poll_interval.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def poll_interval
  @poll_interval
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def queue
  @queue
end

#retry_afterObject (readonly)

Returns the value of attribute retry_after.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def retry_after
  @retry_after
end

#subconsumerObject (readonly)

Returns the value of attribute subconsumer.



12
13
14
# File 'lib/pgque/consumer.rb', line 12

def subconsumer
  @subconsumer
end

Instance Method Details

#on(event_type, &block) ⇒ Object

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
# File 'lib/pgque/consumer.rb', line 56

def on(event_type, &block)
  raise ArgumentError, "block required for Consumer#on" unless block

  if event_type == "*"
    @default_handler = block
  else
    @handlers[event_type] = block
  end
  block
end

#poll_once(conn) ⇒ Object

Public for testability; not part of the stable API.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/pgque/consumer.rb', line 134

def poll_once(conn)
  conn.transaction do
    client = Client.new(conn)
    msgs =
      if @subconsumer
        client.receive_coop(
          @queue, @name, @subconsumer,
          max_messages: @max_messages,
          dead_interval: @dead_interval,
        )
      else
        client.receive(@queue, @name, @max_messages)
      end

    next if msgs.empty?

    batch_id = msgs[0].batch_id
    @logger.debug("batch #{batch_id}: #{msgs.size} message(s)")

    nack_failed = dispatch_batch(client, batch_id, msgs)

    next if nack_failed

    rowcount = client.ack(batch_id)
    if rowcount == 0
      @logger.warn(
        "pgque: ack batch #{batch_id} returned 0 -- stale or " \
        "double ack (batch already finished or not found)",
      )
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/pgque/consumer.rb', line 129

def running?
  @running
end

#startObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/pgque/consumer.rb', line 67

def start
  @running = true
  @stop_signum = nil

  in_main_thread = (Thread.current == Thread.main)
  original_handlers = {}

  # Signal traps run in a restricted context: Mutex#synchronize,
  # Logger#info, and most blocking code raise ThreadError. Keep
  # this proc to plain instance-variable writes; the main loop
  # logs the signal number after waking up.
  stop_proc = ->(signum) {
    @stop_signum = signum
    @running = false
  }

  if in_main_thread
    ["TERM", "INT"].each do |sig|
      original_handlers[sig] = Signal.trap(sig) { stop_proc.call(sig) }
    end
  end

  begin
    conn = PG.connect(@dsn)
    begin
      channel = "pgque_#{@queue}"
      conn.exec("LISTEN #{conn.escape_identifier(channel)}")
      @logger.info(
        "consumer #{@name} listening on #{@queue} (poll=#{@poll_interval}s)"
      )

      while running?
        poll_once(conn)
        break unless running?
        wait_for_notify_or_stop(conn)
      end

      if @stop_signum
        @logger.info("received signal #{@stop_signum}, shutting down")
      end
    ensure
      conn.close unless conn.finished?
    end
  ensure
    # Clear running? before logging so callers observing the flag
    # see "stopped" by the time the log line is written -- and so
    # an exception during PG.connect, LISTEN, or the poll loop
    # leaves the consumer in a consistent state instead of a
    # ghost "running" with no live worker. Plain instance-var
    # write -- not the trap-context-unsafe pattern.
    @running = false
    if in_main_thread
      original_handlers.each { |sig, h| Signal.trap(sig, h || "DEFAULT") }
    end
    @logger.info("consumer #{@name} stopped")
  end
end

#stopObject



125
126
127
# File 'lib/pgque/consumer.rb', line 125

def stop
  @running = false
end