Class: Honker::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/honker/stream.rb

Overview

An append-only ordered log. Publish writes an offset-stamped event; consumers resume from a saved offset. Mirrors the typed API in the Rust binding.

s = db.stream("orders")
off = s.publish({ id: 1 })
events = s.read_since(0, 100)
s.save_offset("billing", events.last.offset)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, topic) ⇒ Stream

Returns a new instance of Stream.



17
18
19
20
# File 'lib/honker/stream.rb', line 17

def initialize(db, topic)
  @db = db
  @topic = topic
end

Instance Attribute Details

#topicObject (readonly)

Returns the value of attribute topic.



15
16
17
# File 'lib/honker/stream.rb', line 15

def topic
  @topic
end

Instance Method Details

#get_offset(consumer) ⇒ Object

Current saved offset for ‘consumer`, or 0 if never saved.



79
80
81
82
83
84
# File 'lib/honker/stream.rb', line 79

def get_offset(consumer)
  @db.db.get_first_row(
    "SELECT honker_stream_get_offset(?, ?)",
    [consumer, @topic],
  )[0]
end

#publish(payload) ⇒ Object

Publish an event. Returns the assigned offset.



23
24
25
# File 'lib/honker/stream.rb', line 23

def publish(payload)
  publish_with_key_opt(nil, payload)
end

#publish_tx(tx, payload) ⇒ Object

Publish inside an open transaction. The event is visible to readers only after the transaction commits.



35
36
37
38
39
40
41
# File 'lib/honker/stream.rb', line 35

def publish_tx(tx, payload)
  row = tx.query_row(
    "SELECT honker_stream_publish(?, NULL, ?)",
    [@topic, JSON.dump(payload)],
  )
  row[0]
end

#publish_with_key(key, payload) ⇒ Object

Publish with a partition key — used by downstream consumers that want per-key ordering.



29
30
31
# File 'lib/honker/stream.rb', line 29

def publish_with_key(key, payload)
  publish_with_key_opt(key, payload)
end

#read_from_consumer(consumer, limit) ⇒ Object

Read events newer than this consumer’s saved offset. Does NOT advance the offset — call ‘save_offset` after processing.



55
56
57
# File 'lib/honker/stream.rb', line 55

def read_from_consumer(consumer, limit)
  read_since(get_offset(consumer), limit)
end

#read_since(offset, limit) ⇒ Object

Read events with offset > ‘offset`, up to `limit`. Returns an array of StreamEvent.



45
46
47
48
49
50
51
# File 'lib/honker/stream.rb', line 45

def read_since(offset, limit)
  rows_json = @db.db.get_first_row(
    "SELECT honker_stream_read_since(?, ?, ?)",
    [@topic, offset, limit],
  )[0]
  JSON.parse(rows_json).map { |r| StreamEvent.from_row(r) }
end

#save_offset(consumer, offset) ⇒ Object

Save a consumer’s offset. Monotonic: saving a lower offset is ignored by the extension and returns false.



61
62
63
64
65
66
# File 'lib/honker/stream.rb', line 61

def save_offset(consumer, offset)
  @db.db.get_first_row(
    "SELECT honker_stream_save_offset(?, ?, ?)",
    [consumer, @topic, offset],
  )[0] == 1
end

#save_offset_tx(tx, consumer, offset) ⇒ Object

Save offset inside an open transaction. Gives you exactly-once semantics relative to whatever else ran on the same tx.



70
71
72
73
74
75
76
# File 'lib/honker/stream.rb', line 70

def save_offset_tx(tx, consumer, offset)
  row = tx.query_row(
    "SELECT honker_stream_save_offset(?, ?, ?)",
    [consumer, @topic, offset],
  )
  row[0] == 1
end