Class: Aikido::Zen::DetachedAgent::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/aikido/zen/detached_agent/agent.rb

Overview

Agent that runs in forked processes. It communicates with the parent process to dRB calls. It's in charge of schedule and send heartbeats to the parent process, to be later pushed.

heartbeat & polling interval are configured to 10s , because they are connecting with parent process. We want to have the freshest data.

It's possible to use extend Forwardable here for one-line forward calls to the created at runtime by DRbObject, which leads to an ugly warning about private methods after the delegator is bound.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: Aikido::Zen.config, worker: Aikido::Zen::Worker.new(config: config), heartbeat_interval: 10, polling_interval: 10, collector: Aikido::Zen.collector) ⇒ Agent

Returns a new instance of Agent.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/aikido/zen/detached_agent/agent.rb', line 23

def initialize(
  config: Aikido::Zen.config,
  worker: Aikido::Zen::Worker.new(config: config),
  heartbeat_interval: 10,
  polling_interval: 10,
  collector: Aikido::Zen.collector
)
  @config = config
  @worker = worker
  @heartbeat_interval = heartbeat_interval
  @polling_interval = polling_interval

  @collector = collector

  @front_object = DRbObject.new_with_uri(config.expanded_detached_agent_socket_uri)

  @has_forked = false
  schedule_tasks
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



21
22
23
# File 'lib/aikido/zen/detached_agent/agent.rb', line 21

def worker
  @worker
end

Instance Method Details

#calculate_rate_limits(request) ⇒ Object



48
49
50
# File 'lib/aikido/zen/detached_agent/agent.rb', line 48

def calculate_rate_limits(request)
  @front_object.calculate_rate_limits(request.route.as_json, request.client_ip, request.actor.as_json)
end

#handle_forkObject

Every time a fork occurs (a new child process is created), we need to start a DRb service in a background thread within the child process. This service will manage the connection and handle resource cleanup.



55
56
57
58
59
60
61
62
# File 'lib/aikido/zen/detached_agent/agent.rb', line 55

def handle_fork
  @has_forked = true
  DRb.start_service
  # we need to ensure that there are not more jobs in the queue, but
  # we reuse the same object
  @worker.restart
  schedule_tasks
end

#send_collector_eventsObject



43
44
45
46
# File 'lib/aikido/zen/detached_agent/agent.rb', line 43

def send_collector_events
  events_data = @collector.flush_events.map(&:as_json)
  @front_object.send_collector_events(events_data)
end