Module: Legion::Extensions::Transformer::Runners::Transform
- Extended by:
- Helpers::Task
- Includes:
- Helpers::Lex
- Defined in:
- lib/legion/extensions/transformer/runners/transform.rb
Instance Method Summary collapse
- #dispatch_multiplied(payload) ⇒ Object
- #dispatch_transformed(payload) ⇒ Object
- #render_transformation(transformation, payload, engine: nil, engine_options: {}) ⇒ Object
- #send_task(**opts) ⇒ Object
- #transform(transformation:, engine: nil, schema: nil, engine_options: {}, **payload) ⇒ Object
- #transform_chain(steps:, **payload) ⇒ Object
Instance Method Details
#dispatch_multiplied(payload) ⇒ Object
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 67 def dispatch_multiplied(payload) payload[:args].each do |thing| new_payload = payload.dup task = Legion::Runner::Status.generate_task_id(function_args: thing, status: 'task.queued', args: thing, **new_payload) new_payload[:task_id] = task[:task_id] new_payload[:args] = thing send_task(**new_payload) end task_update(payload[:task_id], 'task.multiplied', function_args: payload[:args]) unless payload[:task_id].nil? end |
#dispatch_transformed(payload) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 56 def dispatch_transformed(payload) case payload[:args] when Hash task_update(payload[:task_id], 'transformer.succeeded', function_args: payload[:args]) unless payload[:task_id].nil? send_task(**payload) task_update(payload[:task_id], 'task.queued', use_database: false) unless payload[:task_id].nil? when Array dispatch_multiplied(payload) end end |
#render_transformation(transformation, payload, engine: nil, engine_options: {}) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 43 def render_transformation(transformation, payload, engine: nil, engine_options: {}) eng = if engine Engines::Registry.fetch(engine) else Engines::Registry.detect(transformation) end rendered = eng.render(transformation, payload, **) return rendered if rendered.is_a?(Hash) && rendered[:success] == false rendered.is_a?(String) ? from_json(rendered) : rendered end |
#send_task(**opts) ⇒ Object
78 79 80 81 82 83 84 85 |
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 78 def send_task(**opts) payload = {} %i[task_id relationship_id trigger_function_id runner_class function_id function chain_id debug engine args].each do |thing| payload[thing] = opts[thing] if opts.key? thing end Legion::Extensions::Transformer::Transport::Messages::Message.new(**payload).publish end |
#transform(transformation:, engine: nil, schema: nil, engine_options: {}, **payload) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 11 def transform(transformation:, engine: nil, schema: nil, engine_options: {}, **payload) payload[:args] = render_transformation(transformation, payload, engine: engine, engine_options: ) return payload[:args] if payload[:args].is_a?(Hash) && payload[:args][:success] == false if schema validation = Helpers::SchemaValidator.validate(schema: schema, data: payload[:args]) return { success: false, status: 'transformer.validation_failed', errors: validation[:errors] } unless validation[:valid] end dispatch_transformed(payload) generate_task_log(task_id: payload[:task_id], function: 'transform', values: payload) if payload[:debug] && payload.key?(:task_id) { success: true, **payload } end |
#transform_chain(steps:, **payload) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/legion/extensions/transformer/runners/transform.rb', line 25 def transform_chain(steps:, **payload) result = payload steps.each do |step| step_opts = step[:engine_options] || {} rendered = render_transformation(step[:transformation], result, engine: step[:engine], engine_options: step_opts) rendered = from_json(rendered) if rendered.is_a?(String) return rendered if rendered.is_a?(Hash) && rendered[:success] == false if step[:schema] validation = Helpers::SchemaValidator.validate(schema: step[:schema], data: rendered) return { success: false, status: 'transformer.validation_failed', errors: validation[:errors] } unless validation[:valid] end result = result.merge({ args: rendered }.merge(rendered)) end { success: true, **result } end |