Class: ZZQ::Persistence::PStore

Inherits:
Interface show all
Defined in:
lib/zzq/persistence/pstore.rb

Overview

PStore-backed durable persistence. Single-process deployments only — PStore uses file locking but doesn’t coordinate well across processes.

‘ultra_safe` is enabled by default: each commit does a full fsync of the data file AND its parent directory, surviving power loss (not just clean crashes). Fits a broker where “I told the publisher QoS 1 ack” must survive a kernel panic.

Messages are serialized via Message#to_wire, so the on-disk format is the PUBLISH wire encoding — self-describing and version-tagged.

Constant Summary collapse

DEFAULT_FILENAME =
"retained.pstore"

Instance Method Summary collapse

Constructor Details

#initialize(data_dir:, filename: DEFAULT_FILENAME, ultra_safe: true) ⇒ PStore

Returns a new instance of PStore.



27
28
29
30
31
32
# File 'lib/zzq/persistence/pstore.rb', line 27

def initialize(data_dir:, filename: DEFAULT_FILENAME, ultra_safe: true)
  super()
  FileUtils.mkdir_p(data_dir)
  @store = ::PStore.new(File.join(data_dir, filename))
  @store.ultra_safe = ultra_safe
end

Instance Method Details

#clear_retainedObject



61
62
63
# File 'lib/zzq/persistence/pstore.rb', line 61

def clear_retained
  @store.transaction { @store[:retained] = {} }
end

#delete_retained(topic) ⇒ Object



54
55
56
57
58
# File 'lib/zzq/persistence/pstore.rb', line 54

def delete_retained(topic)
  @store.transaction do
    (@store[:retained] || {}).delete(topic)
  end
end

#load_retainedObject



35
36
37
38
39
40
41
42
# File 'lib/zzq/persistence/pstore.rb', line 35

def load_retained
  return enum_for(:load_retained) unless block_given?
  @store.transaction(true) do
    (@store[:retained] || {}).each_value do |bytes|
      yield ZZQ::Message.from_wire(bytes)
    end
  end
end

#save_retained(message) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/zzq/persistence/pstore.rb', line 45

def save_retained(message)
  bytes = message.to_wire
  @store.transaction do
    @store[:retained] ||= {}
    @store[:retained][message.topic] = bytes
  end
end