Class: CipherStash::Collection
- Inherits:
-
Object
- Object
- CipherStash::Collection
- Defined in:
- lib/cipherstash/collection.rb,
lib/cipherstash/collection/query.rb,
lib/cipherstash/collection/query_result.rb
Overview
A group of similar records stored in the CipherStash searchable encrypted data store.
This class is not ordinarily instantiated directly. Instead, instances of this class are created by calling CipherStash::Client#collection or CipherStash::Client#collections.
Defined Under Namespace
Classes: QueryResult
Instance Method Summary collapse
-
#delete(id) ⇒ void
Delete a record from the collection.
-
#drop ⇒ Object
Remove the collection from the data-service.
-
#get(id) ⇒ CipherStash::Record+
Retrieve one or more records from the collection.
-
#insert(record, store_record: true) ⇒ String
Store a new record in the collection.
-
#migrate_records ⇒ TrueClass
Re-index out-of-date records to the collection's current indexes.
-
#name ⇒ String
The plaintext name of the collection.
-
#query(opts = {}, &blk) ⇒ CipherStash::Collection::QueryResult
Search for records in the collection which match the constraints specified.
-
#reload ⇒ TrueClass
Reload the collection's metadata and indexes from the data-service.
-
#streaming_upsert(records) ⇒ Integer
Bulk insert-or-update of many records into the collection.
-
#upsert(id, record, store_record: true) ⇒ TrueClass
Update-or-insert a record in the collection.
Instance Method Details
#delete(id) ⇒ void
This method returns an undefined value.
Delete a record from the collection.
213 214 215 216 217 218 219 220 |
# File 'lib/cipherstash/collection.rb', line 213 def delete(id) @metrics.measure_client_call("delete") do @rpc.delete(self, id) end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#delete") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |
#drop ⇒ Object
Remove the collection from the data-service.
As you can imagine, this is a fairly drastic operation. Don't call it on a whim.
227 228 229 230 231 232 233 234 |
# File 'lib/cipherstash/collection.rb', line 227 def drop @metrics.measure_client_call("drop") do @rpc.delete_collection(self) end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#drop") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |
#get(id) ⇒ CipherStash::Record+
Retrieve one or more records from the collection.
190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/cipherstash/collection.rb', line 190 def get(id) @metrics.measure_client_call("get") do if id.is_a?(Array) @rpc.get_all(self, id) else @rpc.get(self, id) end end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#get") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |
#insert(record, store_record: true) ⇒ String
Store a new record in the collection.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/cipherstash/collection.rb', line 56 def insert(record, store_record: true) @metrics.measure_client_call("insert") do unless store_record @logger.debug("CipherStash::Collection#insert") { "DEPRECATION NOTICE: 'store_record: false' is no longer supported; please stop using it" } end uuid = SecureRandom.uuid vectors = @indexes.map { |idx| idx.analyze(uuid, record) }.compact @rpc.put(self, uuid, record, vectors) uuid end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#insert") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |
#migrate_records ⇒ TrueClass
Re-index out-of-date records to the collection's current indexes
353 354 355 356 357 358 359 360 361 |
# File 'lib/cipherstash/collection.rb', line 353 def migrate_records @metrics.measure_client_call("migrate_records") do @rpc.migrate_records(self) do |uuid, data| indexes.map { |idx| idx.analyze(uuid, data) }.compact end true end end |
#name ⇒ String
The plaintext name of the collection.
37 38 39 |
# File 'lib/cipherstash/collection.rb', line 37 def name @metadata["name"] end |
#query(opts = {}, &blk) ⇒ CipherStash::Collection::QueryResult
Search for records in the collection which match the constraints specified.
The constraints, aggregations, and other ephemera are all defined in a block which is passed to the query. This block receives an object which has methods for each of the indexes defined on the collection, which in turn can be used to specify constraints. The object passed to the block also has special methods for defining aggregations, sorting, and limits.
Defining constraints
A constraint consists of an index, an operator, and a value. We'll define all this in detail, but you can probably just get away with looking at the examples, below, to get the gist of the thing.
The index is any index defined on the collection.
To specify the index to use for a constraint is specified, call a method with the same name as the index on the object passed to the query block.
The operator is specified by calling a method named for the operator on the object returned from the call to the index name.
Which operator(s) can be used depends on the type of the index you're using for the constraint.
For example, the eq (equality) operator can be used on "exact" and "range" indexes, while the lt (less-than) operator is only valid on "range" indexes.
If you try to specify an index that doesn't exist, or an operator that is not valid for the index type given, or you try to use an invalid operator for the index type, an exception will be raised.
The value is passed as an argument to the operator method call.
All operators currently take a single argument.
Ensure that the type of the value you pass is of the same type as the data in the index.
In particular, bear in mind that 42 (an integer) is not the same, from an indexing perspective, as 42.0 (a float), even though the two compare equally in Ruby.
There are currently no runtime checks to ensure type correctness.
To require multiple constraints to match in order for a record to be returned (an AND query), specify each constraint in the query block.
They will be automatically combined.
There is not (currently) any way to perform an OR operation in a query; you must perform multiple queries and combine the results yourself.
Dynamic Constraints
While the literate DSL described above is useful for "literal" queries, if you're generating queries programmatically it is somewhat cumbersome to use (a lot of calls to #__send__).
So, you can instead use the #add_constraint method in the block to define your constraints.
It uses the same inputs -- an index name, an operator, and arguments -- but in a single method call.
Aggregating Results
TBA
Sorting
To specify a sorting strategy, use the order_by method on the object passed to the query block.
It takes one required argument, the name of an index, and an optional second argument, which can be either :ASC or :DESC, which specifies the direction of sorting.
The index you specify must be of a type that supports ordering (only the "range" index type does at present).
You can call order_by more than once; ordering strategies are applied in the order they are specified.
Limiting the Number of Results
Limits and offsets can be handled by passing :limit and :offset to the query method.
339 340 341 342 343 344 345 346 347 |
# File 'lib/cipherstash/collection.rb', line 339 def query(opts = {}, &blk) @metrics.measure_client_call("query") do q = Query.new(self, opts) @rpc.query(self, q.parse(&blk)) end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#query") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |
#reload ⇒ TrueClass
Reload the collection's metadata and indexes from the data-service
After a schema migration or re-indexing -- either by this client or another client running at the same time -- the indexes and metadata cached in this Collection object can be out-of-date with regards to the server. Calling this method requests the current information about the collection from the server, and updates this object's information to match.
370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/cipherstash/collection.rb', line 370 def reload @metrics.measure_client_call("reload") do new_collection = @rpc.collection_info(name) @metadata = new_collection. @indexes = new_collection.indexes @schema_versions = new_collection.schema_versions true end end |
#streaming_upsert(records) ⇒ Integer
Bulk insert-or-update of many records into the collection.
When you have a lot of records that need to be upserted into the collection, doing them one-by-one with #upsert can take a long time, because each upsert needs to complete before you can start the next one. By using this "streaming" upsert instead, you can just mass-spam records into the collection, which greatly reduces round-trips (and hence round-trip wait time).
To stream records, you provide any object that responds to #each (in the manner of an enumerable) and yields a series of { id: <uuid>, record: <hash> } objects.
For small record sets this enumerable can be an array, but for larger data sets you could stream the inputs from, say, a Postgres database using cursors.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/cipherstash/collection.rb', line 157 def streaming_upsert(records) @metrics.measure_client_call("streaming_upsert") do records = records.lazy.map do |r| @metrics.measure_rpc_call("putStream", :excluded) do unless r.is_a?(Hash) && r.key?(:id) && r.key?(:record) raise ArgumentError, "Malformed record passed to streaming_upsert: #{r.inspect}" end vectors = @indexes.map { |idx| idx.analyze(r[:id], r[:record]) }.compact [r[:id], r[:record], vectors] end end @rpc.put_stream(self, records) end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#streaming_upsert") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |
#upsert(id, record, store_record: true) ⇒ TrueClass
Update-or-insert a record in the collection.
If a record with the given ID already exists in the collection, its contents (and indexes) will be updated. Otherwise, a new record will be inserted, with the ID specified.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/cipherstash/collection.rb', line 94 def upsert(id, record, store_record: true) @metrics.measure_client_call("upsert") do unless store_record @logger.debug("CipherStash::Collection#upsert") { "DEPRECATION NOTICE: 'store_record: false' is no longer supported; please stop using it" } end unless id.is_a?(String) raise ArgumentError, "Must provide a string ID" end vectors = @indexes.map { |idx| idx.analyze(id, record) }.compact @rpc.put(self, id, store_record ? record : nil, vectors) true end rescue ::GRPC::Core::StatusCodes => ex @logger.error("CipherStash::Collection#upsert") { "Unhandled GRPC error! Please report this as a bug! #{ex.} (#{ex.class})" } raise end |