Class: Pgque::Client
- Inherits:
-
Object
- Object
- Pgque::Client
- Defined in:
- lib/pgque/client.rb
Overview
Thin wrapper over the pgque SQL functions.
Note: Pgque::Client#send mirrors the SQL ‘pgque.send(queue, payload)` primitive and the Python/TS client surface. That name shadows Ruby’s Object#send, so use #__send__ or #public_send when you need to invoke a method on a Pgque::Client instance reflectively.
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
Class Method Summary collapse
Instance Method Summary collapse
- #ack(batch_id) ⇒ Object
- #close ⇒ Object
- #force_next_tick(queue) ⇒ Object
-
#initialize(conn, owns_conn: false) ⇒ Client
constructor
A new instance of Client.
- #nack(batch_id, msg, retry_after: 60, reason: nil) ⇒ Object
- #receive(queue, consumer, max_messages = 100) ⇒ Object
- #receive_coop(queue, consumer, subconsumer, max_messages: 100, dead_interval: nil) ⇒ Object
- #send(queue, payload, type: "default") ⇒ Object
- #send_batch(queue, type, payloads) ⇒ Object
- #subscribe(queue, consumer) ⇒ Object
-
#subscribe_subconsumer(queue, consumer, subconsumer) ⇒ Object
Experimental: function names, edge-case behavior, and signatures may change before the cooperative API is marked stable.
- #ticker(queue) ⇒ Object
- #ticker_all ⇒ Object
- #touch_subconsumer(queue, consumer, subconsumer) ⇒ Object
- #unsubscribe(queue, consumer) ⇒ Object
- #unsubscribe_subconsumer(queue, consumer, subconsumer, batch_handling: 0) ⇒ Object
Constructor Details
#initialize(conn, owns_conn: false) ⇒ Client
Returns a new instance of Client.
22 23 24 25 |
# File 'lib/pgque/client.rb', line 22 def initialize(conn, owns_conn: false) @conn = conn @owns_conn = owns_conn end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
13 14 15 |
# File 'lib/pgque/client.rb', line 13 def conn @conn end |
Class Method Details
.connect(dsn) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/pgque/client.rb', line 15 def self.connect(dsn) conn = PG.connect(dsn) new(conn, owns_conn: true) rescue PG::ConnectionBad => e raise ConnectionError, e. end |
Instance Method Details
#ack(batch_id) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/pgque/client.rb', line 78 def ack(batch_id) result = @conn.exec_params("select pgque.ack($1)", [batch_id]) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#close ⇒ Object
27 28 29 30 31 |
# File 'lib/pgque/client.rb', line 27 def close return unless @owns_conn return if @conn.finished? @conn.close end |
#force_next_tick(queue) ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/pgque/client.rb', line 103 def force_next_tick(queue) result = @conn.exec_params("select pgque.force_next_tick($1)", [queue]) v = scalar(result) v.nil? || v.empty? ? nil : v.to_i rescue PG::Error => e raise_wrapped_sql_error(e) end |
#nack(batch_id, msg, retry_after: 60, reason: nil) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pgque/client.rb', line 169 def nack(batch_id, msg, retry_after: 60, reason: nil) payload_str = case msg.payload when Hash, Array then JSON.dump(msg.payload) when nil then "null" else msg.payload.to_s end created_at_str = format_created_at(msg.created_at) @conn.exec_params( "select pgque.nack($1, " \ "ROW($2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)::pgque.message, " \ "$12::interval, $13)", [ batch_id, msg.msg_id, msg.batch_id, msg.type, payload_str, msg.retry_count, created_at_str, msg.extra1, msg.extra2, msg.extra3, msg.extra4, "#{retry_after} seconds", reason, ], ) nil rescue PG::Error => e raise_wrapped_sql_error(e) end |
#receive(queue, consumer, max_messages = 100) ⇒ Object
68 69 70 71 72 73 74 75 76 |
# File 'lib/pgque/client.rb', line 68 def receive(queue, consumer, = 100) result = @conn.exec_params( "select * from pgque.receive($1, $2, $3)", [queue, consumer, ], ) result.each_row.map { |row| (row) } rescue PG::Error => e raise_wrapped_sql_error(e) end |
#receive_coop(queue, consumer, subconsumer, max_messages: 100, dead_interval: nil) ⇒ Object
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/pgque/client.rb', line 148 def receive_coop(queue, consumer, subconsumer, max_messages: 100, dead_interval: nil) result = @conn.exec_params( "select * from pgque.receive_coop($1, $2, $3, $4, $5::interval)", [queue, consumer, subconsumer, , dead_interval], ) result.each_row.map { |row| (row) } rescue PG::Error => e raise_wrapped_sql_error(e) end |
#send(queue, payload, type: "default") ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pgque/client.rb', line 33 def send(queue, payload, type: "default") if payload.is_a?(Event) type = payload.type payload = payload.payload end encoded = encode_payload(payload) result = if custom_type?(type) @conn.exec_params( "select pgque.send($1, $2, $3::jsonb)", [queue, type, encoded], ) else @conn.exec_params( "select pgque.send($1, $2::jsonb)", [queue, encoded], ) end integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#send_batch(queue, type, payloads) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pgque/client.rb', line 56 def send_batch(queue, type, payloads) encoded = payloads.map { |p| encode_payload(p) } array_literal = pg_text_array(encoded) result = @conn.exec_params( "select unnest(pgque.send_batch($1, $2, $3::jsonb[]))", [queue, type, array_literal], ) integer_column(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#subscribe(queue, consumer) ⇒ Object
85 86 87 88 89 90 91 92 |
# File 'lib/pgque/client.rb', line 85 def subscribe(queue, consumer) result = @conn.exec_params( "select pgque.subscribe($1, $2)", [queue, consumer] ) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#subscribe_subconsumer(queue, consumer, subconsumer) ⇒ Object
Experimental: function names, edge-case behavior, and signatures may change before the cooperative API is marked stable.
128 129 130 131 132 133 134 135 136 |
# File 'lib/pgque/client.rb', line 128 def subscribe_subconsumer(queue, consumer, subconsumer) result = @conn.exec_params( "select pgque.subscribe_subconsumer($1, $2, $3)", [queue, consumer, subconsumer], ) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#ticker(queue) ⇒ Object
111 112 113 114 115 116 117 |
# File 'lib/pgque/client.rb', line 111 def ticker(queue) result = @conn.exec_params("select pgque.ticker($1)", [queue]) v = scalar(result) v.nil? || v.empty? ? nil : v.to_i rescue PG::Error => e raise_wrapped_sql_error(e) end |
#ticker_all ⇒ Object
119 120 121 122 123 124 |
# File 'lib/pgque/client.rb', line 119 def ticker_all result = @conn.exec_params("select pgque.ticker()", []) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#touch_subconsumer(queue, consumer, subconsumer) ⇒ Object
159 160 161 162 163 164 165 166 167 |
# File 'lib/pgque/client.rb', line 159 def touch_subconsumer(queue, consumer, subconsumer) result = @conn.exec_params( "select pgque.touch_subconsumer($1, $2, $3)", [queue, consumer, subconsumer], ) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#unsubscribe(queue, consumer) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/pgque/client.rb', line 94 def unsubscribe(queue, consumer) result = @conn.exec_params( "select pgque.unsubscribe($1, $2)", [queue, consumer] ) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |
#unsubscribe_subconsumer(queue, consumer, subconsumer, batch_handling: 0) ⇒ Object
138 139 140 141 142 143 144 145 146 |
# File 'lib/pgque/client.rb', line 138 def unsubscribe_subconsumer(queue, consumer, subconsumer, batch_handling: 0) result = @conn.exec_params( "select pgque.unsubscribe_subconsumer($1, $2, $3, $4)", [queue, consumer, subconsumer, batch_handling], ) integer_scalar(result) rescue PG::Error => e raise_wrapped_sql_error(e) end |