Class: Aikido::Zen::DetachedAgent::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/aikido/zen/detached_agent/agent.rb

Overview

Agent that runs in forked processes. It communicates with the parent process to dRB calls. It’s in charge of schedule and send heartbeats to the *parent process*, to be later pushed.

heartbeat & polling interval are configured to 10s , because they are connecting with parent process. We want to have the freshest data.

It’s possible to use ‘extend Forwardable` here for one-line forward calls to the created at runtime by `DRbObject`, which leads to an ugly warning about private methods after the delegator is bound.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: Aikido::Zen.config, worker: Aikido::Zen::Worker.new(config: config), heartbeat_interval: 10, polling_interval: 10, collector: Aikido::Zen.collector) ⇒ Agent

Returns a new instance of Agent.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/aikido/zen/detached_agent/agent.rb', line 23

def initialize(
  config: Aikido::Zen.config,
  worker: Aikido::Zen::Worker.new(config: config),
  heartbeat_interval: 10,
  polling_interval: 10,
  collector: Aikido::Zen.collector
)
  @config = config
  @worker = worker
  @heartbeat_interval = heartbeat_interval
  @polling_interval = polling_interval

  @collector = collector

  @front_object = DRbObject.new_with_uri(config.expanded_detached_agent_socket_uri)

  @has_forked = false
  schedule_tasks
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



21
22
23
# File 'lib/aikido/zen/detached_agent/agent.rb', line 21

def worker
  @worker
end

Instance Method Details

#calculate_rate_limits(request) ⇒ Object



48
49
50
# File 'lib/aikido/zen/detached_agent/agent.rb', line 48

def calculate_rate_limits(request)
  @front_object.calculate_rate_limits(request.route.as_json, request.client_ip, request.actor.as_json)
end

#handle_forkObject

Every time a fork occurs (a new child process is created), we need to start a DRb service in a background thread within the child process. This service will manage the connection and handle resource cleanup.



55
56
57
58
59
60
61
62
# File 'lib/aikido/zen/detached_agent/agent.rb', line 55

def handle_fork
  @has_forked = true
  DRb.start_service
  # we need to ensure that there are not more jobs in the queue, but
  # we reuse the same object
  @worker.restart
  schedule_tasks
end

#send_collector_eventsObject



43
44
45
46
# File 'lib/aikido/zen/detached_agent/agent.rb', line 43

def send_collector_events
  events_data = @collector.flush_events.map(&:as_json)
  @front_object.send_collector_events(events_data)
end