Class: Axn::Async::EnqueueAllOrchestrator
- Inherits:
-
Object
- Object
- Axn::Async::EnqueueAllOrchestrator
- Includes:
- Axn
- Defined in:
- lib/axn/async/enqueue_all_orchestrator.rb
Overview
Shared trigger action for executing batch enqueueing in the background. Called by enqueue_all to iterate over configured fields asynchronously.
Configure the async adapter via Axn.config.set_enqueue_all_async, or it defaults to Axn.config.set_default_async.
Constant Summary
Constants included from Axn
Class Method Summary collapse
-
.enqueue_for(target, **static_args) ⇒ String
Entry point for enqueue_all - validates upfront, then executes async.
-
.execute_iteration(target, on_progress: nil, **static_args) ⇒ Object
Execute the actual iteration (called from #call in background) Returns the count of jobs enqueued.
-
.execute_iteration_without_logging(target, **static_args) ⇒ Object
Execute iteration with per-job async logging suppressed (for foreground execution).
Instance Method Summary collapse
Methods included from Axn
config, configure, extension_config, included
Class Method Details
.enqueue_for(target, **static_args) ⇒ String
Entry point for enqueue_all - validates upfront, then executes async
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 56 def enqueue_for(target, **static_args) validate_async_configured!(target) # Handle no-expects case: just call_async directly return target.call_async(**static_args) if target.internal_field_configs.empty? # Get configs and resolved static args # Kwargs are split: scalars → resolved_static, enumerables → configs configs, resolved_static = resolve_configs(target, static_args:) # Validate static args upfront (raises ArgumentError if missing) validate_static_args!(target, configs, resolved_static) if configs.any? # Check if any configs came from kwargs (these have lambdas that can't be serialized) # If so, we must execute iteration synchronously has_kwarg_iteration = configs.any? { |c| c.from.is_a?(Proc) && static_args.key?(c.field) } if has_kwarg_iteration # Execute iteration synchronously - kwargs with iterables can't be serialized kwarg_fields = configs.select { |c| c.from.is_a?(Proc) && static_args.key?(c.field) }.map(&:field) info "[enqueue_all] Running in foreground: kwargs #{kwarg_fields.join(', ')} cannot be serialized for background execution" execute_iteration_without_logging(target, **static_args) else # Serialize static_args for Sidekiq (convert GlobalID objects, stringify keys) serialized_static_args = Axn::Internal::GlobalIdSerialization.serialize(resolved_static) # Execute iteration in background via EnqueueAllOrchestrator call_async(target_class_name: target.name, static_args: serialized_static_args) end end |
.execute_iteration(target, on_progress: nil, **static_args) ⇒ Object
Execute the actual iteration (called from #call in background) Returns the count of jobs enqueued
93 94 95 96 97 98 |
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 93 def execute_iteration(target, on_progress: nil, **static_args) configs, resolved_static = resolve_configs(target, static_args:) count = { value: 0 } iterate(target:, configs:, index: 0, accumulated: {}, static_args: resolved_static, count:, on_progress:) count[:value] end |
.execute_iteration_without_logging(target, **static_args) ⇒ Object
Execute iteration with per-job async logging suppressed (for foreground execution)
101 102 103 104 105 106 107 |
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 101 def execute_iteration_without_logging(target, **static_args) original_log_level = target.log_calls_level target.log_calls_level = nil execute_iteration(target, **static_args) ensure target.log_calls_level = original_log_level end |
Instance Method Details
#call ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 27 def call target = target_class_name.constantize # Deserialize static_args (convert GlobalID strings back to objects) deserialized_static_args = Axn::Internal::GlobalIdSerialization.deserialize(static_args) count = self.class.execute_iteration( target, **deserialized_static_args, on_progress: method(:set_execution_context), ) = ["Batch enqueued #{count} jobs for #{target.name}"] << "with explicit args: #{static_args.inspect}" if static_args.any? Axn::Internal::CallLogger.log_at_level( self.class, level: :info, message_parts:, error_context: "logging batch enqueue completion", ) end |