Module: Legion::Transport::Spool
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/transport/spool.rb
Class Attribute Summary collapse
-
.max_file_bytes ⇒ Object
readonly
Returns the value of attribute max_file_bytes.
Class Method Summary collapse
- .count ⇒ Object
- .drain ⇒ Object
- .evict_stale ⇒ Object
- .reset! ⇒ Object
- .setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object
- .write(exchange:, routing_key:, payload:, **envelope_opts) ⇒ Object
Class Attribute Details
.max_file_bytes ⇒ Object (readonly)
Returns the value of attribute max_file_bytes.
109 110 111 |
# File 'lib/legion/transport/spool.rb', line 109 def max_file_bytes @max_file_bytes end |
Class Method Details
.count ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/legion/transport/spool.rb', line 70 def count setup unless @directory sorted_files.sum do |file| n = 0 stream_lines(file) { n += 1 } n rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.spool.count', file: file) 0 end end |
.drain ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/legion/transport/spool.rb', line 55 def drain(&) setup unless @directory sorted_files.each do |file| = [] stream_lines(file) { |line| << Legion::JSON.load(line) } log.info "Draining spool file=#{file} messages=#{.size}" .each(&) File.delete(file) rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.spool.drain', file: file) break end end |
.evict_stale ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/legion/transport/spool.rb', line 83 def evict_stale setup unless @directory cutoff = Time.now - @max_age_seconds sorted_files.each do |file| next unless File.mtime(file) < cutoff File.delete(file) log.info "Evicted stale spool file=#{file}" rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'transport.spool.evict_stale', file: file) nil end end |
.reset! ⇒ Object
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/legion/transport/spool.rb', line 98 def reset! log.info 'Spool reset' @directory = nil @current_file = nil @mutex = nil @max_file_bytes = nil @max_total_bytes = nil @max_files = nil @max_age_seconds = nil end |
.setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/legion/transport/spool.rb', line 13 def setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) @directory = directory || File.('~/.legionio/spool') @max_file_bytes = max_file_bytes @max_total_bytes = max_total_bytes @max_files = max_files @max_age_seconds = max_age_seconds @current_file = nil @mutex = Mutex.new FileUtils.mkdir_p(@directory) log.info "Spool initialized directory=#{@directory} max_files=#{@max_files} " \ "max_total_bytes=#{@max_total_bytes} max_file_bytes=#{@max_file_bytes}" end |
.write(exchange:, routing_key:, payload:, **envelope_opts) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/legion/transport/spool.rb', line 28 def write(exchange:, routing_key:, payload:, **envelope_opts) setup unless @directory @mutex.synchronize do evict_oldest if over_limits? envelope = { exchange: exchange, routing_key: routing_key, payload: payload, spooled_at: Time.now.iso8601 } %i[headers priority message_id correlation_id persistent].each do |key| envelope[key] = envelope_opts[key] unless envelope_opts[key].nil? end line = Legion::JSON.dump(envelope) file = current_file File.open(file, 'a') { |f| f.puts(line) } rotate_if_needed end rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'transport.spool.write', directory: @directory, exchange: exchange, routing_key: routing_key) end |