Class: RSMP::Proxy

Inherits:
Object
  • Object
show all
Includes:
Inspect, Logging, Notifier, Wait
Defined in:
lib/rsmp/proxy.rb

Direct Known Subclasses

SiteProxy, SupervisorProxy

Constant Summary collapse

WRAPPING_DELIMITER =
"\f"

Instance Attribute Summary collapse

Attributes included from Notifier

#listeners

Attributes included from Logging

#logger

Instance Method Summary collapse

Methods included from Inspect

#inspector

Methods included from Notifier

#add_listener, #clear_deferred_notify, #deferred_notify, #dequeue_notify, #distribute_error, #initialize_distributor, #notify, #notify_without_defer, #remove_listener

Methods included from Wait

#wait_for

Methods included from Logging

#initialize_logging

Constructor Details

#initialize(options) ⇒ Proxy

Returns a new instance of Proxy.



16
17
18
19
20
21
# File 'lib/rsmp/proxy.rb', line 16

def initialize options
  initialize_logging options
  initialize_distributor
  setup options
  clear
end

Instance Attribute Details

#archiveObject (readonly)

Returns the value of attribute archive.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def archive
  @archive
end

#collectorObject (readonly)

Returns the value of attribute collector.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def collector
  @collector
end

#connection_infoObject (readonly)

Returns the value of attribute connection_info.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def connection_info
  @connection_info
end

#ipObject (readonly)

Returns the value of attribute ip.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def ip
  @ip
end

#portObject (readonly)

Returns the value of attribute port.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def port
  @port
end

#stateObject (readonly)

Returns the value of attribute state.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def state
  @state
end

#sxlObject (readonly)

Returns the value of attribute sxl.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def sxl
  @sxl
end

#taskObject (readonly)

Returns the value of attribute task.



14
15
16
# File 'lib/rsmp/proxy.rb', line 14

def task
  @task
end

Instance Method Details

#acknowledge(original) ⇒ Object

Raises:

  • (InvalidArgument)


419
420
421
422
423
424
425
# File 'lib/rsmp/proxy.rb', line 419

def acknowledge original
  raise InvalidArgument unless original
  ack = MessageAck.build_from(original)
  ack.original = original.clone
  send_message ack, "for #{ack.original.type} #{original.m_id_short}"
  check_ingoing_acknowledged original
end

#acknowledged_first_ingoing(message) ⇒ Object



497
498
# File 'lib/rsmp/proxy.rb', line 497

def acknowledged_first_ingoing message
end

#acknowledged_first_outgoing(message) ⇒ Object



494
495
# File 'lib/rsmp/proxy.rb', line 494

def acknowledged_first_outgoing message
end

#authorObject



571
572
573
# File 'lib/rsmp/proxy.rb', line 571

def author
  node.site_id
end

#buffer_message(message) ⇒ Object



283
284
285
286
# File 'lib/rsmp/proxy.rb', line 283

def buffer_message message
  # TODO
  #log "Cannot send #{message.type} because the connection is closed.", message: message, level: :error
end

#check_ack_timeout(now) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/rsmp/proxy.rb', line 223

def check_ack_timeout now
  timeout = @site_settings['timeouts']['acknowledgement']
  # hash cannot be modify during iteration, so clone it
  @awaiting_acknowledgement.clone.each_pair do |m_id, message|
    latest = message.timestamp + timeout
    if now > latest
      log "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds", level: :error
      stop
      notify_error MissingAcknowledgment.new('No ack')
    end
  end
end

#check_ingoing_acknowledged(message) ⇒ Object



487
488
489
490
491
492
# File 'lib/rsmp/proxy.rb', line 487

def check_ingoing_acknowledged message
  unless @ingoing_acknowledged[message.type]
    @ingoing_acknowledged[message.type] = true
    acknowledged_first_ingoing message
  end
end

#check_outgoing_acknowledged(message) ⇒ Object

TODO this might be better handled by a proper event machine using e.g. the EventMachine gem



480
481
482
483
484
485
# File 'lib/rsmp/proxy.rb', line 480

def check_outgoing_acknowledged message
  unless @outgoing_acknowledged[message.type]
    @outgoing_acknowledged[message.type] = true
    acknowledged_first_outgoing message
  end
end

#check_rsmp_version(message) ⇒ Object



405
406
407
408
409
410
411
412
413
414
# File 'lib/rsmp/proxy.rb', line 405

def check_rsmp_version message
  versions = rsmp_versions
  # find versions that both we and the client support
  candidates = message.versions & versions
  if candidates.any?
    @rsmp_version = candidates.sort_by { |v| Gem::Version.new(v) }.last  # pick latest version
  else
    raise HandshakeError.new "RSMP versions [#{message.versions.join(',')}] requested, but only [#{versions.join(',')}] supported."
  end
end

#check_watchdog_timeout(now) ⇒ Object



236
237
238
239
240
241
242
243
244
# File 'lib/rsmp/proxy.rb', line 236

def check_watchdog_timeout now
  timeout = @site_settings['timeouts']['watchdog']
  latest = @latest_watchdog_received + timeout
  left = latest - now
  if left < 0
    log "No Watchdog within #{timeout} seconds", level: :error
    stop
  end
end

#clearObject



86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/rsmp/proxy.rb', line 86

def clear
  @awaiting_acknowledgement = {}
  @latest_watchdog_received = nil
  @watchdog_started = false
  @version_determined = false
  @ingoing_acknowledged = {}
  @outgoing_acknowledged = {}
  @latest_watchdog_send_at = nil

  @state_condition = Async::Notification.new
  @acknowledgements = {}
  @acknowledgement_condition = Async::Notification.new
end

#clockObject



51
52
53
# File 'lib/rsmp/proxy.rb', line 51

def clock
  node.clock
end

#close_socketObject



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rsmp/proxy.rb', line 100

def close_socket
  if @stream
    @stream.close
    @stream = nil
  end

  if @socket
    @socket.close
    @socket = nil
  end
end

#connected?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/rsmp/proxy.rb', line 66

def connected?
  @state == :starting || @state == :ready
end

#connection_completeObject



560
561
562
# File 'lib/rsmp/proxy.rb', line 560

def connection_complete
  set_state :ready
end

#dont_acknowledge(original, prefix = nil, reason = nil) ⇒ Object

Raises:

  • (InvalidArgument)


427
428
429
430
431
432
433
434
435
436
437
# File 'lib/rsmp/proxy.rb', line 427

def dont_acknowledge original, prefix=nil, reason=nil
  raise InvalidArgument unless original
  str = [prefix,reason].join(' ')
  log str, message: original, level: :warning if reason
  message = MessageNotAck.new({
    "oMId" => original.m_id,
    "rea" => reason || "Unknown reason"
  })
  message.original = original.clone
  send_message message, "for #{original.type} #{original.m_id_short}"
end

#dont_expect_acknowledgement(message) ⇒ Object



391
392
393
# File 'lib/rsmp/proxy.rb', line 391

def dont_expect_acknowledgement message
  @awaiting_acknowledgement.delete message.attribute("oMId")
end

#expect_acknowledgement(message) ⇒ Object



385
386
387
388
389
# File 'lib/rsmp/proxy.rb', line 385

def expect_acknowledgement message
  unless message.is_a?(MessageAck) || message.is_a?(MessageNotAck)
    @awaiting_acknowledgement[message.m_id] = message
  end
end

#expect_version_message(message) ⇒ Object



554
555
556
557
558
# File 'lib/rsmp/proxy.rb', line 554

def expect_version_message message
  unless message.is_a?(Version) || message.is_a?(MessageAck) || message.is_a?(MessageNotAck)
    raise HandshakeError.new "Version must be received first"
  end
end

#extraneous_version(message) ⇒ Object



395
396
397
# File 'lib/rsmp/proxy.rb', line 395

def extraneous_version message
  dont_acknowledge message, "Received", "extraneous Version message"
end

#find_original_for_message(message) ⇒ Object



475
476
477
# File 'lib/rsmp/proxy.rb', line 475

def find_original_for_message message
   @awaiting_acknowledgement[ message.attribute("oMId") ]
end

#get_schemasObject



255
256
257
258
259
260
261
262
263
# File 'lib/rsmp/proxy.rb', line 255

def get_schemas
  # normally we have an sxl, but during connection, it hasn't been established yet
  # at these times we only validate against the core schema
  # TODO
  # what schema should we use to validate the intial Version and MessageAck messages?
  schemas = { core: '3.1.5' }
  schemas[sxl] = RSMP::Schemer.sanitize_version(sxl_version) if sxl && sxl_version
  schemas
end

#inspectObject



45
46
47
48
49
# File 'lib/rsmp/proxy.rb', line 45

def inspect
  "#<#{self.class.name}:#{self.object_id}, #{inspector(
    :@acknowledgements,:@settings,:@site_settings
    )}>"
end

#log(str, options = {}) ⇒ Object



251
252
253
# File 'lib/rsmp/proxy.rb', line 251

def log str, options={}
  super str, options.merge(ip: @ip, port: @port, site_id: @site_id)
end

#log_acknowledgement_for_original(message, original) ⇒ Object



533
534
535
536
537
538
539
540
541
542
# File 'lib/rsmp/proxy.rb', line 533

def log_acknowledgement_for_original message, original
  str = "Received #{message.type} for #{original.type} #{message.attribute("oMId")[0..3]}"
  if message.type == 'MessageNotAck'
    reason = message.attributes["rea"]
    str = "#{str}: #{reason}" if reason
    log str, message: message, level: :warning
  else
    log str, message: message, level: :log
  end
end

#log_acknowledgement_for_unknown(message) ⇒ Object



544
545
546
# File 'lib/rsmp/proxy.rb', line 544

def log_acknowledgement_for_unknown message
  log "Received #{message.type} for unknown message #{message.attribute("oMId")[0..3]}", message: message, level: :warning
end

#log_send(message, reason = nil) ⇒ Object



288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/rsmp/proxy.rb', line 288

def log_send message, reason=nil
  if reason
    str = "Sent #{message.type} #{reason}"
  else
    str = "Sent #{message.type}"
  end

  if message.type == "MessageNotAck"
    log str, message: message, level: :warning
  else
    log str, message: message, level: :log
  end
end

#nodeObject



567
568
569
# File 'lib/rsmp/proxy.rb', line 567

def node
  raise 'Must be overridden'
end

#notify_error(e, options = {}) ⇒ Object



152
153
154
# File 'lib/rsmp/proxy.rb', line 152

def notify_error e, options={}
  node.notify_error e, options
end

#process_ack(message) ⇒ Object



500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
# File 'lib/rsmp/proxy.rb', line 500

def process_ack message
  original = find_original_for_message message
  if original
    dont_expect_acknowledgement message
    message.original = original
    log_acknowledgement_for_original message, original

    if original.type == "Version"
      version_acknowledged
    end

    check_outgoing_acknowledged original

    @acknowledgements[ original.m_id ] = message
    @acknowledgement_condition.signal message
  else
    log_acknowledgement_for_unknown message
  end
end

#process_deferredObject



310
311
312
# File 'lib/rsmp/proxy.rb', line 310

def process_deferred
  node.process_deferred
end

#process_message(message) ⇒ Object



364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/rsmp/proxy.rb', line 364

def process_message message
  case message
    when MessageAck
      process_ack message
    when MessageNotAck
      process_not_ack message
    when Version
      process_version message
    when Watchdog
      process_watchdog message
    else
      dont_acknowledge message, "Received", "unknown message (#{message.type})"
  end
end

#process_not_ack(message) ⇒ Object



520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/rsmp/proxy.rb', line 520

def process_not_ack message
  original = find_original_for_message message
  if original
    dont_expect_acknowledgement message
    message.original = original
    log_acknowledgement_for_original message, original
    @acknowledgements[ original.m_id ] = message
    @acknowledgement_condition.signal message
  else
    log_acknowledgement_for_unknown message
  end
end

#process_packet(json) ⇒ Object



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/rsmp/proxy.rb', line 318

def process_packet json
  attributes = Message.parse_attributes json
  message = Message.build attributes, json
  message.validate(get_schemas) if should_validate_ingoing_message?(message)
  verify_sequence message
  deferred_notify do
    notify message
    process_message message
  end
  process_deferred
  message
rescue InvalidPacket => e
  str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{e.message}"
  notify_error e.exception(str)
  log str, level: :warning
  nil
rescue MalformedMessage => e
  str = "Received malformed message, #{e.message}"
  notify_error e.exception(str)
  log str, message: Malformed.new(attributes), level: :warning
  # cannot send NotAcknowledged for a malformed message since we can't read it, just ignore it
  nil
rescue SchemaError, RSMP::Schemer::Error => e
  reason = "schema errors: #{e.message}"
  str = "Received invalid #{message.type}, #{reason}"
  log str, message: message, level: :warning
  notify_error e.exception(str), message: message
  dont_acknowledge message, str, reason
  message
rescue InvalidMessage => e
  reason = "#{e.message}"
  str = "Received invalid #{message.type},"
  notify_error e.exception("#{str} #{message.json}"), message: message
  dont_acknowledge message, str, reason
  message
rescue FatalError => e
  reason = e.message
  str = "Rejected #{message.type},"
  notify_error e.exception(str), message: message
  dont_acknowledge message, str, reason
  stop
  message
ensure
  node.clear_deferred
end

#process_version(message) ⇒ Object



416
417
# File 'lib/rsmp/proxy.rb', line 416

def process_version message
end

#process_watchdog(message) ⇒ Object



548
549
550
551
552
# File 'lib/rsmp/proxy.rb', line 548

def process_watchdog message
  log "Received #{message.type}", message: message, level: :log
  @latest_watchdog_received = Clock.now
  acknowledge message
end

#ready?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/rsmp/proxy.rb', line 62

def ready?
  @state == :ready
end

#revive(options) ⇒ Object



23
24
25
# File 'lib/rsmp/proxy.rb', line 23

def revive options
  setup options
end

#rsmp_versionsObject



399
400
401
402
403
# File 'lib/rsmp/proxy.rb', line 399

def rsmp_versions
  return ['3.1.5'] if @site_settings["rsmp_versions"] == 'latest'
  return ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5'] if @site_settings["rsmp_versions"] == 'all'
  @site_settings["rsmp_versions"]
end

#runObject



55
56
57
58
59
60
# File 'lib/rsmp/proxy.rb', line 55

def run
  start
  @reader.wait if @reader
ensure
  stop unless [:stopped, :stopping].include? @state
end

#send_and_optionally_collect(message, options, &block) ⇒ Object



589
590
591
592
593
594
595
596
597
598
# File 'lib/rsmp/proxy.rb', line 589

def send_and_optionally_collect message, options, &block
  if options[:collect]
    task = @task.async { |task| yield task }
    send_message message, validate: options[:validate]
    { sent: message, collector: task.wait }
  else
    send_message message, validate: options[:validate]
    return { sent: message }
  end
end

#send_message(message, reason = nil, validate: true) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/rsmp/proxy.rb', line 265

def send_message message, reason=nil, validate: true
  raise NotReady unless connected?
  raise IOError unless @protocol
  message.direction = :out
  message.generate_json
  message.validate get_schemas unless validate==false
  @protocol.write_lines message.json
  expect_acknowledgement message
  notify message
  log_send message, reason
rescue EOFError, IOError
  buffer_message message
rescue SchemaError, RSMP::Schemer::Error => e
  str = "Could not send #{message.type} because schema validation failed: #{e.message}"
  log str, message: message, level: :error
  notify_error e.exception("#{str} #{message.json}")
end

#send_version(site_id, rsmp_versions) ⇒ Object



455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# File 'lib/rsmp/proxy.rb', line 455

def send_version site_id, rsmp_versions
  if rsmp_versions=='latest'
    versions = ['3.1.5']
  elsif rsmp_versions=='all'
    versions = ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5']
  else
    versions = [rsmp_versions].flatten
  end
  versions_array = versions.map {|v| {"vers" => v} }

  site_id_array = [site_id].flatten.map {|id| {"sId" => id} }

  version_response = Version.new({
    "RSMP"=>versions_array,
    "siteId"=>site_id_array,
    "SXL"=>sxl_version
  })
  send_message version_response
end

#send_watchdog(now = Clock.now) ⇒ Object



217
218
219
220
221
# File 'lib/rsmp/proxy.rb', line 217

def send_watchdog now=Clock.now
  message = Watchdog.new( {"wTs" => clock.to_s})
  send_message message
  @latest_watchdog_send_at = now
end

#set_state(state) ⇒ Object



439
440
441
442
# File 'lib/rsmp/proxy.rb', line 439

def set_state state
  @state = state
  @state_condition.signal @state
end

#setup(options) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/rsmp/proxy.rb', line 27

def setup options
  @settings = options[:settings]
  @task = options[:task]
  @socket = options[:socket]
  @stream = options[:stream]
  @protocol = options[:protocol]
  @ip = options[:ip]
  @port = options[:port]
  @connection_info = options[:info]
  @sxl = nil
  @site_settings = nil  # can't pick until we know the site id
  @state = :stopped
  if options[:collect]
    @collector = RSMP::Collector.new self, options[:collect]
    @collector.start
  end
end

#should_validate_ingoing_message?(message) ⇒ Boolean

Returns:

  • (Boolean)


302
303
304
305
306
307
308
# File 'lib/rsmp/proxy.rb', line 302

def should_validate_ingoing_message? message
  return true unless @site_settings
  skip = @site_settings.dig('skip_validation')
  return true unless skip
  klass = message.class.name.split('::').last
  !skip.include?(klass)
end

#startObject



71
72
73
# File 'lib/rsmp/proxy.rb', line 71

def start
  set_state :starting
end

#start_readerObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/rsmp/proxy.rb', line 112

def start_reader  
  @reader = @task.async do |task|
    task.annotate "reader"
    @stream ||= Async::IO::Stream.new(@socket)
    @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed
    while json = @protocol.read_line
      beginning = Time.now
      message = process_packet json
      duration = Time.now - beginning
      ms = (duration*1000).round(4)
      if duration > 0
        per_second = (1.0 / duration).round
      else
        per_second = Float::INFINITY
      end
      if message
        type = message.type
        m_id = Logger.shorten_message_id(message.m_id)
      else
        type = 'Unknown'
        m_id = nil
      end
      str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ')
      log str, level: :statistics
    end
  rescue Async::Wrapper::Cancelled
    # ignore        
  rescue EOFError
    log "Connection closed", level: :warning
  rescue IOError => e
    log "IOError: #{e}", level: :warning
  rescue Errno::ECONNRESET
    log "Connection reset by peer", level: :warning
  rescue Errno::EPIPE
    log "Broken pipe", level: :warning
  rescue StandardError => e
    notify_error e, level: :internal
  end
end

#start_timerObject



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/rsmp/proxy.rb', line 162

def start_timer
  name = "timer"
  interval = @site_settings['intervals']['timer'] || 1
  log "Starting #{name} with interval #{interval} seconds", level: :debug
  @latest_watchdog_received = Clock.now

  @timer = @task.async do |task|
    task.annotate "timer"
    next_time = Time.now.to_f
    loop do
      begin
        now = Clock.now
        timer(now)
      rescue RSMP::Schemer::Error => e
        puts "Timer: Schema error: #{e}"
      rescue EOFError => e
        log "Timer: Connection closed: #{e}", level: :warning
      rescue IOError => e
        log "Timer: IOError", level: :warning
      rescue Errno::ECONNRESET
        log "Timer: Connection reset by peer", level: :warning
      rescue Errno::EPIPE => e
        log "Timer: Broken pipe", level: :warning
      rescue StandardError => e
        notify_error e, level: :internal
      end
    ensure
      next_time += interval
      duration = next_time - Time.now.to_f
      task.sleep duration
    end
  end
end

#start_watchdogObject



156
157
158
159
160
# File 'lib/rsmp/proxy.rb', line 156

def start_watchdog
  log "Starting watchdog with interval #{@site_settings['intervals']['watchdog']} seconds", level: :debug
  send_watchdog
  @watchdog_started = true
end

#stopObject



75
76
77
78
79
80
81
82
83
84
# File 'lib/rsmp/proxy.rb', line 75

def stop
  return if @state == :stopped
  set_state :stopping
  stop_tasks
  notify_error DisconnectError.new("Connection was closed")
ensure
  close_socket
  clear
  set_state :stopped
end

#stop_tasksObject



246
247
248
249
# File 'lib/rsmp/proxy.rb', line 246

def stop_tasks
  @timer.stop if @timer
  @reader.stop if @reader
end

#timer(now) ⇒ Object



196
197
198
199
200
# File 'lib/rsmp/proxy.rb', line 196

def timer now
  watchdog_send_timer now
  check_ack_timeout now
  check_watchdog_timeout now
end

#verify_sequence(message) ⇒ Object



314
315
316
# File 'lib/rsmp/proxy.rb', line 314

def verify_sequence message
  expect_version_message(message) unless @version_determined
end

#version_acknowledgedObject



564
565
# File 'lib/rsmp/proxy.rb', line 564

def version_acknowledged
end

#wait_for_acknowledgement(parent_task, options = {}, m_id) ⇒ Object



575
576
577
578
579
580
581
582
583
584
585
586
587
# File 'lib/rsmp/proxy.rb', line 575

def wait_for_acknowledgement parent_task, options={}, m_id
  collector = Collector.new self, options.merge(task: parent_task, type: ['MessageAck','MessageNotAck'])
  collector.collect do |message|
    if message.is_a?(MessageNotAck)
      if message.attribute('oMId') == m_id
        m_id_short = RSMP::Message.shorten_m_id m_id, 8
        raise RSMP::MessageRejected.new "Aggregated status request #{m_id_short} was rejected with '#{message.attribute('rea')}'"
      end
    elsif message.is_a?(MessageAck)
      collector.complete if message.attribute('oMId') == m_id
    end
  end
end

#wait_for_state(state, timeout) ⇒ Object



444
445
446
447
448
449
450
451
452
453
# File 'lib/rsmp/proxy.rb', line 444

def wait_for_state state, timeout
  states = [state].flatten
  return if states.include?(@state)
  wait_for(@state_condition,timeout) do
    states.include?(@state)
  end
  @state
rescue Async::TimeoutError
  raise RSMP::TimeoutError.new "Did not reach state #{state} within #{timeout}s"
end

#watchdog_send_timer(now) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/rsmp/proxy.rb', line 202

def watchdog_send_timer now
  return unless @watchdog_started  
  return if @site_settings['intervals']['watchdog'] == :never
  if @latest_watchdog_send_at == nil
    send_watchdog now
  else
    # we add half the timer interval to pick the timer
    # event closes to the wanted wathcdog interval
    diff = now - @latest_watchdog_send_at
    if (diff + 0.5*@site_settings['intervals']['timer']) >= (@site_settings['intervals']['watchdog'])
      send_watchdog now
    end
  end
end

#will_not_handle(message) ⇒ Object



379
380
381
382
383
# File 'lib/rsmp/proxy.rb', line 379

def will_not_handle message
  reason = "since we're a #{self.class.name.downcase}" unless reason
  log "Ignoring #{message.type}, #{reason}", message: message, level: :warning
  dont_acknowledge message, nil, reason
end