Class: Julewire::Core::Runtime

Inherits:
Object
  • Object
show all
Defined in:
lib/julewire/core/runtime.rb

Overview

rubocop:disable Metrics/ClassLength

Defined Under Namespace

Classes: CloseTransition, PipelineReplacement, ResetTransition

Constant Summary collapse

CONFIGURE_GUARD_KEY =
:__julewire_core_configure_guard__
RUNTIME_COUNTER_KEYS =
%i[
  close_attempts
  configure_attempts
  flush_attempts
  lifecycle_warnings
  post_close_emits_total
  reset_attempts
  runtime_callback_failures
  runtime_failures
].freeze

Instance Method Summary collapse

Constructor Details

#initializeRuntime

Returns a new instance of Runtime.



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/julewire/core/runtime.rb', line 30

def initialize
  @configure_mutex = Mutex.new
  @configure_generation = Concurrent::AtomicFixnum.new(0)
  @state_mutex = Mutex.new
  @post_close_emit_count = Concurrent::AtomicFixnum.new(0)
  @runtime_health = build_runtime_health
  @integration_health = Diagnostics::IntegrationHealthStore.new
  @invalid_severity_reporter = Diagnostics::InvalidSeverityReporter.counter
  @state_ref = Concurrent::AtomicReference.new(
    RuntimeState.default(invalid_severity_reporter: @invalid_severity_reporter)
  )
  @execution_boundary = build_execution_boundary
end

Instance Method Details

#after_fork!Object



164
165
166
167
# File 'lib/julewire/core/runtime.rb', line 164

def after_fork!
  reject_runtime_call_during_configure!(:after_fork!)
  RuntimeRegistry.reset_after_fork(primary: self)
end

#attributesObject



48
# File 'lib/julewire/core/runtime.rb', line 48

def attributes = ContextStore.current.attributes_proxy

#carryObject



50
# File 'lib/julewire/core/runtime.rb', line 50

def carry = ContextStore.current.carry_proxy

#close(timeout: Core::UNSET) ⇒ Object



142
143
144
# File 'lib/julewire/core/runtime.rb', line 142

def close(timeout: Core::UNSET)
  close_state_resources(close_state(timeout))
end

#configObject



44
# File 'lib/julewire/core/runtime.rb', line 44

def config = runtime_state.configuration

#configureObject

Raises:

  • (ArgumentError)


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/julewire/core/runtime.rb', line 120

def configure(&)
  raise ArgumentError, "Julewire.configure requires a block" unless block_given?

  increment_runtime_count(:configure_attempts)
  replacement = build_configured_pipeline(&)
  deadline = Scheduling::Deadline.for(replacement.close_timeout)
  if replacement.close_pipeline
    report_pipeline_close_result(
      replacement.old_pipeline,
      timeout: Scheduling::Deadline.remaining(deadline),
      on_failure: replacement.old_on_failure,
      operation: :configure,
      skip_resource_identities: replacement.retained_resources
    )
  end
  config
end

#contextObject



52
# File 'lib/julewire/core/runtime.rb', line 52

def context = ContextStore.current.context_proxy

#current_executionObject



56
57
58
59
# File 'lib/julewire/core/runtime.rb', line 56

def current_execution
  scope = ContextStore.current.current_scope
  scope && Execution::View.new(scope)
end

#current_execution?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/julewire/core/runtime.rb', line 61

def current_execution?
  ContextStore.current.current_scope?
end

#emit(record = Core::UNSET, **fields) ⇒ Object



69
70
71
# File 'lib/julewire/core/runtime.rb', line 69

def emit(record = Core::UNSET, **fields, &)
  emit_with_level_check(record, true, fields, &)
end

#emit_envelope(input:, context:, scope:, carry: {}, attributes: {}, neutral: {}, enforce_level: true) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/julewire/core/runtime.rb', line 83

def emit_envelope(input:, context:, scope:, carry: {}, attributes: {}, neutral: {}, enforce_level: true)
  reject_runtime_call_during_configure!(:emit_envelope)
  state = runtime_state
  return record_post_close_emit(state) if state.pipeline_closed

  begin
    record = Records::Draft.build(
      input,
      context: envelope_hash(context),
      attributes: envelope_hash(attributes),
      neutral: envelope_hash(neutral),
      carry: envelope_hash(carry),
      scope: scope,
      error_backtrace_lines: state.configuration.error_backtrace_lines,
      invalid_severity_reporter: @invalid_severity_reporter
    ).to_record
    state.pipeline.emit_record(record, enforce_level: enforce_level)
  rescue StandardError => e
    notify_failure(e, state, action: :emit_envelope)
    nil
  end
end

#emit_integration(record, enforce_level: true) ⇒ Object



77
78
79
80
81
# File 'lib/julewire/core/runtime.rb', line 77

def emit_integration(record, enforce_level: true)
  with_emit_guard(:emit_integration) do |state|
    state.pipeline.emit_integration(record, enforce_level: enforce_level)
  end
end

#emit_summary_record(scope) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/julewire/core/runtime.rb', line 106

def emit_summary_record(scope)
  reject_runtime_call_during_configure!(:emit_summary_record)
  state = runtime_state
  return record_post_close_emit(state) if state.pipeline_closed

  begin
    input = scope.owned_summary_record_input
    state.pipeline.emit_isolated_input(input)
  rescue StandardError => e
    notify_failure(e, state, action: :emit_summary_record)
    nil
  end
end

#emit_without_level(record = Core::UNSET, **fields) ⇒ Object



73
74
75
# File 'lib/julewire/core/runtime.rb', line 73

def emit_without_level(record = Core::UNSET, **fields, &)
  emit_with_level_check(record, false, fields, &)
end

#flush(timeout: Core::UNSET) ⇒ Object



138
139
140
# File 'lib/julewire/core/runtime.rb', line 138

def flush(timeout: Core::UNSET)
  call_validated_lifecycle(:flush, timeout)
end

#healthObject



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/julewire/core/runtime.rb', line 183

def health
  state = runtime_state
  pipeline_health = state.pipeline.health
  integrations = @integration_health.health
  process_integrations = Diagnostics::ProcessIntegrationHealth.health
  {
    closed: state.pipeline_closed,
    counts: runtime_counts_snapshot,
    generation: state.pipeline_generation,
    integrations: integrations,
    last_callback_failure: @runtime_health.last_callback_failure,
    last_failure: @runtime_health.last_failure,
    pipeline: pipeline_health,
    process_integrations: process_integrations,
    status: runtime_status(state, pipeline_health, integrations, process_integrations)
  }
end

#labelsObject



46
# File 'lib/julewire/core/runtime.rb', line 46

def labels = config.labels

#record_integration_failure(integration, error, **metadata) ⇒ Object



175
176
177
# File 'lib/julewire/core/runtime.rb', line 175

def record_integration_failure(integration, error, **)
  @integration_health.record_failure(integration, error, **)
end

#record_integration_success(integration) ⇒ Object



179
180
181
# File 'lib/julewire/core/runtime.rb', line 179

def record_integration_success(integration)
  @integration_health.record_success(integration)
end

#reset!Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/julewire/core/runtime.rb', line 146

def reset!
  increment_runtime_count(:reset_attempts)
  reset_result = reject_runtime_call_during_configure!(:reset!) do
    @configure_mutex.synchronize do
      @state_mutex.synchronize { reset_under_lock }
    end
  end
  deadline = Scheduling::Deadline.for(reset_result.close_timeout)
  return unless reset_result.close_pipeline

  report_pipeline_close_result(
    reset_result.old_pipeline,
    timeout: Scheduling::Deadline.remaining(deadline),
    on_failure: reset_result.old_on_failure,
    operation: :reset
  )
end

#reset_after_fork_runtime!Object



169
170
171
172
173
# File 'lib/julewire/core/runtime.rb', line 169

def reset_after_fork_runtime!
  reset_after_fork_state!
  runtime_state.pipeline.after_fork!
  self
end

#start_executionObject



67
# File 'lib/julewire/core/runtime.rb', line 67

def start_execution(...) = @execution_boundary.start_execution(...)

#summaryObject



54
# File 'lib/julewire/core/runtime.rb', line 54

def summary = ContextStore.current.summary_proxy

#with_executionObject



65
# File 'lib/julewire/core/runtime.rb', line 65

def with_execution(...) = @execution_boundary.with_execution(...)