Module: Legion::Extensions::Transformer::Runners::Transform

Extended by:
Helpers::Task
Includes:
Helpers::Lex
Defined in:
lib/legion/extensions/transformer/runners/transform.rb

Instance Method Summary collapse

Instance Method Details

#dispatch_multiplied(payload) ⇒ Object



67
68
69
70
71
72
73
74
75
76
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 67

def dispatch_multiplied(payload)
  payload[:args].each do |thing|
    new_payload = payload.dup
    task = Legion::Runner::Status.generate_task_id(function_args: thing, status: 'task.queued', args: thing, **new_payload)
    new_payload[:task_id] = task[:task_id]
    new_payload[:args] = thing
    send_task(**new_payload)
  end
  task_update(payload[:task_id], 'task.multiplied', function_args: payload[:args]) unless payload[:task_id].nil?
end

#dispatch_transformed(payload) ⇒ Object



56
57
58
59
60
61
62
63
64
65
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 56

def dispatch_transformed(payload)
  case payload[:args]
  when Hash
    task_update(payload[:task_id], 'transformer.succeeded', function_args: payload[:args]) unless payload[:task_id].nil?
    send_task(**payload)
    task_update(payload[:task_id], 'task.queued', use_database: false) unless payload[:task_id].nil?
  when Array
    dispatch_multiplied(payload)
  end
end

#render_transformation(transformation, payload, engine: nil, engine_options: {}) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 43

def render_transformation(transformation, payload, engine: nil, engine_options: {})
  eng = if engine
          Engines::Registry.fetch(engine)
        else
          Engines::Registry.detect(transformation)
        end

  rendered = eng.render(transformation, payload, **engine_options)
  return rendered if rendered.is_a?(Hash) && rendered[:success] == false

  rendered.is_a?(String) ? from_json(rendered) : rendered
end

#send_task(**opts) ⇒ Object



78
79
80
81
82
83
84
85
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 78

def send_task(**opts)
  payload = {}
  %i[task_id relationship_id trigger_function_id runner_class function_id function chain_id debug engine args].each do |thing|
    payload[thing] = opts[thing] if opts.key? thing
  end

  Legion::Extensions::Transformer::Transport::Messages::Message.new(**payload).publish
end

#transform(transformation:, engine: nil, schema: nil, engine_options: {}, **payload) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 11

def transform(transformation:, engine: nil, schema: nil, engine_options: {}, **payload)
  payload[:args] = render_transformation(transformation, payload, engine: engine, engine_options: engine_options)

  return payload[:args] if payload[:args].is_a?(Hash) && payload[:args][:success] == false

  if schema
    validation = Helpers::SchemaValidator.validate(schema: schema, data: payload[:args])
    return { success: false, status: 'transformer.validation_failed', errors: validation[:errors] } unless validation[:valid]
  end
  dispatch_transformed(payload)
  generate_task_log(task_id: payload[:task_id], function: 'transform', values: payload) if payload[:debug] && payload.key?(:task_id)
  { success: true, **payload }
end

#transform_chain(steps:, **payload) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 25

def transform_chain(steps:, **payload)
  result = payload
  steps.each do |step|
    step_opts = step[:engine_options] || {}
    rendered = render_transformation(step[:transformation], result, engine: step[:engine], engine_options: step_opts)
    rendered = from_json(rendered) if rendered.is_a?(String)

    return rendered if rendered.is_a?(Hash) && rendered[:success] == false

    if step[:schema]
      validation = Helpers::SchemaValidator.validate(schema: step[:schema], data: rendered)
      return { success: false, status: 'transformer.validation_failed', errors: validation[:errors] } unless validation[:valid]
    end
    result = result.merge({ args: rendered }.merge(rendered))
  end
  { success: true, **result }
end