Module: BusinessFlow::ClusterLock::ClassMethods
- Defined in:
- lib/business_flow/cluster_lock.rb
Overview
Defined Under Namespace
Classes: LockFailure, LockInfo
Constant Summary
collapse
- RESULT_FINALIZE =
proc do |cluster_lock_info|
@cluster_lock_info = cluster_lock_info
self
end
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.acquire_lock(flow, lock_info, payload) ⇒ Object
169
170
171
172
173
174
175
176
|
# File 'lib/business_flow/cluster_lock.rb', line 169
def self.acquire_lock(flow, lock_info, payload)
zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers)
lock = flow.instance_variable_set(
:@_business_flow_cluster_lock,
ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name)
)
inner_acquire_lock(zk_connection, lock, payload)
end
|
.cleanup(lock, zk_connection) ⇒ Object
188
189
190
191
192
193
194
195
|
# File 'lib/business_flow/cluster_lock.rb', line 188
def self.cleanup(lock, zk_connection)
begin
lock&.unlock
rescue ZK::Exceptions::OperationTimeOut
end
zk_connection&.close!
end
|
.exception_to_error_type(exc) ⇒ Object
:reek:ControlParameter I’m using a case statement instead of a hash in a constant to ensure that this doesn’t throw exceptions if this file is required before zookeeper is.
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/business_flow/cluster_lock.rb', line 200
def self.exception_to_error_type(exc)
case exc
when ZK::Exceptions::LockAssertionFailedError
:assert_failed
when ZK::Exceptions::OperationTimeOut
:zookeeper_timeout
else
:unknown_failure
end
end
|
.inner_acquire_lock(zk_connection, lock, payload) ⇒ Object
178
179
180
181
182
183
184
185
186
|
# File 'lib/business_flow/cluster_lock.rb', line 178
def self.inner_acquire_lock(zk_connection, lock, payload)
lock_held = lock.lock(wait: false)
payload[:lock_acquired] = lock_held if payload
unless lock_held
zk_connection.close!
raise LockFailure.new(:lock_unavailable, 'the lock was not available')
end
[zk_connection, lock]
end
|
.instrumented_acquire_lock(flow, lock_info) ⇒ Object
162
163
164
165
166
167
|
# File 'lib/business_flow/cluster_lock.rb', line 162
def self.instrumented_acquire_lock(flow, lock_info)
flow.class.instrument(:cluster_lock_setup, flow) do |payload|
payload[:lock_name] = lock_info.lock_name if payload
acquire_lock(flow, lock_info, payload)
end
end
|
.lock_name(flow) ⇒ Object
152
153
154
155
156
157
158
159
160
|
# File 'lib/business_flow/cluster_lock.rb', line 152
def self.lock_name(flow)
lock_name =
catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s }
if lock_name.nil? || lock_name.empty?
raise LockFailure.new(:no_lock_name, 'no lock name provided')
end
lock_name
end
|
.with_lock(flow, lock_info, &_blk) ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
223
|
# File 'lib/business_flow/cluster_lock.rb', line 211
def self.with_lock(flow, lock_info, &_blk)
unless BusinessFlow::ClusterLock.disabled?
zk_connection, lock =
instrumented_acquire_lock(flow, lock_info)
end
yield lock_info
rescue ZK::Exceptions::LockAssertionFailedError, ZK::Exceptions::OperationTimeOut => e
raise LockFailure.new(exception_to_error_type(e), e.message)
ensure
cleanup(lock, zk_connection)
end
|
.zookeeper_server_list(flow) ⇒ Object
141
142
143
144
145
146
147
148
149
|
# File 'lib/business_flow/cluster_lock.rb', line 141
def self.zookeeper_server_list(flow)
servers =
catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s }
if servers.nil? || servers.empty?
raise LockFailure.new(:no_servers, 'no zookeeper servers provided')
end
servers
end
|
Instance Method Details
#add_cluster_luck_info_to_result_class ⇒ Object
130
131
132
133
134
135
136
137
138
|
# File 'lib/business_flow/cluster_lock.rb', line 130
def add_cluster_luck_info_to_result_class
return if @cluster_lock_info_added
result_class = const_get(:Result)
DSL::PublicField.new(:cluster_lock_info).add_to(result_class)
result_class.send(:define_method, :_business_flow_cluster_lock_finalize,
RESULT_FINALIZE)
@cluster_lock_info_added = true
end
|
#build(parameter_object) ⇒ Object
108
109
110
111
|
# File 'lib/business_flow/cluster_lock.rb', line 108
def build(parameter_object)
add_cluster_luck_info_to_result_class
super(parameter_object)
end
|
#default_lock_name ⇒ Object
104
105
106
|
# File 'lib/business_flow/cluster_lock.rb', line 104
def default_lock_name
proc { self.class.name }
end
|
#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/business_flow/cluster_lock.rb', line 82
def with_cluster_lock(lock_name = nil, opts = {}, &blk)
if lock_name.is_a?(String)
@lock_name = Step.new(Callable.new(proc { lock_name }), {})
elsif lock_name || blk
@lock_name = Step.new(Callable.new(lock_name || blk),
{ default_output: :lock_name }.merge(opts))
else
@lock_name ||= Step.new(Callable.new(default_lock_name), opts)
end
end
|
#with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/business_flow/cluster_lock.rb', line 93
def with_zookeeper_servers(servers = nil, opts = {}, &blk)
if servers.is_a?(String)
@zookeeper_servers = Step.new(Callable.new(proc { servers }), {})
elsif servers || blk
@zookeeper_servers = Step.new(Callable.new(servers || blk),
{ default_output: :zookeeper_servers }.merge(opts))
else
@zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts)
end
end
|