Module: Deimos::PhobosConfig

Extended by:
ActiveSupport::Concern
Included in:
FigTree::ConfigStruct
Defined in:
lib/deimos/config/phobos_config.rb

Overview

Module to handle phobos.yml as well as outputting the configuration to save to Phobos itself.

Instance Method Summary collapse

Instance Method Details

#phobos_configHash

Create a hash representing the config that Phobos expects.

Returns:

  • (Hash)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/deimos/config/phobos_config.rb', line 31

def phobos_config
  p_config = {
    logger: Logger.new(STDOUT),
    custom_logger: self.phobos_logger,
    custom_kafka_logger: self.kafka.logger,
    kafka: {
      client_id: self.kafka.client_id,
      connect_timeout: self.kafka.connect_timeout,
      socket_timeout: self.kafka.socket_timeout,
      ssl_verify_hostname: self.kafka.ssl.verify_hostname,
      ssl_ca_certs_from_system: self.kafka.ssl.ca_certs_from_system,
      seed_brokers: Array.wrap(self.kafka.seed_brokers)
    },
    producer: {
      ack_timeout: self.producers.ack_timeout,
      required_acks: self.producers.required_acks,
      max_retries: self.producers.max_retries,
      retry_backoff: self.producers.retry_backoff,
      max_buffer_size: self.producers.max_buffer_size,
      max_buffer_bytesize: self.producers.max_buffer_bytesize,
      compression_codec: self.producers.compression_codec,
      compression_threshold: self.producers.compression_threshold,
      max_queue_size: self.producers.max_queue_size,
      delivery_threshold: self.producers.delivery_threshold,
      delivery_interval: self.producers.delivery_interval
    },
    consumer: {
      session_timeout: self.consumers.session_timeout,
      offset_commit_interval: self.consumers.offset_commit_interval,
      offset_commit_threshold: self.consumers.offset_commit_threshold,
      heartbeat_interval: self.consumers.heartbeat_interval
    },
    backoff: _backoff(self.consumers.backoff.to_a)
  }

  p_config[:listeners] = self.consumer_objects.map do |consumer|
    next nil if consumer.disabled

    hash = consumer.to_h.reject do |k, _|
      %i(class_name schema namespace key_config backoff disabled).include?(k)
    end
    hash = hash.map { |k, v| [k, v.is_a?(Symbol) ? v.to_s : v] }.to_h
    hash[:handler] = consumer.class_name
    if consumer.backoff
      hash[:backoff] = _backoff(consumer.backoff.to_a)
    end
    hash
  end
  p_config[:listeners].compact!

  if self.kafka.ssl.enabled
    %w(ca_cert client_cert client_cert_key).each do |key|
      next if self.kafka.ssl.send(key).blank?

      p_config[:kafka]["ssl_#{key}".to_sym] = ssl_var_contents(self.kafka.ssl.send(key))
    end
  end

  if self.kafka.sasl.enabled
    p_config[:kafka][:sasl_over_ssl] = self.kafka.sasl.enforce_ssl
    %w(
      gssapi_principal
      gssapi_keytab
      plain_authzid
      plain_username
      plain_password
      scram_username
      scram_password
      scram_mechanism
      oauth_token_provider
    ).each do |key|
      value = self.kafka.sasl.send(key)
      next if value.blank?

      p_config[:kafka]["sasl_#{key}".to_sym] = value
    end
  end
  p_config
end

#reset!void

This method returns an undefined value.



24
25
26
27
# File 'lib/deimos/config/phobos_config.rb', line 24

def reset!
  super
  Phobos.configure(self.phobos_config)
end

#ssl_var_contents(key) ⇒ String

Parameters:

  • key (String)

Returns:

  • (String)


113
114
115
# File 'lib/deimos/config/phobos_config.rb', line 113

def ssl_var_contents(key)
  File.exist?(key) ? File.read(key) : key
end

#to_hHash

Returns:

  • (Hash)


12
13
14
15
16
17
18
19
20
21
# File 'lib/deimos/config/phobos_config.rb', line 12

def to_h
  (FIELDS + [:handler]).map { |f|
    val = self.send(f)
    if f == :backoff && val
      [:backoff, _backoff(val)]
    elsif val.present?
      [f, val]
    end
  }.to_h
end