Module: Legion::Transport::Spool
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/spool.rb
Class Attribute Summary collapse
-
.max_file_bytes ⇒ Object
readonly
Returns the value of attribute max_file_bytes.
Class Method Summary collapse
- .count ⇒ Object
- .drain ⇒ Object
- .evict_stale ⇒ Object
- .reset! ⇒ Object
- .setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object
- .write(exchange:, routing_key:, payload:) ⇒ Object
Class Attribute Details
.max_file_bytes ⇒ Object (readonly)
Returns the value of attribute max_file_bytes.
105 106 107 |
# File 'lib/legion/transport/spool.rb', line 105 def max_file_bytes @max_file_bytes end |
Class Method Details
.count ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/legion/transport/spool.rb', line 68 def count setup unless @directory sorted_files.sum do |file| File.readlines(file).count { |l| !l.strip.empty? } rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.spool.count', file: file) 0 end end |
.drain ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/legion/transport/spool.rb', line 51 def drain setup unless @directory sorted_files.each do |file| lines = File.readlines(file).map(&:strip).reject(&:empty?) log.info "Draining spool file=#{file} messages=#{lines.size}" lines.each do |line| msg = Legion::JSON.load(line) yield(msg) end File.delete(file) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.spool.drain', file: file) break end end |
.evict_stale ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/legion/transport/spool.rb', line 79 def evict_stale setup unless @directory cutoff = Time.now - @max_age_seconds sorted_files.each do |file| next unless File.mtime(file) < cutoff File.delete(file) log.info "Evicted stale spool file=#{file}" rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.spool.evict_stale', file: file) nil end end |
.reset! ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/legion/transport/spool.rb', line 94 def reset! log.info 'Spool reset' @directory = nil @current_file = nil @mutex = nil @max_file_bytes = nil @max_total_bytes = nil @max_files = nil @max_age_seconds = nil end |
.setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/transport/spool.rb', line 13 def setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) @directory = directory || File.('~/.legionio/spool') @max_file_bytes = max_file_bytes @max_total_bytes = max_total_bytes @max_files = max_files @max_age_seconds = max_age_seconds @current_file = nil @mutex = Mutex.new FileUtils.mkdir_p(@directory) log.info "Spool initialized directory=#{@directory} max_files=#{@max_files} " \ "max_total_bytes=#{@max_total_bytes} max_file_bytes=#{@max_file_bytes}" end |
.write(exchange:, routing_key:, payload:) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/legion/transport/spool.rb', line 28 def write(exchange:, routing_key:, payload:) setup unless @directory @mutex.synchronize do evict_oldest if over_limits? line = Legion::JSON.dump({ exchange: exchange, routing_key: routing_key, payload: payload, spooled_at: Time.now.iso8601 }) file = current_file File.open(file, 'a') { |f| f.puts(line) } rotate_if_needed end rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.spool.write', directory: @directory, exchange: exchange, routing_key: routing_key) end |