Class: Julewire::Ractor::Destination

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/ractor/destination.rb

Overview

rubocop:disable Metrics/ClassLength – Owns parent queue, worker lifecycle, and health.

Constant Summary collapse

DEFAULT_MAX_QUEUE =
1024
DEFAULT_REQUEST_TIMEOUT =
1
TIMEOUT_THREAD_NAME =
"julewire-ractor-destination-timeout"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil) ⇒ Destination

rubocop:disable Metrics/ParameterLists – Destination setup mirrors core destination knobs.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/julewire/ractor/destination.rb', line 26

def initialize( # rubocop:disable Metrics/ParameterLists -- Destination setup mirrors core destination knobs.
  output:,
  name: :ractor,
  formatter: Julewire::RecordFormatter.new,
  encoder: Julewire::JsonEncoder.new,
  max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES,
  max_queue: DEFAULT_MAX_QUEUE,
  close_output: false,
  request_timeout: DEFAULT_REQUEST_TIMEOUT,
  on_drop: nil,
  on_failure: nil
)
  @name = Core::Destinations.normalize_name(name)
  @formatter = validate_callable(formatter, name: :formatter)
  @encoder = validate_callable(encoder, name: :encoder)
  Core::Destinations::Sink.validate_writeable!(output)
  Core::Validation.validate_byte_limit!(max_record_bytes, name: :max_record_bytes)
  Core::Validation.validate_non_negative_integer!(max_queue, name: :max_queue)
  Core::Validation.validate_timeout!(request_timeout, name: :request_timeout)
  Core::Validation.validate_callable!(on_drop, name: :on_drop, allow_nil: true)
  Core::Validation.validate_callable!(on_failure, name: :on_failure, allow_nil: true)
  @output = output
  @max_record_bytes = max_record_bytes
  @max_queue = max_queue
  @close_output = close_output
  @request_timeout = request_timeout
  @on_drop = on_drop
  @on_failure = on_failure
  @scheduler = Core::Scheduling::DeadlineScheduler.new(thread_name: TIMEOUT_THREAD_NAME, idle: :exit)
  initialize_tracking
  start_worker
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



24
25
26
# File 'lib/julewire/ractor/destination.rb', line 24

def name
  @name
end

Instance Method Details

#after_fork!Object



88
89
90
91
92
93
94
95
96
97
# File 'lib/julewire/ractor/destination.rb', line 88

def after_fork!
  close_ports
  @scheduler.after_fork!
  initialize_tracking
  start_worker
  self
rescue StandardError => e
  record_failure(e, phase: :after_fork)
  self
end

#close(timeout: nil) ⇒ Object



80
81
82
83
84
85
86
# File 'lib/julewire/ractor/destination.rb', line 80

def close(timeout: nil)
  timeout = @request_timeout if timeout.nil?
  @closed.set(true)
  result = request(:close, timeout: timeout)
  close_ports
  result
end

#emit(record) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/julewire/ractor/destination.rb', line 59

def emit(record)
  increment(:received)
  return drop(:closed_dropped, record) if closed?
  return drop(:queue_full_dropped, record) unless reserve_slot

  begin
    @port.send({ command: :emit, record: record })
    increment(:queued)
  rescue StandardError => e
    release_slot
    record_failure(e, phase: :ractor_send)
    drop(:send_error, record)
  end
  nil
end

#flush(timeout: nil) ⇒ Object



75
76
77
78
# File 'lib/julewire/ractor/destination.rb', line 75

def flush(timeout: nil)
  timeout = @request_timeout if timeout.nil?
  request(:flush, timeout: timeout)
end

#healthObject



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/julewire/ractor/destination.rb', line 101

def health
  worker = request(:health, timeout: @request_timeout)
  worker = @worker_health.get unless worker.is_a?(Hash)
  @worker_health.set(worker) if worker.is_a?(Hash)

  @health.snapshot(
    in_flight: @in_flight.value,
    max_queue: @max_queue,
    status: status_for(worker),
    worker: worker
  )
end

#resource_identityObject



99
# File 'lib/julewire/ractor/destination.rb', line 99

def resource_identity = self