Module: Fosm::TransitionBuffer
- Defined in:
- lib/fosm/transition_buffer.rb
Overview
In-memory buffer for high-throughput transition log writes. Used when config.transition_log_strategy = :buffered.
Entries accumulate in a thread-safe Queue and are bulk-INSERTed every FLUSH_INTERVAL seconds by a background thread started at boot.
Trade-offs vs :async:
Pro: Sub-millisecond fire! latency (no job enqueue overhead)
Pro: Fewer DB round-trips (bulk INSERT vs N individual INSERTs)
Con: Up to FLUSH_INTERVAL seconds of log delay
Con: Unflushed entries are lost if the process crashes
To activate, set config.transition_log_strategy = :buffered in fosm.rb. The flusher thread is started automatically by the engine initializer.
Constant Summary collapse
- BUFFER =
Queue.new
- FLUSH_INTERVAL =
seconds
1
Class Method Summary collapse
-
.flush ⇒ Object
Drain the buffer and bulk-INSERT all pending entries.
- .pending_count ⇒ Object
- .push(entry) ⇒ Object
-
.start_flusher! ⇒ Object
Starts the background flusher thread.
Class Method Details
.flush ⇒ Object
Drain the buffer and bulk-INSERT all pending entries. Safe to call manually (e.g. in tests or before process exit).
39 40 41 42 43 44 45 46 47 |
# File 'lib/fosm/transition_buffer.rb', line 39 def self.flush entries = [] entries << BUFFER.pop(true) while !BUFFER.empty? rescue nil return if entries.empty? now = Time.current rows = entries.map { |e| e.merge("created_at" => now) } Fosm::TransitionLog.insert_all(rows) end |
.pending_count ⇒ Object
49 50 51 |
# File 'lib/fosm/transition_buffer.rb', line 49 def self.pending_count BUFFER.size end |
.push(entry) ⇒ Object
20 21 22 |
# File 'lib/fosm/transition_buffer.rb', line 20 def self.push(entry) BUFFER << entry end |
.start_flusher! ⇒ Object
Starts the background flusher thread. Called once by the engine after Rails initializes (if strategy is :buffered).
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/fosm/transition_buffer.rb', line 26 def self.start_flusher! Thread.new do loop do sleep FLUSH_INTERVAL flush rescue => e ::Rails.logger.error("[FOSM] TransitionBuffer flush error: #{e.}") end end end |