Class: Ductwork::Pipeline

Inherits:
Record
  • Object
show all
Defined in:
app/models/ductwork/pipeline.rb

Overview

rubocop:todo Metrics/ClassLength

Defined Under Namespace

Classes: DefinitionError

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Record

table_name_prefix

Class Attribute Details

.pipeline_definitionObject (readonly)

Returns the value of attribute pipeline_definition.



32
33
34
# File 'app/models/ductwork/pipeline.rb', line 32

def pipeline_definition
  @pipeline_definition
end

Class Method Details

.define(&block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'app/models/ductwork/pipeline.rb', line 34

def define(&block)
  if !block_given?
    raise DefinitionError, "Definition block must be given"
  end

  if pipeline_definition
    raise DefinitionError, "Pipeline has already been defined"
  end

  builder = Ductwork::DSL::DefinitionBuilder.new

  block.call(builder)

  @pipeline_definition = builder.complete

  Ductwork.defined_pipelines << name.to_s
end

.inherited(subclass) ⇒ Object



21
22
23
24
25
26
27
# File 'app/models/ductwork/pipeline.rb', line 21

def self.inherited(subclass)
  super

  subclass.class_eval do
    default_scope { where(klass: name.to_s) }
  end
end

.trigger(args) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'app/models/ductwork/pipeline.rb', line 52

def trigger(args)
  if pipeline_definition.nil?
    raise DefinitionError, "Pipeline must be defined before triggering"
  end

  step_klass = pipeline_definition.dig(:nodes, 0)
  definition = JSON.dump(pipeline_definition)

  pipeline = Record.transaction do
    p = create!(
      klass: name.to_s,
      status: :in_progress,
      definition: definition,
      definition_sha1: Digest::SHA1.hexdigest(definition),
      triggered_at: Time.current,
      last_advanced_at: Time.current
    )
    step = p.steps.create!(
      klass: step_klass,
      status: :in_progress,
      step_type: :start,
      started_at: Time.current
    )
    Ductwork::Job.enqueue(step, args)

    p
  end

  Ductwork.logger.info(
    msg: "Pipeline triggered",
    pipeline_id: pipeline.id,
    role: :application
  )

  pipeline
end

Instance Method Details

#advance!Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'app/models/ductwork/pipeline.rb', line 90

def advance!
  # NOTE: if we've expanded the pipeline there could be a lot of
  # advancing records which may cause memory issues. something to
  # watch out for here and maybe add in config to use AR relation
  # at certain counts or even memory limits.
  advancing_steps = steps.advancing.pluck(:id, :klass)
  advancing_ids = advancing_steps.map(&:first)
  edges = find_edges(advancing_steps)

  Ductwork::Record.transaction do
    if edges.nil? || edges.values.all?(&:empty?)
      conditionally_complete_pipeline(advancing_ids)
    else
      advance_to_next_steps_by_type(edges, advancing_ids)
    end
  end
end

#parsed_definitionObject



108
109
110
# File 'app/models/ductwork/pipeline.rb', line 108

def parsed_definition
  @parsed_definition ||= JSON.parse(definition).with_indifferent_access
end