Module: Deimos::PhobosConfig

Extended by:
ActiveSupport::Concern
Included in:
FigTree::ConfigStruct
Defined in:
lib/deimos/config/phobos_config.rb

Overview

Module to handle phobos.yml as well as outputting the configuration to save to Phobos itself.

Instance Method Summary collapse

Instance Method Details

#phobos_configHash

Create a hash representing the config that Phobos expects.

Returns:

  • (Hash)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/deimos/config/phobos_config.rb', line 31

def phobos_config
  p_config = {
    logger: Logger.new(STDOUT),
    custom_logger: self.phobos_logger,
    custom_kafka_logger: self.kafka.logger,
    kafka: {
      client_id: self.kafka.client_id,
      connect_timeout: self.kafka.connect_timeout,
      socket_timeout: self.kafka.socket_timeout,
      ssl_verify_hostname: self.kafka.ssl.verify_hostname,
      ssl_ca_certs_from_system: self.kafka.ssl.ca_certs_from_system,
      seed_brokers: Array.wrap(self.kafka.seed_brokers)
    },
    producer: {
      ack_timeout: self.producers.ack_timeout,
      required_acks: self.producers.required_acks,
      max_retries: self.producers.max_retries,
      retry_backoff: self.producers.retry_backoff,
      max_buffer_size: self.producers.max_buffer_size,
      max_buffer_bytesize: self.producers.max_buffer_bytesize,
      compression_codec: self.producers.compression_codec,
      compression_threshold: self.producers.compression_threshold,
      max_queue_size: self.producers.max_queue_size,
      delivery_threshold: self.producers.delivery_threshold,
      delivery_interval: self.producers.delivery_interval
    },
    consumer: {
      session_timeout: self.consumers.session_timeout,
      offset_commit_interval: self.consumers.offset_commit_interval,
      offset_commit_threshold: self.consumers.offset_commit_threshold,
      heartbeat_interval: self.consumers.heartbeat_interval
    },
    backoff: _backoff(self.consumers.backoff.to_a)
  }

  p_config[:listeners] = self.consumer_objects.map do |consumer|
    next nil if consumer.disabled

    hash = consumer.to_h.reject do |k, _|
      %i(class_name schema namespace key_config backoff disabled).include?(k)
    end
    hash = hash.map { |k, v| [k, v.is_a?(Symbol) ? v.to_s : v] }.to_h
    hash[:handler] = consumer.class_name
    if consumer.backoff
      hash[:backoff] = _backoff(consumer.backoff.to_a)
    end
    hash
  end
  p_config[:listeners].compact!

  if self.kafka.ssl.enabled
    %w(ca_cert client_cert client_cert_key).each do |key|
      next if self.kafka.ssl.send(key).blank?

      p_config[:kafka]["ssl_#{key}".to_sym] = ssl_var_contents(self.kafka.ssl.send(key))
    end
  end

  if self.kafka.sasl.enabled
    p_config[:kafka][:sasl_over_ssl] = self.kafka.sasl.enforce_ssl
    %w(
      gssapi_principal
      gssapi_keytab
      plain_authzid
      plain_username
      plain_password
      scram_username
      scram_password
      scram_mechanism
      oauth_token_provider
    ).each do |key|
      value = self.kafka.sasl.send(key)
      next if value.blank?

      p_config[:kafka]["sasl_#{key}".to_sym] = value
    end
  end
  p_config
end

#phobos_config_file=(file) ⇒ Object

Legacy method to parse Phobos config file



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/deimos/config/phobos_config.rb', line 118

def phobos_config_file=(file)
  pconfig = YAML.load(ERB.new(File.read(File.expand_path(file))).result). # rubocop:disable Security/YAMLLoad
    with_indifferent_access
  self.logger&.warn('phobos.yml is deprecated - use direct configuration instead.')
  pconfig[:kafka].each do |k, v|
    if k.starts_with?('ssl')
      k = k.sub('ssl_', '')
      self.kafka.ssl.send("#{k}=", v)
    elsif k.starts_with?('sasl')
      k = (k == 'sasl_over_ssl') ? 'enforce_ssl' : k.sub('sasl_', '')
      self.kafka.sasl.send("#{k}=", v)
    else
      self.kafka.send("#{k}=", v)
    end
  end
  pconfig[:producer].each do |k, v|
    self.producers.send("#{k}=", v)
  end
  pconfig[:consumer].each do |k, v|
    self.consumers.send("#{k}=", v)
  end
  self.consumers.backoff = pconfig[:backoff][:min_ms]..pconfig[:backoff][:max_ms]
  pconfig[:listeners].each do |listener_hash|
    self.consumer do
      listener_hash.each do |k, v|
        k = 'class_name' if k == 'handler'
        send(k, v)
      end
    end
  end
end

#reset!Object

:nodoc:



24
25
26
27
# File 'lib/deimos/config/phobos_config.rb', line 24

def reset!
  super
  Phobos.configure(self.phobos_config)
end

#ssl_var_contents(key) ⇒ String

Parameters:

  • key (String)

Returns:

  • (String)


113
114
115
# File 'lib/deimos/config/phobos_config.rb', line 113

def ssl_var_contents(key)
  File.exist?(key) ? File.read(key) : key
end

#to_hHash

Returns:

  • (Hash)


12
13
14
15
16
17
18
19
20
21
# File 'lib/deimos/config/phobos_config.rb', line 12

def to_h
  (FIELDS + [:handler]).map { |f|
    val = self.send(f)
    if f == :backoff && val
      [:backoff, _backoff(val)]
    elsif val.present?
      [f, val]
    end
  }.to_h
end