Module: Familia::Connection::PipelineCore

Defined in:
lib/familia/connection/pipelined_core.rb

Overview

Pipeline execution with configurable fallback behavior

Handles two pipeline scenarios based on connection handler capabilities:

  1. Normal pipeline when handler supports pipelines
  2. Individual command execution with configurable error/warn/silent modes

Class Method Summary collapse

Class Method Details

.execute_normal_pipeline(dbclient_proc, &block) ⇒ MultiResult

Executes a normal Redis pipeline

Handles proper Fiber-local state management and cleanup in ensure blocks. Manages nested pipeline contexts by checking for existing pipeline state.

Parameters:

  • dbclient_proc (Proc)

    Lambda that returns the Redis connection

  • block (Proc)

    Block containing Redis commands to execute

Returns:

  • (MultiResult)

    Result object with pipeline command results



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/familia/connection/pipelined_core.rb', line 69

def self.execute_normal_pipeline(dbclient_proc, &block)
  # Check for existing pipeline context
  return yield(Fiber[:familia_pipeline]) if Fiber[:familia_pipeline]

  command_return_values = dbclient_proc.call.pipelined do |conn|
    Fiber[:familia_pipeline] = conn
    begin
      yield(conn)
    ensure
      Fiber[:familia_pipeline] = nil
    end
  end

  # Return same MultiResult format as other methods
  MultiResult.new(command_return_values)
end

.execute_pipeline(dbclient_proc, &block) {|Redis| ... } ⇒ MultiResult

Executes a pipeline with configurable fallback behavior

Handles pipeline execution based on connection handler capabilities. When handler doesn't support pipelines, fallback behavior is controlled by Familia.pipelined_mode setting.

Examples:

Basic usage

result = PipelineCore.execute_pipeline(-> { dbclient }) do |conn|
  conn.set('key1', 'value1')
  conn.incr('counter')
end
result.successful?  # => true/false
result.results     # => ["OK", 1]

With fallback modes

Familia.configure { |c| c.pipelined_mode = :permissive }
result = PipelineCore.execute_pipeline(-> { cached_conn }) do |conn|
  conn.set('key', 'value')  # Executes individually, no error
end

Parameters:

  • dbclient_proc (Proc)

    Lambda that returns the Redis connection

  • block (Proc)

    Block containing Redis commands to execute

Yields:

  • (Redis)

    Redis connection or proxy for command execution

Returns:

  • (MultiResult)

    Result object with success status and command results



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/familia/connection/pipelined_core.rb', line 42

def self.execute_pipeline(dbclient_proc, &block)
  # First, get the connection to populate the handler class
  connection = dbclient_proc.call
  handler_class = Fiber[:familia_connection_handler_class]

  # Check pipeline capability
  pipeline_capability = handler_class&.allows_pipelined

  if pipeline_capability == false
    OperationCore.handle_fallback(:pipeline, dbclient_proc, handler_class, &block)
  else
    # Normal pipeline flow (includes nil, true, and other values)
    execute_normal_pipeline(dbclient_proc, &block)
  end
end