Module: Deimos::PhobosConfig

Extended by:
ActiveSupport::Concern
Included in:
FigTree::ConfigStruct
Defined in:
lib/deimos/config/phobos_config.rb

Overview

Module to handle phobos.yml as well as outputting the configuration to save to Phobos itself.

Instance Method Summary collapse

Instance Method Details

#phobos_configHash

Create a hash representing the config that Phobos expects.

Returns:

  • (Hash)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/deimos/config/phobos_config.rb', line 31

def phobos_config
  p_config = {
    logger: Logger.new(STDOUT),
    custom_logger: self.phobos_logger,
    custom_kafka_logger: self.kafka.logger,
    kafka: {
      client_id: self.kafka.client_id,
      connect_timeout: self.kafka.connect_timeout,
      socket_timeout: self.kafka.socket_timeout,
      ssl_verify_hostname: self.kafka.ssl.verify_hostname,
      seed_brokers: Array.wrap(self.kafka.seed_brokers)
    },
    producer: {
      ack_timeout: self.producers.ack_timeout,
      required_acks: self.producers.required_acks,
      max_retries: self.producers.max_retries,
      retry_backoff: self.producers.retry_backoff,
      max_buffer_size: self.producers.max_buffer_size,
      max_buffer_bytesize: self.producers.max_buffer_bytesize,
      compression_codec: self.producers.compression_codec,
      compression_threshold: self.producers.compression_threshold,
      max_queue_size: self.producers.max_queue_size,
      delivery_threshold: self.producers.delivery_threshold,
      delivery_interval: self.producers.delivery_interval
    },
    consumer: {
      session_timeout: self.consumers.session_timeout,
      offset_commit_interval: self.consumers.offset_commit_interval,
      offset_commit_threshold: self.consumers.offset_commit_threshold,
      heartbeat_interval: self.consumers.heartbeat_interval
    },
    backoff: _backoff(self.consumers.backoff.to_a)
  }

  p_config[:listeners] = self.consumer_objects.map do |consumer|
    next nil if consumer.disabled

    hash = consumer.to_h.reject do |k, _|
      %i(class_name schema namespace key_config backoff disabled).include?(k)
    end
    hash = hash.map { |k, v| [k, v.is_a?(Symbol) ? v.to_s : v] }.to_h
    hash[:handler] = consumer.class_name
    if consumer.backoff
      hash[:backoff] = _backoff(consumer.backoff.to_a)
    end
    hash
  end
  p_config[:listeners].compact!

  if self.kafka.ssl.enabled
    %w(ca_cert client_cert client_cert_key).each do |key|
      next if self.kafka.ssl.send(key).blank?

      p_config[:kafka]["ssl_#{key}".to_sym] = ssl_var_contents(self.kafka.ssl.send(key))
    end
  end
  p_config
end

#phobos_config_file=(file) ⇒ Object

Legacy method to parse Phobos config file



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/deimos/config/phobos_config.rb', line 97

def phobos_config_file=(file)
  pconfig = YAML.load(ERB.new(File.read(File.expand_path(file))).result). # rubocop:disable Security/YAMLLoad
    with_indifferent_access
  self.logger&.warn('phobos.yml is deprecated - use direct configuration instead.')
  pconfig[:kafka].each do |k, v|
    if k.starts_with?('ssl')
      k = k.sub('ssl_', '')
      self.kafka.ssl.send("#{k}=", v)
    else
      self.kafka.send("#{k}=", v)
    end
  end
  pconfig[:producer].each do |k, v|
    self.producers.send("#{k}=", v)
  end
  pconfig[:consumer].each do |k, v|
    self.consumers.send("#{k}=", v)
  end
  self.consumers.backoff = pconfig[:backoff][:min_ms]..pconfig[:backoff][:max_ms]
  pconfig[:listeners].each do |listener_hash|
    self.consumer do
      listener_hash.each do |k, v|
        k = 'class_name' if k == 'handler'
        send(k, v)
      end
    end
  end
end

#reset!Object

:nodoc:



24
25
26
27
# File 'lib/deimos/config/phobos_config.rb', line 24

def reset!
  super
  Phobos.configure(self.phobos_config)
end

#ssl_var_contents(key) ⇒ String

Parameters:

  • key (String)

Returns:

  • (String)


92
93
94
# File 'lib/deimos/config/phobos_config.rb', line 92

def ssl_var_contents(key)
  File.exist?(key) ? File.read(key) : key
end

#to_hHash

Returns:

  • (Hash)


12
13
14
15
16
17
18
19
20
21
# File 'lib/deimos/config/phobos_config.rb', line 12

def to_h
  (FIELDS + [:handler]).map { |f|
    val = self.send(f)
    if f == :backoff && val
      [:backoff, _backoff(val)]
    elsif val.present?
      [f, val]
    end
  }.to_h
end