Class: Tep::Parallel

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/parallel.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ Parallel

Returns a new instance of Parallel.



81
82
83
# File 'lib/tep/parallel.rb', line 81

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject

Returns the value of attribute worker.



79
80
81
# File 'lib/tep/parallel.rb', line 79

def worker
  @worker
end

Class Method Details

.scratch_dirObject

Per-invocation scratch directory. Uses pid + monotonic timestamp so concurrent map_processes calls in different workers don’t trample each other.



164
165
166
# File 'lib/tep/parallel.rb', line 164

def self.scratch_dir
  "/tmp/tep_par_" + Sock.sphttp_getpid.to_s + "_" + Time.now.to_i.to_s
end

Instance Method Details

#each_process(items) ⇒ Object

Fire-and-forget version. Returns 0 once every child exits.



146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/tep/parallel.rb', line 146

def each_process(items)
  n = items.length
  i = 0
  while i < n
    spawn_one(items[i], 0, "")
    i += 1
  end
  reaped = 0
  while reaped < n
    Sock.sphttp_wait_any
    reaped += 1
  end
  0
end

#map_processes(items) ⇒ Object

Result-collecting fan-out. Returns an Array of Strings in input order; one fork per item. See module doc for the constraints (Strings only, no fixed pool).



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/tep/parallel.rb', line 88

def map_processes(items)
  job_dir = Parallel.scratch_dir
  Tep::Shell.run("mkdir -p " + job_dir)

  n = items.length
  i = 0
  while i < n
    # Pull each fork into its own stack frame -- spinel's
    # codegen for the in-line fork-and-exec pattern was
    # observed to share locals across the parent loop and
    # the child body, so all children ended up processing
    # the same (last) item. Method-call boundary gives each
    # child a clean local snapshot.
    spawn_one(items[i], i, job_dir)
    i += 1
  end

  reaped = 0
  while reaped < n
    Sock.sphttp_wait_any
    reaped += 1
  end

  out = [""]
  out.delete_at(0)
  k = 0
  while k < n
    out.push(Tep::Shell.read(job_dir + "/" + k.to_s))
    k += 1
  end
  Tep::Shell.run("rm -rf " + job_dir)
  out
end

#spawn_one(item, idx, job_dir) ⇒ Object

Fork one child to process ‘item`. When `job_dir` is non-empty, the child writes the worker’s String result to ‘job_dir/idx` (consumed by map_processes); otherwise the result is discarded (fire-and-forget shape used by each_process). Returns the child pid in the parent; the child never returns (exits when done).

The method-call boundary is load-bearing: an inline fork-and- exec loop body shared locals across iterations under spinel’s codegen, so every child processed the same (last) item. A separate def gives each fork a clean local frame.



132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/tep/parallel.rb', line 132

def spawn_one(item, idx, job_dir)
  pid = Sock.sphttp_fork
  if pid == 0
    result = @worker.run(item)
    if job_dir.length > 0
      path = job_dir + "/" + idx.to_s
      File.write(path, result)
    end
    Sock.sphttp_exit(0)
  end
  pid
end