Class: Mongo::Protocol::Msg Private

Inherits:
Message
  • Object
show all
Includes:
Monitoring::Event::Secure
Defined in:
lib/mongo/protocol/msg.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

MongoDB Wire protocol Msg message (OP_MSG), a bi-directional wire protocol opcode.

Since:

  • 2.5.0

Defined Under Namespace

Classes: Section1

Constant Summary collapse

DATABASE_IDENTIFIER =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

The identifier for the database name to execute the command on.

Since:

  • 2.5.0

'$db'
INTERNAL_KEYS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Keys that the driver adds to commands. These are going to be moved to the end of the hash for better logging.

Since:

  • 2.5.0

Set.new(%w[$clusterTime $db lsid signature txnNumber]).freeze

Constants included from Monitoring::Event::Secure

Monitoring::Event::Secure::REDACTED_COMMANDS

Constants inherited from Message

Mongo::Protocol::Message::BATCH_SIZE, Mongo::Protocol::Message::COLLECTION, Mongo::Protocol::Message::LIMIT, Mongo::Protocol::Message::MAX_MESSAGE_SIZE, Mongo::Protocol::Message::ORDERED, Mongo::Protocol::Message::Q

Constants included from Serializers

Serializers::HEADER_PACK, Serializers::INT32_PACK, Serializers::INT64_PACK, Serializers::NULL, Serializers::ZERO

Instance Attribute Summary

Attributes inherited from Message

#request_id

Instance Method Summary collapse

Methods included from Monitoring::Event::Secure

#compression_allowed?, #redacted, #sensitive?

Methods inherited from Message

#==, deserialize, deserialize_array, deserialize_field, deserialize_header, field, fields, #hash, #maybe_inflate, #set_request_id

Methods included from Id

included

Constructor Details

#initialize(flags, options, main_document, *sequences) ⇒ Msg

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates a new OP_MSG protocol message

Examples:

Create a OP_MSG wire protocol message

Msg.new([:more_to_come], {}, { hello: 1 },
        { type: 1, payload: { identifier: 'documents', sequence: [..] } })

Parameters:

  • flags (Array<Symbol>)

    The flag bits. Currently supported values are :more_to_come and :checksum_present.

  • options (Hash)

    The options.

  • main_document (BSON::Document, Hash)

    The document that will become the payload type 0 section. Can contain global args as they are defined in the OP_MSG specification.

  • sequences (Protocol::Msg::Section1)

    Zero or more payload type 1 sections.

Options Hash (options):

  • validating_keys (true, false)

    Whether keys should be validated for being valid document keys (i.e. not begin with $ and not contain dots). This option is deprecated and will not be used. It will removed in version 3.0.

Since:

  • 2.5.0



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/mongo/protocol/msg.rb', line 62

def initialize(flags, options, main_document, *sequences)
  if flags
    flags.each do |flag|
      raise ArgumentError, "Unknown flag: #{flag.inspect}" unless KNOWN_FLAGS.key?(flag)
    end
  end
  @flags = flags || []
  @options = options
  unless main_document.is_a?(Hash)
    raise ArgumentError, "Main document must be a Hash, given: #{main_document.class}"
  end

  @main_document = main_document
  sequences.each_with_index do |section, index|
    unless section.is_a?(Section1)
      raise ArgumentError, "All sequences must be Section1 instances, got: #{section} at index #{index}"
    end
  end
  @sequences = sequences
  @sections = [
    { type: 0, payload: @main_document }
  ] + @sequences.map do |section|
    { type: 1, payload: {
      identifier: section.identifier,
      sequence: section.documents.map do |doc|
        CachingHash.new(doc)
      end,
    } }
  end
  @request_id = nil
  super
end

Instance Method Details

#bulk_write?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

This method was written to support client-side encryption functionality. It is not recommended that this method be used in service of any other feature or behavior.

Whether this message represents a bulk write. A bulk write is an insert, update, or delete operation that encompasses multiple operations of the same type.

Returns:

  • (Boolean)

    Whether this message represents a bulk write.

Since:

  • 2.5.0



264
265
266
267
268
269
270
271
272
273
274
# File 'lib/mongo/protocol/msg.rb', line 264

def bulk_write?
  inserts = @main_document['documents']
  updates = @main_document['updates']
  deletes = @main_document['deletes']

  num_inserts = (inserts && inserts.length) || 0
  num_updates = (updates && updates.length) || 0
  num_deletes = (deletes && deletes.length) || 0

  num_inserts > 1 || num_updates > 1 || num_deletes > 1
end

#documentsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.5.0



192
193
194
# File 'lib/mongo/protocol/msg.rb', line 192

def documents
  [ @main_document ]
end

#fix_after_deserializationObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Reverse-populates the instance variables after deserialization sets the @sections instance variable to the list of documents.

TODO fix deserialization so that this method is not needed.

Raises:

  • (NotImplementedError)

Since:

  • 2.5.0



179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/mongo/protocol/msg.rb', line 179

def fix_after_deserialization
  raise NotImplementedError, 'After deserializations @sections should have been initialized' if @sections.nil?

  if @sections.length != 1
    raise NotImplementedError,
          "Deserialization must have produced exactly one section, but it produced #{sections.length} sections"
  end

  @main_document = @sections.first
  @sequences = []
  @sections = [ { type: 0, payload: @main_document } ]
end

#maybe_add_server_api(server_api) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 2.5.0



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/mongo/protocol/msg.rb', line 276

def maybe_add_server_api(server_api)
  conflicts = {}
  %i[apiVersion apiStrict apiDeprecationErrors].each do |key|
    conflicts[key] = @main_document[key] if @main_document.key?(key)
    conflicts[key] = @main_document[key.to_s] if @main_document.key?(key.to_s)
  end
  unless conflicts.empty?
    raise Error::ServerApiConflict,
          "The Client is configured with :server_api option but the operation provided the following conflicting parameters: #{conflicts.inspect}"
  end

  main_document = @main_document.merge(
    Utils.transform_server_api(server_api)
  )
  Msg.new(@flags, @options, main_document, *@sequences)
end

#maybe_compress(compressor, zlib_compression_level = nil) ⇒ Message

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Compress the message, if the command being sent permits compression. Otherwise returns self.

Parameters:

  • compressor (String, Symbol)

    The compressor to use.

  • zlib_compression_level (Integer) (defaults to: nil)

    The zlib compression level to use.

Returns:

  • (Message)

    A Protocol::Compressed message or self, depending on whether this message can be compressed.

Since:

  • 2.5.0



169
170
171
# File 'lib/mongo/protocol/msg.rb', line 169

def maybe_compress(compressor, zlib_compression_level = nil)
  compress_if_possible(command.keys.first, compressor, zlib_compression_level)
end

#maybe_decrypt(context) ⇒ Mongo::Protocol::Msg

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Possibly decrypt this message with libmongocrypt. Message will only be decrypted if the specified client exists, that client has been given auto-encryption options, and this message is eligible for decryption. A message is eligible for decryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

Parameters:

Returns:

  • (Mongo::Protocol::Msg)

    The decrypted message, or the original message if decryption was not possible or necessary.

Since:

  • 2.5.0



243
244
245
246
247
248
249
250
251
# File 'lib/mongo/protocol/msg.rb', line 243

def maybe_decrypt(context)
  if context.decrypt?
    cmd = merge_sections
    enc_cmd = context.decrypt(cmd)
    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end

#maybe_encrypt(connection, context) ⇒ Mongo::Protocol::Msg

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Possibly encrypt this message with libmongocrypt. Message will only be encrypted if the specified client exists, that client has been given auto-encryption options, the client has not been instructed to bypass auto-encryption, and mongocryptd determines that this message is eligible for encryption. A message is eligible for encryption if it represents one of the command types allow-listed by libmongocrypt and it contains data that is required to be encrypted by a local or remote json schema.

Parameters:

Returns:

  • (Mongo::Protocol::Msg)

    The encrypted message, or the original message if encryption was not possible or necessary.

Since:

  • 2.5.0



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/mongo/protocol/msg.rb', line 210

def maybe_encrypt(connection, context)
  if context.encrypt?
    if connection.description.max_wire_version < 8
      raise Error::CryptError.new(
        'Cannot perform encryption against a MongoDB server older than ' +
        '4.2 (wire version less than 8). Currently connected to server ' +
        "with max wire version #{connection.description.max_wire_version}} " +
        '(Auto-encryption requires a minimum MongoDB version of 4.2)'
      )
    end

    db_name = @main_document[DATABASE_IDENTIFIER]
    cmd = merge_sections
    enc_cmd = context.encrypt(db_name, cmd)
    enc_cmd['$db'] = cmd['$db'] if cmd.key?('$db') && !enc_cmd.key?('$db')

    Msg.new(@flags, @options, enc_cmd)
  else
    self
  end
end

#number_returnedInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the number of documents returned from the server.

The Msg instance must be for a server reply and the reply must return an active cursor (either a newly created one or one whose iteration is continuing via getMore).

Returns:

  • (Integer)

    Number of returned documents.

Raises:

  • (NotImplementedError)

Since:

  • 2.5.0



300
301
302
303
304
305
306
# File 'lib/mongo/protocol/msg.rb', line 300

def number_returned
  if (doc = documents.first) && (cursor = doc['cursor']) && (batch = cursor['firstBatch'] || cursor['nextBatch'])
    return batch.length
  end

  raise NotImplementedError, 'number_returned is only defined for cursor replies'
end

#payloadBSON::Document

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Return the event payload for monitoring.

Examples:

Return the event payload.

message.payload

Returns:

  • (BSON::Document)

    The event payload.

Since:

  • 2.5.0



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/mongo/protocol/msg.rb', line 115

def payload
  # Reorder keys in main_document for better logging - see
  # https://jira.mongodb.org/browse/RUBY-1591.
  # Note that even without the reordering, the payload is not an exact
  # match to what is sent over the wire because the command as used in
  # the published event combines keys from multiple sections of the
  # payload sent over the wire.
  ordered_command = {}
  skipped_command = {}
  command.each do |k, v|
    if INTERNAL_KEYS.member?(k.to_s)
      skipped_command[k] = v
    else
      ordered_command[k] = v
    end
  end
  ordered_command.update(skipped_command)

  BSON::Document.new(
    command_name: ordered_command.keys.first.to_s,
    database_name: @main_document[DATABASE_IDENTIFIER],
    command: ordered_command,
    request_id: request_id,
    reply: @main_document
  )
end

#replyable?true, false

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Whether the message expects a reply from the database.

Examples:

Does the message require a reply?

message.replyable?

Returns:

  • (true, false)

    If the message expects a reply.

Since:

  • 2.5.0



103
104
105
# File 'lib/mongo/protocol/msg.rb', line 103

def replyable?
  @replyable ||= !flags.include?(:more_to_come)
end

#serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil) ⇒ BSON::ByteBuffer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Serializes message into bytes that can be sent on the wire.

Parameters:

  • buffer (BSON::ByteBuffer) (defaults to: BSON::ByteBuffer.new)

    where the message should be inserted.

  • max_bson_size (Integer) (defaults to: nil)

    The maximum bson object size.

Returns:

  • (BSON::ByteBuffer)

    buffer containing the serialized message.

Since:

  • 2.5.0



150
151
152
153
154
155
156
# File 'lib/mongo/protocol/msg.rb', line 150

def serialize(buffer = BSON::ByteBuffer.new, max_bson_size = nil, bson_overhead = nil)
  validate_document_size!(max_bson_size)

  super
  add_check_sum(buffer)
  buffer
end