Class: Valkey

Inherits:
Object
  • Object
show all
Includes:
Commands, PubSubCallback, Utils
Defined in:
lib/valkey.rb,
lib/valkey/utils.rb,
lib/valkey/errors.rb,
lib/valkey/version.rb,
lib/valkey/bindings.rb,
lib/valkey/commands.rb,
lib/valkey/pipeline.rb,
lib/valkey/request_type.rb,
lib/valkey/opentelemetry.rb,
lib/valkey/response_type.rb,
lib/valkey/pubsub_callback.rb,
lib/valkey/request_error_type.rb,
lib/valkey/commands/geo_commands.rb,
lib/valkey/commands/set_commands.rb,
lib/valkey/commands/hash_commands.rb,
lib/valkey/commands/json_commands.rb,
lib/valkey/commands/list_commands.rb,
lib/valkey/commands/bitmap_commands.rb,
lib/valkey/commands/module_commands.rb,
lib/valkey/commands/pubsub_commands.rb,
lib/valkey/commands/server_commands.rb,
lib/valkey/commands/stream_commands.rb,
lib/valkey/commands/string_commands.rb,
lib/valkey/commands/cluster_commands.rb,
lib/valkey/commands/generic_commands.rb,
lib/valkey/commands/function_commands.rb,
lib/valkey/commands/scripting_commands.rb,
lib/valkey/commands/connection_commands.rb,
lib/valkey/commands/sorted_set_commands.rb,
lib/valkey/commands/transaction_commands.rb,
lib/valkey/commands/hyper_log_log_commands.rb,
lib/valkey/commands/vector_search_commands.rb

Defined Under Namespace

Modules: Bindings, Commands, OpenTelemetry, PubSubCallback, RequestErrorType, RequestType, ResponseType, Utils Classes: BaseConnectionError, BaseError, CannotConnectError, CommandError, ConnectionError, InheritedError, InvalidClientOptionError, NoScriptError, OutOfMemoryError, PermissionError, Pipeline, ProtocolError, ReadOnlyError, SubscriptionError, TimeoutError, WrongTypeError

Constant Summary collapse

VERSION =
"0.0.2"

Constants included from Utils

Utils::Boolify, Utils::BoolifySet, Utils::Floatify, Utils::FloatifyPair, Utils::FloatifyPairs, Utils::Hashify, Utils::HashifyClusterNodeInfo, Utils::HashifyClusterNodes, Utils::HashifyClusterSlaves, Utils::HashifyClusterSlots, Utils::HashifyInfo, Utils::HashifyStreamAutoclaim, Utils::HashifyStreamAutoclaimJustId, Utils::HashifyStreamEntries, Utils::HashifyStreamPendingDetails, Utils::HashifyStreamPendings, Utils::HashifyStreams, Utils::Noop, Utils::Pairify

Instance Method Summary collapse

Methods included from PubSubCallback

#pubsub_callback

Methods included from Commands::HashCommands

#hdel, #hexists, #hexpire, #hexpireat, #hexpiretime, #hget, #hgetall, #hgetex, #hincrby, #hincrbyfloat, #hkeys, #hlen, #hmget, #hmset, #hpersist, #hpexpire, #hpexpireat, #hpexpiretime, #hpttl, #hrandfield, #hscan, #hscan_each, #hset, #hsetex, #hsetnx, #hstrlen, #httl, #hvals, #mapped_hmget, #mapped_hmset

Methods included from Commands::StreamCommands

#xack, #xadd, #xautoclaim, #xclaim, #xdel, #xgroup, #xgroup_create, #xgroup_createconsumer, #xgroup_delconsumer, #xgroup_destroy, #xgroup_setid, #xinfo, #xinfo_consumers, #xinfo_groups, #xinfo_stream, #xlen, #xpending, #xrange, #xread, #xreadgroup, #xrevrange, #xtrim

Methods included from Commands::VectorSearchCommands

#ft, #ft_aggregate, #ft_alias_add, #ft_alias_del, #ft_alias_list, #ft_alias_update, #ft_create, #ft_drop_index, #ft_explain, #ft_explain_cli, #ft_info, #ft_list, #ft_profile, #ft_search

Methods included from Commands::TransactionCommands

#discard, #exec, #multi, #unwatch, #watch

Methods included from Commands::ClusterCommands

#asking, #cluster_addslots, #cluster_addslotsrange, #cluster_bumpepoch, #cluster_count_failure_reports, #cluster_countkeysinslot, #cluster_delslots, #cluster_delslotsrange, #cluster_failover, #cluster_flushslots, #cluster_forget, #cluster_getkeysinslot, #cluster_info, #cluster_keyslot, #cluster_links, #cluster_meet, #cluster_myid, #cluster_myshardid, #cluster_nodes, #cluster_replicas, #cluster_replicate, #cluster_reset, #cluster_saveconfig, #cluster_set_config_epoch, #cluster_setslot, #cluster_shards, #cluster_slaves, #cluster_slots, #readonly, #readwrite

Methods included from Commands::JsonCommands

#json_arrappend, #json_arrindex, #json_arrinsert, #json_arrlen, #json_arrpop, #json_arrtrim, #json_clear, #json_debug, #json_del, #json_forget, #json_get, #json_mget, #json_numincrby, #json_nummultby, #json_objkeys, #json_objlen, #json_resp, #json_set, #json_strappend, #json_strlen, #json_toggle, #json_type

Methods included from Commands::PubSubCommands

#psubscribe, #publish, #pubsub, #pubsub_channels, #pubsub_numpat, #pubsub_numsub, #pubsub_shardchannels, #pubsub_shardnumsub, #punsubscribe, #spublish, #ssubscribe, #subscribe, #sunsubscribe, #unsubscribe

Methods included from Commands::ModuleCommands

#module, #module_list, #module_load, #module_loadex, #module_unload

Methods included from Commands::FunctionCommands

#fcall, #fcall_ro, #function, #function_delete, #function_dump, #function_flush, #function_kill, #function_list, #function_load, #function_restore, #function_stats

Methods included from Commands::ScriptingCommands

#eval, #eval_ro, #evalsha, #evalsha_ro, #invoke_script, #script, #script_debug, #script_exists, #script_flush, #script_kill, #script_load

Methods included from Commands::SetCommands

#sadd, #sadd?, #scard, #sdiff, #sdiffstore, #sinter, #sinterstore, #sismember, #smembers, #smismember, #smove, #spop, #srandmember, #srem, #srem?, #sscan, #sscan_each, #sunion, #sunionstore

Methods included from Commands::SortedSetCommands

#bzmpop, #bzpopmax, #bzpopmin, #zadd, #zcard, #zcount, #zdiff, #zdiffstore, #zincrby, #zinter, #zintercard, #zinterstore, #zlexcount, #zmpop, #zmscore, #zpopmax, #zpopmin, #zrandmember, #zrange, #zrangebylex, #zrangebyscore, #zrangestore, #zrank, #zrem, #zremrangebylex, #zremrangebyrank, #zremrangebyscore, #zrevrange, #zrevrangebylex, #zrevrangebyscore, #zrevrank, #zscan, #zscore, #zunion, #zunionstore

Methods included from Commands::HyperLogLogCommands

#pfadd, #pfcount, #pfmerge

Methods included from Commands::GeoCommands

#geoadd, #geodist, #geohash, #geopos, #geosearch, #geosearchstore

Methods included from Commands::ListCommands

#blmove, #blmpop, #blpop, #brpop, #brpoplpush, #lindex, #linsert, #llen, #lmove, #lmpop, #lpop, #lpush, #lpushx, #lrange, #lrem, #lset, #ltrim, #rpop, #rpoplpush, #rpush, #rpushx

Methods included from Commands::BitmapCommands

#bitcount, #bitfield, #bitfield_ro, #bitop, #bitpos, #getbit, #setbit

Methods included from Commands::GenericCommands

#_scan, #copy, #del, #dump, #exists, #exists?, #expire, #expireat, #expiretime, #keys, #migrate, #move, #object, #persist, #pexpire, #pexpireat, #pexpiretime, #pttl, #randomkey, #rename, #renamenx, #restore, #scan, #sort, #touch, #ttl, #type, #unlink, #wait, #waitaof

Methods included from Commands::ServerCommands

#acl, #acl_cat, #acl_deluser, #acl_dryrun, #acl_genpass, #acl_getuser, #acl_list, #acl_load, #acl_log, #acl_save, #acl_setuser, #acl_users, #acl_whoami, #bgrewriteaof, #bgsave, #command, #command_, #command_count, #command_docs, #command_get_keys, #command_get_keys_and_flags, #command_info, #command_list, #config, #config_get, #config_resetstat, #config_rewrite, #config_set, #dbsize, #debug, #failover, #flushall, #flushdb, #info, #lastsave, #latency_doctor, #latency_graph, #latency_histogram, #latency_history, #latency_latest, #latency_reset, #lolwut, #memory_doctor, #memory_malloc_stats, #memory_purge, #memory_stats, #memory_usage, #monitor, #psync, #replconf, #replicaof, #restore_asking, #role, #save, #shutdown, #slaveof, #slowlog, #swapdb, #sync, #time

Methods included from Commands::ConnectionCommands

#auth, #client, #client_caching, #client_get_name, #client_getredir, #client_id, #client_info, #client_kill, #client_kill_simple, #client_list, #client_no_evict, #client_no_touch, #client_pause, #client_reply, #client_set_info, #client_set_name, #client_tracking, #client_tracking_info, #client_unblock, #client_unpause, #echo, #hello, #ping, #quit, #reset, #select

Methods included from Commands::StringCommands

#append, #decr, #decrby, #get, #getdel, #getex, #getrange, #getset, #incr, #incrby, #incrbyfloat, #lcs, #mapped_mget, #mapped_mset, #mapped_msetnx, #mget, #mset, #msetnx, #psetex, #set, #setex, #setnx, #setrange, #strlen

Methods included from Utils

parse_redis_url

Constructor Details

#initialize(options = {}) ⇒ Valkey

Returns a new instance of Valkey.

Raises:

  • (ArgumentError)


323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# File 'lib/valkey.rb', line 323

def initialize(options = {})
  # Parse URL if provided
  if options[:url]
    url_options = Utils.parse_redis_url(options[:url])
    # Merge URL options, but explicit options take precedence
    options = url_options.merge(options.reject { |k, _v| k == :url })
  end

  # Extract connection parameters
  host = options[:host] || "127.0.0.1"
  port = options[:port] || 6379
  database_id = options[:db] || 0

  # Validate database ID
  raise ArgumentError, "Database ID must be non-negative, got: #{database_id}" if database_id.negative?

  nodes = options[:nodes] || [{ host: host, port: port }]

  # Validate nodes array
  raise ArgumentError, "Nodes array cannot be empty" if nodes.empty?

  # Build URI string
  # Use the first node for standalone mode, or first node for cluster discovery
  first_node = nodes.first
  raise ArgumentError, "First node cannot be nil" if first_node.nil?

  uri_host = first_node[:host]
  uri_port = first_node[:port]

  # Validate host and port
  raise ArgumentError, "Host cannot be nil" if uri_host.nil?
  raise ArgumentError, "Port cannot be nil" if uri_port.nil?
  raise ArgumentError, "Port must be a number" unless uri_port.is_a?(Integer)

  # Determine scheme based on TLS/SSL
  scheme = [true, "true"].include?(options[:ssl]) ? "rediss" : "redis"

  # Build URI with authentication if provided
  uri_parts = [scheme, "://"]

  # Add authentication to URI
  if options[:username] && options[:password]
    uri_parts << CGI.escape(options[:username])
    uri_parts << ":"
    uri_parts << CGI.escape(options[:password])
    uri_parts << "@"
  elsif options[:password]
    uri_parts << ":"
    uri_parts << CGI.escape(options[:password])
    uri_parts << "@"
  end

  uri_parts << uri_host
  uri_parts << ":"
  uri_parts << uri_port.to_s

  # Add database ID to URI if specified
  uri_parts << "/" << database_id.to_s if database_id.positive?

  uri_str = uri_parts.join

  # Build JSON options for additional configuration
  json_options = {}

  # Cluster mode
  json_options["cluster_mode_enabled"] = true if options[:cluster_mode]

  # Protocol
  json_options["protocol"] = case options[:protocol]
                             when :resp3, "resp3", 3
                               "RESP3"
                             else
                               "RESP2"
                             end

  # Timeouts
  request_timeout = options[:timeout] || 5.0

  # Validate timeout types
  raise ArgumentError, "Timeout must be a number, got: #{request_timeout.class}" unless request_timeout.is_a?(Numeric)
  raise ArgumentError, "Timeout must be positive, got: #{request_timeout}" if request_timeout <= 0

  json_options["request_timeout"] = (request_timeout * 1000).to_i

  if options[:connect_timeout]
    connect_timeout = options[:connect_timeout]
    unless connect_timeout.is_a?(Numeric)
      raise ArgumentError, "Connect timeout must be a number, got: #{connect_timeout.class}"
    end
    raise ArgumentError, "Connect timeout must be positive, got: #{connect_timeout}" if connect_timeout <= 0

    json_options["connection_timeout"] = (connect_timeout * 1000).to_i
  end

  # Client name
  json_options["client_name"] = options[:client_name] if options[:client_name]

  # TLS/SSL certificates
  root_certs = []
  if options[:ssl_params].is_a?(Hash)
    # ca_file - read CA certificate file (PEM or DER format)
    if options[:ssl_params][:ca_file]
      ca_file = options[:ssl_params][:ca_file]
      raise ArgumentError, "CA file does not exist: #{ca_file}" unless File.exist?(ca_file)
      raise ArgumentError, "CA file is not readable: #{ca_file}" unless File.readable?(ca_file)

      root_certs << File.binread(ca_file)
    end

    # cert - client certificate (file path or OpenSSL::X509::Certificate)
    if options[:ssl_params][:cert]
      cert_data = if options[:ssl_params][:cert].is_a?(String)
                    cert_file = options[:ssl_params][:cert]
                    raise ArgumentError, "Cert file does not exist: #{cert_file}" unless File.exist?(cert_file)
                    raise ArgumentError, "Cert file is not readable: #{cert_file}" unless File.readable?(cert_file)

                    File.binread(cert_file)
                  elsif options[:ssl_params][:cert].respond_to?(:to_pem)
                    options[:ssl_params][:cert].to_pem
                  elsif options[:ssl_params][:cert].respond_to?(:to_der)
                    options[:ssl_params][:cert].to_der
                  else
                    options[:ssl_params][:cert].to_s
                  end
      root_certs << cert_data
    end

    # key - client key (file path or OpenSSL::PKey)
    if options[:ssl_params][:key]
      key_data = if options[:ssl_params][:key].is_a?(String)
                   key_file = options[:ssl_params][:key]
                   raise ArgumentError, "Key file does not exist: #{key_file}" unless File.exist?(key_file)
                   raise ArgumentError, "Key file is not readable: #{key_file}" unless File.readable?(key_file)

                   File.binread(key_file)
                 elsif options[:ssl_params][:key].respond_to?(:to_pem)
                   options[:ssl_params][:key].to_pem
                 elsif options[:ssl_params][:key].respond_to?(:to_der)
                   options[:ssl_params][:key].to_der
                 else
                   options[:ssl_params][:key].to_s
                 end
      root_certs << key_data
    end

    # Additional root certificates from ca_path
    if options[:ssl_params][:ca_path]
      ca_path = options[:ssl_params][:ca_path]
      raise ArgumentError, "CA path does not exist: #{ca_path}" unless Dir.exist?(ca_path)

      Dir.glob(File.join(ca_path, "*.crt")).each do |cert_file|
        root_certs << File.binread(cert_file) if File.readable?(cert_file)
      end
      Dir.glob(File.join(ca_path, "*.pem")).each do |cert_file|
        root_certs << File.binread(cert_file) if File.readable?(cert_file)
      end
    end

    # Direct root_certs array support
    root_certs.concat(options[:ssl_params][:root_certs]) if options[:ssl_params][:root_certs].is_a?(Array)
  end

  json_options["root_certs"] = root_certs unless root_certs.empty?

  # Connection retry strategy
  if options[:reconnect_attempts] || options[:reconnect_delay] || options[:reconnect_delay_max]
    number_of_retries = options[:reconnect_attempts] || 1
    base_delay = options[:reconnect_delay] || 0.5
    max_delay = options[:reconnect_delay_max]

    # Validate reconnection parameters
    unless number_of_retries.is_a?(Integer)
      raise ArgumentError, "Reconnect attempts must be an integer, got: #{number_of_retries.class}"
    end

    if number_of_retries.negative?
      raise ArgumentError,
            "Reconnect attempts must be non-negative, got: #{number_of_retries}"
    end

    raise ArgumentError, "Reconnect delay must be a number, got: #{base_delay.class}" unless base_delay.is_a?(Numeric)
    raise ArgumentError, "Reconnect delay must be positive, got: #{base_delay}" unless base_delay.positive?

    if max_delay
      unless max_delay.is_a?(Numeric)
        raise ArgumentError, "Reconnect delay max must be a number, got: #{max_delay.class}"
      end
      raise ArgumentError, "Reconnect delay max must be positive, got: #{max_delay}" unless max_delay.positive?
    end

    exponent_base = 2

    if max_delay && base_delay.positive? && number_of_retries.positive?
      calculated_base = (max_delay / base_delay)**(1.0 / number_of_retries.to_f)
      exponent_base = [calculated_base.round, 2].max
    end

    factor_ms = (base_delay * 1000).to_i

    json_options["connection_retry_strategy"] = {
      "number_of_retries" => number_of_retries,
      "factor" => factor_ms,
      "exponent_base" => exponent_base,
      "jitter_percent" => 0
    }
  end

  # Convert JSON options to string (pass nil if empty)
  json_str = json_options.empty? ? nil : JSON.generate(json_options)

  # Create client using URI-based FFI function
  client_type = Bindings::ClientType.new
  client_type[:tag] = 1 # SyncClient

  response_ptr = Bindings.create_client_from_uri(
    uri_str,
    json_str,
    client_type,
    method(:pubsub_callback)
  )

  res = Bindings::ConnectionResponse.new(response_ptr)

  if res[:conn_ptr].null?
    error_message = res[:connection_error_message]
    Bindings.free_connection_response(response_ptr)
    raise CannotConnectError, error_message
  end

  @connection = res[:conn_ptr]
  Bindings.free_connection_response(response_ptr)

  # Track transactional state for `MULTI` / `EXEC` / `DISCARD` helpers.
  # This avoids Ruby warnings about uninitialised instance variables and
  # gives us a single source of truth for whether we're inside a TX.
  @in_multi = false
  # Track queued commands during MULTI for transaction isolation support
  @queued_commands = []
  # Track if we're inside a multi block (multi { ... }) vs direct multi calls
  @in_multi_block = false
end

Instance Method Details

#build_command_args(command_args) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/valkey.rb', line 131

def build_command_args(command_args)
  # For empty arrays, pass NULL pointers as per Rust FFI contract
  # This matches Go's approach which successfully uses nil pointers
  return [FFI::Pointer::NULL, FFI::Pointer::NULL, []] if command_args.empty?

  arg_ptrs = FFI::MemoryPointer.new(:pointer, command_args.size)
  arg_lens = FFI::MemoryPointer.new(:ulong, command_args.size)
  buffers = []

  command_args.each_with_index do |arg, i|
    arg = arg.to_s # Ensure we convert to string

    buf = FFI::MemoryPointer.from_string(arg.to_s)
    buffers << buf # prevent garbage collection
    arg_ptrs.put_pointer(i * FFI::Pointer.size, buf)
    arg_lens.put_ulong(i * 8, arg.bytesize)
  end

  [arg_ptrs, arg_lens, buffers]
end

#closeObject Also known as: disconnect!



565
566
567
568
569
570
# File 'lib/valkey.rb', line 565

def close
  return if @connection.nil? || @connection.null?

  Bindings.close_client(@connection)
  @connection = nil
end

#convert_response(res, &block) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/valkey.rb', line 152

def convert_response(res, &block)
  result = Bindings::CommandResult.new(res)

  if result[:response].null?
    error = result[:command_error]

    case error[:command_error_type]
    when RequestErrorType::EXECABORT, RequestErrorType::UNSPECIFIED
      raise CommandError, error[:command_error_message]
    when RequestErrorType::TIMEOUT
      raise TimeoutError, error[:command_error_message]
    when RequestErrorType::DISCONNECT
      raise ConnectionError, error[:command_error_message]
    else
      raise "Unknown error type: #{error[:command_error_type]}"
    end
  end

  result = result[:response]

  convert_response = lambda { |response_item|
    # TODO: handle all types of responses
    case response_item[:response_type]
    when ResponseType::STRING
      response_item[:string_value].read_string(response_item[:string_value_len])
    when ResponseType::INT
      response_item[:int_value]
    when ResponseType::FLOAT
      response_item[:float_value]
    when ResponseType::BOOL
      response_item[:bool_value]
    when ResponseType::ARRAY
      ptr = response_item[:array_value]
      count = response_item[:array_value_len].to_i
      return [] if count.zero? || ptr.null?

      count.times.map do |i|
        item = Bindings::CommandResponse.new(ptr + (i * Bindings::CommandResponse.size))
        convert_response.call(item)
      end
    when ResponseType::MAP
      return nil if response_item[:array_value].null?

      ptr = response_item[:array_value]
      count = response_item[:array_value_len].to_i
      map = {}

      Array.new(count) do |i|
        item = Bindings::CommandResponse.new(ptr + (i * Bindings::CommandResponse.size))

        map_key = convert_response.call(Bindings::CommandResponse.new(item[:map_key]))
        map_value = convert_response.call(Bindings::CommandResponse.new(item[:map_value]))

        map[map_key] = map_value
      end

      # technically it has to return a Hash, but as of now we return just one pair
      map.to_a.flatten(1) # Flatten to get pairs
    when ResponseType::SETS
      ptr = response_item[:sets_value]
      count = response_item[:sets_value_len].to_i

      Array.new(count) do |i|
        item = Bindings::CommandResponse.new(ptr + (i * Bindings::CommandResponse.size))
        convert_response.call(item)
      end
    when ResponseType::NULL
      nil
    when ResponseType::OK
      "OK"
    when ResponseType::ERROR
      # For errors in arrays (like EXEC responses), return an error object
      # instead of raising. The error message is typically in string_value.
      error_msg = if response_item[:string_value].null?
                    "Unknown error"
                  else
                    response_item[:string_value].read_string(response_item[:string_value_len])
                  end
      CommandError.new(error_msg)
    else
      raise "Unsupported response type: #{response_item[:response_type]}"
    end
  }

  response = convert_response.call(result)

  if block_given?
    block.call(response)
  else
    response
  end
end

#pipelined(exception: true) {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


24
25
26
27
28
29
30
31
32
# File 'lib/valkey.rb', line 24

def pipelined(exception: true)
  pipeline = Pipeline.new

  yield pipeline

  return [] if pipeline.commands.empty?

  send_batch_commands(pipeline.commands, exception: exception)
end

#send_batch_commands(commands, exception: true) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/valkey.rb', line 34

def send_batch_commands(commands, exception: true)
  # WORKAROUND: The underlying Glide FFI backend has stability issues when
  # batching transactional commands like MULTI / EXEC / DISCARD. To avoid
  # native crashes we fall back to issuing those commands sequentially
  # instead of via `Bindings.batch`.
  tx_types = [RequestType::MULTI, RequestType::EXEC, RequestType::DISCARD]

  if commands.any? { |(command_type, _args, _block)| tx_types.include?(command_type) }
    results = []

    commands.each do |command_type, command_args, block|
      res = send_command(command_type, command_args)
      res = block.call(res) if block
      results << res
    end

    return results
  end

  cmds = []
  blocks = []
  buffers = [] # Keep references to prevent GC

  commands.each do |command_type, command_args, block|
    arg_ptrs, arg_lens, arg_bufs = build_command_args(command_args)

    cmd = Bindings::CmdInfo.new
    cmd[:request_type] = command_type
    cmd[:args] = arg_ptrs
    cmd[:arg_count] = command_args.size
    cmd[:args_len] = arg_lens

    cmds << cmd
    blocks << block
    buffers << [arg_ptrs, arg_lens, arg_bufs] # Prevent GC
  end

  # Create array of pointers to CmdInfo structs
  cmd_ptrs = FFI::MemoryPointer.new(:pointer, cmds.size)
  cmds.each_with_index do |cmd, i|
    cmd_ptrs[i].put_pointer(0, cmd.to_ptr)
  end

  batch_info = Bindings::BatchInfo.new
  batch_info[:cmd_count] = cmds.size
  batch_info[:cmds] = cmd_ptrs
  batch_info[:is_atomic] = false

  batch_options = Bindings::BatchOptionsInfo.new
  batch_options[:retry_server_error] = true
  batch_options[:retry_connection_error] = true
  batch_options[:has_timeout] = false
  batch_options[:timeout] = 0 # No timeout
  batch_options[:route_info] = FFI::Pointer::NULL

  # Create OpenTelemetry span for batch operation if sampling is enabled
  # TODO: add parent span propagation via create_batch_otel_span_with_parent
  # to support distributed tracing context (see Go client base_client.go for reference)
  span_ptr = 0
  if OpenTelemetry.should_sample?
    begin
      span_ptr = Bindings.create_batch_otel_span
    rescue StandardError => e
      warn "Failed to create OpenTelemetry batch span: #{e.message}"
      span_ptr = 0
    end
  end

  begin
    res = Bindings.batch(
      @connection,
      0,
      batch_info,
      exception,
      batch_options.to_ptr,
      span_ptr
    )

    results = convert_response(res)
  ensure
    # Always drop the span if one was created
    if span_ptr != 0
      begin
        Bindings.drop_otel_span(span_ptr)
      rescue StandardError => e
        warn "Failed to drop OpenTelemetry batch span: #{e.message}"
      end
    end
  end

  blocks.each_with_index do |block, i|
    results[i] = block.call(results[i]) if block
  end

  results
end

#send_command(command_type, command_args = [], &block) ⇒ Object



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/valkey.rb', line 245

def send_command(command_type, command_args = [], &block)
  # Validate connection
  if @connection.nil?
    raise "Connection is nil"
  elsif @connection.null?
    raise "Connection pointer is null"
  elsif @connection.address.zero?
    raise "Connection address is 0"
  end

  channel = 0
  route = ""

  route_buf = FFI::MemoryPointer.from_string(route)

  # Handle empty command_args case
  if command_args.empty?
    arg_ptrs = FFI::MemoryPointer.new(:pointer, 1)
    arg_lens = FFI::MemoryPointer.new(:ulong, 1)
    arg_ptrs.put_pointer(0, FFI::MemoryPointer.new(1))
    arg_lens.put_ulong(0, 0)
    _buffers = [] # nothing to keep alive
  else
    arg_ptrs, arg_lens, _buffers = build_command_args(command_args)
  end

  # Create OpenTelemetry span if sampling is enabled
  # TODO: add parent span propagation via create_otel_span_with_parent
  # to support distributed tracing context (see Go client base_client.go for reference)
  span_ptr = 0
  if OpenTelemetry.should_sample?
    begin
      span_ptr = Bindings.create_otel_span(command_type)
    rescue StandardError => e
      # Log error but continue execution - tracing is non-critical
      warn "Failed to create OpenTelemetry span: #{e.message}"
      span_ptr = 0
    end
  end

  begin
    res = Bindings.command(
      @connection,
      channel,
      command_type,
      command_args.size,
      arg_ptrs,
      arg_lens,
      route_buf,
      route.bytesize,
      span_ptr
    )

    result = convert_response(res, &block)
  ensure
    # Always drop the span if one was created, even if command fails
    if span_ptr != 0
      begin
        Bindings.drop_otel_span(span_ptr)
      rescue StandardError => e
        # Log but don't raise - span cleanup errors shouldn't break command execution
        warn "Failed to drop OpenTelemetry span: #{e.message}"
      end
    end
  end

  # Track queued commands during MULTI (except for MULTI, EXEC, DISCARD, WATCH, UNWATCH)
  if @in_multi && !@queued_commands.nil?
    tx_commands = [
      RequestType::MULTI, RequestType::EXEC, RequestType::DISCARD,
      RequestType::WATCH, RequestType::UNWATCH
    ]
    @queued_commands << [command_type, command_args.dup] if !tx_commands.include?(command_type) && result == "QUEUED"
  end

  result
end

#statisticsHash Also known as: get_statistics

Note:

Statistics are tracked globally and shared across all clients

Retrieves client statistics including connection and compression metrics.

This method returns detailed statistics about the client’s operations, tracked globally across all clients in the process.

Examples:

Get client statistics

client = Valkey.new(host: 'localhost', port: 6379)
stats = client.get_statistics
puts "Total connections: #{stats[:total_connections]}"
puts "Total clients: #{stats[:total_clients]}"
puts "Values compressed: #{stats[:total_values_compressed]}"

Returns:

  • (Hash)

    a hash containing statistics with the following keys:

    • ‘:total_connections` [Integer] total number of connections opened to Valkey

    • ‘:total_clients` [Integer] total number of GLIDE clients

    • ‘:total_values_compressed` [Integer] total number of values compressed

    • ‘:total_values_decompressed` [Integer] total number of values decompressed

    • ‘:total_original_bytes` [Integer] total original bytes before compression

    • ‘:total_bytes_compressed` [Integer] total bytes after compression

    • ‘:total_bytes_decompressed` [Integer] total bytes after decompression

    • ‘:compression_skipped_count` [Integer] number of times compression was skipped

  • (Hash)

    statistics hash with integer values



599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
# File 'lib/valkey.rb', line 599

def statistics
  # Call FFI function to get statistics (returns by value)
  stats = Bindings.get_statistics

  # Convert to Ruby hash
  {
    total_connections: stats[:total_connections],
    total_clients: stats[:total_clients],
    total_values_compressed: stats[:total_values_compressed],
    total_values_decompressed: stats[:total_values_decompressed],
    total_original_bytes: stats[:total_original_bytes],
    total_bytes_compressed: stats[:total_bytes_compressed],
    total_bytes_decompressed: stats[:total_bytes_decompressed],
    compression_skipped_count: stats[:compression_skipped_count]
  }
end