Class: Wurk::Topology

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/topology.rb

Overview

Worker topology DSL (Wurk extension on top of Ent’s flat swarm). Lets users declare specialized slots: e.g. 2 forks dedicated to the critical queue with low concurrency, 2 forks for bulk + low with high concurrency. Stronger queue isolation than a flat swarm.

Each Slot describes a kind of fork; ‘count` is how many identical forks of that kind to spawn. Swarm consumes `assignments` (the flat list of forks to spawn, in order) so a slot with count=2 yields two assignment entries pointing at the same Slot.

See docs/idea/03-process-model.md §Worker topology.

Defined Under Namespace

Classes: Slot

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeTopology

Returns a new instance of Topology.



24
25
26
# File 'lib/wurk/topology.rb', line 24

def initialize
  @slots = []
end

Class Method Details

.flat(count:, queues:, concurrency:) ⇒ Object

Convenience: build a flat topology of ‘count` identical forks consuming `queues` with `concurrency` threads each. Used by the railtie when the host hasn’t declared a custom topology.



56
57
58
# File 'lib/wurk/topology.rb', line 56

def self.flat(count:, queues:, concurrency:)
  new.slot(count: count, queues: queues, concurrency: concurrency)
end

Instance Method Details

#assignmentsObject

Flat ordered list of slots to fork, one per child process. A slot with count=N contributes N entries.



45
46
47
# File 'lib/wurk/topology.rb', line 45

def assignments
  @slots.flat_map { |s| Array.new(s.count, s) }
end

#empty?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/wurk/topology.rb', line 39

def empty?
  @slots.empty?
end

#slot(count:, queues:, concurrency:) ⇒ Object

Declare one slot kind. Returns self so calls chain.



29
30
31
32
33
# File 'lib/wurk/topology.rb', line 29

def slot(count:, queues:, concurrency:)
  queue_list = validate_slot!(count, queues, concurrency)
  @slots << Slot.new(count: count, queues: queue_list, concurrency: concurrency).freeze
  self
end

#slotsObject



35
36
37
# File 'lib/wurk/topology.rb', line 35

def slots
  @slots.dup.freeze
end

#total_processesObject



49
50
51
# File 'lib/wurk/topology.rb', line 49

def total_processes
  @slots.sum(&:count)
end