Class: Julewire::Ractor::Destination
- Inherits:
-
Object
- Object
- Julewire::Ractor::Destination
- Defined in:
- lib/julewire/ractor/destination.rb
Overview
rubocop:disable Metrics/ClassLength – Owns parent queue, worker lifecycle, and health.
Constant Summary collapse
- DEFAULT_MAX_QUEUE =
1024- DEFAULT_REQUEST_TIMEOUT =
1- TIMEOUT_THREAD_NAME =
"julewire-ractor-destination-timeout"
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #after_fork! ⇒ Object
- #close(timeout: nil) ⇒ Object
- #emit(record) ⇒ Object
- #flush(timeout: nil) ⇒ Object
- #health ⇒ Object
-
#initialize(output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil) ⇒ Destination
constructor
rubocop:disable Metrics/ParameterLists – Destination setup mirrors core destination knobs.
- #resource_identity ⇒ Object
Constructor Details
#initialize(output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil) ⇒ Destination
rubocop:disable Metrics/ParameterLists – Destination setup mirrors core destination knobs.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/julewire/ractor/destination.rb', line 25 def initialize( # rubocop:disable Metrics/ParameterLists -- Destination setup mirrors core destination knobs. output:, name: :ractor, formatter: Julewire::RecordFormatter.new, encoder: Julewire::JsonEncoder.new, max_record_bytes: Core::DEFAULT_MAX_RECORD_BYTES, max_queue: DEFAULT_MAX_QUEUE, close_output: false, request_timeout: DEFAULT_REQUEST_TIMEOUT, on_drop: nil, on_failure: nil ) @name = Core::Destinations.normalize_name(name) @formatter = validate_callable(formatter, name: :formatter) @encoder = validate_callable(encoder, name: :encoder) Core::Destinations::Sink.validate_writeable!(output) Core::Validation.validate_byte_limit!(max_record_bytes, name: :max_record_bytes) Core::Validation.validate_non_negative_integer!(max_queue, name: :max_queue) Core::Validation.validate_timeout!(request_timeout, name: :request_timeout) Core::Validation.validate_callable!(on_drop, name: :on_drop, allow_nil: true) Core::Validation.validate_callable!(on_failure, name: :on_failure, allow_nil: true) @output = output @max_record_bytes = max_record_bytes @max_queue = max_queue @close_output = close_output @request_timeout = request_timeout @on_drop = on_drop @on_failure = on_failure @scheduler = Core::Scheduling::DeadlineScheduler.new(thread_name: TIMEOUT_THREAD_NAME, idle: :exit) initialize_tracking start_worker end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
23 24 25 |
# File 'lib/julewire/ractor/destination.rb', line 23 def name @name end |
Instance Method Details
#after_fork! ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/julewire/ractor/destination.rb', line 85 def after_fork! close_ports @scheduler.after_fork! initialize_tracking start_worker self rescue StandardError => e record_failure(e, phase: :after_fork) self end |
#close(timeout: nil) ⇒ Object
77 78 79 80 81 82 83 |
# File 'lib/julewire/ractor/destination.rb', line 77 def close(timeout: nil) timeout = @request_timeout if timeout.nil? @closed.set(true) result = request(:close, timeout: timeout) close_ports result end |
#emit(record) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/julewire/ractor/destination.rb', line 58 def emit(record) increment(:received) return drop(:closed_dropped, record) if closed? return drop(:queue_full_dropped, record) if queue_full? @port.send({ command: :emit, record: record }) @in_flight.increment increment(:queued) nil rescue StandardError => e record_failure(e, phase: :ractor_send) drop(:send_error, record) end |
#flush(timeout: nil) ⇒ Object
72 73 74 75 |
# File 'lib/julewire/ractor/destination.rb', line 72 def flush(timeout: nil) timeout = @request_timeout if timeout.nil? request(:flush, timeout: timeout) end |
#health ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/julewire/ractor/destination.rb', line 98 def health worker = request(:health, timeout: @request_timeout) worker = @worker_health.get unless worker.is_a?(Hash) @worker_health.set(worker) if worker.is_a?(Hash) @health.snapshot( in_flight: @in_flight.value, max_queue: @max_queue, status: status_for(worker), worker: worker ) end |
#resource_identity ⇒ Object
96 |
# File 'lib/julewire/ractor/destination.rb', line 96 def resource_identity = self |