Class: Julewire::Ractor::Destination

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/ractor/destination.rb

Overview

rubocop:disable Metrics/ClassLength – Owns parent queue, worker lifecycle, and health.

Constant Summary collapse

DEFAULT_MAX_QUEUE =
1024
DEFAULT_REQUEST_TIMEOUT =
1
TIMEOUT_THREAD_NAME =
"julewire-ractor-destination-timeout"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil) ⇒ Destination

rubocop:disable Metrics/ParameterLists – Destination setup mirrors core destination knobs.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/julewire/ractor/destination.rb', line 25

def initialize( # rubocop:disable Metrics/ParameterLists -- Destination setup mirrors core destination knobs.
  output:,
  name: :ractor,
  formatter: Julewire::RecordFormatter.new,
  encoder: Julewire::JsonEncoder.new,
  max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES,
  max_queue: DEFAULT_MAX_QUEUE,
  close_output: false,
  request_timeout: DEFAULT_REQUEST_TIMEOUT,
  on_drop: nil,
  on_failure: nil
)
  @name = Core::Destinations.normalize_name(name)
  @formatter = validate_callable(formatter, name: :formatter)
  @encoder = validate_callable(encoder, name: :encoder)
  Core::Destinations::Sink.validate_writeable!(output)
  Core::Validation.validate_byte_limit!(max_record_bytes, name: :max_record_bytes)
  Core::Validation.validate_non_negative_integer!(max_queue, name: :max_queue)
  Core::Validation.validate_timeout!(request_timeout, name: :request_timeout)
  Core::Validation.validate_callable!(on_drop, name: :on_drop, allow_nil: true)
  Core::Validation.validate_callable!(on_failure, name: :on_failure, allow_nil: true)
  @output = output
  @max_record_bytes = max_record_bytes
  @max_queue = max_queue
  @close_output = close_output
  @request_timeout = request_timeout
  @on_drop = on_drop
  @on_failure = on_failure
  @scheduler = Core::Scheduling::DeadlineScheduler.new(thread_name: TIMEOUT_THREAD_NAME, idle: :exit)
  initialize_tracking
  start_worker
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



23
24
25
# File 'lib/julewire/ractor/destination.rb', line 23

def name
  @name
end

Instance Method Details

#after_fork!Object



85
86
87
88
89
90
91
92
93
94
# File 'lib/julewire/ractor/destination.rb', line 85

def after_fork!
  close_ports
  @scheduler.after_fork!
  initialize_tracking
  start_worker
  self
rescue StandardError => e
  record_failure(e, phase: :after_fork)
  self
end

#close(timeout: nil) ⇒ Object



77
78
79
80
81
82
83
# File 'lib/julewire/ractor/destination.rb', line 77

def close(timeout: nil)
  timeout = @request_timeout if timeout.nil?
  @closed.set(true)
  result = request(:close, timeout: timeout)
  close_ports
  result
end

#emit(record) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/julewire/ractor/destination.rb', line 58

def emit(record)
  increment(:received)
  return drop(:closed_dropped, record) if closed?
  return drop(:queue_full_dropped, record) if queue_full?

  @port.send({ command: :emit, record: record })
  @in_flight.increment
  increment(:queued)
  nil
rescue StandardError => e
  record_failure(e, phase: :ractor_send)
  drop(:send_error, record)
end

#flush(timeout: nil) ⇒ Object



72
73
74
75
# File 'lib/julewire/ractor/destination.rb', line 72

def flush(timeout: nil)
  timeout = @request_timeout if timeout.nil?
  request(:flush, timeout: timeout)
end

#healthObject



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/julewire/ractor/destination.rb', line 98

def health
  worker = request(:health, timeout: @request_timeout)
  worker = @worker_health.get unless worker.is_a?(Hash)
  @worker_health.set(worker) if worker.is_a?(Hash)

  @health.snapshot(
    in_flight: @in_flight.value,
    max_queue: @max_queue,
    status: status_for(worker),
    worker: worker
  )
end

#resource_identityObject



96
# File 'lib/julewire/ractor/destination.rb', line 96

def resource_identity = self