Class: Kommando::ScheduledCommandAdapters::Sequel
- Inherits:
-
Object
- Object
- Kommando::ScheduledCommandAdapters::Sequel
- Defined in:
- lib/kommando/scheduled_command_adapters/sequel.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.fetch!(&block) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/kommando/scheduled_command_adapters/sequel.rb', line 22 def self.fetch!(&block) db.transaction do record = lock_style("FOR UPDATE OF #{table_name} SKIP LOCKED"). where(::Sequel.lit('TIMEZONE(\'UTC\', NOW()) >= handle_at')). exclude(::Sequel.lit("wait_for_command_ids && (SELECT array_agg(id) FROM #{table_name})")). order(::Sequel.asc(:handle_at)). limit(1). first if record result = block.call(record.name, record.parameters) if result.success? record.destroy else failures = record.failures.append(result.error).map do |failure| JSON.generate(failure) end record.update({ failures: ::Sequel.pg_array(failures, :json), handle_at: (record.handle_at + 5 * 60).getutc, }) end end end end |
.metrics ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/kommando/scheduled_command_adapters/sequel.rb', line 50 def self.metrics executable = where(::Sequel.lit('TIMEZONE(\'UTC\', NOW()) >= handle_at')). exclude(::Sequel.lit("wait_for_command_ids && (SELECT array_agg(id) FROM #{table_name})")). count scheduled = count with_failures = where(::Sequel.lit('array_length(failures, 1) > 0')).count { kommando_executable_commands: executable, kommando_scheduled_commands: scheduled, kommando_scheduled_commands_with_failures: with_failures, } end |
.schedule!(command, parameters, handle_at) ⇒ Object
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/kommando/scheduled_command_adapters/sequel.rb', line 11 def self.schedule!(command, parameters, handle_at) create({ id: parameters.fetch(:command_id), name: command, parameters: JSON.generate(parameters), handle_at: handle_at, failures: ::Sequel.pg_array([], :json), wait_for_command_ids: ::Sequel.pg_array(parameters.fetch(:wait_for_command_ids, []), :uuid), }) end |
Instance Method Details
#failures ⇒ Object
70 71 72 |
# File 'lib/kommando/scheduled_command_adapters/sequel.rb', line 70 def failures @failures ||= deep_symbolize(super) end |
#handle_at ⇒ Object
74 75 76 77 |
# File 'lib/kommando/scheduled_command_adapters/sequel.rb', line 74 def handle_at offset = super.gmt_offset super.dup.gmtime + offset end |
#parameters ⇒ Object
66 67 68 |
# File 'lib/kommando/scheduled_command_adapters/sequel.rb', line 66 def parameters @parameters ||= deep_symbolize(super) end |