Class: Hatchet::ConcurrencyExpression

Inherits:
Object
  • Object
show all
Defined in:
lib/hatchet/concurrency.rb

Overview

Defines a concurrency expression for workflow or task-level concurrency control

Examples:

Workflow-level concurrency

Hatchet::ConcurrencyExpression.new(
  expression: "input.group_key",
  max_runs: 5,
  limit_strategy: :cancel_in_progress
)

Task-level concurrency with multiple keys

[
  Hatchet::ConcurrencyExpression.new(expression: "input.digit", max_runs: 8, limit_strategy: :group_round_robin),
  Hatchet::ConcurrencyExpression.new(expression: "input.name", max_runs: 3, limit_strategy: :group_round_robin)
]

Constant Summary collapse

LIMIT_STRATEGY_MAP =

Map Ruby symbol to v1 proto enum symbol

{
  cancel_in_progress: :CANCEL_IN_PROGRESS,
  cancel_newest: :CANCEL_NEWEST,
  group_round_robin: :GROUP_ROUND_ROBIN,
  queue: :QUEUE_NEWEST,
  drop_newest: :DROP_NEWEST,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(expression:, max_runs: 1, limit_strategy: :cancel_in_progress) ⇒ ConcurrencyExpression

Returns a new instance of ConcurrencyExpression.

Parameters:

  • expression (String)

    CEL expression evaluated against input

  • max_runs (Integer) (defaults to: 1)

    Maximum concurrent runs

  • limit_strategy (Symbol) (defaults to: :cancel_in_progress)

    Strategy when limit is reached



39
40
41
42
43
# File 'lib/hatchet/concurrency.rb', line 39

def initialize(expression:, max_runs: 1, limit_strategy: :cancel_in_progress)
  @expression = expression
  @max_runs = max_runs
  @limit_strategy = limit_strategy
end

Instance Attribute Details

#expressionString (readonly)

Returns CEL expression evaluated against the workflow input.

Returns:

  • (String)

    CEL expression evaluated against the workflow input



28
29
30
# File 'lib/hatchet/concurrency.rb', line 28

def expression
  @expression
end

#limit_strategySymbol (readonly)

Returns Strategy when limit is exceeded (:cancel_in_progress, :cancel_newest, :group_round_robin, :queue).

Returns:

  • (Symbol)

    Strategy when limit is exceeded (:cancel_in_progress, :cancel_newest, :group_round_robin, :queue)



34
35
36
# File 'lib/hatchet/concurrency.rb', line 34

def limit_strategy
  @limit_strategy
end

#max_runsInteger (readonly)

Returns Maximum concurrent runs for this key.

Returns:

  • (Integer)

    Maximum concurrent runs for this key



31
32
33
# File 'lib/hatchet/concurrency.rb', line 31

def max_runs
  @max_runs
end

Instance Method Details

#to_hHash

Convert to a hash for API serialization

Returns:

  • (Hash)


47
48
49
50
51
52
53
# File 'lib/hatchet/concurrency.rb', line 47

def to_h
  {
    expression: @expression,
    max_runs: @max_runs,
    limit_strategy: @limit_strategy.to_s.upcase,
  }
end

#to_protoV1::Concurrency

Convert to a V1::Concurrency protobuf message

Returns:



66
67
68
69
70
71
72
73
74
# File 'lib/hatchet/concurrency.rb', line 66

def to_proto
  proto_strategy = LIMIT_STRATEGY_MAP[@limit_strategy] || :CANCEL_IN_PROGRESS

  ::V1::Concurrency.new(
    expression: @expression,
    max_runs: @max_runs,
    limit_strategy: proto_strategy,
  )
end