Class: Tep::Parallel
- Inherits:
-
Object
- Object
- Tep::Parallel
- Defined in:
- lib/tep/parallel.rb
Instance Attribute Summary collapse
-
#worker ⇒ Object
Returns the value of attribute worker.
Class Method Summary collapse
-
.scratch_dir ⇒ Object
Per-invocation scratch directory.
Instance Method Summary collapse
-
#each_process(items) ⇒ Object
Fire-and-forget version.
-
#initialize(worker) ⇒ Parallel
constructor
A new instance of Parallel.
-
#map_processes(items) ⇒ Object
Result-collecting fan-out.
-
#spawn_one(item, idx, job_dir) ⇒ Object
Fork one child to process ‘item`.
Constructor Details
#initialize(worker) ⇒ Parallel
Returns a new instance of Parallel.
81 82 83 |
# File 'lib/tep/parallel.rb', line 81 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object
Returns the value of attribute worker.
79 80 81 |
# File 'lib/tep/parallel.rb', line 79 def worker @worker end |
Class Method Details
.scratch_dir ⇒ Object
Per-invocation scratch directory. Uses pid + monotonic timestamp so concurrent map_processes calls in different workers don’t trample each other.
164 165 166 |
# File 'lib/tep/parallel.rb', line 164 def self.scratch_dir "/tmp/tep_par_" + Sock.sphttp_getpid.to_s + "_" + Time.now.to_i.to_s end |
Instance Method Details
#each_process(items) ⇒ Object
Fire-and-forget version. Returns 0 once every child exits.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/tep/parallel.rb', line 146 def each_process(items) n = items.length i = 0 while i < n spawn_one(items[i], 0, "") i += 1 end reaped = 0 while reaped < n Sock.sphttp_wait_any reaped += 1 end 0 end |
#map_processes(items) ⇒ Object
Result-collecting fan-out. Returns an Array of Strings in input order; one fork per item. See module doc for the constraints (Strings only, no fixed pool).
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/tep/parallel.rb', line 88 def map_processes(items) job_dir = Parallel.scratch_dir Tep::Shell.run("mkdir -p " + job_dir) n = items.length i = 0 while i < n # Pull each fork into its own stack frame -- spinel's # codegen for the in-line fork-and-exec pattern was # observed to share locals across the parent loop and # the child body, so all children ended up processing # the same (last) item. Method-call boundary gives each # child a clean local snapshot. spawn_one(items[i], i, job_dir) i += 1 end reaped = 0 while reaped < n Sock.sphttp_wait_any reaped += 1 end out = [""] out.delete_at(0) k = 0 while k < n out.push(Tep::Shell.read(job_dir + "/" + k.to_s)) k += 1 end Tep::Shell.run("rm -rf " + job_dir) out end |
#spawn_one(item, idx, job_dir) ⇒ Object
Fork one child to process ‘item`. When `job_dir` is non-empty, the child writes the worker’s String result to ‘job_dir/idx` (consumed by map_processes); otherwise the result is discarded (fire-and-forget shape used by each_process). Returns the child pid in the parent; the child never returns (exits when done).
The method-call boundary is load-bearing: an inline fork-and- exec loop body shared locals across iterations under spinel’s codegen, so every child processed the same (last) item. A separate def gives each fork a clean local frame.
132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/tep/parallel.rb', line 132 def spawn_one(item, idx, job_dir) pid = Sock.sphttp_fork if pid == 0 result = @worker.run(item) if job_dir.length > 0 path = job_dir + "/" + idx.to_s File.write(path, result) end Sock.sphttp_exit(0) end pid end |