Module: EzLogsAgent::Capturers::BulkDatabaseCapturer

Defined in:
lib/ez_logs_agent/capturers/bulk_database_capturer.rb

Overview

Captures bulk SQL operations that bypass ActiveRecord lifecycle callbacks: delete_all, update_all, insert_all, upsert_all.

## Why this exists

DatabaseCapturer (the per-row sibling) hooks after_create/_update/_destroy to capture rich per-record context (saved_changes, encrypted_attributes, display_name). That model breaks for bulk ops — Rails issues a single UPDATE/DELETE/INSERT statement against the database WITHOUT instantiating records, so the callbacks never fire. Customer code like

Order.where(status: "cart").delete_all
Library.where(closed: true).update_all(status: "active")
User.insert_all([{name: "a"}, {name: "b"}])

plus ‘dependent: :delete_all` cascades during a parent destroy, are all invisible to the callback-based path. This capturer fills the gap.

## How it works

Subscribes to “sql.active_record” — the standard Rails instrumentation API every observability tool uses (Datadog APM, AppSignal, Skylight). On every SQL statement the host app runs, we get a payload with the raw SQL, binds, name, and row_count. We filter aggressively to ONLY four operations (delete_all / update_all / insert_all / upsert_all) by SQL shape detection (BulkSqlParser.detect_operation), then parse + sanitize + ship.

## Dedup vs DatabaseCapturer

Per-row CRUD (‘user.save`, `order.destroy`) fires `after_*` callbacks AND produces an `sql.active_record` notification with a singular name (“User Update”, “Order Destroy”). DatabaseCapturer captures these via callbacks; this capturer ignores them because their SQL shape is NOT one of the four bulk operations. Mutually exclusive — no double-capture.

Cascade case: ‘Company has_many :orders, dependent: :delete_all` issues a single DELETE for the children. Callbacks don’t fire on the children (delete_all bypasses them by design), but this capturer catches the bulk DELETE. The parent’s ‘after_destroy` is captured separately by DatabaseCapturer. Both events share the request’s correlation_id and land under the same Action shell. Reader sees parent + cascade as sibling rows on the timeline — the right narrative.

## Wire shape (matches server EventIngest expectations)

{
  source_type: "bulk_database",
  source_data: {
    model_class: "Order",
    operation: "delete_all" | "update_all" | "insert_all" | "upsert_all",
    row_count: 50000,
    where_template: "\"orders\".\"status\" = $1",
    where_binds: [{column: "status", value: "cart"}],
    set: {"status" => "paid"},        # only update_all
    columns: ["name", "email"]        # only insert_all / upsert_all
  },
  correlation_id: ...,
  resource_ids: [{resource_type: "Order", resource_id: "bulk:50000"}],
  outcome: "success",
  duration_ms: <finish - start>
}

The “bulk:<count>” sentinel resource_id is required because the server’s ResourceAggregationStage drops entries with nil resource_id. The display layer detects the sentinel and renders “Order (50,000 rows)” without a clickable entity link.

Defined Under Namespace

Modules: RelationRowCountStash

Constant Summary collapse

BULK_NAME_HINT =

AR’s ‘payload` convention for the four bulk operations (verified against Rails 7.0–8.0 + SQLite/PG/MySQL):

delete_all  → "<Model> Delete All"
update_all  → "<Model> Update All"
insert_all  → "<Model> Insert"   (or "<Model> Bulk Insert" on older PG)
upsert_all  → "<Model> Upsert"   (or "<Model> Bulk Upsert" on older PG)

Per-row CRUD uses singular operation verbs:

user.save (new)     → "<Model> Create"
user.update         → "<Model> Update"  (no " All")
user.destroy        → "<Model> Destroy"

So the four bulk shapes are uniquely identified by either:

- ending in " All" (covers Delete All / Update All), OR
- the words Insert / Upsert (which are NEVER used for per-row CRUD
  — per-row inserts are tagged "Create", per-row updates "Update").

SQL shape detection (BulkSqlParser.detect_operation) is the actual authority — this filter is only a sub-µs pre-pass to skip non-bulk notifications without parsing SQL.

/ All\z| (Bulk )?(Insert|Upsert)\z/.freeze

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.subscriberObject (readonly)

Returns the value of attribute subscriber.



99
100
101
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 99

def subscriber
  @subscriber
end

Class Method Details

.backfill_row_count(real_count, model_class) ⇒ Object

Patches the row_count on the most recently buffered bulk_database event when its model matches ‘model_class`. Called from the RelationRowCountStash module immediately after `super` returns from delete_all / update_all. The buffer’s ‘peek_last` API is the lightweight read path; we mutate in place because the event hash hasn’t been serialized yet.



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 164

def backfill_row_count(real_count, model_class)
  return if real_count.nil?

  tail = ::EzLogsAgent::Buffer.peek_last
  return unless tail.is_a?(Hash)
  return unless tail[:source_type] == "bulk_database"
  return unless tail.dig(:source_data, :model_class) == model_class.name

  tail[:source_data][:row_count] = real_count
  if (rids = tail[:resource_ids]).is_a?(Array) && rids.first.is_a?(Hash)
    rids.first[:resource_id] = "bulk:#{real_count}"
  end
rescue StandardError => e
  ::EzLogsAgent::Logger.debug("[BulkDatabaseCapturer] backfill_row_count failed: #{e.class}: #{e.message}")
end

.build_resource_ids(model_class, row_count) ⇒ Object

Builds the sentinel resource entry. row_count may be nil (Rails < 7 didn’t ship it; some adapters still don’t), and 0 is also not informative (PG’s update_all returns 0 in some paths) —fall back to “many” so the display reads naturally and the server-side ResourceAggregationStage doesn’t drop it.



470
471
472
473
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 470

def build_resource_ids(model_class, row_count)
  count_str = (row_count.is_a?(Integer) && row_count > 0) ? row_count.to_s : "many"
  [{ resource_type: model_class.name, resource_id: "bulk:#{count_str}" }]
end

.build_source_data(operation:, model_class:, row_count:, parse_result:) ⇒ Object

Builds the source_data hash from the parser result, applying encrypted_attributes drop + sensitive-pattern masking on column-keyed values.



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 386

def build_source_data(operation:, model_class:, row_count:, parse_result:)
  base = {
    model_class: model_class.name,
    operation: operation.to_s,
    row_count: row_count
  }

  return base if parse_result[:unparseable]

  if (set = parse_result[:set])
    base[:set] = mask_set_hash(set, model_class)
  end

  if (template = parse_result[:where_template])
    base[:where_template] = template
    base[:where_binds] = mask_where_binds(parse_result[:where_binds], model_class)
  end

  if (columns = parse_result[:columns])
    base[:columns] = filter_columns(columns, model_class)
  end

  base
end

.capture_enabled?Boolean

Mirrors DatabaseCapturer’s same-named guard. capture_database = false disables both capturers in one switch.

Returns:

  • (Boolean)


477
478
479
480
481
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 477

def capture_enabled?
  ::EzLogsAgent.configuration.capture_database
rescue StandardError
  false
end

.eligible_payload?(payload) ⇒ Boolean

Fast pre-filter — checks the name field WITHOUT touching SQL. Returns false for the vast majority of notifications (per-row CRUD, SCHEMA, TRANSACTION, internal lookups).

Parameters:

  • payload (Hash, nil)

Returns:

  • (Boolean)


310
311
312
313
314
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 310

def eligible_payload?(payload)
  return false unless payload.is_a?(Hash)
  name = payload[:name]
  name.is_a?(String) && name_eligible?(name)
end

.extract_table_name(sql) ⇒ Object

Extracts the unquoted table name from the FROM / INTO / UPDATE clause. Handles all three identifier-quote styles (PG/SQLite/ MySQL). Returns nil on unparseable SQL.



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 366

def extract_table_name(sql)
  # DELETE FROM "table"
  if (m = sql.match(/\ADELETE FROM\s+["`]?([^"`\s]+)["`]?/i))
    return m[1]
  end
  # UPDATE "table"
  if (m = sql.match(/\AUPDATE\s+["`]?([^"`\s]+)["`]?/i))
    return m[1]
  end
  # INSERT INTO "table"
  if (m = sql.match(/\AINSERT INTO\s+["`]?([^"`\s]+)["`]?/i))
    return m[1]
  end

  nil
end

.filter_columns(columns, model_class) ⇒ Object

For insert_all / upsert_all, we ship column names ONLY (no values — product decision). Sensitive column names still need masking so the column LIST itself doesn’t hint “this table has a ‘password` column”. Drop sensitive columns from the displayed list; replace with the literal marker so the count remains true.



453
454
455
456
457
458
459
460
461
462
463
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 453

def filter_columns(columns, model_class)
  columns.map do |col|
    if ::EzLogsAgent::EncryptedAttributes.attribute?(model_class, col)
      "[FILTERED]"
    elsif ::EzLogsAgent::SensitivePatterns.match?(col)
      "[FILTERED]"
    else
      col
    end
  end
end

.format_value_for_json(value) ⇒ Object

Same formatter as DatabaseCapturer. Keeps Date / Time / BigDecimal from collapsing to “[Object]” when they reach Sanitizer / wire.



551
552
553
554
555
556
557
558
559
560
561
562
563
564
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 551

def format_value_for_json(value)
  case value
  when ::Time, ::DateTime
    value.iso8601
  when ::Date
    value.to_s
  when ::BigDecimal
    value.to_f
  when ::Array
    value.map { |v| format_value_for_json(v) }
  else
    value
  end
end

.framework_rewrite?(operation, parse_result) ⇒ Boolean

Detects Rails-generated update_all SQL that we can’t render meaningfully OR that is pure framework noise. We are deliberately narrow here — anything that COULD be a real change a customer cares about (even SET col = NULL on N rows) we keep.

Filtered shapes:

1. Empty SET hash — the parser couldn't extract any column →
   value pairs (e.g. unquoted column names in raw SQL). The
   captured event would render with no "Set X to Y" detail
   and no humanized filter, just "Updated 26 things" with an
   empty Operation block. That's misleading.

2. Counter cache / increment! / decrement! — any SET value is
   a `COALESCE(<col>, 0) + N` expression. Rails uses this
   shape for `increment_counter`, `increment!`,
   `update_counters`. The semantics are "bump a number by N",
   which is plumbing-grade noise: high volume (per-request
   counter bumps fire dozens of times per minute on a busy
   tenant) and zero business meaning to a non-technical
   reader. On the EZLogs server alone these would dominate
   the timeline.

SET <fk> = NULL on N rows IS captured — even though Rails sometimes generates it implicitly as cleanup before a destroy, it’s also the shape of a real customer-issued nullification (soft-orphaning, disassociating tags from an item, etc.), and from the SQL alone we can’t tell the two apart. Showing it honestly is the right call: the reader sees that N rows had their column X set to NULL.

Returns:

  • (Boolean)


528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 528

def framework_rewrite?(operation, parse_result)
  return false unless operation == :update_all
  return false unless parse_result.is_a?(Hash)

  set = parse_result[:set]

  # Empty SET hash: parser bailed. Drop — would render as an
  # empty Operation block.
  return true if set.is_a?(Hash) && set.empty? && !parse_result[:unparseable]

  return false unless set.is_a?(Hash) && set.any?

  # Counter-cache / increment! — any value contains COALESCE().
  # Distinct from a deliberate UPDATE because the SET expression
  # references the column on its own RHS, which no business
  # update_all ever does.
  return true if set.values.any? { |v| v.to_s.include?("COALESCE(") }

  false
end

.handle_notification(_event_name, started, finished, payload) ⇒ Object

Per-notification entry point. Wraps everything in ‘rescue Exception` because an AS::N handler that raises pollutes the host’s subscriber chain and (depending on the chain order) can break OTHER observability tools listening on the same channel. Hard rule: bulk capture failures never propagate.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 225

def handle_notification(_event_name, started, finished, payload)
  # Hot path: this runs once per SQL statement in the host app —
  # often thousands of times per second on a busy tenant. EVERY
  # branch above the cheapest filter has to be fast. Order is:
  #
  #   1. Cheapest possible name check (string suffix, no regex,
  #      no method dispatch on the configuration object). This
  #      rejects ~99% of notifications in <1 µs.
  #   2. Then capture_enabled? (config check).
  #   3. Then everything else.
  # Cheapest possible early-exit: a single instance-variable
  # read. This branch fires on EVERY SQL statement in the host
  # app and must not allocate or call into configuration.
  return unless @capture_enabled
  return unless payload.is_a?(Hash)
  name = payload[:name]
  return unless name && name_eligible?(name)

  operation = ::EzLogsAgent::BulkSqlParser.detect_operation(payload[:sql])
  return unless operation

  model_class = resolve_model_class(payload[:sql])
  return if model_class.nil?
  return if table_excluded?(model_class)

  parse_result = ::EzLogsAgent::BulkSqlParser.parse(
    sql: payload[:sql],
    type_casted_binds: payload[:type_casted_binds]
  )

  # Drop Rails framework rewrites that aren't real business activity:
  #
  #   * Foreign-key nullification — when a parent has `dependent:
  #     :restrict_with_error` AND the child's `belongs_to :parent`
  #     is `optional: true`, Rails rewrites `child.delete_all`
  #     into `UPDATE children SET parent_id = NULL WHERE ...`
  #     before the destroy. There is no business meaning here;
  #     it's framework cleanup before the parent goes away.
  #
  #   * Counter-cache / increment! — `Model.increment_counter(:x)`
  #     and `record.increment!(:x)` compile to
  #     `UPDATE ... SET x = COALESCE(x, 0) + N WHERE id = ?`,
  #     which is Rails plumbing for a numeric counter bump, not
  #     a user-visible change.
  #
  # Both produce high-volume noise on the timeline (see EZLogs's
  # own dogfood where Company#increment_actions_count! fires
  # per ingest batch). Filtering at the capturer means they
  # don't ride the wire at all.
  return if framework_rewrite?(operation, parse_result)

  source_data = build_source_data(
    operation: operation,
    model_class: model_class,
    row_count: payload[:row_count],
    parse_result: parse_result
  )

  duration_ms = ((finished - started) * 1000).to_i

  event = ::EzLogsAgent::EventBuilder.build(
    source_type: :bulk_database,
    source_data: source_data,
    outcome: :success,
    correlation_id: ::EzLogsAgent::Correlation.current,
    resource_ids: build_resource_ids(model_class, source_data[:row_count]),
    context: nil,
    duration_ms: duration_ms
  )

  ::EzLogsAgent::Buffer.push(event)
rescue Exception => e # rubocop:disable Lint/RescueException
  # See class comment: a raise from an AS::N handler hurts other
  # subscribers, so we swallow EVERYTHING (not just StandardError).
  # Logged at error level so a regression surfaces in customer
  # debug output, but never re-raised.
  safe_log_error("handle_notification", e)
end

.installObject

Installs the AS::Notifications subscription. Idempotent — calling twice is a no-op (would otherwise produce double-events because AS::Notifications.subscribe is itself NOT idempotent).

Called from Railtie.install_database_capturer alongside the per-row DatabaseCapturer.install. Both gated by the same ‘capture_database` configuration flag — no new toggle.



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 108

def install
  return if @installed

  # Cache configuration values that the hot path checks per
  # notification. Re-read on every install (specs reinstall
  # after toggling config). Runtime mutations to these settings
  # require uninstall! + install to take effect — acceptable
  # because nobody flips this at runtime in production.
  @capture_enabled =
    begin
      ::EzLogsAgent.configuration.capture_database
    rescue StandardError
      false
    end
  @excluded_tables =
    begin
      ::EzLogsAgent.configuration.all_excluded_tables.dup.freeze
    rescue StandardError
      [].freeze
    end

  install_row_count_capture!

  # 5-arity block bypasses the `*args` splat allocation per
  # notification — measurable on hot paths where we ignore
  # ~99% of events. Block accepts positional args matching the
  # AS::N convention: (name, start, finish, id, payload).
  @subscriber = ::ActiveSupport::Notifications.subscribe("sql.active_record") do |name, started, finished, _id, payload|
    handle_notification(name, started, finished, payload)
  end
  @installed = true
end

.install_row_count_capture!Object

Patches ActiveRecord::Relation’s bulk methods to backfill the affected-row count on the most recently captured bulk_database event. See the comment on RelationRowCountStash below for why this is necessary (Rails’ payload is unreliable for plain DELETE/UPDATE on PG).



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 146

def install_row_count_capture!
  return if @relation_patched
  return unless defined?(::ActiveRecord::Relation)

  ::ActiveRecord::Relation.prepend(RelationRowCountStash)
  @relation_patched = true
rescue StandardError => e
  # Patching is best-effort — if AR's Relation isn't there or the
  # prepend raises, the capturer still works with payload[:row_count].
  ::EzLogsAgent::Logger.debug("[BulkDatabaseCapturer] could not patch Relation: #{e.class}: #{e.message}")
end

.mask_set_hash(set, model_class) ⇒ Object

Walks ‘{ column => value }` from update_all SET, masking values whose column is encrypted OR matches a sensitive pattern. Date / Time / BigDecimal values get JSON-formatted so they don’t collapse to “[Object]” downstream.



415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 415

def mask_set_hash(set, model_class)
  set.each_with_object({}) do |(col, value), acc|
    acc[col] =
      if ::EzLogsAgent::EncryptedAttributes.attribute?(model_class, col)
        "[FILTERED]"
      elsif ::EzLogsAgent::SensitivePatterns.match?(col)
        "[FILTERED]"
      else
        format_value_for_json(value)
      end
  end
end

.mask_where_binds(binds, model_class) ⇒ Object

Walks the array of value: bind entries from the WHERE parser, same masking rules as mask_set_hash. Binds whose column is nil (the parser couldn’t attribute them) ride through with the formatted value — display falls back to template substitution.



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 432

def mask_where_binds(binds, model_class)
  (binds || []).map do |bind|
    col = bind[:column]
    value = bind[:value]
    masked_value =
      if col && (::EzLogsAgent::EncryptedAttributes.attribute?(model_class, col) ||
                 ::EzLogsAgent::SensitivePatterns.match?(col))
        "[FILTERED]"
      else
        format_value_for_json(value)
      end

    { column: col, value: masked_value }
  end
end

.name_eligible?(name) ⇒ Boolean

Cheapest possible bulk-name check — string ‘end_with?` calls, no regex compilation, no method dispatch. Runs on the hot path. The four shapes we care about (per AR convention):

"<Model> Delete All", "<Model> Update All",
"<Model> Insert", "<Model> Upsert"

(Older Rails 7 also used “ Bulk Insert” / “ Bulk Upsert” — those still end with “Insert”/“Upsert” so end_with? catches them.) Per-row CRUD names like “<Model> Create”, “<Model> Update”, “<Model> Destroy” fail all four suffix checks and bail in <1 µs.

Returns:

  • (Boolean)


325
326
327
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 325

def name_eligible?(name)
  name.end_with?(" All") || name.end_with?(" Insert") || name.end_with?(" Upsert")
end

.resolve_model_class(sql) ⇒ Object

Looks up the model class from the SQL’s table name. Returns nil for SQL we can’t attribute (raw multi-table queries, anonymous adapter SQL, schema introspection). Skipping these is correct —we’d have nothing to display anyway.



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 333

def resolve_model_class(sql)
  return nil if sql.nil?

  table = extract_table_name(sql)
  return nil if table.nil?

  # Try the descendants list first — cheap and works in environments
  # where models are already eager-loaded (production, Sidekiq workers).
  loaded = ::ActiveRecord::Base.descendants.find do |klass|
    klass.respond_to?(:table_name) && klass.table_name == table && !klass.abstract_class?
  end
  return loaded if loaded

  # Fallback for development mode and any lazy-autoload path: derive
  # the model class name from the table name via Rails' inflector
  # and try to safe_constantize it. ActiveRecord's reflection class
  # caches it on first call, so subsequent bulk ops on the same
  # table hit the descendants path above.
  constant_name = table.to_s.classify
  klass = constant_name.safe_constantize
  return klass if klass.is_a?(Class) &&
                  klass < ::ActiveRecord::Base &&
                  klass.respond_to?(:table_name) &&
                  klass.table_name == table

  nil
rescue StandardError
  nil
end

.safe_log_error(stage, exception) ⇒ Object



566
567
568
569
570
571
572
573
574
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 566

def safe_log_error(stage, exception)
  ::EzLogsAgent::Logger.error(
    "[BulkDatabaseCapturer] #{stage} failed: #{exception.class} - #{exception.message}"
  )
rescue StandardError
  # Even logging can fail in pathological boot states. We've done
  # everything reasonable; drop the event silently.
  nil
end

.table_excluded?(model_class) ⇒ Boolean

Uses DatabaseCapturer’s existing all_excluded_tables list — one config knob, both capturers obey it. The list is memoized at install time (‘@excluded_tables`) so we don’t pay a Hash method dispatch on every captured event. Customers who change config at runtime need to call uninstall! + install — the same constraint that already applies to ‘capture_database`.

Returns:

  • (Boolean)


489
490
491
492
493
494
495
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 489

def table_excluded?(model_class)
  return false unless model_class.respond_to?(:table_name)

  @excluded_tables.include?(model_class.table_name)
rescue StandardError
  false
end

.uninstall!Object

Removes the subscription. Specs use this between examples to avoid leaked subscribers; production never calls it.



214
215
216
217
218
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 214

def uninstall!
  ::ActiveSupport::Notifications.unsubscribe(@subscriber) if @subscriber
  @subscriber = nil
  @installed = false
end