Class: RSMP::Proxy
- Inherits:
-
Object
- Object
- RSMP::Proxy
- Defined in:
- lib/rsmp/proxy.rb
Direct Known Subclasses
Constant Summary collapse
- WRAPPING_DELIMITER =
"\f"
Instance Attribute Summary collapse
-
#archive ⇒ Object
readonly
Returns the value of attribute archive.
-
#collector ⇒ Object
readonly
Returns the value of attribute collector.
-
#connection_info ⇒ Object
readonly
Returns the value of attribute connection_info.
-
#ip ⇒ Object
readonly
Returns the value of attribute ip.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#sxl ⇒ Object
readonly
Returns the value of attribute sxl.
-
#task ⇒ Object
readonly
Returns the value of attribute task.
Attributes included from Notifier
Attributes included from Logging
Instance Method Summary collapse
- #acknowledge(original) ⇒ Object
- #acknowledged_first_ingoing(message) ⇒ Object
- #acknowledged_first_outgoing(message) ⇒ Object
- #author ⇒ Object
- #buffer_message(message) ⇒ Object
- #check_ack_timeout(now) ⇒ Object
- #check_ingoing_acknowledged(message) ⇒ Object
-
#check_outgoing_acknowledged(message) ⇒ Object
TODO this might be better handled by a proper event machine using e.g.
- #check_rsmp_version(message) ⇒ Object
- #check_watchdog_timeout(now) ⇒ Object
- #clear ⇒ Object
- #clock ⇒ Object
- #close_socket ⇒ Object
- #collect(task, options, &block) ⇒ Object
- #connected? ⇒ Boolean
- #connection_complete ⇒ Object
- #dont_acknowledge(original, prefix = nil, reason = nil) ⇒ Object
- #dont_expect_acknowledgement(message) ⇒ Object
- #expect_acknowledgement(message) ⇒ Object
- #expect_version_message(message) ⇒ Object
- #extraneous_version(message) ⇒ Object
- #find_original_for_message(message) ⇒ Object
- #get_schemas ⇒ Object
-
#initialize(options) ⇒ Proxy
constructor
A new instance of Proxy.
- #inspect ⇒ Object
- #log(str, options = {}) ⇒ Object
- #log_acknowledgement_for_original(message, original) ⇒ Object
- #log_acknowledgement_for_unknown(message) ⇒ Object
- #log_send(message, reason = nil) ⇒ Object
- #node ⇒ Object
- #notify_error(e, options = {}) ⇒ Object
- #process_ack(message) ⇒ Object
- #process_deferred ⇒ Object
- #process_message(message) ⇒ Object
- #process_not_ack(message) ⇒ Object
- #process_packet(json) ⇒ Object
- #process_version(message) ⇒ Object
- #process_watchdog(message) ⇒ Object
- #ready? ⇒ Boolean
- #revive(options) ⇒ Object
- #rsmp_versions ⇒ Object
- #run ⇒ Object
- #send_message(message, reason = nil, validate: true) ⇒ Object
- #send_version(site_id, rsmp_versions) ⇒ Object
- #send_watchdog(now = Clock.now) ⇒ Object
- #set_state(state) ⇒ Object
- #setup(options) ⇒ Object
- #should_validate_ingoing_message?(message) ⇒ Boolean
- #start ⇒ Object
- #start_reader ⇒ Object
- #start_timer ⇒ Object
- #start_watchdog ⇒ Object
- #stop ⇒ Object
- #stop_tasks ⇒ Object
- #timer(now) ⇒ Object
- #verify_sequence(message) ⇒ Object
- #version_acknowledged ⇒ Object
- #wait_for_acknowledgement(parent_task, options = {}, m_id) ⇒ Object
- #wait_for_state(state, timeout) ⇒ Object
- #watchdog_send_timer(now) ⇒ Object
- #will_not_handle(message) ⇒ Object
Methods included from Inspect
Methods included from Notifier
#add_listener, #clear_deferred_notify, #deferred_notify, #dequeue_notify, #distribute_error, #initialize_distributor, #notify, #notify_without_defer, #remove_listener
Methods included from Wait
Methods included from Logging
Constructor Details
#initialize(options) ⇒ Proxy
Returns a new instance of Proxy.
16 17 18 19 20 21 |
# File 'lib/rsmp/proxy.rb', line 16 def initialize initialize_logging initialize_distributor setup clear end |
Instance Attribute Details
#archive ⇒ Object (readonly)
Returns the value of attribute archive.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def archive @archive end |
#collector ⇒ Object (readonly)
Returns the value of attribute collector.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def collector @collector end |
#connection_info ⇒ Object (readonly)
Returns the value of attribute connection_info.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def connection_info @connection_info end |
#ip ⇒ Object (readonly)
Returns the value of attribute ip.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def ip @ip end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def port @port end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def state @state end |
#sxl ⇒ Object (readonly)
Returns the value of attribute sxl.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def sxl @sxl end |
#task ⇒ Object (readonly)
Returns the value of attribute task.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def task @task end |
Instance Method Details
#acknowledge(original) ⇒ Object
425 426 427 428 429 430 431 |
# File 'lib/rsmp/proxy.rb', line 425 def acknowledge original raise InvalidArgument unless original ack = MessageAck.build_from(original) ack.original = original.clone ack, "for #{ack.original.type} #{original.m_id_short}" check_ingoing_acknowledged original end |
#acknowledged_first_ingoing(message) ⇒ Object
503 504 |
# File 'lib/rsmp/proxy.rb', line 503 def acknowledged_first_ingoing end |
#acknowledged_first_outgoing(message) ⇒ Object
500 501 |
# File 'lib/rsmp/proxy.rb', line 500 def acknowledged_first_outgoing end |
#author ⇒ Object
577 578 579 |
# File 'lib/rsmp/proxy.rb', line 577 def node.site_id end |
#buffer_message(message) ⇒ Object
289 290 291 292 |
# File 'lib/rsmp/proxy.rb', line 289 def # TODO #log "Cannot send #{message.type} because the connection is closed.", message: message, level: :error end |
#check_ack_timeout(now) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/rsmp/proxy.rb', line 229 def check_ack_timeout now timeout = @site_settings['timeouts']['acknowledgement'] # hash cannot be modify during iteration, so clone it @awaiting_acknowledgement.clone.each_pair do |m_id, | latest = . + timeout if now > latest log "No acknowledgements for #{.type} #{.m_id_short} within #{timeout} seconds", level: :error stop notify_error MissingAcknowledgment.new('No ack') end end end |
#check_ingoing_acknowledged(message) ⇒ Object
493 494 495 496 497 498 |
# File 'lib/rsmp/proxy.rb', line 493 def check_ingoing_acknowledged unless @ingoing_acknowledged[.type] @ingoing_acknowledged[.type] = true acknowledged_first_ingoing end end |
#check_outgoing_acknowledged(message) ⇒ Object
TODO this might be better handled by a proper event machine using e.g. the EventMachine gem
486 487 488 489 490 491 |
# File 'lib/rsmp/proxy.rb', line 486 def check_outgoing_acknowledged unless @outgoing_acknowledged[.type] @outgoing_acknowledged[.type] = true acknowledged_first_outgoing end end |
#check_rsmp_version(message) ⇒ Object
411 412 413 414 415 416 417 418 419 420 |
# File 'lib/rsmp/proxy.rb', line 411 def check_rsmp_version versions = rsmp_versions # find versions that both we and the client support candidates = .versions & versions if candidates.any? @rsmp_version = candidates.sort_by { |v| Gem::Version.new(v) }.last # pick latest version else raise HandshakeError.new "RSMP versions [#{.versions.join(',')}] requested, but only [#{versions.join(',')}] supported." end end |
#check_watchdog_timeout(now) ⇒ Object
242 243 244 245 246 247 248 249 250 |
# File 'lib/rsmp/proxy.rb', line 242 def check_watchdog_timeout now timeout = @site_settings['timeouts']['watchdog'] latest = @latest_watchdog_received + timeout left = latest - now if left < 0 log "No Watchdog within #{timeout} seconds", level: :error stop end end |
#clear ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/rsmp/proxy.rb', line 92 def clear @awaiting_acknowledgement = {} @latest_watchdog_received = nil @watchdog_started = false @version_determined = false @ingoing_acknowledged = {} @outgoing_acknowledged = {} @latest_watchdog_send_at = nil @state_condition = Async::Notification.new @acknowledgements = {} @acknowledgement_condition = Async::Notification.new end |
#clock ⇒ Object
51 52 53 |
# File 'lib/rsmp/proxy.rb', line 51 def clock node.clock end |
#close_socket ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/rsmp/proxy.rb', line 106 def close_socket if @stream @stream.close @stream = nil end if @socket @socket.close @socket = nil end end |
#collect(task, options, &block) ⇒ Object
55 56 57 58 59 |
# File 'lib/rsmp/proxy.rb', line 55 def collect task, , &block collector = RSMP::Collector.new self, collector.collect task, &block collector end |
#connected? ⇒ Boolean
72 73 74 |
# File 'lib/rsmp/proxy.rb', line 72 def connected? @state == :starting || @state == :ready end |
#connection_complete ⇒ Object
566 567 568 |
# File 'lib/rsmp/proxy.rb', line 566 def connection_complete set_state :ready end |
#dont_acknowledge(original, prefix = nil, reason = nil) ⇒ Object
433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/rsmp/proxy.rb', line 433 def dont_acknowledge original, prefix=nil, reason=nil raise InvalidArgument unless original str = [prefix,reason].join(' ') log str, message: original, level: :warning if reason = MessageNotAck.new({ "oMId" => original.m_id, "rea" => reason || "Unknown reason" }) .original = original.clone , "for #{original.type} #{original.m_id_short}" end |
#dont_expect_acknowledgement(message) ⇒ Object
397 398 399 |
# File 'lib/rsmp/proxy.rb', line 397 def dont_expect_acknowledgement @awaiting_acknowledgement.delete .attribute("oMId") end |
#expect_acknowledgement(message) ⇒ Object
391 392 393 394 395 |
# File 'lib/rsmp/proxy.rb', line 391 def expect_acknowledgement unless .is_a?(MessageAck) || .is_a?(MessageNotAck) @awaiting_acknowledgement[.m_id] = end end |
#expect_version_message(message) ⇒ Object
560 561 562 563 564 |
# File 'lib/rsmp/proxy.rb', line 560 def unless .is_a?(Version) || .is_a?(MessageAck) || .is_a?(MessageNotAck) raise HandshakeError.new "Version must be received first" end end |
#extraneous_version(message) ⇒ Object
401 402 403 |
# File 'lib/rsmp/proxy.rb', line 401 def extraneous_version dont_acknowledge , "Received", "extraneous Version message" end |
#find_original_for_message(message) ⇒ Object
481 482 483 |
# File 'lib/rsmp/proxy.rb', line 481 def @awaiting_acknowledgement[ .attribute("oMId") ] end |
#get_schemas ⇒ Object
261 262 263 264 265 266 267 268 269 |
# File 'lib/rsmp/proxy.rb', line 261 def get_schemas # normally we have an sxl, but during connection, it hasn't been established yet # at these times we only validate against the core schema # TODO # what schema should we use to validate the intial Version and MessageAck messages? schemas = { core: '3.1.5' } schemas[sxl] = RSMP::Schemer.sanitize_version(sxl_version) if sxl && sxl_version schemas end |
#inspect ⇒ Object
45 46 47 48 49 |
# File 'lib/rsmp/proxy.rb', line 45 def inspect "#<#{self.class.name}:#{self.object_id}, #{inspector( :@acknowledgements,:@settings,:@site_settings )}>" end |
#log(str, options = {}) ⇒ Object
257 258 259 |
# File 'lib/rsmp/proxy.rb', line 257 def log str, ={} super str, .merge(ip: @ip, port: @port, site_id: @site_id) end |
#log_acknowledgement_for_original(message, original) ⇒ Object
539 540 541 542 543 544 545 546 547 548 |
# File 'lib/rsmp/proxy.rb', line 539 def log_acknowledgement_for_original , original str = "Received #{.type} for #{original.type} #{.attribute("oMId")[0..3]}" if .type == 'MessageNotAck' reason = .attributes["rea"] str = "#{str}: #{reason}" if reason log str, message: , level: :warning else log str, message: , level: :log end end |
#log_acknowledgement_for_unknown(message) ⇒ Object
550 551 552 |
# File 'lib/rsmp/proxy.rb', line 550 def log_acknowledgement_for_unknown log "Received #{.type} for unknown message #{.attribute("oMId")[0..3]}", message: , level: :warning end |
#log_send(message, reason = nil) ⇒ Object
294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/rsmp/proxy.rb', line 294 def log_send , reason=nil if reason str = "Sent #{.type} #{reason}" else str = "Sent #{.type}" end if .type == "MessageNotAck" log str, message: , level: :warning else log str, message: , level: :log end end |
#node ⇒ Object
573 574 575 |
# File 'lib/rsmp/proxy.rb', line 573 def node raise 'Must be overridden' end |
#notify_error(e, options = {}) ⇒ Object
158 159 160 |
# File 'lib/rsmp/proxy.rb', line 158 def notify_error e, ={} node.notify_error e, end |
#process_ack(message) ⇒ Object
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/rsmp/proxy.rb', line 506 def process_ack original = if original dont_expect_acknowledgement .original = original log_acknowledgement_for_original , original if original.type == "Version" version_acknowledged end check_outgoing_acknowledged original @acknowledgements[ original.m_id ] = @acknowledgement_condition.signal else log_acknowledgement_for_unknown end end |
#process_deferred ⇒ Object
316 317 318 |
# File 'lib/rsmp/proxy.rb', line 316 def process_deferred node.process_deferred end |
#process_message(message) ⇒ Object
370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/rsmp/proxy.rb', line 370 def case when MessageAck process_ack when MessageNotAck process_not_ack when Version process_version when Watchdog process_watchdog else dont_acknowledge , "Received", "unknown message (#{.type})" end end |
#process_not_ack(message) ⇒ Object
526 527 528 529 530 531 532 533 534 535 536 537 |
# File 'lib/rsmp/proxy.rb', line 526 def process_not_ack original = if original dont_expect_acknowledgement .original = original log_acknowledgement_for_original , original @acknowledgements[ original.m_id ] = @acknowledgement_condition.signal else log_acknowledgement_for_unknown end end |
#process_packet(json) ⇒ Object
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/rsmp/proxy.rb', line 324 def process_packet json attributes = Message.parse_attributes json = Message.build attributes, json .validate(get_schemas) if () verify_sequence deferred_notify do notify end process_deferred rescue InvalidPacket => e str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{e.}" notify_error e.exception(str) log str, level: :warning nil rescue MalformedMessage => e str = "Received malformed message, #{e.}" notify_error e.exception(str) log str, message: Malformed.new(attributes), level: :warning # cannot send NotAcknowledged for a malformed message since we can't read it, just ignore it nil rescue SchemaError, RSMP::Schemer::Error => e reason = "schema errors: #{e.}" str = "Received invalid #{.type}, #{reason}" log str, message: , level: :warning notify_error e.exception(str), message: dont_acknowledge , str, reason rescue InvalidMessage => e reason = "#{e.}" str = "Received invalid #{.type}," notify_error e.exception("#{str} #{.json}"), message: dont_acknowledge , str, reason rescue FatalError => e reason = e. str = "Rejected #{.type}," notify_error e.exception(str), message: dont_acknowledge , str, reason stop ensure node.clear_deferred end |
#process_version(message) ⇒ Object
422 423 |
# File 'lib/rsmp/proxy.rb', line 422 def process_version end |
#process_watchdog(message) ⇒ Object
554 555 556 557 558 |
# File 'lib/rsmp/proxy.rb', line 554 def process_watchdog log "Received #{.type}", message: , level: :log @latest_watchdog_received = Clock.now acknowledge end |
#ready? ⇒ Boolean
68 69 70 |
# File 'lib/rsmp/proxy.rb', line 68 def ready? @state == :ready end |
#revive(options) ⇒ Object
23 24 25 |
# File 'lib/rsmp/proxy.rb', line 23 def revive setup end |
#rsmp_versions ⇒ Object
405 406 407 408 409 |
# File 'lib/rsmp/proxy.rb', line 405 def rsmp_versions return ['3.1.5'] if @site_settings["rsmp_versions"] == 'latest' return ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5'] if @site_settings["rsmp_versions"] == 'all' @site_settings["rsmp_versions"] end |
#run ⇒ Object
61 62 63 64 65 66 |
# File 'lib/rsmp/proxy.rb', line 61 def run start @reader.wait if @reader ensure stop unless [:stopped, :stopping].include? @state end |
#send_message(message, reason = nil, validate: true) ⇒ Object
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/rsmp/proxy.rb', line 271 def , reason=nil, validate: true raise NotReady unless connected? raise IOError unless @protocol .direction = :out .generate_json .validate get_schemas unless validate==false @protocol.write_lines .json expect_acknowledgement notify log_send , reason rescue EOFError, IOError rescue SchemaError, RSMP::Schemer::Error => e str = "Could not send #{.type} because schema validation failed: #{e.}" log str, message: , level: :error notify_error e.exception("#{str} #{.json}") end |
#send_version(site_id, rsmp_versions) ⇒ Object
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'lib/rsmp/proxy.rb', line 461 def send_version site_id, rsmp_versions if rsmp_versions=='latest' versions = ['3.1.5'] elsif rsmp_versions=='all' versions = ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5'] else versions = [rsmp_versions].flatten end versions_array = versions.map {|v| {"vers" => v} } site_id_array = [site_id].flatten.map {|id| {"sId" => id} } version_response = Version.new({ "RSMP"=>versions_array, "siteId"=>site_id_array, "SXL"=>sxl_version }) version_response end |
#send_watchdog(now = Clock.now) ⇒ Object
223 224 225 226 227 |
# File 'lib/rsmp/proxy.rb', line 223 def send_watchdog now=Clock.now = Watchdog.new( {"wTs" => clock.to_s}) @latest_watchdog_send_at = now end |
#set_state(state) ⇒ Object
445 446 447 448 |
# File 'lib/rsmp/proxy.rb', line 445 def set_state state @state = state @state_condition.signal @state end |
#setup(options) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rsmp/proxy.rb', line 27 def setup @settings = [:settings] @task = [:task] @socket = [:socket] @stream = [:stream] @protocol = [:protocol] @ip = [:ip] @port = [:port] @connection_info = [:info] @sxl = nil @site_settings = nil # can't pick until we know the site id @state = :stopped if [:collect] @collector = RSMP::Collector.new self, [:collect] @collector.start end end |
#should_validate_ingoing_message?(message) ⇒ Boolean
308 309 310 311 312 313 314 |
# File 'lib/rsmp/proxy.rb', line 308 def return true unless @site_settings skip = @site_settings.dig('skip_validation') return true unless skip klass = .class.name.split('::').last !skip.include?(klass) end |
#start ⇒ Object
77 78 79 |
# File 'lib/rsmp/proxy.rb', line 77 def start set_state :starting end |
#start_reader ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/rsmp/proxy.rb', line 118 def start_reader @reader = @task.async do |task| task.annotate "reader" @stream ||= Async::IO::Stream.new(@socket) @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed while json = @protocol.read_line beginning = Time.now = process_packet json duration = Time.now - beginning ms = (duration*1000).round(4) if duration > 0 per_second = (1.0 / duration).round else per_second = Float::INFINITY end if type = .type m_id = Logger.(.m_id) else type = 'Unknown' m_id = nil end str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ') log str, level: :statistics end rescue Async::Wrapper::Cancelled # ignore rescue EOFError log "Connection closed", level: :warning rescue IOError => e log "IOError: #{e}", level: :warning rescue Errno::ECONNRESET log "Connection reset by peer", level: :warning rescue Errno::EPIPE log "Broken pipe", level: :warning rescue StandardError => e notify_error e, level: :internal end end |
#start_timer ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/rsmp/proxy.rb', line 168 def start_timer name = "timer" interval = @site_settings['intervals']['timer'] || 1 log "Starting #{name} with interval #{interval} seconds", level: :debug @latest_watchdog_received = Clock.now @timer = @task.async do |task| task.annotate "timer" next_time = Time.now.to_f loop do begin now = Clock.now timer(now) rescue RSMP::Schemer::Error => e puts "Timer: Schema error: #{e}" rescue EOFError => e log "Timer: Connection closed: #{e}", level: :warning rescue IOError => e log "Timer: IOError", level: :warning rescue Errno::ECONNRESET log "Timer: Connection reset by peer", level: :warning rescue Errno::EPIPE => e log "Timer: Broken pipe", level: :warning rescue StandardError => e notify_error e, level: :internal end ensure next_time += interval duration = next_time - Time.now.to_f task.sleep duration end end end |
#start_watchdog ⇒ Object
162 163 164 165 166 |
# File 'lib/rsmp/proxy.rb', line 162 def start_watchdog log "Starting watchdog with interval #{@site_settings['intervals']['watchdog']} seconds", level: :debug send_watchdog @watchdog_started = true end |
#stop ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rsmp/proxy.rb', line 81 def stop return if @state == :stopped set_state :stopping stop_tasks notify_error DisconnectError.new("Connection was closed") ensure close_socket clear set_state :stopped end |
#stop_tasks ⇒ Object
252 253 254 255 |
# File 'lib/rsmp/proxy.rb', line 252 def stop_tasks @timer.stop if @timer @reader.stop if @reader end |
#timer(now) ⇒ Object
202 203 204 205 206 |
# File 'lib/rsmp/proxy.rb', line 202 def timer now watchdog_send_timer now check_ack_timeout now check_watchdog_timeout now end |
#verify_sequence(message) ⇒ Object
320 321 322 |
# File 'lib/rsmp/proxy.rb', line 320 def verify_sequence () unless @version_determined end |
#version_acknowledged ⇒ Object
570 571 |
# File 'lib/rsmp/proxy.rb', line 570 def version_acknowledged end |
#wait_for_acknowledgement(parent_task, options = {}, m_id) ⇒ Object
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 |
# File 'lib/rsmp/proxy.rb', line 581 def wait_for_acknowledgement parent_task, ={}, m_id collect(parent_task,.merge({ type: ['MessageAck','MessageNotAck'], num: 1 })) do || if .is_a?(MessageNotAck) if .attribute('oMId') == m_id # set result to an exception, but don't raise it. # this will be returned by the task and stored as the task result # when the parent task call wait() on the task, the exception # will be raised in the parent task, and caught by rspec. # rspec will then show the error and record the test as failed m_id_short = RSMP::Message.shorten_m_id m_id, 8 result = RSMP::MessageRejected.new "Aggregated status request #{m_id_short} was rejected with '#{.attribute('rea')}'" next true # done, no more messages wanted end elsif .is_a?(MessageAck) next true if .attribute('oMId') == m_id end false end end |
#wait_for_state(state, timeout) ⇒ Object
450 451 452 453 454 455 456 457 458 459 |
# File 'lib/rsmp/proxy.rb', line 450 def wait_for_state state, timeout states = [state].flatten return if states.include?(@state) wait_for(@state_condition,timeout) do states.include?(@state) end @state rescue Async::TimeoutError raise RSMP::TimeoutError.new "Did not reach state #{state} within #{timeout}s" end |
#watchdog_send_timer(now) ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/rsmp/proxy.rb', line 208 def watchdog_send_timer now return unless @watchdog_started return if @site_settings['intervals']['watchdog'] == :never if @latest_watchdog_send_at == nil send_watchdog now else # we add half the timer interval to pick the timer # event closes to the wanted wathcdog interval diff = now - @latest_watchdog_send_at if (diff + 0.5*@site_settings['intervals']['timer']) >= (@site_settings['intervals']['watchdog']) send_watchdog now end end end |
#will_not_handle(message) ⇒ Object
385 386 387 388 389 |
# File 'lib/rsmp/proxy.rb', line 385 def will_not_handle reason = "since we're a #{self.class.name.downcase}" unless reason log "Ignoring #{.type}, #{reason}", message: , level: :warning dont_acknowledge , nil, reason end |