Class: CipherStash::Collection

Inherits:
Object
  • Object
show all
Defined in:
lib/cipherstash/collection.rb,
lib/cipherstash/collection/query.rb,
lib/cipherstash/collection/query_result.rb

Overview

A group of similar records stored in the CipherStash searchable encrypted data store.

This class is not ordinarily instantiated directly. Instead, instances of this class are created by calling CipherStash::Client#collection or CipherStash::Client#collections.

Defined Under Namespace

Classes: QueryResult

Instance Method Summary collapse

Instance Method Details

#delete(id) ⇒ void

This method returns an undefined value.

Delete a record from the collection.

Parameters:

  • id (String)

    the ID of the record, as a human-readable UUID.

Raises:



213
214
215
216
217
218
219
220
# File 'lib/cipherstash/collection.rb', line 213

def delete(id)
  @metrics.measure_client_call("delete") do
    @rpc.delete(self, id)
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#delete") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end

#dropObject

Remove the collection from the data-service.

As you can imagine, this is a fairly drastic operation. Don't call it on a whim.



227
228
229
230
231
232
233
234
# File 'lib/cipherstash/collection.rb', line 227

def drop
  @metrics.measure_client_call("drop") do
    @rpc.delete_collection(self)
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#drop") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end

#get(id) ⇒ CipherStash::Record+

Retrieve one or more records from the collection.

Parameters:

  • id (String, Array<String>)

    the ID(s) of the record(s) to retrieve. Each should be a human-readable UUID of a record in the collection.

Returns:

  • (CipherStash::Record, Array<CipherStash::Record>)

    the record(s) corresponding to the ID(s) provided. If a single ID was given, then a single record will be returned, while if an array of records was given (even an array with a single element), then an array of records will be returned.

Raises:



190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/cipherstash/collection.rb', line 190

def get(id)
  @metrics.measure_client_call("get") do
    if id.is_a?(Array)
      @rpc.get_all(self, id)
    else
      @rpc.get(self, id)
    end
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#get") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end

#insert(record, store_record: true) ⇒ String

Store a new record in the collection.

Parameters:

  • record (Hash)

    the complete record to store in the database.

  • store_record (Hash) (defaults to: true)

    a customizable set of options

Options Hash (store_record:):

  • if (Boolean)

    set to false, the record data itself will not be stored in the data store. DEPRECATED.

Returns:

  • (String)

    the UUID of the newly-created record.

Raises:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/cipherstash/collection.rb', line 56

def insert(record, store_record: true)
  @metrics.measure_client_call("insert") do
    unless store_record
      @logger.debug("CipherStash::Collection#insert") { "DEPRECATION NOTICE: 'store_record: false' is no longer supported; please stop using it" }
    end

    uuid = SecureRandom.uuid

    vectors = @indexes.map { |idx| idx.analyze(uuid, record) }.compact
    @rpc.put(self, uuid, record, vectors)

    uuid
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#insert") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end

#migrate_recordsTrueClass

Re-index out-of-date records to the collection's current indexes

Returns:

  • (TrueClass)


353
354
355
356
357
358
359
360
361
# File 'lib/cipherstash/collection.rb', line 353

def migrate_records
  @metrics.measure_client_call("migrate_records") do
    @rpc.migrate_records(self) do |uuid, data|
      indexes.map { |idx| idx.analyze(uuid, data) }.compact
    end

    true
  end
end

#nameString

The plaintext name of the collection.

Returns:

  • (String)


37
38
39
# File 'lib/cipherstash/collection.rb', line 37

def name
  @metadata["name"]
end

#query(opts = {}, &blk) ⇒ CipherStash::Collection::QueryResult

Search for records in the collection which match the constraints specified.

The constraints, aggregations, and other ephemera are all defined in a block which is passed to the query. This block receives an object which has methods for each of the indexes defined on the collection, which in turn can be used to specify constraints. The object passed to the block also has special methods for defining aggregations, sorting, and limits.

Defining constraints

A constraint consists of an index, an operator, and a value. We'll define all this in detail, but you can probably just get away with looking at the examples, below, to get the gist of the thing.

The index is any index defined on the collection. To specify the index to use for a constraint is specified, call a method with the same name as the index on the object passed to the query block.

The operator is specified by calling a method named for the operator on the object returned from the call to the index name. Which operator(s) can be used depends on the type of the index you're using for the constraint. For example, the eq (equality) operator can be used on "exact" and "range" indexes, while the lt (less-than) operator is only valid on "range" indexes.

If you try to specify an index that doesn't exist, or an operator that is not valid for the index type given, or you try to use an invalid operator for the index type, an exception will be raised.

The value is passed as an argument to the operator method call. All operators currently take a single argument. Ensure that the type of the value you pass is of the same type as the data in the index. In particular, bear in mind that 42 (an integer) is not the same, from an indexing perspective, as 42.0 (a float), even though the two compare equally in Ruby. There are currently no runtime checks to ensure type correctness.

To require multiple constraints to match in order for a record to be returned (an AND query), specify each constraint in the query block. They will be automatically combined. There is not (currently) any way to perform an OR operation in a query; you must perform multiple queries and combine the results yourself.

Dynamic Constraints

While the literate DSL described above is useful for "literal" queries, if you're generating queries programmatically it is somewhat cumbersome to use (a lot of calls to #__send__). So, you can instead use the #add_constraint method in the block to define your constraints. It uses the same inputs -- an index name, an operator, and arguments -- but in a single method call.

Aggregating Results

TBA

Sorting

To specify a sorting strategy, use the order_by method on the object passed to the query block. It takes one required argument, the name of an index, and an optional second argument, which can be either :ASC or :DESC, which specifies the direction of sorting. The index you specify must be of a type that supports ordering (only the "range" index type does at present).

You can call order_by more than once; ordering strategies are applied in the order they are specified.

Limiting the Number of Results

Limits and offsets can be handled by passing :limit and :offset to the query method.

Examples:

Search for movies with an exact title

collection.query do |movies|
  movies.exactTitle.eq("Star Trek: The Motion Picture")
end

Search for movies made after 2015

collection.query do |movies|
  movies.year.gt(2015.0)
end

Search for movies less than an hour long

collection.query do |movies|
  movies.runningTime.lt(60.0)
end

Search for movies less than an hour long and made after 2015

collection.query do |movies|
  movies.runningTime.lt(60.0)
  movies.year.gt(2015.0)
end

Using #add_constraint to find movies less than an hour long and made after 2015

collection.query do |movies|
  movies.add_constraint("runningTime", "lt", 60.0)
  movies.add_constraint("year", "gt", 2015.0)
end

Limit to the first 5 results

collection.query(limit: 5) do |movies|
  movies.year.gt(1990.0)
end

Return 5 results offset by 20

collection.query(limit: 5, offset: 20) do |movies|
  movies.year.gt(1990.0)
end

Returns:

Raises:



339
340
341
342
343
344
345
346
347
# File 'lib/cipherstash/collection.rb', line 339

def query(opts = {}, &blk)
  @metrics.measure_client_call("query") do
    q = Query.new(self, opts)
    @rpc.query(self, q.parse(&blk))
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#query") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end

#reloadTrueClass

Reload the collection's metadata and indexes from the data-service

After a schema migration or re-indexing -- either by this client or another client running at the same time -- the indexes and metadata cached in this Collection object can be out-of-date with regards to the server. Calling this method requests the current information about the collection from the server, and updates this object's information to match.

Returns:

  • (TrueClass)


370
371
372
373
374
375
376
377
378
379
380
# File 'lib/cipherstash/collection.rb', line 370

def reload
  @metrics.measure_client_call("reload") do
    new_collection = @rpc.collection_info(name)

    @metadata = new_collection.
    @indexes = new_collection.indexes
    @schema_versions = new_collection.schema_versions

    true
  end
end

#streaming_upsert(records) ⇒ Integer

Bulk insert-or-update of many records into the collection.

When you have a lot of records that need to be upserted into the collection, doing them one-by-one with #upsert can take a long time, because each upsert needs to complete before you can start the next one. By using this "streaming" upsert instead, you can just mass-spam records into the collection, which greatly reduces round-trips (and hence round-trip wait time).

To stream records, you provide any object that responds to #each (in the manner of an enumerable) and yields a series of { id: <uuid>, record: <hash> } objects. For small record sets this enumerable can be an array, but for larger data sets you could stream the inputs from, say, a Postgres database using cursors.

Examples:

Stream an array of records

records = [
  {
    id: "9a08f6c9-faf3-4bcf-a5eb-fcaf066a9b3f",
    record: {
      title: "Star Trek: The Motion Picture",
      runningTime: 132,
      year: 1979,
    },
  },
  {
    id: "f7f6443b-99c0-4579-9721-d77052769f44",
    record: {
      title: "Star Trek II: The Wrath of Khan",
      runningTime: 113,
      year: 1982,
    },
  },
  # etc etc etc
}
count = collection.streaming_upsert(records)
puts "Upserted #{count} records"

Stream lots (possibly billions) of records from an ActiveRecord model, using streaming

count = collection.streaming_upsert(User.find_each)
puts "Upserted #{count} records"

Returns:

  • (Integer)

    the number of records upserted into the database, including exact duplicates.

Raises:

  • (CipherStash::Client::Error::StreamingPutFailure)

    if the record could not be inserted for some reason.

  • (CipherStash::Client::Error::EncryptionFailure)

    if there was a problem encrypting the record.

  • (CipherStash::Client::Error::RPCFailure)

    if a low-level communication problem with the server caused the insert to fail.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/cipherstash/collection.rb', line 157

def streaming_upsert(records)
  @metrics.measure_client_call("streaming_upsert") do
    records = records.lazy.map do |r|
      @metrics.measure_rpc_call("putStream", :excluded) do
        unless r.is_a?(Hash) && r.key?(:id) && r.key?(:record)
          raise ArgumentError, "Malformed record passed to streaming_upsert: #{r.inspect}"
        end
        vectors = @indexes.map { |idx| idx.analyze(r[:id], r[:record]) }.compact
        [r[:id], r[:record], vectors]
      end
    end

    @rpc.put_stream(self, records)
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#streaming_upsert") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end

#upsert(id, record, store_record: true) ⇒ TrueClass

Update-or-insert a record in the collection.

If a record with the given ID already exists in the collection, its contents (and indexes) will be updated. Otherwise, a new record will be inserted, with the ID specified.

Parameters:

  • id (String)

    the human-readable UUID of the record.

  • record (Hash)

    the complete record to store in the database.

  • store_record (Hash) (defaults to: true)

    a customizable set of options

Options Hash (store_record:):

  • if (Boolean)

    set to false, the record data itself will not be stored in the data store. DEPRECATED.

Returns:

  • (TrueClass)

Raises:



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/cipherstash/collection.rb', line 94

def upsert(id, record, store_record: true)
  @metrics.measure_client_call("upsert") do
    unless store_record
      @logger.debug("CipherStash::Collection#upsert") { "DEPRECATION NOTICE: 'store_record: false' is no longer supported; please stop using it" }
    end

    unless id.is_a?(String)
      raise ArgumentError, "Must provide a string ID"
    end

    vectors = @indexes.map { |idx| idx.analyze(id, record) }.compact
    @rpc.put(self, id, store_record ? record : nil, vectors)

    true
  end
rescue ::GRPC::Core::StatusCodes => ex
  @logger.error("CipherStash::Collection#upsert") { "Unhandled GRPC error!  Please report this as a bug!  #{ex.message} (#{ex.class})" }
  raise
end