Class: Computable::Variable
- Inherits:
-
Object
- Object
- Computable::Variable
- Defined in:
- lib/computable.rb
Instance Attribute Summary collapse
-
#calc_method ⇒ Object
Returns the value of attribute calc_method.
-
#count ⇒ Object
Returns the value of attribute count.
-
#expired_from ⇒ Object
Returns the value of attribute expired_from.
-
#in_process ⇒ Object
Returns the value of attribute in_process.
-
#name ⇒ Object
Returns the value of attribute name.
-
#recalc_error ⇒ Object
Returns the value of attribute recalc_error.
-
#used_for ⇒ Object
Returns the value of attribute used_for.
-
#value ⇒ Object
Returns the value of attribute value.
-
#value_calced ⇒ Object
Returns the value of attribute value_calced.
Instance Method Summary collapse
- #assign_value(value) ⇒ Object
- #calc! ⇒ Object
- #expire_value ⇒ Object
- #find_recalcable ⇒ Object
-
#initialize(name, calc_method, comp, mutex) ⇒ Variable
constructor
A new instance of Variable.
- #inspect ⇒ Object
- #master_loop(max_threads, workers, from_workers, to_workers) ⇒ Object
- #new_worker(from_workers, to_workers) ⇒ Object
- #process_recalced_value(recalced_value, err) ⇒ Object
- #query_value(kaller) ⇒ Object
- #recalc_parallel(max_threads) ⇒ Object
- #recalc_value ⇒ Object
- #revoke_expire ⇒ Object
Constructor Details
#initialize(name, calc_method, comp, mutex) ⇒ Variable
Returns a new instance of Variable.
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/computable.rb', line 21 def initialize(name, calc_method, comp, mutex) @name = name @calc_method = calc_method @comp = comp @mutex = mutex @used_for = {} @expired_from = {} @count = 0 @value = Unknown @in_process = false @recalc_error = nil end |
Instance Attribute Details
#calc_method ⇒ Object
Returns the value of attribute calc_method.
19 20 21 |
# File 'lib/computable.rb', line 19 def calc_method @calc_method end |
#count ⇒ Object
Returns the value of attribute count.
19 20 21 |
# File 'lib/computable.rb', line 19 def count @count end |
#expired_from ⇒ Object
Returns the value of attribute expired_from.
19 20 21 |
# File 'lib/computable.rb', line 19 def expired_from @expired_from end |
#in_process ⇒ Object
Returns the value of attribute in_process.
19 20 21 |
# File 'lib/computable.rb', line 19 def in_process @in_process end |
#name ⇒ Object
Returns the value of attribute name.
19 20 21 |
# File 'lib/computable.rb', line 19 def name @name end |
#recalc_error ⇒ Object
Returns the value of attribute recalc_error.
19 20 21 |
# File 'lib/computable.rb', line 19 def recalc_error @recalc_error end |
#used_for ⇒ Object
Returns the value of attribute used_for.
19 20 21 |
# File 'lib/computable.rb', line 19 def used_for @used_for end |
#value ⇒ Object
Returns the value of attribute value.
19 20 21 |
# File 'lib/computable.rb', line 19 def value @value end |
#value_calced ⇒ Object
Returns the value of attribute value_calced.
19 20 21 |
# File 'lib/computable.rb', line 19 def value_calced @value_calced end |
Instance Method Details
#assign_value(value) ⇒ Object
185 186 187 188 189 190 191 192 193 |
# File 'lib/computable.rb', line 185 def assign_value(value) unless self.value == value expire_value expired_from.clear used_for.clear self.value = value end self.value_calced = false end |
#calc! ⇒ Object
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/computable.rb', line 39 def calc! self.count += 1 self.value_calced = true @mutex.unlock begin calc_method.call(self) ensure @mutex.lock end end |
#expire_value ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/computable.rb', line 50 def expire_value return if used_for.empty? puts "expire #{inspect}" if @comp.computable_debug used_for.each do |name2, v2| if v2.value_calced && !v2.expired_from[name] v2.expire_value v2.expired_from[name] = self end end end |
#find_recalcable ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/computable.rb', line 172 def find_recalcable if !value_calced || expired_from.empty? || in_process nil elsif expired_from.all?{ |_, v2| !v2.value_calced || v2.expired_from.empty? } self else expired_from.each do |_, v2| node = v2.find_recalcable and return node end nil end end |
#inspect ⇒ Object
34 35 36 37 |
# File 'lib/computable.rb', line 34 def inspect has = @recalc_error ? "error!" : "value:#{Unknown!=value}" "<Variable #{name} used_for:#{used_for.keys} expired_from:#{expired_from.keys} has #{has} value_calced:#{value_calced.inspect}>" end |
#master_loop(max_threads, workers, from_workers, to_workers) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/computable.rb', line 130 def master_loop(max_threads, workers, from_workers, to_workers) num_working = 0 loop do if num_working == max_threads || !(node = find_recalcable) # # maxed out or no nodes available -- wait for results # return if num_working == 0 puts "recalc join" if @comp.computable_debug @mutex.unlock begin node, recalced_value, err = from_workers.pop ensure @mutex.lock end node.in_process = false num_working -= 1 if err # Add the backtrace of the caller to the small in-thread backtrace for better debugging err.set_backtrace(err.backtrace + caller) end node.process_recalced_value(recalced_value, err) else # # not maxed out and found a node -- compute it # if (max_threads && workers.size < max_threads) || (!max_threads && num_working == workers.size) workers << new_worker(from_workers, to_workers) end node.in_process = true node.count += 1 node.value_calced = true num_working += 1 to_workers.push(node) end end end |
#new_worker(from_workers, to_workers) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/computable.rb', line 105 def new_worker(from_workers, to_workers) Thread.new do while v = to_workers.pop puts "recalc parallel #{v.inspect}" if @comp.computable_debug err = nil begin recalced_value = v.calc_method.call(v) rescue Exception => err end from_workers.push([v, recalced_value, err]) end end end |
#process_recalced_value(recalced_value, err) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/computable.rb', line 73 def process_recalced_value(recalced_value, err) if err self.recalc_error = err self.value = Unknown used_for.clear elsif self.value == recalced_value revoke_expire else self.recalc_error = nil self.value = recalced_value used_for.clear end expired_from.clear end |
#query_value(kaller) ⇒ Object
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/computable.rb', line 195 def query_value(kaller) if kaller v2 = used_for[kaller.name] if v2 if Unknown==value && Unknown==v2.value && value_calced && v2.value_calced raise RecursionDetected, "#{v2.name} depends on #{name}, but #{name} could not be computed without #{v2.name}" end else used_for[kaller.name] = kaller end end max_threads = @comp.computable_max_threads if !max_threads || max_threads > 0 recalc_parallel(max_threads) else recalc_value end raise recalc_error if recalc_error self.value = calc! if Unknown==value self.value end |
#recalc_parallel(max_threads) ⇒ Object
119 120 121 122 123 124 125 126 127 128 |
# File 'lib/computable.rb', line 119 def recalc_parallel(max_threads) workers = [] from_workers = Queue.new to_workers = Queue.new master_loop(max_threads, workers, from_workers, to_workers) to_workers.close workers.each { |t| t.join } end |
#recalc_value ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/computable.rb', line 88 def recalc_value return if !value_calced || expired_from.empty? puts "recalc #{inspect}" if @comp.computable_debug expired_from.each do |name2, v2| v2.recalc_value # Adjust improve_backtrace when moving this line within method recalc_value end unless expired_from.empty? begin recalced_value = self.calc! rescue Exception => err end process_recalced_value(recalced_value, err) end end |
#revoke_expire ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/computable.rb', line 62 def revoke_expire return if used_for.empty? puts "revoke expire #{inspect}" if @comp.computable_debug used_for.each do |name2, v2| if v2.value_calced && v2.expired_from.delete(name) && v2.expired_from.empty? v2.revoke_expire end end end |