Class: EventMeter::Stores::Rollup::Postgres

Inherits:
Object
  • Object
show all
Includes:
CleanupHelpers, Namespace
Defined in:
lib/event_meter/stores/rollup/postgres.rb

Direct Known Subclasses

ActiveRecordPostgres

Constant Summary collapse

MAX_QUERY_PARAMS =
1_000
SCOPED_QUERY_PARAMS =
3
MAX_ENTRY_ID_QUERY_PARAMS =
MAX_QUERY_PARAMS - SCOPED_QUERY_PARAMS
KEY_VALUE_PARAM_COUNT =
2
MAX_KEY_VALUE_ROWS =
MAX_QUERY_PARAMS / KEY_VALUE_PARAM_COUNT
LOCK_ID_MASK =
0x7fff_ffff_ffff_ffff
LOCK_REFRESH_RATIO =
2.0
BIGINT_MAX =
"9223372036854775807"
BIGINT_MIN_ABS =
"9223372036854775808"
BIGINT_DIGITS =
19

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection:, namespace:, table_prefix: "event_meter", report_name: nil, version: nil, lock_scope: nil, connection_lock: nil, lock_connection: nil, lock_connection_lock: nil) ⇒ Postgres

Returns a new instance of Postgres.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/event_meter/stores/rollup/postgres.rb', line 74

def initialize(connection:, namespace:, table_prefix: "event_meter", report_name: nil, version: nil,
  lock_scope: nil, connection_lock: nil, lock_connection: nil, lock_connection_lock: nil)
  self.class.validate_identifier!(table_prefix)

  @connection = connection
  @connection_lock = connection_lock || Monitor.new
  @lock_connection = lock_connection
  @lock_connection_lock = lock_connection_lock
  @namespace = normalize_namespace(namespace)
  @table_prefix = table_prefix
  @report_name = report_name&.to_s
  @version = version&.to_i
  @lock_scope = lock_scope
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def connection
  @connection
end

#connection_lockObject (readonly)

Returns the value of attribute connection_lock.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def connection_lock
  @connection_lock
end

#lock_scopeObject (readonly)

Returns the value of attribute lock_scope.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def lock_scope
  @lock_scope
end

#namespaceObject (readonly)

Returns the value of attribute namespace.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def namespace
  @namespace
end

#report_nameObject (readonly)

Returns the value of attribute report_name.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def report_name
  @report_name
end

#table_prefixObject (readonly)

Returns the value of attribute table_prefix.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def table_prefix
  @table_prefix
end

#versionObject (readonly)

Returns the value of attribute version.



31
32
33
# File 'lib/event_meter/stores/rollup/postgres.rb', line 31

def version
  @version
end

Class Method Details

.install!(connection:, table_prefix: "event_meter") ⇒ Object



70
71
72
# File 'lib/event_meter/stores/rollup/postgres.rb', line 70

def self.install!(connection:, table_prefix: "event_meter")
  connection.exec(schema_sql(table_prefix: table_prefix))
end

.schema_sql(table_prefix: "event_meter") ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/event_meter/stores/rollup/postgres.rb', line 34

def self.schema_sql(table_prefix: "event_meter")
  validate_identifier!(table_prefix)

  <<~SQL
    CREATE TABLE IF NOT EXISTS #{table_prefix}_rollups (
      key text PRIMARY KEY,
      fields jsonb NOT NULL DEFAULT '{}'::jsonb,
      updated_at timestamptz NOT NULL DEFAULT now()
    );

    CREATE TABLE IF NOT EXISTS #{table_prefix}_strings (
      key text PRIMARY KEY,
      value text NOT NULL,
      updated_at timestamptz NOT NULL DEFAULT now()
    );

    CREATE TABLE IF NOT EXISTS #{table_prefix}_processed_entries (
      namespace text NOT NULL,
      event_name text NOT NULL,
      version integer NOT NULL,
      entry_id text NOT NULL,
      created_at timestamptz NOT NULL DEFAULT now(),
      PRIMARY KEY (namespace, event_name, version, entry_id)
    );

    CREATE INDEX IF NOT EXISTS #{table_prefix}_processed_created_at_idx
    ON #{table_prefix}_processed_entries (created_at);

    CREATE INDEX IF NOT EXISTS #{table_prefix}_rollups_key_prefix_idx
    ON #{table_prefix}_rollups (key text_pattern_ops);

    CREATE INDEX IF NOT EXISTS #{table_prefix}_strings_key_prefix_idx
    ON #{table_prefix}_strings (key text_pattern_ops);
  SQL
end

.validate_identifier!(value) ⇒ Object

Raises:

  • (ArgumentError)


229
230
231
232
233
# File 'lib/event_meter/stores/rollup/postgres.rb', line 229

def self.validate_identifier!(value)
  return if value.to_s.match?(/\A[a-zA-Z_][a-zA-Z0-9_]*\z/)

  raise ArgumentError, "invalid PostgreSQL identifier: #{value.inspect}"
end

Instance Method Details

#apply(batch) ⇒ Object



156
157
158
159
160
161
162
163
164
# File 'lib/event_meter/stores/rollup/postgres.rb', line 156

def apply(batch)
  ensure_scoped!

  transaction do
    merge_rollups(batch.rollups)
    upsert_max_strings(batch.state_updates)
    mark_processed_entries(batch.entry_ids)
  end
end

#cleanup_history(before:, events:, interval_state:) ⇒ Object



218
219
220
221
222
223
224
225
226
227
# File 'lib/event_meter/stores/rollup/postgres.rb', line 218

def cleanup_history(before:, events:, interval_state:)
  transaction do
    filter = event_filter(events)
    {
      rollup_keys_deleted: cleanup_rollups(before, filter),
      interval_state_keys_deleted: interval_state ? cleanup_interval_state(before, filter) : 0,
      processed_entries_deleted: cleanup_processed_entries(before, filter)
    }
  end
end

#cleanup_watermark(key) ⇒ Object



195
196
197
# File 'lib/event_meter/stores/rollup/postgres.rb', line 195

def cleanup_watermark(key)
  get(key)
end

#ensure_definition(definition) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/event_meter/stores/rollup/postgres.rb', line 106

def ensure_definition(definition)
  key = definition_key(definition.name, definition.version)
  payload = JSON.generate(definition.to_h)

  transaction do
    row = exec_params("SELECT value FROM #{strings_table} WHERE key = $1 FOR UPDATE", [key]).first
    if row
      ensure_same_definition!(row.fetch("value"), definition)
    else
      insert_string_once(key, payload)
      stored = exec_params("SELECT value FROM #{strings_table} WHERE key = $1 FOR UPDATE", [key]).first
      raise DefinitionChangedError, "#{definition.name} v#{definition.version} definition was not stored" unless stored

      ensure_same_definition!(stored.fetch("value"), definition)
    end
  end
end

#for_report(name:, version:) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/event_meter/stores/rollup/postgres.rb', line 89

def for_report(name:, version:)
  name = name.to_s
  version = version.to_i

  self.class.new(
    connection: connection,
    connection_lock: connection_lock,
    lock_connection: @lock_connection,
    lock_connection_lock: @lock_connection_lock,
    namespace: namespace,
    table_prefix: table_prefix,
    report_name: name,
    version: version,
    lock_scope: "#{Keys.event_name(name)}:#{Keys.version_key(version)}"
  )
end

#forget_processed_ids(ids) ⇒ Object



151
152
153
154
# File 'lib/event_meter/stores/rollup/postgres.rb', line 151

def forget_processed_ids(ids)
  ensure_scoped!
  delete_processed_entries(ids)
end

#get(key) ⇒ Object



190
191
192
193
# File 'lib/event_meter/stores/rollup/postgres.rb', line 190

def get(key)
  row = exec_params("SELECT value FROM #{strings_table} WHERE key = $1", [key]).first
  row&.fetch("value")
end

#hgetall_many(keys) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/event_meter/stores/rollup/postgres.rb', line 166

def hgetall_many(keys)
  return [] if keys.empty?

  rows = keys.each_slice(MAX_QUERY_PARAMS).flat_map do |slice|
    exec_params(
      "SELECT key, fields::text AS fields FROM #{rollups_table} WHERE key IN (#{placeholders(slice)})",
      slice
    )
  end
  by_key = rows.to_h { |row| [row.fetch("key"), parse_hash(row.fetch("fields"))] }
  keys.map { |key| by_key.fetch(key, {}) }
end

#keys_matching(pattern, limit: nil) ⇒ Object



179
180
181
182
183
184
185
186
187
188
# File 'lib/event_meter/stores/rollup/postgres.rb', line 179

def keys_matching(pattern, limit: nil)
  limit = positive_integer(limit, "limit") if limit
  rows = exec_params(
    "SELECT key FROM #{rollups_table} WHERE key LIKE $1 ESCAPE '\\' ORDER BY key",
    [like_prefix_for_pattern(pattern)]
  )

  keys = rows.map { |row| row.fetch("key") }.select { |key| key_matches?(key, pattern) }
  limit ? keys.first(limit) : keys
end

#processed_ids(ids) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/event_meter/stores/rollup/postgres.rb', line 131

def processed_ids(ids)
  ensure_scoped!
  return [] if ids.empty?

  rows = ids.each_slice(MAX_ENTRY_ID_QUERY_PARAMS).flat_map do |slice|
    exec_params(
      <<~SQL,
        SELECT entry_id
        FROM #{processed_table}
        WHERE namespace = $1
          AND event_name = $2
          AND version = $3
          AND entry_id IN (#{placeholders(slice, start: 4)})
      SQL
      scoped_params + slice
    )
  end
  rows.map { |row| row.fetch("entry_id") }
end

#report_definition(name:, version:) ⇒ Object



124
125
126
127
128
129
# File 'lib/event_meter/stores/rollup/postgres.rb', line 124

def report_definition(name:, version:)
  row = exec_params("SELECT value FROM #{strings_table} WHERE key = $1", [definition_key(name, version)]).first
  row && JSON.parse(row.fetch("value"))
rescue JSON::ParserError, TypeError
  nil
end

#with_lock(ttl:) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/event_meter/stores/rollup/postgres.rb', line 203

def with_lock(ttl:)
  ttl = positive_integer(ttl, "lock ttl")
  ensure_independent_lock_connection!
  token = SecureRandom.hex(16)
  locked = acquire_lock_lease(token, ttl)
  return false unless locked

  refresher = start_lock_refresher(token, ttl)
  yield
  true
ensure
  stop_lock_refresher(refresher)
  release_lock_lease(token) if locked
end

#write_cleanup_watermark(key, value) ⇒ Object



199
200
201
# File 'lib/event_meter/stores/rollup/postgres.rb', line 199

def write_cleanup_watermark(key, value)
  upsert_string(key, value)
end