Module: Valkey::Commands::StreamCommands
- Included in:
- Valkey::Commands
- Defined in:
- lib/valkey/commands/stream_commands.rb
Overview
This module contains commands related to Redis Streams.
Instance Method Summary collapse
-
#xack(key, group, *ids) ⇒ Integer
Acknowledge one or more messages in a consumer group.
-
#xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: "*") ⇒ String
Append a new entry to a stream.
-
#xautoclaim(key, group, consumer, min_idle_time, start, **options) ⇒ Hash
Automatically claim pending messages that have been idle for a specified time.
-
#xclaim(key, group, consumer, min_idle_time, ids, **options) ⇒ Array
Claim ownership of pending messages in a consumer group.
-
#xdel(key, *ids) ⇒ Integer
Remove one or more entries from a stream.
-
#xgroup(subcommand, key, group, *args, **options) ⇒ String, Integer
Manage consumer groups (dispatcher method).
-
#xgroup_create(key, group, id, **options) ⇒ String
Create a consumer group for a stream.
-
#xgroup_createconsumer(key, group, consumer) ⇒ Integer
Create a consumer in a consumer group.
-
#xgroup_delconsumer(key, group, consumer) ⇒ Integer
Remove a consumer from a consumer group.
-
#xgroup_destroy(key, group) ⇒ Integer
Destroy a consumer group.
-
#xgroup_setid(key, group, id) ⇒ String
Set the last-delivered ID for a consumer group.
-
#xinfo(subcommand, key, group = nil, **options) ⇒ Hash, Array
Get information about streams, groups, and consumers (dispatcher method).
-
#xinfo_consumers(key, group) ⇒ Array
Get information about consumers in a consumer group.
-
#xinfo_groups(key) ⇒ Array
Get information about consumer groups of a stream.
-
#xinfo_stream(key, **options) ⇒ Array
Get information about a stream.
-
#xlen(key) ⇒ Integer
Get the length of a stream.
-
#xpending(key, group, *args, idle: nil) ⇒ Hash, Array
Get information about pending messages in a consumer group.
-
#xrange(key, start, end_id, **options) ⇒ Array
Get entries from a stream within a range of IDs.
-
#xread(keys, ids, count: nil, block: nil) ⇒ Hash
Read entries from one or more streams.
-
#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: false) ⇒ Hash
Read entries from streams using a consumer group.
-
#xrevrange(key, end_id = "+", start = "-", count: nil) ⇒ Array
Get entries from a stream within a range of IDs in reverse order.
-
#xtrim(key, maxlen, strategy: "MAXLEN", approximate: true) ⇒ Integer
Trim a stream to a maximum length.
Instance Method Details
#xack(key, group, *ids) ⇒ Integer
Acknowledge one or more messages in a consumer group.
403 404 405 406 |
# File 'lib/valkey/commands/stream_commands.rb', line 403 def xack(key, group, *ids) args = [key, group] + Array(ids).flatten send_command(RequestType::X_ACK, args) end |
#xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: "*") ⇒ String
Append a new entry to a stream.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/valkey/commands/stream_commands.rb', line 32 def xadd(key, entry, approximate: nil, maxlen: nil, minid: nil, nomkstream: nil, id: "*") args = [key] # Handle maxlen/minid trimming if maxlen raise ArgumentError, "can't supply both maxlen and minid" if minid args << "MAXLEN" args << "~" if approximate args << maxlen.to_s elsif minid args << "MINID" args << "~" if approximate args << minid end args << "NOMKSTREAM" if nomkstream args << id # Add field-value pairs if entry.is_a?(Hash) entry.each { |k, v| args << k.to_s << v.to_s } else args.concat(Array(entry).flatten) end send_command(RequestType::X_ADD, args) end |
#xautoclaim(key, group, consumer, min_idle_time, start, **options) ⇒ Hash
Automatically claim pending messages that have been idle for a specified time.
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 |
# File 'lib/valkey/commands/stream_commands.rb', line 501 def xautoclaim(key, group, consumer, min_idle_time, start, **) args = [key, group, consumer, min_idle_time.to_s, start] args << "COUNT" << [:count].to_s if [:count] args << "IDLE" << [:idle].to_s if [:idle] args << "TIME" << [:time].to_s if [:time] args << "RETRYCOUNT" << [:retrycount].to_s if [:retrycount] args << "JUSTID" if [:justid] send_command(RequestType::X_AUTO_CLAIM, args) do |reply| return { 'next' => '0-0', 'entries' => [] } if reply.nil? || !reply.is_a?(Array) if [:justid] Utils::HashifyStreamAutoclaimJustId.call(reply) else Utils::HashifyStreamAutoclaim.call(reply) end end end |
#xclaim(key, group, consumer, min_idle_time, ids, **options) ⇒ Array
Claim ownership of pending messages in a consumer group.
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'lib/valkey/commands/stream_commands.rb', line 462 def xclaim(key, group, consumer, min_idle_time, ids, **) args = [key, group, consumer, min_idle_time.to_s] args.concat(Array(ids).flatten) args << "IDLE" << [:idle].to_s if [:idle] args << "TIME" << [:time].to_s if [:time] args << "RETRYCOUNT" << [:retrycount].to_s if [:retrycount] args << "FORCE" if [:force] args << "JUSTID" if [:justid] send_command(RequestType::X_CLAIM, args) do |reply| if [:justid] reply else Utils::HashifyStreamEntries.call(reply) end end end |
#xdel(key, *ids) ⇒ Integer
Remove one or more entries from a stream.
75 76 77 78 |
# File 'lib/valkey/commands/stream_commands.rb', line 75 def xdel(key, *ids) args = [key] + Array(ids).flatten send_command(RequestType::X_DEL, args) end |
#xgroup(subcommand, key, group, *args, **options) ⇒ String, Integer
Manage consumer groups (dispatcher method).
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/valkey/commands/stream_commands.rb', line 261 def xgroup(subcommand, key, group, *args, **) subcommand = subcommand.to_s.downcase case subcommand when "create" xgroup_create(key, group, args[0], **) when "setid" xgroup_setid(key, group, args[0]) when "destroy" xgroup_destroy(key, group) when "createconsumer" xgroup_createconsumer(key, group, args[0]) when "delconsumer" xgroup_delconsumer(key, group, args[0]) else raise ArgumentError, "Unknown XGROUP subcommand: #{subcommand}" end end |
#xgroup_create(key, group, id, **options) ⇒ String
Create a consumer group for a stream.
304 305 306 |
# File 'lib/valkey/commands/stream_commands.rb', line 304 def xgroup_create(key, group, id, **) xgroup_create_impl(key, group, id, **) end |
#xgroup_createconsumer(key, group, consumer) ⇒ Integer
Create a consumer in a consumer group.
320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/valkey/commands/stream_commands.rb', line 320 def xgroup_createconsumer(key, group, consumer) send_command(RequestType::X_GROUP_CREATE_CONSUMER, [key, group, consumer]) do |reply| # Convert boolean to integer if needed (backend may return boolean) if reply.is_a?(TrueClass) 1 elsif reply.is_a?(FalseClass) 0 else reply end end end |
#xgroup_delconsumer(key, group, consumer) ⇒ Integer
Remove a consumer from a consumer group.
384 385 386 |
# File 'lib/valkey/commands/stream_commands.rb', line 384 def xgroup_delconsumer(key, group, consumer) send_command(RequestType::X_GROUP_DEL_CONSUMER, [key, group, consumer]) end |
#xgroup_destroy(key, group) ⇒ Integer
Destroy a consumer group.
359 360 361 362 363 364 365 366 367 368 369 370 |
# File 'lib/valkey/commands/stream_commands.rb', line 359 def xgroup_destroy(key, group) send_command(RequestType::X_GROUP_DESTROY, [key, group]) do |reply| # Convert boolean to integer if needed (backend may return boolean) if reply.is_a?(TrueClass) 1 elsif reply.is_a?(FalseClass) 0 else reply end end end |
#xgroup_setid(key, group, id) ⇒ String
Set the last-delivered ID for a consumer group.
344 345 346 |
# File 'lib/valkey/commands/stream_commands.rb', line 344 def xgroup_setid(key, group, id) send_command(RequestType::X_GROUP_SET_ID, [key, group, id]) end |
#xinfo(subcommand, key, group = nil, **options) ⇒ Hash, Array
Get information about streams, groups, and consumers (dispatcher method).
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 |
# File 'lib/valkey/commands/stream_commands.rb', line 541 def xinfo(subcommand, key, group = nil, **) subcommand = subcommand.to_s.downcase case subcommand when "stream" args = [key] if [:full] args << "FULL" args << "COUNT" << [:count].to_s if [:count] end send_command(RequestType::X_INFO_STREAM, args) when "groups" send_command(RequestType::X_INFO_GROUPS, [key]) when "consumers" raise ArgumentError, "Group name required for XINFO CONSUMERS" unless group send_command(RequestType::X_INFO_CONSUMERS, [key, group]) else raise ArgumentError, "Unknown XINFO subcommand: #{subcommand}" end end |
#xinfo_consumers(key, group) ⇒ Array
Get information about consumers in a consumer group.
606 607 608 |
# File 'lib/valkey/commands/stream_commands.rb', line 606 def xinfo_consumers(key, group) xinfo(:consumers, key, group) end |
#xinfo_groups(key) ⇒ Array
Get information about consumer groups of a stream.
591 592 593 |
# File 'lib/valkey/commands/stream_commands.rb', line 591 def xinfo_groups(key) xinfo(:groups, key) end |
#xinfo_stream(key, **options) ⇒ Array
Get information about a stream.
577 578 579 |
# File 'lib/valkey/commands/stream_commands.rb', line 577 def xinfo_stream(key, **) xinfo(:stream, key, **) end |
#xlen(key) ⇒ Integer
Get the length of a stream.
90 91 92 |
# File 'lib/valkey/commands/stream_commands.rb', line 90 def xlen(key) send_command(RequestType::X_LEN, [key]) end |
#xpending(key, group, *args, idle: nil) ⇒ Hash, Array
Get information about pending messages in a consumer group.
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
# File 'lib/valkey/commands/stream_commands.rb', line 427 def xpending(key, group, *args, idle: nil) cmd_args = [key, group] cmd_args.concat(args) cmd_args << "IDLE" << idle.to_s if idle send_command(RequestType::X_PENDING, cmd_args) do |reply| # If args provided (start, end, count), return detailed format # Otherwise return summary format if args.length >= 2 Utils::HashifyStreamPendingDetails.call(reply) else Utils::HashifyStreamPendings.call(reply) end end end |
#xrange(key, start, end_id, **options) ⇒ Array
Get entries from a stream within a range of IDs.
188 189 190 191 192 193 194 |
# File 'lib/valkey/commands/stream_commands.rb', line 188 def xrange(key, start, end_id, **) args = [key, start, end_id] args << "COUNT" << [:count].to_s if [:count] send_command(RequestType::X_RANGE, args) do |reply| Utils::HashifyStreamEntries.call(reply) end end |
#xread(keys, ids, count: nil, block: nil) ⇒ Hash
Read entries from one or more streams.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/valkey/commands/stream_commands.rb', line 110 def xread(keys, ids, count: nil, block: nil) args = [] args << "COUNT" << count.to_s if count args << "BLOCK" << block.to_s if block args << "STREAMS" args.concat(Array(keys)) args.concat(Array(ids)) send_command(RequestType::X_READ, args) do |reply| # Backend returns Array format: [stream_name, entries, stream_name2, entries2, ...] # Convert to Hash format first if reply.nil? {} elsif reply.is_a?(Array) && !reply.empty? stream_hash = reply.each_slice(2).to_h Utils::HashifyStreams.call(stream_hash) else Utils::HashifyStreams.call(reply) end end end |
#xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: false) ⇒ Hash
Read entries from streams using a consumer group.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/valkey/commands/stream_commands.rb', line 149 def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: false) args = ["GROUP", group, consumer] args << "COUNT" << count.to_s if count args << "BLOCK" << block.to_s if block args << "NOACK" if noack args << "STREAMS" args.concat(Array(keys)) args.concat(Array(ids)) send_command(RequestType::X_READ_GROUP, args) do |reply| # Backend returns Array format: [stream_name, entries, stream_name2, entries2, ...] # Convert to Hash format first if reply.nil? {} elsif reply.is_a?(Array) && !reply.empty? stream_hash = reply.each_slice(2).to_h Utils::HashifyStreams.call(stream_hash) else Utils::HashifyStreams.call(reply) end end end |
#xrevrange(key, end_id = "+", start = "-", count: nil) ⇒ Array
Get entries from a stream within a range of IDs in reverse order.
209 210 211 212 213 214 215 |
# File 'lib/valkey/commands/stream_commands.rb', line 209 def xrevrange(key, end_id = "+", start = "-", count: nil) args = [key, end_id, start] args << "COUNT" << count.to_s if count send_command(RequestType::X_REV_RANGE, args) do |reply| Utils::HashifyStreamEntries.call(reply) end end |
#xtrim(key, maxlen, strategy: "MAXLEN", approximate: true) ⇒ Integer
Trim a stream to a maximum length.
232 233 234 235 236 237 |
# File 'lib/valkey/commands/stream_commands.rb', line 232 def xtrim(key, maxlen, strategy: "MAXLEN", approximate: true) args = [key, strategy] args << "~" if approximate args << maxlen.to_s send_command(RequestType::X_TRIM, args) end |