Class: ZZQ::Persistence::PStore
- Defined in:
- lib/zzq/persistence/pstore.rb
Overview
PStore-backed durable persistence. Single-process deployments only — PStore uses file locking but doesn’t coordinate well across processes.
‘ultra_safe` is enabled by default: each commit does a full fsync of the data file AND its parent directory, surviving power loss (not just clean crashes). Fits a broker where “I told the publisher QoS 1 ack” must survive a kernel panic.
Messages are serialized via Message#to_wire, so the on-disk format is the PUBLISH wire encoding — self-describing and version-tagged.
Constant Summary collapse
- DEFAULT_FILENAME =
"retained.pstore"
Instance Method Summary collapse
- #clear_retained ⇒ Object
- #delete_retained(topic) ⇒ Object
-
#initialize(data_dir:, filename: DEFAULT_FILENAME, ultra_safe: true) ⇒ PStore
constructor
A new instance of PStore.
- #load_retained ⇒ Object
- #save_retained(message) ⇒ Object
Constructor Details
#initialize(data_dir:, filename: DEFAULT_FILENAME, ultra_safe: true) ⇒ PStore
Returns a new instance of PStore.
27 28 29 30 31 32 |
# File 'lib/zzq/persistence/pstore.rb', line 27 def initialize(data_dir:, filename: DEFAULT_FILENAME, ultra_safe: true) super() FileUtils.mkdir_p(data_dir) @store = ::PStore.new(File.join(data_dir, filename)) @store.ultra_safe = ultra_safe end |
Instance Method Details
#clear_retained ⇒ Object
61 62 63 |
# File 'lib/zzq/persistence/pstore.rb', line 61 def clear_retained @store.transaction { @store[:retained] = {} } end |
#delete_retained(topic) ⇒ Object
54 55 56 57 58 |
# File 'lib/zzq/persistence/pstore.rb', line 54 def delete_retained(topic) @store.transaction do (@store[:retained] || {}).delete(topic) end end |
#load_retained ⇒ Object
35 36 37 38 39 40 41 42 |
# File 'lib/zzq/persistence/pstore.rb', line 35 def load_retained return enum_for(:load_retained) unless block_given? @store.transaction(true) do (@store[:retained] || {}).each_value do |bytes| yield ZZQ::Message.from_wire(bytes) end end end |
#save_retained(message) ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/zzq/persistence/pstore.rb', line 45 def save_retained() bytes = .to_wire @store.transaction do @store[:retained] ||= {} @store[:retained][.topic] = bytes end end |