Module: Legion::Transport::Spool

Extended by:
Logging::Helper
Defined in:
lib/legion/transport/spool.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.max_file_bytesObject (readonly)

Returns the value of attribute max_file_bytes.



109
110
111
# File 'lib/legion/transport/spool.rb', line 109

def max_file_bytes
  @max_file_bytes
end

Class Method Details

.countObject



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/legion/transport/spool.rb', line 70

def count
  setup unless @directory

  sorted_files.sum do |file|
    n = 0
    stream_lines(file) { n += 1 }
    n
  rescue StandardError => e
    handle_exception(e, level: :debug, handled: true, operation: 'transport.spool.count', file: file)
    0
  end
end

.drainObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/legion/transport/spool.rb', line 55

def drain(&)
  setup unless @directory

  sorted_files.each do |file|
    messages = []
    stream_lines(file) { |line| messages << Legion::JSON.load(line) }
    log.info "Draining spool file=#{file} messages=#{messages.size}"
    messages.each(&)
    File.delete(file)
  rescue StandardError => e
    handle_exception(e, level: :warn, handled: true, operation: 'transport.spool.drain', file: file)
    break
  end
end

.evict_staleObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/legion/transport/spool.rb', line 83

def evict_stale
  setup unless @directory

  cutoff = Time.now - @max_age_seconds
  sorted_files.each do |file|
    next unless File.mtime(file) < cutoff

    File.delete(file)
    log.info "Evicted stale spool file=#{file}"
  rescue StandardError => e
    handle_exception(e, level: :debug, handled: true, operation: 'transport.spool.evict_stale', file: file)
    nil
  end
end

.reset!Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/legion/transport/spool.rb', line 98

def reset!
  log.info 'Spool reset'
  @directory = nil
  @current_file = nil
  @mutex = nil
  @max_file_bytes = nil
  @max_total_bytes = nil
  @max_files = nil
  @max_age_seconds = nil
end

.setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000, max_files: 100, max_age_seconds: 259_200) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/transport/spool.rb', line 13

def setup(directory: nil, max_file_bytes: 10_485_760, max_total_bytes: 524_288_000,
          max_files: 100, max_age_seconds: 259_200)
  @directory = directory || File.expand_path('~/.legionio/spool')
  @max_file_bytes = max_file_bytes
  @max_total_bytes = max_total_bytes
  @max_files = max_files
  @max_age_seconds = max_age_seconds
  @current_file = nil
  @mutex = Mutex.new

  FileUtils.mkdir_p(@directory)
  log.info "Spool initialized directory=#{@directory} max_files=#{@max_files} " \
           "max_total_bytes=#{@max_total_bytes} max_file_bytes=#{@max_file_bytes}"
end

.write(exchange:, routing_key:, payload:, **envelope_opts) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/legion/transport/spool.rb', line 28

def write(exchange:, routing_key:, payload:, **envelope_opts)
  setup unless @directory

  @mutex.synchronize do
    evict_oldest if over_limits?

    envelope = {
      exchange:    exchange,
      routing_key: routing_key,
      payload:     payload,
      spooled_at:  Time.now.iso8601
    }
    %i[headers priority message_id correlation_id persistent].each do |key|
      envelope[key] = envelope_opts[key] unless envelope_opts[key].nil?
    end

    line = Legion::JSON.dump(envelope)
    file = current_file
    File.open(file, 'a') { |f| f.puts(line) }

    rotate_if_needed
  end
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true, operation: 'transport.spool.write',
                   directory: @directory, exchange: exchange, routing_key: routing_key)
end