Class: Pgque::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pgque/client.rb

Overview

Thin wrapper over the pgque SQL functions.

Note: Pgque::Client#send mirrors the SQL ‘pgque.send(queue, payload)` primitive and the Python/TS client surface. That name shadows Ruby’s Object#send, so use #__send__ or #public_send when you need to invoke a method on a Pgque::Client instance reflectively.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, owns_conn: false) ⇒ Client

Returns a new instance of Client.



22
23
24
25
# File 'lib/pgque/client.rb', line 22

def initialize(conn, owns_conn: false)
  @conn = conn
  @owns_conn = owns_conn
end

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



13
14
15
# File 'lib/pgque/client.rb', line 13

def conn
  @conn
end

Class Method Details

.connect(dsn) ⇒ Object



15
16
17
18
19
20
# File 'lib/pgque/client.rb', line 15

def self.connect(dsn)
  conn = PG.connect(dsn)
  new(conn, owns_conn: true)
rescue PG::ConnectionBad => e
  raise ConnectionError, e.message
end

Instance Method Details

#ack(batch_id) ⇒ Object



78
79
80
81
82
83
# File 'lib/pgque/client.rb', line 78

def ack(batch_id)
  result = @conn.exec_params("select pgque.ack($1)", [batch_id])
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#closeObject



27
28
29
30
31
# File 'lib/pgque/client.rb', line 27

def close
  return unless @owns_conn
  return if @conn.finished?
  @conn.close
end

#force_next_tick(queue) ⇒ Object



103
104
105
106
107
108
109
# File 'lib/pgque/client.rb', line 103

def force_next_tick(queue)
  result = @conn.exec_params("select pgque.force_next_tick($1)", [queue])
  v = scalar(result)
  v.nil? || v.empty? ? nil : v.to_i
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#nack(batch_id, msg, retry_after: 60, reason: nil) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pgque/client.rb', line 169

def nack(batch_id, msg, retry_after: 60, reason: nil)
  payload_str = case msg.payload
                when Hash, Array then JSON.dump(msg.payload)
                when nil         then "null"
                else                  msg.payload.to_s
                end
  created_at_str = format_created_at(msg.created_at)

  @conn.exec_params(
    "select pgque.nack($1, " \
    "ROW($2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)::pgque.message, " \
    "$12::interval, $13)",
    [
      batch_id, msg.msg_id, msg.batch_id, msg.type, payload_str,
      msg.retry_count, created_at_str,
      msg.extra1, msg.extra2, msg.extra3, msg.extra4,
      "#{retry_after} seconds", reason,
    ],
  )
  nil
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#receive(queue, consumer, max_messages = 100) ⇒ Object



68
69
70
71
72
73
74
75
76
# File 'lib/pgque/client.rb', line 68

def receive(queue, consumer, max_messages = 100)
  result = @conn.exec_params(
    "select * from pgque.receive($1, $2, $3)",
    [queue, consumer, max_messages],
  )
  result.each_row.map { |row| row_to_message(row) }
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#receive_coop(queue, consumer, subconsumer, max_messages: 100, dead_interval: nil) ⇒ Object



148
149
150
151
152
153
154
155
156
157
# File 'lib/pgque/client.rb', line 148

def receive_coop(queue, consumer, subconsumer, max_messages: 100,
                 dead_interval: nil)
  result = @conn.exec_params(
    "select * from pgque.receive_coop($1, $2, $3, $4, $5::interval)",
    [queue, consumer, subconsumer, max_messages, dead_interval],
  )
  result.each_row.map { |row| row_to_message(row) }
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#send(queue, payload, type: "default") ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pgque/client.rb', line 33

def send(queue, payload, type: "default")
  if payload.is_a?(Event)
    type = payload.type
    payload = payload.payload
  end
  encoded = encode_payload(payload)
  result =
    if custom_type?(type)
      @conn.exec_params(
        "select pgque.send($1, $2, $3::jsonb)",
        [queue, type, encoded],
      )
    else
      @conn.exec_params(
        "select pgque.send($1, $2::jsonb)",
        [queue, encoded],
      )
    end
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#send_batch(queue, type, payloads) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/pgque/client.rb', line 56

def send_batch(queue, type, payloads)
  encoded = payloads.map { |p| encode_payload(p) }
  array_literal = pg_text_array(encoded)
  result = @conn.exec_params(
    "select unnest(pgque.send_batch($1, $2, $3::jsonb[]))",
    [queue, type, array_literal],
  )
  integer_column(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#subscribe(queue, consumer) ⇒ Object



85
86
87
88
89
90
91
92
# File 'lib/pgque/client.rb', line 85

def subscribe(queue, consumer)
  result = @conn.exec_params(
    "select pgque.subscribe($1, $2)", [queue, consumer]
  )
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#subscribe_subconsumer(queue, consumer, subconsumer) ⇒ Object

Experimental: function names, edge-case behavior, and signatures may change before the cooperative API is marked stable.



128
129
130
131
132
133
134
135
136
# File 'lib/pgque/client.rb', line 128

def subscribe_subconsumer(queue, consumer, subconsumer)
  result = @conn.exec_params(
    "select pgque.subscribe_subconsumer($1, $2, $3)",
    [queue, consumer, subconsumer],
  )
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#ticker(queue) ⇒ Object



111
112
113
114
115
116
117
# File 'lib/pgque/client.rb', line 111

def ticker(queue)
  result = @conn.exec_params("select pgque.ticker($1)", [queue])
  v = scalar(result)
  v.nil? || v.empty? ? nil : v.to_i
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#ticker_allObject



119
120
121
122
123
124
# File 'lib/pgque/client.rb', line 119

def ticker_all
  result = @conn.exec_params("select pgque.ticker()", [])
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#touch_subconsumer(queue, consumer, subconsumer) ⇒ Object



159
160
161
162
163
164
165
166
167
# File 'lib/pgque/client.rb', line 159

def touch_subconsumer(queue, consumer, subconsumer)
  result = @conn.exec_params(
    "select pgque.touch_subconsumer($1, $2, $3)",
    [queue, consumer, subconsumer],
  )
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#unsubscribe(queue, consumer) ⇒ Object



94
95
96
97
98
99
100
101
# File 'lib/pgque/client.rb', line 94

def unsubscribe(queue, consumer)
  result = @conn.exec_params(
    "select pgque.unsubscribe($1, $2)", [queue, consumer]
  )
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end

#unsubscribe_subconsumer(queue, consumer, subconsumer, batch_handling: 0) ⇒ Object



138
139
140
141
142
143
144
145
146
# File 'lib/pgque/client.rb', line 138

def unsubscribe_subconsumer(queue, consumer, subconsumer, batch_handling: 0)
  result = @conn.exec_params(
    "select pgque.unsubscribe_subconsumer($1, $2, $3, $4)",
    [queue, consumer, subconsumer, batch_handling],
  )
  integer_scalar(result)
rescue PG::Error => e
  raise_wrapped_sql_error(e)
end