Class: Dynflow::Connectors::Database

Inherits:
Abstract
  • Object
show all
Defined in:
lib/dynflow/connectors/database.rb

Defined Under Namespace

Classes: Core, PostgresListerner

Instance Method Summary collapse

Methods inherited from Abstract

#receive, #terminate

Constructor Details

#initialize(world = nil, polling_interval = nil) ⇒ Database

Returns a new instance of Database.



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/dynflow/connectors/database.rb', line 147

def initialize(world = nil, polling_interval = nil)
  polling_interval ||= begin
                         if world && PostgresListerner.notify_supported?(world.persistence.adapter.db)
                           30 # when the notify is supported, we don't need that much polling
                         else
                           1
                         end
                       end
  @core = Core.spawn('connector-database-core', self, polling_interval)
  start_listening(world) if world
end

Instance Method Details

#prune_undeliverable_envelopes(world) ⇒ Object



176
177
178
# File 'lib/dynflow/connectors/database.rb', line 176

def prune_undeliverable_envelopes(world)
  world.persistence.prune_undeliverable_envelopes
end

#send(envelope) ⇒ Object



171
172
173
174
# File 'lib/dynflow/connectors/database.rb', line 171

def send(envelope)
  Telemetry.with_instance { |t| t.increment_counter(:dynflow_connector_envelopes, 1, :world => envelope.sender_id, :direction => 'outgoing') }
  @core.ask([:handle_envelope, envelope])
end

#start_listening(world) ⇒ Object



159
160
161
# File 'lib/dynflow/connectors/database.rb', line 159

def start_listening(world)
  @core.ask([:start_listening, world])
end

#stop_listening(_, timeout = nil) ⇒ Object



167
168
169
# File 'lib/dynflow/connectors/database.rb', line 167

def stop_listening(_, timeout = nil)
  @core.ask(:stop_listening).then { @core.ask(:terminate!) }.wait(timeout)
end

#stop_receiving_new_work(_, timeout = nil) ⇒ Object



163
164
165
# File 'lib/dynflow/connectors/database.rb', line 163

def stop_receiving_new_work(_, timeout = nil)
  @core.ask(:stop_receiving_new_work).wait(timeout)
end