Class: Julewire::Core::Runtime
- Inherits:
-
Object
- Object
- Julewire::Core::Runtime
- Defined in:
- lib/julewire/core/runtime.rb
Overview
rubocop:disable Metrics/ClassLength
Defined Under Namespace
Classes: CloseTransition, PipelineReplacement, ResetTransition
Constant Summary collapse
- CONFIGURE_GUARD_KEY =
:__julewire_core_configure_guard__- RUNTIME_COUNTER_KEYS =
%i[ close_attempts configure_attempts flush_attempts lifecycle_warnings post_close_emits_total reset_attempts runtime_callback_failures runtime_failures ].freeze
Instance Method Summary collapse
- #after_fork! ⇒ Object
- #attributes ⇒ Object
- #carry ⇒ Object
- #close(timeout: Core::UNSET) ⇒ Object
- #config ⇒ Object
- #configure ⇒ Object
- #context ⇒ Object
- #current_execution ⇒ Object
- #current_execution? ⇒ Boolean
- #emit(record = Core::UNSET, **fields) ⇒ Object
- #emit_envelope(input:, context:, scope:, carry: {}, attributes: {}, neutral: {}, enforce_level: true) ⇒ Object
- #emit_integration(record, enforce_level: true) ⇒ Object
- #emit_summary_record(scope) ⇒ Object
- #emit_without_level(record = Core::UNSET, **fields) ⇒ Object
- #flush(timeout: Core::UNSET) ⇒ Object
- #health ⇒ Object
-
#initialize ⇒ Runtime
constructor
A new instance of Runtime.
- #labels ⇒ Object
- #record_integration_failure(integration, error, **metadata) ⇒ Object
- #record_integration_success(integration) ⇒ Object
- #reset! ⇒ Object
- #reset_after_fork_runtime! ⇒ Object
- #start_execution ⇒ Object
- #summary ⇒ Object
- #with_execution ⇒ Object
Constructor Details
#initialize ⇒ Runtime
Returns a new instance of Runtime.
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/julewire/core/runtime.rb', line 30 def initialize @configure_mutex = Mutex.new @configure_generation = Concurrent::AtomicFixnum.new(0) @state_mutex = Mutex.new @post_close_emit_count = Concurrent::AtomicFixnum.new(0) @runtime_health = build_runtime_health @integration_health = Diagnostics::IntegrationHealthStore.new @invalid_severity_reporter = Diagnostics::InvalidSeverityReporter.counter @state_ref = Concurrent::AtomicReference.new( RuntimeState.default(invalid_severity_reporter: @invalid_severity_reporter) ) @execution_boundary = build_execution_boundary end |
Instance Method Details
#after_fork! ⇒ Object
164 165 166 167 |
# File 'lib/julewire/core/runtime.rb', line 164 def after_fork! reject_runtime_call_during_configure!(:after_fork!) RuntimeRegistry.reset_after_fork(primary: self) end |
#attributes ⇒ Object
48 |
# File 'lib/julewire/core/runtime.rb', line 48 def attributes = ContextStore.current.attributes_proxy |
#carry ⇒ Object
50 |
# File 'lib/julewire/core/runtime.rb', line 50 def carry = ContextStore.current.carry_proxy |
#close(timeout: Core::UNSET) ⇒ Object
142 143 144 |
# File 'lib/julewire/core/runtime.rb', line 142 def close(timeout: Core::UNSET) close_state_resources(close_state(timeout)) end |
#config ⇒ Object
44 |
# File 'lib/julewire/core/runtime.rb', line 44 def config = runtime_state.configuration |
#configure ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/julewire/core/runtime.rb', line 120 def configure(&) raise ArgumentError, "Julewire.configure requires a block" unless block_given? increment_runtime_count(:configure_attempts) replacement = build_configured_pipeline(&) deadline = Scheduling::Deadline.for(replacement.close_timeout) if replacement.close_pipeline report_pipeline_close_result( replacement.old_pipeline, timeout: Scheduling::Deadline.remaining(deadline), on_failure: replacement.old_on_failure, operation: :configure, skip_resource_identities: replacement.retained_resources ) end config end |
#context ⇒ Object
52 |
# File 'lib/julewire/core/runtime.rb', line 52 def context = ContextStore.current.context_proxy |
#current_execution ⇒ Object
56 57 58 59 |
# File 'lib/julewire/core/runtime.rb', line 56 def current_execution scope = ContextStore.current.current_scope scope && Execution::View.new(scope) end |
#current_execution? ⇒ Boolean
61 62 63 |
# File 'lib/julewire/core/runtime.rb', line 61 def current_execution? ContextStore.current.current_scope? end |
#emit(record = Core::UNSET, **fields) ⇒ Object
69 70 71 |
# File 'lib/julewire/core/runtime.rb', line 69 def emit(record = Core::UNSET, **fields, &) emit_with_level_check(record, true, fields, &) end |
#emit_envelope(input:, context:, scope:, carry: {}, attributes: {}, neutral: {}, enforce_level: true) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/julewire/core/runtime.rb', line 83 def emit_envelope(input:, context:, scope:, carry: {}, attributes: {}, neutral: {}, enforce_level: true) reject_runtime_call_during_configure!(:emit_envelope) state = runtime_state return record_post_close_emit(state) if state.pipeline_closed begin record = Records::Draft.build( input, context: envelope_hash(context), attributes: envelope_hash(attributes), neutral: envelope_hash(neutral), carry: envelope_hash(carry), scope: scope, error_backtrace_lines: state.configuration.error_backtrace_lines, invalid_severity_reporter: @invalid_severity_reporter ).to_record state.pipeline.emit_record(record, enforce_level: enforce_level) rescue StandardError => e notify_failure(e, state, action: :emit_envelope) nil end end |
#emit_integration(record, enforce_level: true) ⇒ Object
77 78 79 80 81 |
# File 'lib/julewire/core/runtime.rb', line 77 def emit_integration(record, enforce_level: true) with_emit_guard(:emit_integration) do |state| state.pipeline.emit_integration(record, enforce_level: enforce_level) end end |
#emit_summary_record(scope) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/julewire/core/runtime.rb', line 106 def emit_summary_record(scope) reject_runtime_call_during_configure!(:emit_summary_record) state = runtime_state return record_post_close_emit(state) if state.pipeline_closed begin input = scope.owned_summary_record_input state.pipeline.emit_isolated_input(input) rescue StandardError => e notify_failure(e, state, action: :emit_summary_record) nil end end |
#emit_without_level(record = Core::UNSET, **fields) ⇒ Object
73 74 75 |
# File 'lib/julewire/core/runtime.rb', line 73 def emit_without_level(record = Core::UNSET, **fields, &) emit_with_level_check(record, false, fields, &) end |
#flush(timeout: Core::UNSET) ⇒ Object
138 139 140 |
# File 'lib/julewire/core/runtime.rb', line 138 def flush(timeout: Core::UNSET) call_validated_lifecycle(:flush, timeout) end |
#health ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/julewire/core/runtime.rb', line 183 def health state = runtime_state pipeline_health = state.pipeline.health integrations = @integration_health.health process_integrations = Diagnostics::ProcessIntegrationHealth.health { closed: state.pipeline_closed, counts: runtime_counts_snapshot, generation: state.pipeline_generation, integrations: integrations, last_callback_failure: @runtime_health.last_callback_failure, last_failure: @runtime_health.last_failure, pipeline: pipeline_health, process_integrations: process_integrations, status: runtime_status(state, pipeline_health, integrations, process_integrations) } end |
#labels ⇒ Object
46 |
# File 'lib/julewire/core/runtime.rb', line 46 def labels = config.labels |
#record_integration_failure(integration, error, **metadata) ⇒ Object
175 176 177 |
# File 'lib/julewire/core/runtime.rb', line 175 def record_integration_failure(integration, error, **) @integration_health.record_failure(integration, error, **) end |
#record_integration_success(integration) ⇒ Object
179 180 181 |
# File 'lib/julewire/core/runtime.rb', line 179 def record_integration_success(integration) @integration_health.record_success(integration) end |
#reset! ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/julewire/core/runtime.rb', line 146 def reset! increment_runtime_count(:reset_attempts) reset_result = reject_runtime_call_during_configure!(:reset!) do @configure_mutex.synchronize do @state_mutex.synchronize { reset_under_lock } end end deadline = Scheduling::Deadline.for(reset_result.close_timeout) return unless reset_result.close_pipeline report_pipeline_close_result( reset_result.old_pipeline, timeout: Scheduling::Deadline.remaining(deadline), on_failure: reset_result.old_on_failure, operation: :reset ) end |
#reset_after_fork_runtime! ⇒ Object
169 170 171 172 173 |
# File 'lib/julewire/core/runtime.rb', line 169 def reset_after_fork_runtime! reset_after_fork_state! runtime_state.pipeline.after_fork! self end |
#start_execution ⇒ Object
67 |
# File 'lib/julewire/core/runtime.rb', line 67 def start_execution(...) = @execution_boundary.start_execution(...) |
#summary ⇒ Object
54 |
# File 'lib/julewire/core/runtime.rb', line 54 def summary = ContextStore.current.summary_proxy |
#with_execution ⇒ Object
65 |
# File 'lib/julewire/core/runtime.rb', line 65 def with_execution(...) = @execution_boundary.with_execution(...) |