Module: Valkey::Commands::StreamCommands

Included in:
Valkey::Commands
Defined in:
lib/valkey/commands/stream_commands.rb

Overview

This module contains commands related to Redis Streams.

Instance Method Summary collapse

Instance Method Details

#xack(key, group, *ids) ⇒ Integer

Acknowledge one or more messages in a consumer group.

Examples:

Acknowledge a single message

valkey.xack("mystream", "mygroup", "1234567890-0")
  # => 1

Acknowledge multiple messages

valkey.xack("mystream", "mygroup", ["1234567890-0", "1234567890-1"])
  # => 2

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • ids (String, Array<String>)

    entry ID(s) to acknowledge

Returns:

  • (Integer)

    number of messages acknowledged

See Also:



403
404
405
406
# File 'lib/valkey/commands/stream_commands.rb', line 403

def xack(key, group, *ids)
  args = [key, group] + Array(ids).flatten
  send_command(RequestType::X_ACK, args)
end

#xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: "*") ⇒ String

Append a new entry to a stream.

Examples:

Add entry with auto-generated ID

valkey.xadd("mystream", { "field1" => "value1", "field2" => "value2" })
  # => "1234567890-0"

Add entry with specific ID

valkey.xadd("mystream", { "field1" => "value1" }, id: "1234567890-1")
  # => "1234567890-1"

Add entry with maxlen trimming

valkey.xadd("mystream", { "field1" => "value1" }, maxlen: 1000, approximate: true)

Parameters:

  • key (String)

    stream key

  • entry (Hash, Array)

    field-value pairs

  • options (Hash)

    optional parameters

    • ‘:id => String`: entry ID (default: “*” for auto-generated)

    • ‘:maxlen => Integer`: maximum length of the stream

    • ‘:minid => String`: minimum ID to keep

    • ‘:approximate => true`: use approximate trimming

    • ‘:nomkstream => true`: do not create stream if it doesn’t exist

Returns:

  • (String)

    entry ID

See Also:



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/valkey/commands/stream_commands.rb', line 32

def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: "*")
  args = [key]

  # Handle maxlen/minid trimming
  if maxlen
    raise ArgumentError, "can't supply both maxlen and minid" if minid

    args << "MAXLEN"
    args << "~" if approximate
    args << maxlen.to_s
  elsif minid
    args << "MINID"
    args << "~" if approximate
    args << minid
  end

  args << "NOMKSTREAM" if nomkstream
  args << id

  # Add field-value pairs
  if entry.is_a?(Hash)
    entry.each { |k, v| args << k.to_s << v.to_s }
  else
    args.concat(Array(entry).flatten)
  end

  send_command(RequestType::X_ADD, args)
end

#xautoclaim(key, group, consumer, min_idle_time, start, **options) ⇒ Hash

Automatically claim pending messages that have been idle for a specified time.

Examples:

Auto-claim pending messages

valkey.xautoclaim("mystream", "mygroup", "consumer2", 3600000, "0-0")
  # => { 'next' => "1234567890-5", 'entries' => [["1234567890-0", ["field1", "value1"]]] }

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • consumer (String)

    consumer name

  • min_idle_time (Integer)

    minimum idle time in milliseconds

  • start (String)

    start ID for scanning

  • options (Hash)

    optional parameters

    • ‘:count => Integer`: maximum number of entries to claim

    • ‘:idle => Integer`: set idle time in milliseconds

    • ‘:time => Integer`: set time in milliseconds (Unix timestamp)

    • ‘:retrycount => Integer`: set retry count

    • ‘:justid => true`: return only IDs

Returns:

  • (Hash)

    hash with ‘next’ key for next cursor ID and ‘entries’ key for array of claimed entries

See Also:



501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
# File 'lib/valkey/commands/stream_commands.rb', line 501

def xautoclaim(key, group, consumer, min_idle_time, start, **options)
  args = [key, group, consumer, min_idle_time.to_s, start]

  args << "COUNT" << options[:count].to_s if options[:count]
  args << "IDLE" << options[:idle].to_s if options[:idle]
  args << "TIME" << options[:time].to_s if options[:time]
  args << "RETRYCOUNT" << options[:retrycount].to_s if options[:retrycount]
  args << "JUSTID" if options[:justid]

  send_command(RequestType::X_AUTO_CLAIM, args) do |reply|
    return { 'next' => '0-0', 'entries' => [] } if reply.nil? || !reply.is_a?(Array)

    if options[:justid]
      Utils::HashifyStreamAutoclaimJustId.call(reply)
    else
      Utils::HashifyStreamAutoclaim.call(reply)
    end
  end
end

#xclaim(key, group, consumer, min_idle_time, ids, **options) ⇒ Array

Claim ownership of pending messages in a consumer group.

Examples:

Claim pending messages

valkey.xclaim("mystream", "mygroup", "consumer2", 3600000, ["1234567890-0"])

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • consumer (String)

    consumer name

  • min_idle_time (Integer)

    minimum idle time in milliseconds

  • ids (String, Array<String>)

    entry ID(s) to claim

  • options (Hash)

    optional parameters

    • ‘:idle => Integer`: set idle time in milliseconds

    • ‘:time => Integer`: set time in milliseconds (Unix timestamp)

    • ‘:retrycount => Integer`: set retry count

    • ‘:force => true`: claim even if already assigned

    • ‘:justid => true`: return only IDs

Returns:

  • (Array)

    array of claimed entries or IDs

See Also:



462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/valkey/commands/stream_commands.rb', line 462

def xclaim(key, group, consumer, min_idle_time, ids, **options)
  args = [key, group, consumer, min_idle_time.to_s]
  args.concat(Array(ids).flatten)

  args << "IDLE" << options[:idle].to_s if options[:idle]
  args << "TIME" << options[:time].to_s if options[:time]
  args << "RETRYCOUNT" << options[:retrycount].to_s if options[:retrycount]
  args << "FORCE" if options[:force]
  args << "JUSTID" if options[:justid]

  send_command(RequestType::X_CLAIM, args) do |reply|
    if options[:justid]
      reply
    else
      Utils::HashifyStreamEntries.call(reply)
    end
  end
end

#xdel(key, *ids) ⇒ Integer

Remove one or more entries from a stream.

Examples:

Delete a single entry

valkey.xdel("mystream", "1234567890-0")
  # => 1

Delete multiple entries

valkey.xdel("mystream", ["1234567890-0", "1234567890-1"])
  # => 2

Parameters:

  • key (String)

    stream key

  • ids (String, Array<String>)

    entry ID(s) to delete

Returns:

  • (Integer)

    number of entries deleted

See Also:



75
76
77
78
# File 'lib/valkey/commands/stream_commands.rb', line 75

def xdel(key, *ids)
  args = [key] + Array(ids).flatten
  send_command(RequestType::X_DEL, args)
end

#xgroup(subcommand, key, group, *args, **options) ⇒ String, Integer

Manage consumer groups (dispatcher method).

Examples:

Create group

valkey.xgroup(:create, "mystream", "mygroup", "0")

Create group with mkstream

valkey.xgroup(:create, "mystream", "mygroup", "0", mkstream: true)

Set group ID

valkey.xgroup(:setid, "mystream", "mygroup", "1234567890-0")

Destroy group

valkey.xgroup(:destroy, "mystream", "mygroup")

Create consumer

valkey.xgroup(:createconsumer, "mystream", "mygroup", "consumer1")

Delete consumer

valkey.xgroup(:delconsumer, "mystream", "mygroup", "consumer1")

Parameters:

  • subcommand (Symbol, String)

    subcommand (:create, :setid, :destroy, :createconsumer, :delconsumer)

  • key (String)

    stream key

  • group (String)

    consumer group name

  • args (Array)

    additional arguments depending on subcommand

Returns:

  • (String, Integer)

    depends on subcommand

See Also:



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/valkey/commands/stream_commands.rb', line 261

def xgroup(subcommand, key, group, *args, **options)
  subcommand = subcommand.to_s.downcase
  case subcommand
  when "create"
    xgroup_create(key, group, args[0], **options)
  when "setid"
    xgroup_setid(key, group, args[0])
  when "destroy"
    xgroup_destroy(key, group)
  when "createconsumer"
    xgroup_createconsumer(key, group, args[0])
  when "delconsumer"
    xgroup_delconsumer(key, group, args[0])
  else
    raise ArgumentError, "Unknown XGROUP subcommand: #{subcommand}"
  end
end

#xgroup_create(key, group, id, **options) ⇒ String

Create a consumer group for a stream.

Examples:

Create group from beginning

valkey.xgroup_create("mystream", "mygroup", "0")

Create group from end and create stream if needed

valkey.xgroup_create("mystream", "mygroup", "$", mkstream: true)

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • id (String)

    starting ID (“0” for beginning, “$” for end)

  • options (Hash)

    optional parameters

    • ‘:mkstream => true`: create stream if it doesn’t exist

Returns:

  • (String)

    “OK”

See Also:



304
305
306
# File 'lib/valkey/commands/stream_commands.rb', line 304

def xgroup_create(key, group, id, **options)
  xgroup_create_impl(key, group, id, **options)
end

#xgroup_createconsumer(key, group, consumer) ⇒ Integer

Create a consumer in a consumer group.

Examples:

valkey.xgroup_createconsumer("mystream", "mygroup", "consumer1")
  # => 0

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • consumer (String)

    consumer name

Returns:

  • (Integer)

    number of pending messages for the consumer (0 for new consumer)

See Also:



320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/valkey/commands/stream_commands.rb', line 320

def xgroup_createconsumer(key, group, consumer)
  send_command(RequestType::X_GROUP_CREATE_CONSUMER, [key, group, consumer]) do |reply|
    # Convert boolean to integer if needed (backend may return boolean)
    if reply.is_a?(TrueClass)
      1
    elsif reply.is_a?(FalseClass)
      0
    else
      reply
    end
  end
end

#xgroup_delconsumer(key, group, consumer) ⇒ Integer

Remove a consumer from a consumer group.

Examples:

valkey.xgroup_delconsumer("mystream", "mygroup", "consumer1")
  # => 5

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • consumer (String)

    consumer name

Returns:

  • (Integer)

    number of pending messages for the consumer

See Also:



384
385
386
# File 'lib/valkey/commands/stream_commands.rb', line 384

def xgroup_delconsumer(key, group, consumer)
  send_command(RequestType::X_GROUP_DEL_CONSUMER, [key, group, consumer])
end

#xgroup_destroy(key, group) ⇒ Integer

Destroy a consumer group.

Examples:

valkey.xgroup_destroy("mystream", "mygroup")
  # => 0

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

Returns:

  • (Integer)

    number of pending messages (if any)

See Also:



359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/valkey/commands/stream_commands.rb', line 359

def xgroup_destroy(key, group)
  send_command(RequestType::X_GROUP_DESTROY, [key, group]) do |reply|
    # Convert boolean to integer if needed (backend may return boolean)
    if reply.is_a?(TrueClass)
      1
    elsif reply.is_a?(FalseClass)
      0
    else
      reply
    end
  end
end

#xgroup_setid(key, group, id) ⇒ String

Set the last-delivered ID for a consumer group.

Examples:

valkey.xgroup_setid("mystream", "mygroup", "1234567890-0")

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • id (String)

    entry ID

Returns:

  • (String)

    “OK”

See Also:



344
345
346
# File 'lib/valkey/commands/stream_commands.rb', line 344

def xgroup_setid(key, group, id)
  send_command(RequestType::X_GROUP_SET_ID, [key, group, id])
end

#xinfo(subcommand, key, group = nil, **options) ⇒ Hash, Array

Get information about streams, groups, and consumers (dispatcher method).

Examples:

Get stream info

valkey.xinfo(:stream, "mystream")

Get stream info with full details

valkey.xinfo(:stream, "mystream", full: true, count: 10)

Get groups info

valkey.xinfo(:groups, "mystream")

Get consumers info

valkey.xinfo(:consumers, "mystream", "mygroup")

Parameters:

  • subcommand (Symbol, String)

    subcommand (:stream, :groups, :consumers)

  • key (String)

    stream key

  • group (String) (defaults to: nil)

    optional consumer group name (required for :consumers)

  • options (Hash)

    optional parameters (for :stream)

    • ‘:full => true`: return full information including entries

    • ‘:count => Integer`: limit number of entries (requires :full)

Returns:

  • (Hash, Array)

    depends on subcommand

See Also:



541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
# File 'lib/valkey/commands/stream_commands.rb', line 541

def xinfo(subcommand, key, group = nil, **options)
  subcommand = subcommand.to_s.downcase
  case subcommand
  when "stream"
    args = [key]
    if options[:full]
      args << "FULL"
      args << "COUNT" << options[:count].to_s if options[:count]
    end
    send_command(RequestType::X_INFO_STREAM, args)
  when "groups"
    send_command(RequestType::X_INFO_GROUPS, [key])
  when "consumers"
    raise ArgumentError, "Group name required for XINFO CONSUMERS" unless group

    send_command(RequestType::X_INFO_CONSUMERS, [key, group])
  else
    raise ArgumentError, "Unknown XINFO subcommand: #{subcommand}"
  end
end

#xinfo_consumers(key, group) ⇒ Array

Get information about consumers in a consumer group.

Examples:

valkey.xinfo_consumers("mystream", "mygroup")
  # => [{"name" => "consumer1", "pending" => 3, "idle" => 12345, ...}]

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

Returns:

  • (Array)

    array of consumer information hashes

See Also:



606
607
608
# File 'lib/valkey/commands/stream_commands.rb', line 606

def xinfo_consumers(key, group)
  xinfo(:consumers, key, group)
end

#xinfo_groups(key) ⇒ Array

Get information about consumer groups of a stream.

Examples:

valkey.xinfo_groups("mystream")
  # => [{"name" => "mygroup", "consumers" => 2, "pending" => 5, ...}]

Parameters:

  • key (String)

    stream key

Returns:

  • (Array)

    array of consumer group information hashes

See Also:



591
592
593
# File 'lib/valkey/commands/stream_commands.rb', line 591

def xinfo_groups(key)
  xinfo(:groups, key)
end

#xinfo_stream(key, **options) ⇒ Array

Get information about a stream.

Examples:

Get basic stream info

valkey.xinfo_stream("mystream")
  # => ["length", 42, "radix-tree-keys", 1, ...]

Get full info with entries

valkey.xinfo_stream("mystream", full: true, count: 10)

Parameters:

  • key (String)

    stream key

  • options (Hash)

    optional parameters

    • ‘:full => true`: return full information including entries

    • ‘:count => Integer`: limit number of entries (requires :full)

Returns:

  • (Array)

    stream information as flat array of key-value pairs

See Also:



577
578
579
# File 'lib/valkey/commands/stream_commands.rb', line 577

def xinfo_stream(key, **options)
  xinfo(:stream, key, **options)
end

#xlen(key) ⇒ Integer

Get the length of a stream.

Examples:

valkey.xlen("mystream")
  # => 42

Parameters:

  • key (String)

    stream key

Returns:

  • (Integer)

    number of entries in the stream

See Also:



90
91
92
# File 'lib/valkey/commands/stream_commands.rb', line 90

def xlen(key)
  send_command(RequestType::X_LEN, [key])
end

#xpending(key, group, *args, idle: nil) ⇒ Hash, Array

Get information about pending messages in a consumer group.

Examples:

Get summary

valkey.xpending("mystream", "mygroup")
  # => {"size" => 5, "min_entry_id" => "1234567890-0",
  #     "max_entry_id" => "1234567890-4", "consumers" => {"consumer1" => 3, "consumer2" => 2}}

Get detailed pending entries

valkey.xpending("mystream", "mygroup", "-", "+", 10)

Parameters:

  • key (String)

    stream key

  • group (String)

    consumer group name

  • args (Array)

    optional arguments (start, end, count, consumer)

  • options (Hash)

    optional parameters

    • ‘:idle => Integer`: filter by minimum idle time in milliseconds

Returns:

  • (Hash, Array)

    pending information

    • Without args: summary hash with keys ‘size’, ‘min_entry_id’, ‘max_entry_id’, ‘consumers’

    • With start/end/count: array of Hashes with keys ‘entry_id’, ‘consumer’, ‘elapsed’, and ‘count’

See Also:



427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/valkey/commands/stream_commands.rb', line 427

def xpending(key, group, *args, idle: nil)
  cmd_args = [key, group]
  cmd_args.concat(args)
  cmd_args << "IDLE" << idle.to_s if idle

  send_command(RequestType::X_PENDING, cmd_args) do |reply|
    # If args provided (start, end, count), return detailed format
    # Otherwise return summary format
    if args.length >= 2
      Utils::HashifyStreamPendingDetails.call(reply)
    else
      Utils::HashifyStreamPendings.call(reply)
    end
  end
end

#xrange(key, start, end_id, **options) ⇒ Array

Get entries from a stream within a range of IDs.

Examples:

Get all entries

valkey.xrange("mystream", "-", "+")

Get entries with count limit

valkey.xrange("mystream", "-", "+", count: 10)

Parameters:

  • key (String)

    stream key

  • start (String)

    start ID (“-” for beginning, “+” for end)

  • end_id (String)

    end ID (“-” for beginning, “+” for end)

  • options (Hash)

    optional parameters

    • ‘:count => Integer`: maximum number of entries to return

Returns:

  • (Array)

    array of [id, [field, value, …]] entries

See Also:



188
189
190
191
192
193
194
# File 'lib/valkey/commands/stream_commands.rb', line 188

def xrange(key, start, end_id, **options)
  args = [key, start, end_id]
  args << "COUNT" << options[:count].to_s if options[:count]
  send_command(RequestType::X_RANGE, args) do |reply|
    Utils::HashifyStreamEntries.call(reply)
  end
end

#xread(keys, ids, count: nil, block: nil) ⇒ Hash

Read entries from one or more streams.

Examples:

Read from a single stream

valkey.xread(["mystream"], ["0"])
  # => { "mystream" => [["1234567890-0", ["field1", "value1"]]] }

Read with count and block

valkey.xread(["mystream"], ["0"], count: 10, block: 1000)

Parameters:

  • keys (Array<String>)

    stream keys

  • ids (Array<String>)

    last read IDs for each stream

  • options (Hash)

    optional parameters

    • ‘:count => Integer`: maximum number of entries per stream

    • ‘:block => Integer`: block for specified milliseconds (0 = no timeout)

Returns:

  • (Hash)

    hash of stream keys to arrays of entries (empty hash on timeout or no data)

See Also:



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/valkey/commands/stream_commands.rb', line 110

def xread(keys, ids, count: nil, block: nil)
  args = []

  args << "COUNT" << count.to_s if count
  args << "BLOCK" << block.to_s if block
  args << "STREAMS"
  args.concat(Array(keys))
  args.concat(Array(ids))

  send_command(RequestType::X_READ, args) do |reply|
    # Backend returns Array format: [stream_name, entries, stream_name2, entries2, ...]
    # Convert to Hash format first
    if reply.nil?
      {}
    elsif reply.is_a?(Array) && !reply.empty?
      stream_hash = reply.each_slice(2).to_h
      Utils::HashifyStreams.call(stream_hash)
    else
      Utils::HashifyStreams.call(reply)
    end
  end
end

#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: false) ⇒ Hash

Read entries from streams using a consumer group.

Examples:

Read from consumer group

valkey.xreadgroup("mygroup", "consumer1", ["mystream"], [">"])

Parameters:

  • group (String)

    consumer group name

  • consumer (String)

    consumer name

  • keys (Array<String>)

    stream keys

  • ids (Array<String>)

    last read IDs for each stream

  • options (Hash)

    optional parameters

    • ‘:count => Integer`: maximum number of entries per stream

    • ‘:block => Integer`: block for specified milliseconds (0 = no timeout)

    • ‘:noack => true`: do not add messages to pending list

Returns:

  • (Hash)

    hash of stream keys to arrays of entries (empty hash on timeout or no data)

See Also:



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/valkey/commands/stream_commands.rb', line 149

def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: false)
  args = ["GROUP", group, consumer]

  args << "COUNT" << count.to_s if count
  args << "BLOCK" << block.to_s if block
  args << "NOACK" if noack
  args << "STREAMS"
  args.concat(Array(keys))
  args.concat(Array(ids))

  send_command(RequestType::X_READ_GROUP, args) do |reply|
    # Backend returns Array format: [stream_name, entries, stream_name2, entries2, ...]
    # Convert to Hash format first
    if reply.nil?
      {}
    elsif reply.is_a?(Array) && !reply.empty?
      stream_hash = reply.each_slice(2).to_h
      Utils::HashifyStreams.call(stream_hash)
    else
      Utils::HashifyStreams.call(reply)
    end
  end
end

#xrevrange(key, end_id = "+", start = "-", count: nil) ⇒ Array

Get entries from a stream within a range of IDs in reverse order.

Examples:

Get last 10 entries

valkey.xrevrange("mystream", "+", "-", count: 10)

Parameters:

  • key (String)

    stream key

  • end_id (String) (defaults to: "+")

    end ID (“+” for end, “-” for beginning) - higher bound

  • start (String) (defaults to: "-")

    start ID (“-” for beginning, “+” for end) - lower bound

  • options (Hash)

    optional parameters

    • ‘:count => Integer`: maximum number of entries to return

Returns:

  • (Array)

    array of [id, [field, value, …]] entries in reverse order

See Also:



209
210
211
212
213
214
215
# File 'lib/valkey/commands/stream_commands.rb', line 209

def xrevrange(key, end_id = "+", start = "-", count: nil)
  args = [key, end_id, start]
  args << "COUNT" << count.to_s if count
  send_command(RequestType::X_REV_RANGE, args) do |reply|
    Utils::HashifyStreamEntries.call(reply)
  end
end

#xtrim(key, maxlen, strategy: "MAXLEN", approximate: true) ⇒ Integer

Trim a stream to a maximum length.

Examples:

Trim to maximum length

valkey.xtrim("mystream", 1000)

Trim with exact count

valkey.xtrim("mystream", 1000, approximate: false)

Parameters:

  • key (String)

    stream key

  • maxlen (Integer)

    maximum length of the stream

  • options (Hash)

    trimming options

    • ‘:strategy => String`: trimming strategy (default: “MAXLEN”)

    • ‘:approximate => true`: use approximate trimming (default: true)

Returns:

  • (Integer)

    number of entries removed

See Also:



232
233
234
235
236
237
# File 'lib/valkey/commands/stream_commands.rb', line 232

def xtrim(key, maxlen, strategy: "MAXLEN", approximate: true)
  args = [key, strategy]
  args << "~" if approximate
  args << maxlen.to_s
  send_command(RequestType::X_TRIM, args)
end