Class: Honker::Stream
- Inherits:
-
Object
- Object
- Honker::Stream
- Defined in:
- lib/honker/stream.rb
Overview
An append-only ordered log. Publish writes an offset-stamped event; consumers resume from a saved offset. Mirrors the typed API in the Rust binding.
s = db.stream("orders")
off = s.publish({ id: 1 })
events = s.read_since(0, 100)
s.save_offset("billing", events.last.offset)
Instance Attribute Summary collapse
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#get_offset(consumer) ⇒ Object
Current saved offset for ‘consumer`, or 0 if never saved.
-
#initialize(db, topic) ⇒ Stream
constructor
A new instance of Stream.
-
#publish(payload) ⇒ Object
Publish an event.
-
#publish_tx(tx, payload) ⇒ Object
Publish inside an open transaction.
-
#publish_with_key(key, payload) ⇒ Object
Publish with a partition key — used by downstream consumers that want per-key ordering.
-
#read_from_consumer(consumer, limit) ⇒ Object
Read events newer than this consumer’s saved offset.
-
#read_since(offset, limit) ⇒ Object
Read events with offset > ‘offset`, up to `limit`.
-
#save_offset(consumer, offset) ⇒ Object
Save a consumer’s offset.
-
#save_offset_tx(tx, consumer, offset) ⇒ Object
Save offset inside an open transaction.
Constructor Details
#initialize(db, topic) ⇒ Stream
Returns a new instance of Stream.
17 18 19 20 |
# File 'lib/honker/stream.rb', line 17 def initialize(db, topic) @db = db @topic = topic end |
Instance Attribute Details
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
15 16 17 |
# File 'lib/honker/stream.rb', line 15 def topic @topic end |
Instance Method Details
#get_offset(consumer) ⇒ Object
Current saved offset for ‘consumer`, or 0 if never saved.
79 80 81 82 83 84 |
# File 'lib/honker/stream.rb', line 79 def get_offset(consumer) @db.db.get_first_row( "SELECT honker_stream_get_offset(?, ?)", [consumer, @topic], )[0] end |
#publish(payload) ⇒ Object
Publish an event. Returns the assigned offset.
23 24 25 |
# File 'lib/honker/stream.rb', line 23 def publish(payload) publish_with_key_opt(nil, payload) end |
#publish_tx(tx, payload) ⇒ Object
Publish inside an open transaction. The event is visible to readers only after the transaction commits.
35 36 37 38 39 40 41 |
# File 'lib/honker/stream.rb', line 35 def publish_tx(tx, payload) row = tx.query_row( "SELECT honker_stream_publish(?, NULL, ?)", [@topic, JSON.dump(payload)], ) row[0] end |
#publish_with_key(key, payload) ⇒ Object
Publish with a partition key — used by downstream consumers that want per-key ordering.
29 30 31 |
# File 'lib/honker/stream.rb', line 29 def publish_with_key(key, payload) publish_with_key_opt(key, payload) end |
#read_from_consumer(consumer, limit) ⇒ Object
Read events newer than this consumer’s saved offset. Does NOT advance the offset — call ‘save_offset` after processing.
55 56 57 |
# File 'lib/honker/stream.rb', line 55 def read_from_consumer(consumer, limit) read_since(get_offset(consumer), limit) end |
#read_since(offset, limit) ⇒ Object
Read events with offset > ‘offset`, up to `limit`. Returns an array of StreamEvent.
45 46 47 48 49 50 51 |
# File 'lib/honker/stream.rb', line 45 def read_since(offset, limit) rows_json = @db.db.get_first_row( "SELECT honker_stream_read_since(?, ?, ?)", [@topic, offset, limit], )[0] JSON.parse(rows_json).map { |r| StreamEvent.from_row(r) } end |
#save_offset(consumer, offset) ⇒ Object
Save a consumer’s offset. Monotonic: saving a lower offset is ignored by the extension and returns false.
61 62 63 64 65 66 |
# File 'lib/honker/stream.rb', line 61 def save_offset(consumer, offset) @db.db.get_first_row( "SELECT honker_stream_save_offset(?, ?, ?)", [consumer, @topic, offset], )[0] == 1 end |
#save_offset_tx(tx, consumer, offset) ⇒ Object
Save offset inside an open transaction. Gives you exactly-once semantics relative to whatever else ran on the same tx.
70 71 72 73 74 75 76 |
# File 'lib/honker/stream.rb', line 70 def save_offset_tx(tx, consumer, offset) row = tx.query_row( "SELECT honker_stream_save_offset(?, ?, ?)", [consumer, @topic, offset], ) row[0] == 1 end |