Module: EzLogsAgent::Capturers::BulkDatabaseCapturer
- Defined in:
- lib/ez_logs_agent/capturers/bulk_database_capturer.rb
Overview
Captures bulk SQL operations that bypass ActiveRecord lifecycle callbacks: delete_all, update_all, insert_all, upsert_all.
## Why this exists
DatabaseCapturer (the per-row sibling) hooks after_create/_update/_destroy to capture rich per-record context (saved_changes, encrypted_attributes, display_name). That model breaks for bulk ops — Rails issues a single UPDATE/DELETE/INSERT statement against the database WITHOUT instantiating records, so the callbacks never fire. Customer code like
Order.where(status: "cart").delete_all
Library.where(closed: true).update_all(status: "active")
User.insert_all([{name: "a"}, {name: "b"}])
plus ‘dependent: :delete_all` cascades during a parent destroy, are all invisible to the callback-based path. This capturer fills the gap.
## How it works
Subscribes to “sql.active_record” — the standard Rails instrumentation API every observability tool uses (Datadog APM, AppSignal, Skylight). On every SQL statement the host app runs, we get a payload with the raw SQL, binds, name, and row_count. We filter aggressively to ONLY four operations (delete_all / update_all / insert_all / upsert_all) by SQL shape detection (BulkSqlParser.detect_operation), then parse + sanitize + ship.
## Dedup vs DatabaseCapturer
Per-row CRUD (‘user.save`, `order.destroy`) fires `after_*` callbacks AND produces an `sql.active_record` notification with a singular name (“User Update”, “Order Destroy”). DatabaseCapturer captures these via callbacks; this capturer ignores them because their SQL shape is NOT one of the four bulk operations. Mutually exclusive — no double-capture.
Cascade case: ‘Company has_many :orders, dependent: :delete_all` issues a single DELETE for the children. Callbacks don’t fire on the children (delete_all bypasses them by design), but this capturer catches the bulk DELETE. The parent’s ‘after_destroy` is captured separately by DatabaseCapturer. Both events share the request’s correlation_id and land under the same Action shell. Reader sees parent + cascade as sibling rows on the timeline — the right narrative.
## Wire shape (matches server EventIngest expectations)
{
source_type: "bulk_database",
source_data: {
model_class: "Order",
operation: "delete_all" | "update_all" | "insert_all" | "upsert_all",
row_count: 50000,
where_template: "\"orders\".\"status\" = $1",
where_binds: [{column: "status", value: "cart"}],
set: {"status" => "paid"}, # only update_all
columns: ["name", "email"] # only insert_all / upsert_all
},
correlation_id: ...,
resource_ids: [{resource_type: "Order", resource_id: "bulk:50000"}],
outcome: "success",
duration_ms: <finish - start>
}
The “bulk:<count>” sentinel resource_id is required because the server’s ResourceAggregationStage drops entries with nil resource_id. The display layer detects the sentinel and renders “Order (50,000 rows)” without a clickable entity link.
Constant Summary collapse
- BULK_NAME_HINT =
AR’s ‘payload` convention for the four bulk operations (verified against Rails 7.0–8.0 + SQLite/PG/MySQL):
delete_all → "<Model> Delete All" update_all → "<Model> Update All" insert_all → "<Model> Insert" (or "<Model> Bulk Insert" on older PG) upsert_all → "<Model> Upsert" (or "<Model> Bulk Upsert" on older PG)Per-row CRUD uses singular operation verbs:
user.save (new) → "<Model> Create" user.update → "<Model> Update" (no " All") user.destroy → "<Model> Destroy"So the four bulk shapes are uniquely identified by either:
- ending in " All" (covers Delete All / Update All), OR - the words Insert / Upsert (which are NEVER used for per-row CRUD — per-row inserts are tagged "Create", per-row updates "Update").SQL shape detection (BulkSqlParser.detect_operation) is the actual authority — this filter is only a sub-µs pre-pass to skip non-bulk notifications without parsing SQL.
/ All\z| (Bulk )?(Insert|Upsert)\z/.freeze
Class Attribute Summary collapse
-
.subscriber ⇒ Object
readonly
Returns the value of attribute subscriber.
Class Method Summary collapse
-
.build_resource_ids(model_class, row_count) ⇒ Object
Builds the sentinel resource entry.
-
.build_source_data(operation:, model_class:, row_count:, parse_result:) ⇒ Object
Builds the source_data hash from the parser result, applying encrypted_attributes drop + sensitive-pattern masking on column-keyed values.
-
.capture_enabled? ⇒ Boolean
Mirrors DatabaseCapturer’s same-named guard.
-
.eligible_payload?(payload) ⇒ Boolean
Fast pre-filter — checks the name field WITHOUT touching SQL.
-
.extract_table_name(sql) ⇒ Object
Extracts the unquoted table name from the FROM / INTO / UPDATE clause.
-
.filter_columns(columns, model_class) ⇒ Object
For insert_all / upsert_all, we ship column names ONLY (no values — product decision).
-
.format_value_for_json(value) ⇒ Object
Same formatter as DatabaseCapturer.
-
.handle_notification(_event_name, started, finished, payload) ⇒ Object
Per-notification entry point.
-
.install ⇒ Object
Installs the AS::Notifications subscription.
-
.mask_set_hash(set, model_class) ⇒ Object
Walks ‘{ column => value }` from update_all SET, masking values whose column is encrypted OR matches a sensitive pattern.
-
.mask_where_binds(binds, model_class) ⇒ Object
Walks the array of value: bind entries from the WHERE parser, same masking rules as mask_set_hash.
-
.resolve_model_class(sql) ⇒ Object
Looks up the model class from the SQL’s table name.
- .safe_log_error(stage, exception) ⇒ Object
-
.table_excluded?(model_class) ⇒ Boolean
Uses DatabaseCapturer’s existing all_excluded_tables list — one config knob, both capturers obey it.
-
.uninstall! ⇒ Object
Removes the subscription.
Class Attribute Details
.subscriber ⇒ Object (readonly)
Returns the value of attribute subscriber.
99 100 101 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 99 def subscriber @subscriber end |
Class Method Details
.build_resource_ids(model_class, row_count) ⇒ Object
Builds the sentinel resource entry. row_count may be nil (Rails < 7 didn’t ship it; some adapters still don’t) — fall back to “bulk” so the entry is non-nil and the server-side ResourceAggregationStage doesn’t drop it.
316 317 318 319 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 316 def build_resource_ids(model_class, row_count) count_str = row_count.is_a?(Integer) ? row_count.to_s : "unknown" [{ resource_type: model_class.name, resource_id: "bulk:#{count_str}" }] end |
.build_source_data(operation:, model_class:, row_count:, parse_result:) ⇒ Object
Builds the source_data hash from the parser result, applying encrypted_attributes drop + sensitive-pattern masking on column-keyed values.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 233 def build_source_data(operation:, model_class:, row_count:, parse_result:) base = { model_class: model_class.name, operation: operation.to_s, row_count: row_count } return base if parse_result[:unparseable] if (set = parse_result[:set]) base[:set] = mask_set_hash(set, model_class) end if (template = parse_result[:where_template]) base[:where_template] = template base[:where_binds] = mask_where_binds(parse_result[:where_binds], model_class) end if (columns = parse_result[:columns]) base[:columns] = filter_columns(columns, model_class) end base end |
.capture_enabled? ⇒ Boolean
Mirrors DatabaseCapturer’s same-named guard. capture_database = false disables both capturers in one switch.
323 324 325 326 327 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 323 def capture_enabled? ::EzLogsAgent.configuration.capture_database rescue StandardError false end |
.eligible_payload?(payload) ⇒ Boolean
Fast pre-filter — checks the name field WITHOUT touching SQL. Returns false for the vast majority of notifications (per-row CRUD, SCHEMA, TRANSACTION, internal lookups).
184 185 186 187 188 189 190 191 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 184 def eligible_payload?(payload) return false unless payload.is_a?(Hash) name = payload[:name].to_s return false if name.empty? BULK_NAME_HINT.match?(name) end |
.extract_table_name(sql) ⇒ Object
Extracts the unquoted table name from the FROM / INTO / UPDATE clause. Handles all three identifier-quote styles (PG/SQLite/ MySQL). Returns nil on unparseable SQL.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 213 def extract_table_name(sql) # DELETE FROM "table" if (m = sql.match(/\ADELETE FROM\s+["`]?([^"`\s]+)["`]?/i)) return m[1] end # UPDATE "table" if (m = sql.match(/\AUPDATE\s+["`]?([^"`\s]+)["`]?/i)) return m[1] end # INSERT INTO "table" if (m = sql.match(/\AINSERT INTO\s+["`]?([^"`\s]+)["`]?/i)) return m[1] end nil end |
.filter_columns(columns, model_class) ⇒ Object
For insert_all / upsert_all, we ship column names ONLY (no values — product decision). Sensitive column names still need masking so the column LIST itself doesn’t hint “this table has a ‘password` column”. Drop sensitive columns from the displayed list; replace with the literal marker so the count remains true.
300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 300 def filter_columns(columns, model_class) columns.map do |col| if ::EzLogsAgent::EncryptedAttributes.attribute?(model_class, col) "[FILTERED]" elsif ::EzLogsAgent::SensitivePatterns.match?(col) "[FILTERED]" else col end end end |
.format_value_for_json(value) ⇒ Object
Same formatter as DatabaseCapturer. Keeps Date / Time / BigDecimal from collapsing to “[Object]” when they reach Sanitizer / wire.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 341 def format_value_for_json(value) case value when ::Time, ::DateTime value.iso8601 when ::Date value.to_s when ::BigDecimal value.to_f when ::Array value.map { |v| format_value_for_json(v) } else value end end |
.handle_notification(_event_name, started, finished, payload) ⇒ Object
Per-notification entry point. Wraps everything in ‘rescue Exception` because an AS::N handler that raises pollutes the host’s subscriber chain and (depending on the chain order) can break OTHER observability tools listening on the same channel. Hard rule: bulk capture failures never propagate.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 134 def handle_notification(_event_name, started, finished, payload) return unless capture_enabled? return unless eligible_payload?(payload) operation = ::EzLogsAgent::BulkSqlParser.detect_operation(payload[:sql]) return unless operation model_class = resolve_model_class(payload[:sql]) return if model_class.nil? return if table_excluded?(model_class) parse_result = ::EzLogsAgent::BulkSqlParser.parse( sql: payload[:sql], type_casted_binds: payload[:type_casted_binds] ) source_data = build_source_data( operation: operation, model_class: model_class, row_count: payload[:row_count], parse_result: parse_result ) duration_ms = ((finished - started) * 1000).to_i event = ::EzLogsAgent::EventBuilder.build( source_type: :bulk_database, source_data: source_data, outcome: :success, correlation_id: ::EzLogsAgent::Correlation.current, resource_ids: build_resource_ids(model_class, source_data[:row_count]), context: nil, duration_ms: duration_ms ) ::EzLogsAgent::Buffer.push(event) rescue Exception => e # rubocop:disable Lint/RescueException # See class comment: a raise from an AS::N handler hurts other # subscribers, so we swallow EVERYTHING (not just StandardError). # Logged at error level so a regression surfaces in customer # debug output, but never re-raised. safe_log_error("handle_notification", e) end |
.install ⇒ Object
Installs the AS::Notifications subscription. Idempotent — calling twice is a no-op (would otherwise produce double-events because AS::Notifications.subscribe is itself NOT idempotent).
Called from Railtie.install_database_capturer alongside the per-row DatabaseCapturer.install. Both gated by the same ‘capture_database` configuration flag — no new toggle.
108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 108 def install return if @installed @subscriber = ::ActiveSupport::Notifications.subscribe("sql.active_record") do |*args| payload = args.last event_name = args.first started = args[1] finished = args[2] handle_notification(event_name, started, finished, payload) end @installed = true end |
.mask_set_hash(set, model_class) ⇒ Object
Walks ‘{ column => value }` from update_all SET, masking values whose column is encrypted OR matches a sensitive pattern. Date / Time / BigDecimal values get JSON-formatted so they don’t collapse to “[Object]” downstream.
262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 262 def mask_set_hash(set, model_class) set.each_with_object({}) do |(col, value), acc| acc[col] = if ::EzLogsAgent::EncryptedAttributes.attribute?(model_class, col) "[FILTERED]" elsif ::EzLogsAgent::SensitivePatterns.match?(col) "[FILTERED]" else format_value_for_json(value) end end end |
.mask_where_binds(binds, model_class) ⇒ Object
Walks the array of value: bind entries from the WHERE parser, same masking rules as mask_set_hash. Binds whose column is nil (the parser couldn’t attribute them) ride through with the formatted value — display falls back to template substitution.
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 279 def mask_where_binds(binds, model_class) (binds || []).map do |bind| col = bind[:column] value = bind[:value] masked_value = if col && (::EzLogsAgent::EncryptedAttributes.attribute?(model_class, col) || ::EzLogsAgent::SensitivePatterns.match?(col)) "[FILTERED]" else format_value_for_json(value) end { column: col, value: masked_value } end end |
.resolve_model_class(sql) ⇒ Object
Looks up the model class from the SQL’s table name. Returns nil for SQL we can’t attribute (raw multi-table queries, anonymous adapter SQL, schema introspection). Skipping these is correct —we’d have nothing to display anyway.
197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 197 def resolve_model_class(sql) return nil if sql.nil? table = extract_table_name(sql) return nil if table.nil? ::ActiveRecord::Base.descendants.find do |klass| klass.respond_to?(:table_name) && klass.table_name == table && !klass.abstract_class? end rescue StandardError nil end |
.safe_log_error(stage, exception) ⇒ Object
356 357 358 359 360 361 362 363 364 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 356 def safe_log_error(stage, exception) ::EzLogsAgent::Logger.error( "[BulkDatabaseCapturer] #{stage} failed: #{exception.class} - #{exception.}" ) rescue StandardError # Even logging can fail in pathological boot states. We've done # everything reasonable; drop the event silently. nil end |
.table_excluded?(model_class) ⇒ Boolean
Uses DatabaseCapturer’s existing all_excluded_tables list — one config knob, both capturers obey it.
331 332 333 334 335 336 337 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 331 def table_excluded?(model_class) return false unless model_class.respond_to?(:table_name) ::EzLogsAgent.configuration.all_excluded_tables.include?(model_class.table_name) rescue StandardError false end |
.uninstall! ⇒ Object
Removes the subscription. Specs use this between examples to avoid leaked subscribers; production never calls it.
123 124 125 126 127 |
# File 'lib/ez_logs_agent/capturers/bulk_database_capturer.rb', line 123 def uninstall! ::ActiveSupport::Notifications.unsubscribe(@subscriber) if @subscriber @subscriber = nil @installed = false end |