Module: Legion::Transport::Spool
- Defined in:
- lib/legion/transport/spool.rb
Class Attribute Summary collapse
-
.max_file_bytes ⇒ Object
readonly
Returns the value of attribute max_file_bytes.
Class Method Summary collapse
- .count ⇒ Object
- .drain ⇒ Object
- .evict_stale ⇒ Object
- .reset! ⇒ Object
- .setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object
- .write(exchange:, routing_key:, payload:) ⇒ Object
Class Attribute Details
.max_file_bytes ⇒ Object (readonly)
Returns the value of attribute max_file_bytes.
94 95 96 |
# File 'lib/legion/transport/spool.rb', line 94 def max_file_bytes @max_file_bytes end |
Class Method Details
.count ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/legion/transport/spool.rb', line 61 def count setup unless @directory sorted_files.sum do |file| File.readlines(file).count { |l| !l.strip.empty? } rescue StandardError => e Legion::Logging.debug("Spool#count file read failed: #{e.}") if defined?(Legion::Logging) 0 end end |
.drain ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/legion/transport/spool.rb', line 45 def drain setup unless @directory sorted_files.each do |file| lines = File.readlines(file).map(&:strip).reject(&:empty?) lines.each do |line| msg = Legion::JSON.load(line) yield(msg) end File.delete(file) rescue StandardError => e Legion::Logging.warn { "Spool drain error on #{file}: #{e.}" } if defined?(Legion::Logging) break end end |
.evict_stale ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/legion/transport/spool.rb', line 72 def evict_stale setup unless @directory cutoff = Time.now - @max_age_seconds sorted_files.each do |file| File.delete(file) if File.mtime(file) < cutoff rescue StandardError => e Legion::Logging.debug("Spool#evict_stale file delete failed: #{e.}") if defined?(Legion::Logging) nil end end |
.reset! ⇒ Object
84 85 86 87 88 89 90 91 92 |
# File 'lib/legion/transport/spool.rb', line 84 def reset! @directory = nil @current_file = nil @mutex = nil @max_file_bytes = nil @max_total_bytes = nil @max_files = nil @max_age_seconds = nil end |
.setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/legion/transport/spool.rb', line 10 def setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) @directory = directory || File.('~/.legionio/spool') @max_file_bytes = max_file_bytes @max_total_bytes = max_total_bytes @max_files = max_files @max_age_seconds = max_age_seconds @current_file = nil @mutex = Mutex.new FileUtils.mkdir_p(@directory) end |
.write(exchange:, routing_key:, payload:) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/legion/transport/spool.rb', line 23 def write(exchange:, routing_key:, payload:) setup unless @directory @mutex.synchronize do evict_oldest if over_limits? line = Legion::JSON.dump({ exchange: exchange, routing_key: routing_key, payload: payload, spooled_at: Time.now.iso8601 }) file = current_file File.open(file, 'a') { |f| f.puts(line) } rotate_if_needed end rescue StandardError => e Legion::Logging.warn { "Spool write failed: #{e.}" } if defined?(Legion::Logging) end |