Module: Familia::Connection::PipelineCore

Defined in:
lib/familia/connection/pipelined_core.rb

Overview

Pipeline execution with configurable fallback behavior

Handles two pipeline scenarios based on connection handler capabilities:

  1. Normal pipeline when handler supports pipelines
  2. Individual command execution with configurable error/warn/silent modes

Class Method Summary collapse

Class Method Details

.execute_normal_pipeline(dbclient_proc) ⇒ MultiResult

Executes a normal Redis pipeline

Handles proper Fiber-local state management and cleanup in ensure blocks. Manages nested pipeline contexts by checking for existing pipeline state.

Parameters:

  • dbclient_proc (Proc)

    Lambda that returns the Redis connection

  • block (Proc)

    Block containing Redis commands to execute

Returns:

  • (MultiResult)

    Result object with pipeline command results



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/familia/connection/pipelined_core.rb', line 76

def self.execute_normal_pipeline(dbclient_proc)
  # Check for existing pipeline context
  return yield(Fiber[:familia_pipeline]) if Fiber[:familia_pipeline]

  command_return_values = dbclient_proc.call.pipelined do |conn|
    Fiber[:familia_pipeline] = conn
    begin
      yield(conn)
    ensure
      Fiber[:familia_pipeline] = nil
    end
  end

  # Return same MultiResult format as other methods
  MultiResult.new(command_return_values)
end

.execute_pipeline(dbclient_proc) {|Redis| ... } ⇒ MultiResult

Executes a pipeline with configurable fallback behavior

Handles pipeline execution based on connection handler capabilities. When handler doesn't support pipelines, fallback behavior is controlled by Familia.pipelined_mode setting.

Examples:

Basic usage

result = PipelineCore.execute_pipeline(-> { dbclient }) do |conn|
  conn.set('key1', 'value1')
  conn.incr('counter')
end
result.successful?  # => true/false
result.results     # => ["OK", 1]

With fallback modes

Familia.configure { |c| c.pipelined_mode = :permissive }
result = PipelineCore.execute_pipeline(-> { cached_conn }) do |conn|
  conn.set('key', 'value')  # Executes individually, no error
end

Parameters:

  • dbclient_proc (Proc)

    Lambda that returns the Redis connection

  • block (Proc)

    Block containing Redis commands to execute

Yields:

  • (Redis)

    Redis connection or proxy for command execution

Returns:

  • (MultiResult)

    Result object with success status and command results



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/familia/connection/pipelined_core.rb', line 42

def self.execute_pipeline(dbclient_proc, &)
  # Prevent mixing pipeline and transaction contexts
  if Fiber[:familia_transaction]
    Familia.trace :CONFLICTING_CONTEXT, nil,
                 'Attempted to start pipeline inside active transaction'
    raise Familia::ConflictingContextError,
      'Cannot start pipeline inside transaction. ' \
      'Restructure to use one or the other.'
  end

  # First, get the connection to populate the handler class
  dbclient_proc.call
  handler_class = Fiber[:familia_connection_handler_class]

  # Check pipeline capability
  pipeline_capability = handler_class&.allows_pipelined

  if pipeline_capability == false
    OperationCore.handle_fallback(:pipeline, dbclient_proc, handler_class, &)
  else
    # Normal pipeline flow (includes nil, true, and other values)
    execute_normal_pipeline(dbclient_proc, &)
  end
end