Class: Pcrd::Connection::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/pcrd/connection/client.rb

Constant Summary collapse

DEFAULT_SESSION_SETTINGS =

Conservative per-session defaults applied to every connection.

application_name                     — identifies pcrd in pg_stat_activity
lock_timeout=5s                      — fail fast instead of blocking
                                       production behind a lock (DDL etc.)
idle_in_transaction_session_timeout  — release locks if a transaction is
  =60s                                 left open idle (e.g. a stalled tool)
statement_timeout=0                  — DISABLED on purpose: backfill COPY
                                       and large batches run for a long
                                       time and must not be killed

Override per pool via ‘settings:`; values are GUC strings (units allowed).

{
  "application_name"                    => "pcrd",
  "lock_timeout"                        => "5s",
  "idle_in_transaction_session_timeout" => "60s",
  "statement_timeout"                   => "0"
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, settings: {}) ⇒ Client

Returns a new instance of Client.



29
30
31
32
33
# File 'lib/pcrd/connection/client.rb', line 29

def initialize(config, settings: {})
  @config           = config
  @session_settings = DEFAULT_SESSION_SETTINGS.merge(settings)
  @conn             = nil
end

Instance Attribute Details

#session_settingsObject (readonly)

Returns the value of attribute session_settings.



27
28
29
# File 'lib/pcrd/connection/client.rb', line 27

def session_settings
  @session_settings
end

Instance Method Details

#closeObject



80
81
82
83
# File 'lib/pcrd/connection/client.rb', line 80

def close
  @conn&.close
  @conn = nil
end

#connected?Boolean

Returns:

  • (Boolean)


85
86
87
# File 'lib/pcrd/connection/client.rb', line 85

def connected?
  @conn && !@conn.finished?
end

#copy_data(sql) ⇒ Object

For COPY … FROM STDIN. Yields the raw PG::Connection so the caller can call conn.put_copy_data(line) inside the block.



53
54
55
56
57
58
59
60
# File 'lib/pcrd/connection/client.rb', line 53

def copy_data(sql)
  connection.copy_data(sql) { yield connection }
rescue PG::Error => e
  # A failed COPY can leave the connection mid-COPY or in an aborted
  # transaction; reset it like exec/exec_sql so the next call is usable.
  reset_connection!
  raise Error, e.message
end

#escape_literal(val) ⇒ Object



66
67
68
# File 'lib/pcrd/connection/client.rb', line 66

def escape_literal(val)
  connection.escape_literal(val.to_s)
end

#exec(sql, params = []) ⇒ Object

For parameterized queries (SELECT, INSERT with $1 placeholders).



36
37
38
39
40
41
# File 'lib/pcrd/connection/client.rb', line 36

def exec(sql, params = [])
  connection.exec_params(sql, params)
rescue PG::Error => e
  reset_connection!
  raise Error, e.message
end

#exec_sql(sql) ⇒ Object

For DDL and multi-statement SQL where no parameter substitution is needed.



44
45
46
47
48
49
# File 'lib/pcrd/connection/client.rb', line 44

def exec_sql(sql)
  connection.exec(sql)
rescue PG::Error => e
  reset_connection!
  raise Error, e.message
end

#quote_ident(name) ⇒ Object



62
63
64
# File 'lib/pcrd/connection/client.rb', line 62

def quote_ident(name)
  connection.quote_ident(name)
end

#session_optionsObject

libpq options string that applies the session settings at connect time (-c key=value), so they are in force for the very first statement. application_name is excluded here — it is passed as the dedicated connect parameter because a -c value is overridden by libpq’s fallback_application_name.



94
95
96
97
98
99
# File 'lib/pcrd/connection/client.rb', line 94

def session_options
  @session_settings
    .reject { |key, _| key == "application_name" }
    .map    { |key, value| "-c #{key}=#{value}" }
    .join(" ")
end

#transactionObject



70
71
72
73
74
75
76
77
78
# File 'lib/pcrd/connection/client.rb', line 70

def transaction
  exec("BEGIN")
  result = yield
  exec("COMMIT")
  result
rescue StandardError
  exec("ROLLBACK") rescue nil
  raise
end