Class: Axn::Async::EnqueueAllOrchestrator

Inherits:
Object
  • Object
show all
Includes:
Axn
Defined in:
lib/axn/async/enqueue_all_orchestrator.rb

Overview

Shared trigger action for executing batch enqueueing in the background. Called by enqueue_all to iterate over configured fields asynchronously.

Configure the async adapter via Axn.config.set_enqueue_all_async, or it defaults to Axn.config.set_default_async.

Examples:

Configure a specific queue for all enqueue_all jobs

Axn.configure do |c|
  c.set_enqueue_all_async(:sidekiq, queue: :batch)
end

Constant Summary

Constants included from Axn

VERSION

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Axn

config, configure, extension_config, included

Class Method Details

.enqueue_for(target, **static_args) ⇒ String

Entry point for enqueue_all - validates upfront, then executes async

Parameters:

  • target (Class)

    The action class to batch enqueue

  • static_args (Hash)

    Static arguments passed to each job

Returns:

  • (String)

    Job ID from the async adapter



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 56

def enqueue_for(target, **static_args)
  validate_async_configured!(target)

  # Handle no-expects case: just call_async directly
  return target.call_async(**static_args) if target.internal_field_configs.empty?

  # Get configs and resolved static args
  # Kwargs are split: scalars → resolved_static, enumerables → configs
  configs, resolved_static = resolve_configs(target, static_args:)

  # Validate static args upfront (raises ArgumentError if missing)
  validate_static_args!(target, configs, resolved_static) if configs.any?

  # Check if any configs came from kwargs (these have lambdas that can't be serialized)
  # If so, we must execute iteration synchronously
  has_kwarg_iteration = configs.any? { |c| c.from.is_a?(Proc) && static_args.key?(c.field) }

  if has_kwarg_iteration
    # Execute iteration synchronously - kwargs with iterables can't be serialized
    kwarg_fields = configs.select { |c| c.from.is_a?(Proc) && static_args.key?(c.field) }.map(&:field)
    info "[enqueue_all] Running in foreground: kwargs #{kwarg_fields.join(', ')} cannot be serialized for background execution"
    execute_iteration_without_logging(target, **static_args)
  else
    # Serialize static_args for Sidekiq (convert GlobalID objects, stringify keys)
    serialized_static_args = Axn::Internal::GlobalIdSerialization.serialize(resolved_static)

    # Execute iteration in background via EnqueueAllOrchestrator
    call_async(target_class_name: target.name, static_args: serialized_static_args)
  end
end

.execute_iteration(target, on_progress: nil, **static_args) ⇒ Object

Execute the actual iteration (called from #call in background) Returns the count of jobs enqueued

Parameters:

  • target (Class)

    The action class to enqueue jobs for

  • on_progress (Proc, nil) (defaults to: nil)

    Callback to track iteration progress for logging context

  • static_args (Hash)

    Static arguments to pass to each job



93
94
95
96
97
98
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 93

def execute_iteration(target, on_progress: nil, **static_args)
  configs, resolved_static = resolve_configs(target, static_args:)
  count = { value: 0 }
  iterate(target:, configs:, index: 0, accumulated: {}, static_args: resolved_static, count:, on_progress:)
  count[:value]
end

.execute_iteration_without_logging(target, **static_args) ⇒ Object

Execute iteration with per-job async logging suppressed (for foreground execution)



101
102
103
104
105
106
107
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 101

def execute_iteration_without_logging(target, **static_args)
  original_log_level = target.log_calls_level
  target.log_calls_level = nil
  execute_iteration(target, **static_args)
ensure
  target.log_calls_level = original_log_level
end

Instance Method Details

#callObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/axn/async/enqueue_all_orchestrator.rb', line 27

def call
  target = target_class_name.constantize

  # Deserialize static_args (convert GlobalID strings back to objects)
  deserialized_static_args = Axn::Internal::GlobalIdSerialization.deserialize(static_args)

  count = self.class.execute_iteration(
    target,
    **deserialized_static_args,
    on_progress: method(:set_execution_context),
  )

  message_parts = ["Batch enqueued #{count} jobs for #{target.name}"]
  message_parts << "with explicit args: #{static_args.inspect}" if static_args.any?

  Axn::Internal::CallLogger.log_at_level(
    self.class,
    level: :info,
    message_parts:,
    error_context: "logging batch enqueue completion",
  )
end