Class: Julewire::Ractor::Destination
- Inherits:
-
Object
- Object
- Julewire::Ractor::Destination
- Defined in:
- lib/julewire/ractor/destination.rb
Overview
rubocop:disable Metrics/ClassLength – Owns parent queue, worker lifecycle, and health.
Constant Summary collapse
- DEFAULT_MAX_QUEUE =
1024- DEFAULT_REQUEST_TIMEOUT =
1- TIMEOUT_THREAD_NAME =
"julewire-ractor-destination-timeout"
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #after_fork! ⇒ Object
- #close(timeout: nil) ⇒ Object
- #emit(record) ⇒ Object
- #flush(timeout: nil) ⇒ Object
- #health ⇒ Object
-
#initialize(output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil) ⇒ Destination
constructor
rubocop:disable Metrics/ParameterLists – Destination setup mirrors core destination knobs.
- #resource_identity ⇒ Object
Constructor Details
#initialize(output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil) ⇒ Destination
rubocop:disable Metrics/ParameterLists – Destination setup mirrors core destination knobs.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/julewire/ractor/destination.rb', line 26 def initialize( # rubocop:disable Metrics/ParameterLists -- Destination setup mirrors core destination knobs. output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil ) @name = Core::Destinations.normalize_name(name) @formatter = validate_callable(formatter, name: :formatter) @encoder = validate_callable(encoder, name: :encoder) Core::Destinations::Sink.validate_writeable!(output) Core::Validation.validate_byte_limit!(max_record_bytes, name: :max_record_bytes) Core::Validation.validate_non_negative_integer!(max_queue, name: :max_queue) Core::Validation.validate_timeout!(request_timeout, name: :request_timeout) Core::Validation.validate_callable!(on_drop, name: :on_drop, allow_nil: true) Core::Validation.validate_callable!(on_failure, name: :on_failure, allow_nil: true) @output = output @max_record_bytes = max_record_bytes @max_queue = max_queue @close_output = close_output @request_timeout = request_timeout @on_drop = on_drop @on_failure = on_failure @scheduler = Core::Scheduling::DeadlineScheduler.new(thread_name: TIMEOUT_THREAD_NAME, idle: :exit) initialize_tracking start_worker end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
24 25 26 |
# File 'lib/julewire/ractor/destination.rb', line 24 def name @name end |
Instance Method Details
#after_fork! ⇒ Object
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/julewire/ractor/destination.rb', line 88 def after_fork! close_ports @scheduler.after_fork! initialize_tracking start_worker self rescue StandardError => e record_failure(e, phase: :after_fork) self end |
#close(timeout: nil) ⇒ Object
80 81 82 83 84 85 86 |
# File 'lib/julewire/ractor/destination.rb', line 80 def close(timeout: nil) timeout = @request_timeout if timeout.nil? @closed.set(true) result = request(:close, timeout: timeout) close_ports result end |
#emit(record) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/julewire/ractor/destination.rb', line 59 def emit(record) increment(:received) return drop(:closed_dropped, record) if closed? return drop(:queue_full_dropped, record) unless reserve_slot begin @port.send({ command: :emit, record: record }) increment(:queued) rescue StandardError => e release_slot record_failure(e, phase: :ractor_send) drop(:send_error, record) end nil end |
#flush(timeout: nil) ⇒ Object
75 76 77 78 |
# File 'lib/julewire/ractor/destination.rb', line 75 def flush(timeout: nil) timeout = @request_timeout if timeout.nil? request(:flush, timeout: timeout) end |
#health ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/julewire/ractor/destination.rb', line 101 def health worker = request(:health, timeout: @request_timeout) worker = @worker_health.get unless worker.is_a?(Hash) @worker_health.set(worker) if worker.is_a?(Hash) @health.snapshot( in_flight: @in_flight.value, max_queue: @max_queue, status: status_for(worker), worker: worker ) end |
#resource_identity ⇒ Object
99 |
# File 'lib/julewire/ractor/destination.rb', line 99 def resource_identity = self |