Class: Exwiw::Adapter::MongodbAdapter

Inherits:
Base
  • Object
show all
Defined in:
lib/exwiw/adapter/mongodb_adapter.rb

Defined Under Namespace

Classes: EmbeddedMask, MaskPlan, StreamingResult

Constant Summary collapse

EXPLAIN_PLACEHOLDER_OID_HEX =

A recognizably-fake ObjectId substituted for captured parent ids when building scope filters in explain placeholder mode (see #explain_scope_with_placeholders! and #parent_state_for).

"ffffffffffffffffffffffff"
DEFAULT_EXPLAIN_VERBOSITY =

Default explain verbosity. queryPlanner asks the server to PLAN the query without executing it, so it is safe to run against a production source — no documents are scanned or returned. executionStats and allPlansExecution actually run the query to collect runtime stats (the latter also runs the rejected candidate plans), so they carry the cost of the real extraction query.

"queryPlanner"
DEFAULT_BULK_INSERT_CHUNK_SIZE =

Bound how many documents are serialized at once when a collection config carries no explicit bulk_insert_chunk_size. A MongoDB dump is one JSONL line per document and, without chunking, the Runner would materialize the entire collection's output as a single giant string while the full in-memory result set is still alive — doubling peak memory on large or embed-heavy collections. Chunking lets the Runner stream each slice to the file and release its serialized string (and the transient extended-JSON trees) before building the next.

1_000
INDEX_OPTION_ALLOWLIST =

Index options copied through to the emitted createIndex call. Anything else (v, ns, server-internal fields) is dropped — they would either be rejected by createIndex or are not portable across mongod versions.

%w[
  unique sparse hidden expireAfterSeconds collation
  partialFilterExpression wildcardProjection
].freeze
PLACEHOLDER_PATTERN =
/\{([^{}]+)\}/

Instance Attribute Summary collapse

Attributes inherited from Base

#connection_config

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#commented_sql, #post_insert_sql, #pre_insert_sql, #query_comment_text, #sql_query_comment, #to_copy_from_stdin, #write_inserts

Constructor Details

#initialize(connection_config, logger) ⇒ MongodbAdapter

Returns a new instance of MongodbAdapter.



68
69
70
71
72
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 68

def initialize(connection_config, logger)
  super
  @state = {}
  @explain_placeholder = false
end

Instance Attribute Details

#stateObject

Propagation @state accessor, used ONLY by MongodbParallelDumper to seed a forked worker with the slice of parent ids its collections reference and to harvest the ids downstream collections will $in-match against (handed between processes as Marshal sidecars). The serial Runner never touches these — it relies on the in-process capture during #execute.



100
101
102
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 100

def state
  @state
end

Class Method Details

.table_config_classObject



13
14
15
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 13

def self.table_config_class
  Exwiw::MongodbCollectionConfig
end

Instance Method Details

#build_query(config, dump_target, config_by_name) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 126

def build_query(config, dump_target, config_by_name)
  if config.embedded?
    raise NotImplementedError,
          "MongodbAdapter#build_query was called with embedded config '#{config.name}'. " \
          "Embedded configs are masked through the parent collection."
  end

  reject_filter!(config)
  # Stash the embedded-children index for the matching to_bulk_insert call
  # below. The Adapter contract does not pass config_by_name to
  # to_bulk_insert (SQL adapters don't need it), so we rely on the Runner
  # invariant that build_query is always called before to_bulk_insert for
  # the same config.
  @embedded_children_by_parent = index_embedded_children(config_by_name)

  # Which of this collection's fields downstream children will `$in`-match
  # against (always including primary_key). Stashed for the matching
  # #execute call to capture, by the same build_query-before-execute
  # invariant the embedded index relies on.
  @propagation_keys = propagation_keys_for(config, config_by_name)

  filter =
    if config.name == dump_target.table_name
      # `--ids-field` may override which field --ids is matched against;
      # otherwise fall back to the primary key. Note this only changes the
      # WHERE filter on the target collection — downstream foreign-key
      # propagation keys off each child belongs_to's `references` field
      # (default: the parent primary_key); see #execute, which stashes
      # those fields into @state.
      #
      # Type coercion is only applied to the primary key (`_id`), whose
      # stored type we know (Mongoid's default ObjectId). For a custom
      # `ids_field` the stored type is unknown, so the textual --ids are
      # left as Strings rather than guessed at — the caller passes values
      # matching the field's actual type.
      if dump_target.ids_field
        { dump_target.ids_field => { "$in" => dump_target.ids } }
      else
        { config.primary_key => { "$in" => coerce_ids(dump_target.ids) } }
      end
    else
      related_collection_filter(config, config_by_name, dump_target)
    end

  Exwiw::MongoQuery::Find.new(
    collection: config.name,
    primary_key: config.primary_key,
    filter: filter,
    projection: build_projection(config, @propagation_keys),
  )
end

#default_bulk_insert_chunk_sizeObject



264
265
266
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 264

def default_bulk_insert_chunk_size
  DEFAULT_BULK_INSERT_CHUNK_SIZE
end

#describe_query(query) ⇒ Object



244
245
246
247
248
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 244

def describe_query(query)
  "find collection=#{query.collection} filter=#{query.filter.inspect} projection=#{query.projection.inspect}"
rescue => e
  "<unavailable: #{e.class}: #{e.message}>"
end

#dump_schema(ordered_tables, output_path) ⇒ Object



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 280

def dump_schema(ordered_tables, output_path)
  require 'json'

  collections = ordered_tables.reject(&:embedded?)

  # Index listing targets a specific collection, and MongoDB raises
  # NamespaceNotFound (code 26) for one that does not exist. The schema may
  # declare collections absent from this database (schema/DB drift, or a
  # sparse dev DB), so resolve the set that actually exists up front and emit
  # indexes only for those. `createCollection` is still emitted for every
  # config below, so the target schema is created in full regardless.
  existing_collections = db.database.collection_names.to_set

  File.open(output_path, 'w') do |file|
    file.puts("// Auto-generated by exwiw. Apply with: mongosh \"$MONGODB_URI\" #{File.basename(output_path)}")
    file.puts

    collections.each do |config|
      name = config.name
      file.puts(%(try { db.createCollection(#{JSON.generate(name)}); } catch (e) { if (e.code !== 48) throw e; }))
    end
    file.puts

    collections.each do |config|
      name = config.name
      unless existing_collections.include?(name)
        @logger.debug("  Collection '#{name}' is not present in the source database; emitting no indexes.")
        next
      end

      indexes = db[name].indexes.to_a.reject { |idx| idx['name'] == '_id_' }
      indexes.each do |idx|
        key = idx['key']
        opts = idx.slice(*INDEX_OPTION_ALLOWLIST)
        opts['name'] = idx['name'] if idx['name']
        file.puts(%(db.getCollection(#{JSON.generate(name)}).createIndex(#{JSON.generate(key)}, #{JSON.generate(opts)});))
      end
    end
  end
  @logger.info("  Wrote schema for #{collections.size} collection(s) to #{output_path}.")
end

#dumpable?(config) ⇒ Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 114

def dumpable?(config)
  !config.embedded?
end

#estimated_count(collection_name) ⇒ Object

Cheap, metadata-only document-count estimate for collection_name, used by the parallel dumper to weight collections for LPT bin-packing. This only influences which worker processes a collection (never the output bytes), so an imprecise estimate is harmless. Reads collection metadata rather than running a COLLSCAN; returns 0 on any error (e.g. a collection absent from this database just sorts to the lowest weight).



108
109
110
111
112
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 108

def estimated_count(collection_name)
  db[collection_name].estimated_document_count
rescue StandardError
  0
end

#execute(query) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 178

def execute(query)
  @logger.debug("  Executing Mongo find on '#{query.collection}': filter=#{query.filter.inspect} projection=#{query.projection.inspect}")

  view = db[query.collection]
    .find(query.filter)
    .projection(query.projection)
    .comment(query_comment_text("collection=#{query.collection}"))

  # Per referenced field, the values children will `$in`-match against.
  # @propagation_keys is set by the build_query call for this same
  # collection; fall back to the primary key if execute is driven without a
  # preceding build_query (e.g. in isolation from a test).
  keys = @propagation_keys || [query.primary_key]

  # Return a streaming view of the result set rather than `.to_a`-ing the
  # whole collection into memory. The Runner pulls documents through
  # `each_slice`, so only one chunk's worth is resident at a time even for
  # large / embed-heavy collections — the dump's dominant memory cost. The
  # propagation-key values are captured as the cursor streams and published
  # into @state once the pass completes (see StreamingResult).
  StreamingResult.new(view: view, collection: query.collection, keys: keys, state: @state)
end

#explain(query, verbosity: nil) ⇒ Object

Server-side explain for the find this dump would issue, returned as pretty-printed relaxed extended JSON (same value semantics as the dumped documents). verbosity is one of queryPlanner / executionStats / allPlansExecution; see DEFAULT_EXPLAIN_VERBOSITY for the safety implications. A String verbosity is passed through to the driver as-is.



231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 231

def explain(query, verbosity: nil)
  verbosity ||= DEFAULT_EXPLAIN_VERBOSITY
  @logger.debug("  Running explain (verbosity=#{verbosity}) on '#{query.collection}': filter=#{query.filter.inspect}")

  result = db[query.collection]
    .find(query.filter)
    .projection(query.projection)
    .comment(query_comment_text("collection=#{query.collection}"))
    .explain(verbosity: verbosity)

  JSON.pretty_generate(result.as_extended_json(mode: :relaxed))
end

#explain_scope_with_placeholders!Object

Switch #build_query into placeholder-scope mode for explain. The mongodb scope of a non-target collection is its parents' captured ids, which the serial dump harvests at runtime in #execute. explain never executes a query, so that @state stays empty and every scoped child would otherwise fall back to the match-nothing {_id: {$in: []}} filter — hiding which field (e.g. a foreign key) the real dump would actually filter on, and thus whether it is indexed. In this mode #parent_state_for returns a placeholder id for each dumped parent instead of reading @state, so build_query emits the real foreign-key filter shape with a dummy value. queryPlanner picks an index by the queried FIELD, not the value, so index selection (IXSCAN vs COLLSCAN) is reported correctly even though the bound value is fake.



91
92
93
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 91

def explain_scope_with_placeholders!
  @explain_placeholder = true
end

#output_extensionObject



250
251
252
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 250

def output_extension
  'jsonl'
end

#schema_output_extensionObject



268
269
270
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 268

def schema_output_extension
  'js'
end

#supports_bulk_delete?Boolean

Returns:

  • (Boolean)


322
323
324
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 322

def supports_bulk_delete?
  false
end

#to_bulk_delete(_query, _config) ⇒ Object

Raises:

  • (NotImplementedError)


214
215
216
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 214

def to_bulk_delete(_query, _config)
  raise NotImplementedError, "MongodbAdapter does not support bulk delete"
end

#to_bulk_insert(rows, config) ⇒ Object

NOTE: relies on @embedded_children_by_parent set by a prior build_query call for the same config. This implicit ordering exists because the Adapter contract intentionally does not thread config_by_name through to_bulk_insert (SQL adapters don't need it). Safe in Runner, fragile in tests — call build_query first.



206
207
208
209
210
211
212
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 206

def to_bulk_insert(rows, config)
  plan = mask_plan(config)
  rows.map do |doc|
    apply_mask_plan!(doc, plan)
    Exwiw::ExtJson.encode(doc)
  end.join("\n")
end

#validate_as_dump_target!(config) ⇒ Object

Raises:

  • (NotImplementedError)


118
119
120
121
122
123
124
# File 'lib/exwiw/adapter/mongodb_adapter.rb', line 118

def validate_as_dump_target!(config)
  return unless config.embedded?

  raise NotImplementedError,
        "dump_target '#{config.name}' is an embedded MongodbCollectionConfig; " \
        "specify a top-level collection instead."
end