Class: Pgque::Consumer
- Inherits:
-
Object
- Object
- Pgque::Consumer
- Defined in:
- lib/pgque/consumer.rb
Constant Summary collapse
- DEFAULT_MAX_MESSAGES =
2_147_483_647- WAIT_SLICE_SECONDS =
0.5
Instance Attribute Summary collapse
-
#dead_interval ⇒ Object
readonly
Returns the value of attribute dead_interval.
-
#dsn ⇒ Object
readonly
Returns the value of attribute dsn.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#max_messages ⇒ Object
readonly
Returns the value of attribute max_messages.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#poll_interval ⇒ Object
readonly
Returns the value of attribute poll_interval.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#retry_after ⇒ Object
readonly
Returns the value of attribute retry_after.
-
#subconsumer ⇒ Object
readonly
Returns the value of attribute subconsumer.
Instance Method Summary collapse
-
#initialize(dsn, queue:, name:, poll_interval: 30, max_messages: DEFAULT_MAX_MESSAGES, retry_after: 60, unknown_handler_policy: "nack", subconsumer: nil, dead_interval: nil, logger: nil) ⇒ Consumer
constructor
A new instance of Consumer.
- #on(event_type, &block) ⇒ Object
-
#poll_once(conn) ⇒ Object
Public for testability; not part of the stable API.
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(dsn, queue:, name:, poll_interval: 30, max_messages: DEFAULT_MAX_MESSAGES, retry_after: 60, unknown_handler_policy: "nack", subconsumer: nil, dead_interval: nil, logger: nil) ⇒ Consumer
Returns a new instance of Consumer.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pgque/consumer.rb', line 17 def initialize(dsn, queue:, name:, poll_interval: 30, max_messages: DEFAULT_MAX_MESSAGES, retry_after: 60, unknown_handler_policy: "nack", subconsumer: nil, dead_interval: nil, logger: nil) @dsn = dsn @queue = queue @name = name @poll_interval = poll_interval @max_messages = @retry_after = retry_after unless ["nack", "ack"].include?(unknown_handler_policy.to_s) raise ArgumentError, "unknown_handler_policy must be 'nack' or 'ack', " \ "got #{unknown_handler_policy.inspect}" end @unknown_handler_policy = unknown_handler_policy.to_s if dead_interval && subconsumer.nil? raise ArgumentError, "dead_interval is only valid in cooperative mode " \ "(set subconsumer:)" end @subconsumer = subconsumer @dead_interval = dead_interval @handlers = {} @default_handler = nil # @running is a plain boolean. Ruby integer/boolean assignment # is atomic, and the only cross-thread interactions are the # signal trap and Consumer#stop flipping it false while the # main loop polls running? -- no ordering dependencies, so a # mutex would be overkill (and unsafe to enter from a signal # trap, which raises ThreadError on Mutex#synchronize). @running = false @stop_signum = nil @logger = logger || default_logger end |
Instance Attribute Details
#dead_interval ⇒ Object (readonly)
Returns the value of attribute dead_interval.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def dead_interval @dead_interval end |
#dsn ⇒ Object (readonly)
Returns the value of attribute dsn.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def dsn @dsn end |
#logger ⇒ Object
Returns the value of attribute logger.
15 16 17 |
# File 'lib/pgque/consumer.rb', line 15 def logger @logger end |
#max_messages ⇒ Object (readonly)
Returns the value of attribute max_messages.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def @max_messages end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def name @name end |
#poll_interval ⇒ Object (readonly)
Returns the value of attribute poll_interval.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def poll_interval @poll_interval end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def queue @queue end |
#retry_after ⇒ Object (readonly)
Returns the value of attribute retry_after.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def retry_after @retry_after end |
#subconsumer ⇒ Object (readonly)
Returns the value of attribute subconsumer.
12 13 14 |
# File 'lib/pgque/consumer.rb', line 12 def subconsumer @subconsumer end |
Instance Method Details
#on(event_type, &block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/pgque/consumer.rb', line 56 def on(event_type, &block) raise ArgumentError, "block required for Consumer#on" unless block if event_type == "*" @default_handler = block else @handlers[event_type] = block end block end |
#poll_once(conn) ⇒ Object
Public for testability; not part of the stable API.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/pgque/consumer.rb', line 134 def poll_once(conn) conn.transaction do client = Client.new(conn) msgs = if @subconsumer client.receive_coop( @queue, @name, @subconsumer, max_messages: @max_messages, dead_interval: @dead_interval, ) else client.receive(@queue, @name, @max_messages) end next if msgs.empty? batch_id = msgs[0].batch_id @logger.debug("batch #{batch_id}: #{msgs.size} message(s)") nack_failed = dispatch_batch(client, batch_id, msgs) next if nack_failed rowcount = client.ack(batch_id) if rowcount == 0 @logger.warn( "pgque: ack batch #{batch_id} returned 0 -- stale or " \ "double ack (batch already finished or not found)", ) end end end |
#running? ⇒ Boolean
129 130 131 |
# File 'lib/pgque/consumer.rb', line 129 def running? @running end |
#start ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/pgque/consumer.rb', line 67 def start @running = true @stop_signum = nil in_main_thread = (Thread.current == Thread.main) original_handlers = {} # Signal traps run in a restricted context: Mutex#synchronize, # Logger#info, and most blocking code raise ThreadError. Keep # this proc to plain instance-variable writes; the main loop # logs the signal number after waking up. stop_proc = ->(signum) { @stop_signum = signum @running = false } if in_main_thread ["TERM", "INT"].each do |sig| original_handlers[sig] = Signal.trap(sig) { stop_proc.call(sig) } end end begin conn = PG.connect(@dsn) begin channel = "pgque_#{@queue}" conn.exec("LISTEN #{conn.escape_identifier(channel)}") @logger.info( "consumer #{@name} listening on #{@queue} (poll=#{@poll_interval}s)" ) while running? poll_once(conn) break unless running? wait_for_notify_or_stop(conn) end if @stop_signum @logger.info("received signal #{@stop_signum}, shutting down") end ensure conn.close unless conn.finished? end ensure # Clear running? before logging so callers observing the flag # see "stopped" by the time the log line is written -- and so # an exception during PG.connect, LISTEN, or the poll loop # leaves the consumer in a consistent state instead of a # ghost "running" with no live worker. Plain instance-var # write -- not the trap-context-unsafe pattern. @running = false if in_main_thread original_handlers.each { |sig, h| Signal.trap(sig, h || "DEFAULT") } end @logger.info("consumer #{@name} stopped") end end |
#stop ⇒ Object
125 126 127 |
# File 'lib/pgque/consumer.rb', line 125 def stop @running = false end |