Rivulet

A small stream with a bit of state flowing through it.

Rivulet gives Ruby a vocabulary for sliding window operations — the grow-shrink-emit pattern that Enumerable never named. It sits between each_cons (fixed windows, no state) and hand-rolled pointer loops (flexible but noisy).

Near-zero allocations on the hot path. 200k items, single-digit object count.

Installation

gem "rivulet"

Usage

Moving average

Rivulet.sum(latencies).windows(3) { |w| w.average }
# => [20.0, 30.0, 40.0]

Batch records under a byte budget

Rivulet.sum(records, &:bytesize)
  .max_size_while { |w| w.sum <= 1024 }
# => 12  (max records that fit)

Minimum subarray meeting a target

Rivulet.sum(nums).min_size_where { |w| w.sum >= target }
# => 2  (smallest window with sum >= target)

Longest non-repeating run

Rivulet.count(events)
  .max_window { |w| !w.repeats? }
  .max_by { |w| w.size }
# => 5

Sliding window maximum (O(n) via monotonic deque)

Rivulet.minmax(prices).windows(20) { |w| w.max }
# => [103, 105, 105, ...]

Top-k windows

Rivulet.sum(data).windows(5).max_by(3) { |w| w.average }
# => [98.2, 95.1, 93.7]

Minimum window containing all target characters

target = Hash.new(0)
t.each_char { |c| target[c] += 1 }

Rivulet.count(s.chars)
  .min_window { |w| w.covers?(target) }
  .min_by { |w| w.size }
# => 4  (smallest window covering all of t)

Early termination

Rivulet.sum(data)
  .max_window { |w| w.sum <= budget }
  .first { |w| w.size }
# => 1  (first valid window's size)

Window Types

Entry point Tracks Window methods
Rivulet.sum(source) Running total sum, average, size
`Rivulet.sum(source) { \ item\ ... }`
Rivulet.count(source) Item frequencies distinct, repeats?, max_count, covers?, sum, counts, size
Rivulet.minmax(source) Rolling min/max min, max, range, size
Rivulet.stats(source) All of the above sum, average, min, max, range, distinct, repeats?, max_count, covers?, counts, size

API

Building a window stream

stream = Rivulet.sum(data)        # or .count, .minmax
stream = Rivulet.sum(data) { |item| item.bytesize }  # with mapper

Fixed-size windows

stream.windows(n)                 # => WindowBuilder
stream.windows(n) { |w| ... }    # => Array (filter_map semantics)

Variable-size windows (grow/shrink)

stream.max_window { |w| rule }      # => WindowBuilder (maximize: shrink when invalid)
stream.min_window { |w| goal }  # => WindowBuilder (minimize: shrink while still valid)

max_window grows the window and evicts from the front when the rule fails — use it to find the largest window under a constraint.

min_window grows the window and shrinks from the front while the goal holds — use it to find the smallest window meeting a goal.

Terminal methods on WindowBuilder

All terminals yield the live window — no snapshots allocated.

builder.each_window { |w| ... }   # filter_map: collect non-nil block results
builder.max_by { |w| ... }        # single best score
builder.max_by(k) { |w| ... }    # top-k scores (descending)
builder.min_by { |w| ... }        # single smallest score
builder.min_by(k) { |w| ... }    # bottom-k scores (ascending)
builder.first { |w| ... }         # first non-nil result
builder.first(k) { |w| ... }     # first k non-nil results
builder.take(k) { |w| ... }      # alias for first(k)
builder.count                     # number of valid windows

Reducers (single-value shortcuts)

stream.max_size_while { |w| rule }     # largest window size under constraint
stream.max_sum_while { |w| rule }      # largest sum under constraint (sum only)
stream.min_size_where { |w| goal }     # smallest window size meeting goal (sum only)
stream.max_distinct_while { |w| rule } # most distinct items (count only)
stream.max_range_while { |w| rule }    # largest range (minmax only)

Performance

Rivulet's builder path allocates near-zero objects regardless of input size:

200k items, budget 500:
  max_window.max_by { size }       39 objects
  max_size_while (reducer)          13 objects
  windows(5) { average }            18 objects

The minmax window uses monotonic deques for O(1) amortized min/max per step. All window types maintain O(1) incremental state updates.

Requirements

  • Ruby >= 3.2

License

MIT