Class: Computable::Variable

Inherits:
Object
  • Object
show all
Defined in:
lib/computable.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, calc_method, comp, mutex) ⇒ Variable

Returns a new instance of Variable.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/computable.rb', line 21

def initialize(name, calc_method, comp, mutex)
  @name = name
  @calc_method = calc_method
  @comp = comp
  @mutex = mutex
  @used_for = {}
  @expired_from = {}
  @count = 0
  @value = Unknown
  @in_process = false
  @recalc_error = nil
end

Instance Attribute Details

#calc_methodObject

Returns the value of attribute calc_method.



19
20
21
# File 'lib/computable.rb', line 19

def calc_method
  @calc_method
end

#countObject

Returns the value of attribute count.



19
20
21
# File 'lib/computable.rb', line 19

def count
  @count
end

#expired_fromObject

Returns the value of attribute expired_from.



19
20
21
# File 'lib/computable.rb', line 19

def expired_from
  @expired_from
end

#in_processObject

Returns the value of attribute in_process.



19
20
21
# File 'lib/computable.rb', line 19

def in_process
  @in_process
end

#nameObject

Returns the value of attribute name.



19
20
21
# File 'lib/computable.rb', line 19

def name
  @name
end

#recalc_errorObject

Returns the value of attribute recalc_error.



19
20
21
# File 'lib/computable.rb', line 19

def recalc_error
  @recalc_error
end

#used_forObject

Returns the value of attribute used_for.



19
20
21
# File 'lib/computable.rb', line 19

def used_for
  @used_for
end

#valueObject

Returns the value of attribute value.



19
20
21
# File 'lib/computable.rb', line 19

def value
  @value
end

#value_calcedObject

Returns the value of attribute value_calced.



19
20
21
# File 'lib/computable.rb', line 19

def value_calced
  @value_calced
end

Instance Method Details

#assign_value(value) ⇒ Object



185
186
187
188
189
190
191
192
193
# File 'lib/computable.rb', line 185

def assign_value(value)
  unless self.value == value
    expire_value
    expired_from.clear
    used_for.clear
    self.value = value
  end
  self.value_calced = false
end

#calc!Object



39
40
41
42
43
44
45
46
47
48
# File 'lib/computable.rb', line 39

def calc!
  self.count += 1
  self.value_calced = true
  @mutex.unlock
  begin
    calc_method.call(self)
  ensure
    @mutex.lock
  end
end

#expire_valueObject



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/computable.rb', line 50

def expire_value
  return if used_for.empty?

  puts "expire #{inspect}" if @comp.computable_debug
  used_for.each do |name2, v2|
    if v2.value_calced && !v2.expired_from[name]
      v2.expire_value
      v2.expired_from[name] = self
    end
  end
end

#find_recalcableObject



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/computable.rb', line 172

def find_recalcable
  if !value_calced || expired_from.empty? || in_process
    nil
  elsif expired_from.all?{ |_, v2| !v2.value_calced || v2.expired_from.empty? }
    self
  else
    expired_from.each do |_, v2|
      node = v2.find_recalcable and return node
    end
    nil
  end
end

#inspectObject



34
35
36
37
# File 'lib/computable.rb', line 34

def inspect
  has = @recalc_error ? "error!" : "value:#{Unknown!=value}"
  "<Variable #{name} used_for:#{used_for.keys} expired_from:#{expired_from.keys} has #{has} value_calced:#{value_calced.inspect}>"
end

#master_loop(max_threads, workers, from_workers, to_workers) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/computable.rb', line 130

def master_loop(max_threads, workers, from_workers, to_workers)
  num_working = 0
  loop do
    if num_working == max_threads || !(node = find_recalcable)
      #
      # maxed out or no nodes available -- wait for results
      #
      return if num_working == 0

      puts "recalc join" if @comp.computable_debug
      @mutex.unlock
      begin
        node, recalced_value, err = from_workers.pop
      ensure
        @mutex.lock
      end
      node.in_process = false
      num_working -= 1

      if err
        # Add the backtrace of the caller to the small in-thread backtrace for better debugging
        err.set_backtrace(err.backtrace + caller)
      end

      node.process_recalced_value(recalced_value, err)
    else
      #
      # not maxed out and found a node -- compute it
      #
      if (max_threads && workers.size < max_threads) ||
         (!max_threads && num_working == workers.size)
        workers << new_worker(from_workers, to_workers)
      end
      node.in_process = true
      node.count += 1
      node.value_calced = true
      num_working += 1
      to_workers.push(node)
    end
  end
end

#new_worker(from_workers, to_workers) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/computable.rb', line 105

def new_worker(from_workers, to_workers)
  Thread.new do
    while v = to_workers.pop
      puts "recalc parallel #{v.inspect}" if @comp.computable_debug
      err = nil
      begin
        recalced_value = v.calc_method.call(v)
      rescue Exception => err
      end
      from_workers.push([v, recalced_value, err])
    end
  end
end

#process_recalced_value(recalced_value, err) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/computable.rb', line 73

def process_recalced_value(recalced_value, err)
  if err
    self.recalc_error = err
    self.value = Unknown
    used_for.clear
  elsif self.value == recalced_value
    revoke_expire
  else
    self.recalc_error = nil
    self.value = recalced_value
    used_for.clear
  end
  expired_from.clear
end

#query_value(kaller) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/computable.rb', line 195

def query_value(kaller)
  if kaller
    v2 = used_for[kaller.name]
    if v2
      if Unknown==value && Unknown==v2.value && value_calced && v2.value_calced
        raise RecursionDetected, "#{v2.name} depends on #{name}, but #{name} could not be computed without #{v2.name}"
      end
    else
      used_for[kaller.name] = kaller
    end
  end

  max_threads = @comp.computable_max_threads
  if !max_threads || max_threads > 0
    recalc_parallel(max_threads)
  else
    recalc_value
  end

  raise recalc_error if recalc_error
  self.value = calc! if Unknown==value
  self.value
end

#recalc_parallel(max_threads) ⇒ Object



119
120
121
122
123
124
125
126
127
128
# File 'lib/computable.rb', line 119

def recalc_parallel(max_threads)
  workers = []
  from_workers = Queue.new
  to_workers = Queue.new

  master_loop(max_threads, workers, from_workers, to_workers)

  to_workers.close
  workers.each { |t| t.join }
end

#recalc_valueObject



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/computable.rb', line 88

def recalc_value
  return if !value_calced || expired_from.empty?

  puts "recalc #{inspect}" if @comp.computable_debug
  expired_from.each do |name2, v2|
    v2.recalc_value  # Adjust improve_backtrace when moving this line within method recalc_value
  end

  unless expired_from.empty?
    begin
      recalced_value = self.calc!
    rescue Exception => err
    end
    process_recalced_value(recalced_value, err)
  end
end

#revoke_expireObject



62
63
64
65
66
67
68
69
70
71
# File 'lib/computable.rb', line 62

def revoke_expire
  return if used_for.empty?

  puts "revoke expire #{inspect}" if @comp.computable_debug
  used_for.each do |name2, v2|
    if v2.value_calced && v2.expired_from.delete(name) && v2.expired_from.empty?
      v2.revoke_expire
    end
  end
end