Module: BusinessFlow::ClusterLock::ClassMethods

Defined in:
lib/business_flow/cluster_lock.rb

Overview

DSL Methods

Defined Under Namespace

Classes: LockFailure, LockInfo

Constant Summary collapse

RESULT_FINALIZE =
proc do |cluster_lock_info|
  @cluster_lock_info = cluster_lock_info
  self
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.acquire_lock(flow, lock_info, payload) ⇒ Object



169
170
171
172
173
174
175
176
# File 'lib/business_flow/cluster_lock.rb', line 169

def self.acquire_lock(flow, lock_info, payload)
  zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers)
  lock = flow.instance_variable_set(
    :@_business_flow_cluster_lock,
    ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name)
  )
  inner_acquire_lock(zk_connection, lock, payload)
end

.cleanup(lock, zk_connection) ⇒ Object



188
189
190
191
192
193
194
195
# File 'lib/business_flow/cluster_lock.rb', line 188

def self.cleanup(lock, zk_connection)
  begin
    lock&.unlock
  rescue ZK::Exceptions::OperationTimeOut
    # Just let the connection close handle this.
  end
  zk_connection&.close!
end

.exception_to_error_type(exc) ⇒ Object

:reek:ControlParameter I’m using a case statement instead of a hash in a constant to ensure that this doesn’t throw exceptions if this file is required before zookeeper is.



200
201
202
203
204
205
206
207
208
209
# File 'lib/business_flow/cluster_lock.rb', line 200

def self.exception_to_error_type(exc)
  case exc
  when ZK::Exceptions::LockAssertionFailedError
    :assert_failed
  when ZK::Exceptions::OperationTimeOut
    :zookeeper_timeout
  else
    :unknown_failure
  end
end

.inner_acquire_lock(zk_connection, lock, payload) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/business_flow/cluster_lock.rb', line 178

def self.inner_acquire_lock(zk_connection, lock, payload)
  lock_held = lock.lock(wait: false)
  payload[:lock_acquired] = lock_held if payload
  unless lock_held
    zk_connection.close!
    raise LockFailure.new(:lock_unavailable, 'the lock was not available')
  end
  [zk_connection, lock]
end

.instrumented_acquire_lock(flow, lock_info) ⇒ Object



162
163
164
165
166
167
# File 'lib/business_flow/cluster_lock.rb', line 162

def self.instrumented_acquire_lock(flow, lock_info)
  flow.class.instrument(:cluster_lock_setup, flow) do |payload|
    payload[:lock_name] = lock_info.lock_name if payload
    acquire_lock(flow, lock_info, payload)
  end
end

.lock_name(flow) ⇒ Object

:reek:NilCheck



152
153
154
155
156
157
158
159
160
# File 'lib/business_flow/cluster_lock.rb', line 152

def self.lock_name(flow)
  lock_name =
    catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s }
  if lock_name.nil? || lock_name.empty?
    raise LockFailure.new(:no_lock_name, 'no lock name provided')
  end

  lock_name
end

.with_lock(flow, lock_info, &_blk) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/business_flow/cluster_lock.rb', line 211

def self.with_lock(flow, lock_info, &_blk)
  unless BusinessFlow::ClusterLock.disabled?
    zk_connection, lock =
      instrumented_acquire_lock(flow, lock_info)
  end
  yield lock_info
rescue ZK::Exceptions::LockAssertionFailedError, ZK::Exceptions::OperationTimeOut => e
  # This would occur if we asserted a cluster lock while executing the flow.
  # This will have set an error on the flow, so we can carry on.
  raise LockFailure.new(exception_to_error_type(e), e.message)
ensure
  cleanup(lock, zk_connection)
end

.zookeeper_server_list(flow) ⇒ Object

:reek:NilCheck



141
142
143
144
145
146
147
148
149
# File 'lib/business_flow/cluster_lock.rb', line 141

def self.zookeeper_server_list(flow)
  servers =
    catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s }
  if servers.nil? || servers.empty?
    raise LockFailure.new(:no_servers, 'no zookeeper servers provided')
  end

  servers
end

Instance Method Details

#add_cluster_luck_info_to_result_classObject



130
131
132
133
134
135
136
137
138
# File 'lib/business_flow/cluster_lock.rb', line 130

def add_cluster_luck_info_to_result_class
  return if @cluster_lock_info_added

  result_class = const_get(:Result)
  DSL::PublicField.new(:cluster_lock_info).add_to(result_class)
  result_class.send(:define_method, :_business_flow_cluster_lock_finalize,
                    RESULT_FINALIZE)
  @cluster_lock_info_added = true
end

#build(parameter_object) ⇒ Object



108
109
110
111
# File 'lib/business_flow/cluster_lock.rb', line 108

def build(parameter_object)
  add_cluster_luck_info_to_result_class
  super(parameter_object)
end

#default_lock_nameObject



104
105
106
# File 'lib/business_flow/cluster_lock.rb', line 104

def default_lock_name
  proc { self.class.name }
end

#execute(flow) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/business_flow/cluster_lock.rb', line 113

def execute(flow)
  lock_info = LockInfo.new(
    ClassMethods.lock_name(flow),
    ClassMethods.zookeeper_server_list(flow)
  )
  ClassMethods.with_lock(flow, lock_info) do
    super(flow)._business_flow_cluster_lock_finalize(lock_info)
  end
rescue LockFailure => e
  result_from(e.add_to(flow))._business_flow_cluster_lock_finalize(lock_info)
end

#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object



82
83
84
85
86
87
88
89
90
91
# File 'lib/business_flow/cluster_lock.rb', line 82

def with_cluster_lock(lock_name = nil, opts = {}, &blk)
  if lock_name.is_a?(String)
    @lock_name = Step.new(Callable.new(proc { lock_name }), {})
  elsif lock_name || blk
    @lock_name = Step.new(Callable.new(lock_name || blk),
                          { default_output: :lock_name }.merge(opts))
  else
    @lock_name ||= Step.new(Callable.new(default_lock_name), opts)
  end
end

#with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object



93
94
95
96
97
98
99
100
101
102
# File 'lib/business_flow/cluster_lock.rb', line 93

def with_zookeeper_servers(servers = nil, opts = {}, &blk)
  if servers.is_a?(String)
    @zookeeper_servers = Step.new(Callable.new(proc { servers }), {})
  elsif servers || blk
    @zookeeper_servers = Step.new(Callable.new(servers || blk),
                                  { default_output: :zookeeper_servers }.merge(opts))
  else
    @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts)
  end
end