Module: Familia::Connection::Operations
- Included in:
- Familia::Connection
- Defined in:
- lib/familia/connection/operations.rb
Instance Method Summary collapse
-
#atomic_write(*instances, update_expiration: true, watch_keys: nil, pre_check: nil) { ... } ⇒ Boolean
Persists multiple Horreum instances in a SINGLE MULTI/EXEC transaction.
-
#pipelined {|Redis| ... } ⇒ Array, MultiResult
(also: #pipeline)
Executes Database commands in a pipeline for improved performance.
-
#transaction {|Redis| ... } ⇒ Array, MultiResult
(also: #multi)
Executes Database commands atomically within a transaction (MULTI/EXEC).
-
#with_dbclient {|Redis| ... } ⇒ Object
Provides explicit access to a Database connection.
-
#with_isolated_dbclient(uri = nil) {|Redis| ... } ⇒ Object
Provides explicit access to an isolated Database connection for temporary operations.
Instance Method Details
#atomic_write(*instances, update_expiration: true, watch_keys: nil, pre_check: nil) { ... } ⇒ Boolean
On Redis Cluster, a cross-model MULTI also requires the watched and written keys to share a hash slot (CROSSSLOT error otherwise). Co-locate related models with hash tags, e.g. +customer:acct42:object+.
When +watch_keys+ is set and a WATCH abort triggers a retry, the user block AND each +persist_to_storage+ are re-executed on every attempt. The aborted MULTI discards all queued Redis commands, so Redis state is clean on retry, but side effects outside Redis (logging, counters, external API calls) in the user block fire again -- design retry-safe blocks when using +watch_keys+.
Persists multiple Horreum instances in a SINGLE MULTI/EXEC transaction.
This is the cross-model / multi-instance counterpart to Horreum::AtomicWrite#atomic_write. Where the instance method composes one object's scalar HMSET and collection mutations into one transaction, this module-level method folds the persistence of several (possibly different-class) instances into one atomic MULTI/EXEC.
Why this works
Once a MULTI opens, every instance's +dbclient+ resolves to the same +Fiber[:familia_transaction]+ connection. So the HMSET, EXPIRE, index HSET, and instances ZADD for each instance all queue on one socket and commit together. The transaction is anchored on +instances.first.dbclient+; because #guard_cross_model_database! enforces that all roots share ONE logical database, every instance routes to that same connection -- there is no special routing to engineer, the same-logical-DB requirement IS the constraint.
Read/write split
The read-validate-write split is the key constraint. +prepare_for_save+ (timestamps + unique-index reads) runs OUTSIDE the transaction because the reads it performs would return uninspectable +Redis::Future+ objects inside MULTI/EXEC. Only +persist_to_storage+ (HMSET/EXPIRE/index HSET/instances ZADD -- write-only) runs INSIDE.
Optimistic locking / create-only
Pass +watch_keys:+ to wrap the MULTI in a WATCH block, and +pre_check:+ to run a guard between WATCH and MULTI (the only window where reads return real values while the watched keys are guarded). A concurrent modification of any watched key aborts EXEC and retries (the committed primitive owns abort detection + retry). This enables a race-safe create-only pattern -- see the example below.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/familia/connection/operations.rb', line 195 def atomic_write(*instances, update_expiration: true, watch_keys: nil, pre_check: nil, &user_block) raise ArgumentError, 'atomic_write requires at least one instance' if instances.empty? raise ArgumentError, 'pre_check requires watch_keys' if pre_check && !watch_keys&.any? if Fiber[:familia_transaction] raise Familia::OperationModeError, 'Cannot call Familia.atomic_write within a transaction. It opens its own MULTI/EXEC and cannot be nested.' end guard_cross_model_database!(instances) # all roots + their related fields share ONE logical db # Activate atomic_write mode on every instance BEFORE prepare_for_save so # that collection mutations in the user block do not trip dirty-write # warnings -- or, under :strict / raise_on_unsaved_parent_write, raises -- # against the just-dirtied scalars. Those scalars are persisted by this # same MULTI, so the writes are legitimate. Mirrors the instance-level # atomic_write (which acquires ownership before prepare_for_save). Only the # instances actually acquired are released, in the ensure below. acquired = [] begin instances.each do |i| i.send(:acquire_atomic_write_ownership!) acquired << i end instances.each { |i| i.send(:prepare_for_save) } # READS — outside the txn persist_all = lambda do user_block&.call instances.each { |i| i.send(:persist_to_storage, update_expiration) } end result = if watch_keys&.any? Familia::Connection::TransactionCore.execute_watched_transaction( -> { instances.first.dbclient }, watch_keys: watch_keys ) do |conn| pre_check&.call Familia::Connection::TransactionCore.execute_normal_transaction(-> { conn }) { persist_all.call } end else # Route the non-watched path through the instance #transaction so it # inherits execute_transaction's handler-compatibility gate: a # connection whose handler disallows transactions falls back per # Familia.transaction_mode (raise/warn/individual) instead of # issuing a raw MULTI on an unsupported connection. (The watched # branch above must call execute_normal_transaction directly to # reuse the WATCH-resolved connection; this branch has no such # constraint.) Anchored on instances.first -- the guard ensures all # roots share one logical database, so it routes every instance. instances.first.transaction { persist_all.call } end success = result.is_a?(Familia::MultiResult) ? result.successful? : !result.nil? instances.each { |i| i.send(:clear_dirty!) } if success success ensure acquired.each { |i| i.send(:release_atomic_write_ownership!) } end end |
#pipelined {|Redis| ... } ⇒ Array, MultiResult Also known as: pipeline
Pipeline vs Transaction Differences:
- Pipeline: Commands executed independently, some may succeed while others fail
- Transaction: All-or-nothing execution, commands are atomic as a group
- Pipeline: Better performance for independent operations
- Transaction: Better consistency for related operations
Connection Handler Compatibility:
- FiberPipelineHandler: Supports reentrant pipelines
- ProviderConnectionHandler: Full pipeline support
- CreateConnectionHandler: Full pipeline support
- FiberTransactionHandler: Blocked (raises OperationModeError)
- FiberConnectionHandler: Blocked (raises OperationModeError)
- DefaultConnectionHandler: Blocked (raises OperationModeError)
Thread Safety: Uses Fiber-local storage to maintain pipeline context across nested calls and ensure proper cleanup even when exceptions occur.
Executes Database commands in a pipeline for improved performance.
Pipelines send multiple commands without waiting for individual responses, reducing network round-trips. Commands execute independently and can succeed or fail without affecting other commands in the pipeline.
Executes Redis commands in a pipeline for improved performance.
Batches multiple Redis commands together and sends them in a single network round-trip, improving performance for multiple independent operations. Returns a MultiResult object containing both success status and command results.
373 374 375 |
# File 'lib/familia/connection/operations.rb', line 373 def pipelined(&) PipelineCore.execute_pipeline(-> { dbclient }, &) end |
#transaction {|Redis| ... } ⇒ Array, MultiResult Also known as: multi
Comparison of Database batch operations:
| Feature | Multi/Exec | Pipeline |
|---|---|---|
| Atomicity | Yes | No |
| Performance | Good | Better |
| Error handling | All-or-nothing | Per-command |
| Use case | Data consistency | Bulk operations |
Connection Handler Compatibility:
- FiberTransactionHandler: Supports reentrant transactions
- ProviderConnectionHandler: Full transaction support
- CreateConnectionHandler: Full transaction support
- FiberConnectionHandler: Blocked (raises OperationModeError)
- DefaultConnectionHandler: Blocked (raises OperationModeError)
Thread Safety: Uses Fiber-local storage to maintain transaction context across nested calls and ensure proper cleanup even when exceptions occur.
Executes Database commands atomically within a transaction (MULTI/EXEC).
Database transactions queue commands and execute them atomically as a single unit. All commands succeed together or all fail together, ensuring data consistency.
Executes a Redis transaction (MULTI/EXEC) with proper connection handling.
Provides atomic execution of multiple Redis commands with automatic connection management and operation mode enforcement. Returns a MultiResult object containing both success status and command results.
99 100 101 |
# File 'lib/familia/connection/operations.rb', line 99 def transaction(&) Familia::Connection::TransactionCore.execute_transaction(-> { dbclient }, &) end |
#with_dbclient {|Redis| ... } ⇒ Object
Provides explicit access to a Database connection.
This method is useful when you need direct access to a connection for operations not covered by other methods. The connection is properly managed and returned to the pool (if using connection_provider).
393 394 395 |
# File 'lib/familia/connection/operations.rb', line 393 def with_dbclient(&) yield dbclient end |
#with_isolated_dbclient(uri = nil) {|Redis| ... } ⇒ Object
Provides explicit access to an isolated Database connection for temporary operations.
This method creates a new connection that won't interfere with the cached connection pool, executes the given block with that connection, and ensures the connection is properly closed afterward.
Perfect for database scanning, inspection, or migration operations where you need to access different databases without affecting your models' normal connections.
421 422 423 424 425 426 427 428 |
# File 'lib/familia/connection/operations.rb', line 421 def with_isolated_dbclient(uri = nil, &) client = isolated_dbclient(uri) begin yield client ensure client&.close end end |