Class: RSMP::Proxy
- Inherits:
-
Object
- Object
- RSMP::Proxy
- Defined in:
- lib/rsmp/proxy.rb
Direct Known Subclasses
Constant Summary collapse
- WRAPPING_DELIMITER =
"\f"
Instance Attribute Summary collapse
-
#archive ⇒ Object
readonly
Returns the value of attribute archive.
-
#collector ⇒ Object
readonly
Returns the value of attribute collector.
-
#connection_info ⇒ Object
readonly
Returns the value of attribute connection_info.
-
#ip ⇒ Object
readonly
Returns the value of attribute ip.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#sxl ⇒ Object
readonly
Returns the value of attribute sxl.
-
#task ⇒ Object
readonly
Returns the value of attribute task.
Attributes included from Notifier
Attributes included from Logging
Instance Method Summary collapse
- #acknowledge(original) ⇒ Object
- #acknowledged_first_ingoing(message) ⇒ Object
- #acknowledged_first_outgoing(message) ⇒ Object
- #author ⇒ Object
- #buffer_message(message) ⇒ Object
- #check_ack_timeout(now) ⇒ Object
- #check_ingoing_acknowledged(message) ⇒ Object
-
#check_outgoing_acknowledged(message) ⇒ Object
TODO this might be better handled by a proper event machine using e.g.
- #check_rsmp_version(message) ⇒ Object
- #check_watchdog_timeout(now) ⇒ Object
- #clear ⇒ Object
- #clock ⇒ Object
- #close_socket ⇒ Object
- #connected? ⇒ Boolean
- #connection_complete ⇒ Object
- #dont_acknowledge(original, prefix = nil, reason = nil) ⇒ Object
- #dont_expect_acknowledgement(message) ⇒ Object
- #expect_acknowledgement(message) ⇒ Object
- #expect_version_message(message) ⇒ Object
- #extraneous_version(message) ⇒ Object
- #find_original_for_message(message) ⇒ Object
- #get_schemas ⇒ Object
-
#initialize(options) ⇒ Proxy
constructor
A new instance of Proxy.
- #inspect ⇒ Object
- #log(str, options = {}) ⇒ Object
- #log_acknowledgement_for_original(message, original) ⇒ Object
- #log_acknowledgement_for_unknown(message) ⇒ Object
- #log_send(message, reason = nil) ⇒ Object
- #node ⇒ Object
- #notify_error(e, options = {}) ⇒ Object
- #process_ack(message) ⇒ Object
- #process_deferred ⇒ Object
- #process_message(message) ⇒ Object
- #process_not_ack(message) ⇒ Object
- #process_packet(json) ⇒ Object
- #process_version(message) ⇒ Object
- #process_watchdog(message) ⇒ Object
- #ready? ⇒ Boolean
- #revive(options) ⇒ Object
- #rsmp_versions ⇒ Object
- #run ⇒ Object
- #send_and_optionally_collect(message, options, &block) ⇒ Object
- #send_message(message, reason = nil, validate: true) ⇒ Object
- #send_version(site_id, rsmp_versions) ⇒ Object
- #send_watchdog(now = Clock.now) ⇒ Object
- #set_state(state) ⇒ Object
- #setup(options) ⇒ Object
- #should_validate_ingoing_message?(message) ⇒ Boolean
- #start ⇒ Object
- #start_reader ⇒ Object
- #start_timer ⇒ Object
- #start_watchdog ⇒ Object
- #stop ⇒ Object
- #stop_tasks ⇒ Object
- #timer(now) ⇒ Object
- #verify_sequence(message) ⇒ Object
- #version_acknowledged ⇒ Object
- #wait_for_acknowledgement(parent_task, options = {}, m_id) ⇒ Object
- #wait_for_state(state, timeout) ⇒ Object
- #watchdog_send_timer(now) ⇒ Object
- #will_not_handle(message) ⇒ Object
Methods included from Inspect
Methods included from Notifier
#add_listener, #clear_deferred_notify, #deferred_notify, #dequeue_notify, #distribute_error, #initialize_distributor, #notify, #notify_without_defer, #remove_listener
Methods included from Wait
Methods included from Logging
Constructor Details
#initialize(options) ⇒ Proxy
Returns a new instance of Proxy.
16 17 18 19 20 21 |
# File 'lib/rsmp/proxy.rb', line 16 def initialize initialize_logging initialize_distributor setup clear end |
Instance Attribute Details
#archive ⇒ Object (readonly)
Returns the value of attribute archive.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def archive @archive end |
#collector ⇒ Object (readonly)
Returns the value of attribute collector.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def collector @collector end |
#connection_info ⇒ Object (readonly)
Returns the value of attribute connection_info.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def connection_info @connection_info end |
#ip ⇒ Object (readonly)
Returns the value of attribute ip.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def ip @ip end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def port @port end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def state @state end |
#sxl ⇒ Object (readonly)
Returns the value of attribute sxl.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def sxl @sxl end |
#task ⇒ Object (readonly)
Returns the value of attribute task.
14 15 16 |
# File 'lib/rsmp/proxy.rb', line 14 def task @task end |
Instance Method Details
#acknowledge(original) ⇒ Object
419 420 421 422 423 424 425 |
# File 'lib/rsmp/proxy.rb', line 419 def acknowledge original raise InvalidArgument unless original ack = MessageAck.build_from(original) ack.original = original.clone ack, "for #{ack.original.type} #{original.m_id_short}" check_ingoing_acknowledged original end |
#acknowledged_first_ingoing(message) ⇒ Object
497 498 |
# File 'lib/rsmp/proxy.rb', line 497 def acknowledged_first_ingoing end |
#acknowledged_first_outgoing(message) ⇒ Object
494 495 |
# File 'lib/rsmp/proxy.rb', line 494 def acknowledged_first_outgoing end |
#author ⇒ Object
571 572 573 |
# File 'lib/rsmp/proxy.rb', line 571 def node.site_id end |
#buffer_message(message) ⇒ Object
283 284 285 286 |
# File 'lib/rsmp/proxy.rb', line 283 def # TODO #log "Cannot send #{message.type} because the connection is closed.", message: message, level: :error end |
#check_ack_timeout(now) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/rsmp/proxy.rb', line 223 def check_ack_timeout now timeout = @site_settings['timeouts']['acknowledgement'] # hash cannot be modify during iteration, so clone it @awaiting_acknowledgement.clone.each_pair do |m_id, | latest = . + timeout if now > latest log "No acknowledgements for #{.type} #{.m_id_short} within #{timeout} seconds", level: :error stop notify_error MissingAcknowledgment.new('No ack') end end end |
#check_ingoing_acknowledged(message) ⇒ Object
487 488 489 490 491 492 |
# File 'lib/rsmp/proxy.rb', line 487 def check_ingoing_acknowledged unless @ingoing_acknowledged[.type] @ingoing_acknowledged[.type] = true acknowledged_first_ingoing end end |
#check_outgoing_acknowledged(message) ⇒ Object
TODO this might be better handled by a proper event machine using e.g. the EventMachine gem
480 481 482 483 484 485 |
# File 'lib/rsmp/proxy.rb', line 480 def check_outgoing_acknowledged unless @outgoing_acknowledged[.type] @outgoing_acknowledged[.type] = true acknowledged_first_outgoing end end |
#check_rsmp_version(message) ⇒ Object
405 406 407 408 409 410 411 412 413 414 |
# File 'lib/rsmp/proxy.rb', line 405 def check_rsmp_version versions = rsmp_versions # find versions that both we and the client support candidates = .versions & versions if candidates.any? @rsmp_version = candidates.sort_by { |v| Gem::Version.new(v) }.last # pick latest version else raise HandshakeError.new "RSMP versions [#{.versions.join(',')}] requested, but only [#{versions.join(',')}] supported." end end |
#check_watchdog_timeout(now) ⇒ Object
236 237 238 239 240 241 242 243 244 |
# File 'lib/rsmp/proxy.rb', line 236 def check_watchdog_timeout now timeout = @site_settings['timeouts']['watchdog'] latest = @latest_watchdog_received + timeout left = latest - now if left < 0 log "No Watchdog within #{timeout} seconds", level: :error stop end end |
#clear ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/rsmp/proxy.rb', line 86 def clear @awaiting_acknowledgement = {} @latest_watchdog_received = nil @watchdog_started = false @version_determined = false @ingoing_acknowledged = {} @outgoing_acknowledged = {} @latest_watchdog_send_at = nil @state_condition = Async::Notification.new @acknowledgements = {} @acknowledgement_condition = Async::Notification.new end |
#clock ⇒ Object
51 52 53 |
# File 'lib/rsmp/proxy.rb', line 51 def clock node.clock end |
#close_socket ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/rsmp/proxy.rb', line 100 def close_socket if @stream @stream.close @stream = nil end if @socket @socket.close @socket = nil end end |
#connected? ⇒ Boolean
66 67 68 |
# File 'lib/rsmp/proxy.rb', line 66 def connected? @state == :starting || @state == :ready end |
#connection_complete ⇒ Object
560 561 562 |
# File 'lib/rsmp/proxy.rb', line 560 def connection_complete set_state :ready end |
#dont_acknowledge(original, prefix = nil, reason = nil) ⇒ Object
427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/rsmp/proxy.rb', line 427 def dont_acknowledge original, prefix=nil, reason=nil raise InvalidArgument unless original str = [prefix,reason].join(' ') log str, message: original, level: :warning if reason = MessageNotAck.new({ "oMId" => original.m_id, "rea" => reason || "Unknown reason" }) .original = original.clone , "for #{original.type} #{original.m_id_short}" end |
#dont_expect_acknowledgement(message) ⇒ Object
391 392 393 |
# File 'lib/rsmp/proxy.rb', line 391 def dont_expect_acknowledgement @awaiting_acknowledgement.delete .attribute("oMId") end |
#expect_acknowledgement(message) ⇒ Object
385 386 387 388 389 |
# File 'lib/rsmp/proxy.rb', line 385 def expect_acknowledgement unless .is_a?(MessageAck) || .is_a?(MessageNotAck) @awaiting_acknowledgement[.m_id] = end end |
#expect_version_message(message) ⇒ Object
554 555 556 557 558 |
# File 'lib/rsmp/proxy.rb', line 554 def unless .is_a?(Version) || .is_a?(MessageAck) || .is_a?(MessageNotAck) raise HandshakeError.new "Version must be received first" end end |
#extraneous_version(message) ⇒ Object
395 396 397 |
# File 'lib/rsmp/proxy.rb', line 395 def extraneous_version dont_acknowledge , "Received", "extraneous Version message" end |
#find_original_for_message(message) ⇒ Object
475 476 477 |
# File 'lib/rsmp/proxy.rb', line 475 def @awaiting_acknowledgement[ .attribute("oMId") ] end |
#get_schemas ⇒ Object
255 256 257 258 259 260 261 262 263 |
# File 'lib/rsmp/proxy.rb', line 255 def get_schemas # normally we have an sxl, but during connection, it hasn't been established yet # at these times we only validate against the core schema # TODO # what schema should we use to validate the intial Version and MessageAck messages? schemas = { core: '3.1.5' } schemas[sxl] = RSMP::Schemer.sanitize_version(sxl_version) if sxl && sxl_version schemas end |
#inspect ⇒ Object
45 46 47 48 49 |
# File 'lib/rsmp/proxy.rb', line 45 def inspect "#<#{self.class.name}:#{self.object_id}, #{inspector( :@acknowledgements,:@settings,:@site_settings )}>" end |
#log(str, options = {}) ⇒ Object
251 252 253 |
# File 'lib/rsmp/proxy.rb', line 251 def log str, ={} super str, .merge(ip: @ip, port: @port, site_id: @site_id) end |
#log_acknowledgement_for_original(message, original) ⇒ Object
533 534 535 536 537 538 539 540 541 542 |
# File 'lib/rsmp/proxy.rb', line 533 def log_acknowledgement_for_original , original str = "Received #{.type} for #{original.type} #{.attribute("oMId")[0..3]}" if .type == 'MessageNotAck' reason = .attributes["rea"] str = "#{str}: #{reason}" if reason log str, message: , level: :warning else log str, message: , level: :log end end |
#log_acknowledgement_for_unknown(message) ⇒ Object
544 545 546 |
# File 'lib/rsmp/proxy.rb', line 544 def log_acknowledgement_for_unknown log "Received #{.type} for unknown message #{.attribute("oMId")[0..3]}", message: , level: :warning end |
#log_send(message, reason = nil) ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/rsmp/proxy.rb', line 288 def log_send , reason=nil if reason str = "Sent #{.type} #{reason}" else str = "Sent #{.type}" end if .type == "MessageNotAck" log str, message: , level: :warning else log str, message: , level: :log end end |
#node ⇒ Object
567 568 569 |
# File 'lib/rsmp/proxy.rb', line 567 def node raise 'Must be overridden' end |
#notify_error(e, options = {}) ⇒ Object
152 153 154 |
# File 'lib/rsmp/proxy.rb', line 152 def notify_error e, ={} node.notify_error e, end |
#process_ack(message) ⇒ Object
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 |
# File 'lib/rsmp/proxy.rb', line 500 def process_ack original = if original dont_expect_acknowledgement .original = original log_acknowledgement_for_original , original if original.type == "Version" version_acknowledged end check_outgoing_acknowledged original @acknowledgements[ original.m_id ] = @acknowledgement_condition.signal else log_acknowledgement_for_unknown end end |
#process_deferred ⇒ Object
310 311 312 |
# File 'lib/rsmp/proxy.rb', line 310 def process_deferred node.process_deferred end |
#process_message(message) ⇒ Object
364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/rsmp/proxy.rb', line 364 def case when MessageAck process_ack when MessageNotAck process_not_ack when Version process_version when Watchdog process_watchdog else dont_acknowledge , "Received", "unknown message (#{.type})" end end |
#process_not_ack(message) ⇒ Object
520 521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/rsmp/proxy.rb', line 520 def process_not_ack original = if original dont_expect_acknowledgement .original = original log_acknowledgement_for_original , original @acknowledgements[ original.m_id ] = @acknowledgement_condition.signal else log_acknowledgement_for_unknown end end |
#process_packet(json) ⇒ Object
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/rsmp/proxy.rb', line 318 def process_packet json attributes = Message.parse_attributes json = Message.build attributes, json .validate(get_schemas) if () verify_sequence deferred_notify do notify end process_deferred rescue InvalidPacket => e str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{e.}" notify_error e.exception(str) log str, level: :warning nil rescue MalformedMessage => e str = "Received malformed message, #{e.}" notify_error e.exception(str) log str, message: Malformed.new(attributes), level: :warning # cannot send NotAcknowledged for a malformed message since we can't read it, just ignore it nil rescue SchemaError, RSMP::Schemer::Error => e reason = "schema errors: #{e.}" str = "Received invalid #{.type}, #{reason}" log str, message: , level: :warning notify_error e.exception(str), message: dont_acknowledge , str, reason rescue InvalidMessage => e reason = "#{e.}" str = "Received invalid #{.type}," notify_error e.exception("#{str} #{.json}"), message: dont_acknowledge , str, reason rescue FatalError => e reason = e. str = "Rejected #{.type}," notify_error e.exception(str), message: dont_acknowledge , str, reason stop ensure node.clear_deferred end |
#process_version(message) ⇒ Object
416 417 |
# File 'lib/rsmp/proxy.rb', line 416 def process_version end |
#process_watchdog(message) ⇒ Object
548 549 550 551 552 |
# File 'lib/rsmp/proxy.rb', line 548 def process_watchdog log "Received #{.type}", message: , level: :log @latest_watchdog_received = Clock.now acknowledge end |
#ready? ⇒ Boolean
62 63 64 |
# File 'lib/rsmp/proxy.rb', line 62 def ready? @state == :ready end |
#revive(options) ⇒ Object
23 24 25 |
# File 'lib/rsmp/proxy.rb', line 23 def revive setup end |
#rsmp_versions ⇒ Object
399 400 401 402 403 |
# File 'lib/rsmp/proxy.rb', line 399 def rsmp_versions return ['3.1.5'] if @site_settings["rsmp_versions"] == 'latest' return ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5'] if @site_settings["rsmp_versions"] == 'all' @site_settings["rsmp_versions"] end |
#run ⇒ Object
55 56 57 58 59 60 |
# File 'lib/rsmp/proxy.rb', line 55 def run start @reader.wait if @reader ensure stop unless [:stopped, :stopping].include? @state end |
#send_and_optionally_collect(message, options, &block) ⇒ Object
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 |
# File 'lib/rsmp/proxy.rb', line 589 def send_and_optionally_collect , , &block = [:collect] || [:collect!] if task = @task.async do |task| collector = yield # call block to create collector collector.collect collector.ok! if [:collect!] # raise any errors if the bang version was specified collector end , validate: [:validate] { sent: , collector: task.wait } else , validate: [:validate] return { sent: } end end |
#send_message(message, reason = nil, validate: true) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 |
# File 'lib/rsmp/proxy.rb', line 265 def , reason=nil, validate: true raise NotReady unless connected? raise IOError unless @protocol .direction = :out .generate_json .validate get_schemas unless validate==false @protocol.write_lines .json expect_acknowledgement notify log_send , reason rescue EOFError, IOError rescue SchemaError, RSMP::Schemer::Error => e str = "Could not send #{.type} because schema validation failed: #{e.}" log str, message: , level: :error notify_error e.exception("#{str} #{.json}") end |
#send_version(site_id, rsmp_versions) ⇒ Object
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/rsmp/proxy.rb', line 455 def send_version site_id, rsmp_versions if rsmp_versions=='latest' versions = ['3.1.5'] elsif rsmp_versions=='all' versions = ['3.1.1','3.1.2','3.1.3','3.1.4','3.1.5'] else versions = [rsmp_versions].flatten end versions_array = versions.map {|v| {"vers" => v} } site_id_array = [site_id].flatten.map {|id| {"sId" => id} } version_response = Version.new({ "RSMP"=>versions_array, "siteId"=>site_id_array, "SXL"=>sxl_version }) version_response end |
#send_watchdog(now = Clock.now) ⇒ Object
217 218 219 220 221 |
# File 'lib/rsmp/proxy.rb', line 217 def send_watchdog now=Clock.now = Watchdog.new( {"wTs" => clock.to_s}) @latest_watchdog_send_at = now end |
#set_state(state) ⇒ Object
439 440 441 442 |
# File 'lib/rsmp/proxy.rb', line 439 def set_state state @state = state @state_condition.signal @state end |
#setup(options) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rsmp/proxy.rb', line 27 def setup @settings = [:settings] @task = [:task] @socket = [:socket] @stream = [:stream] @protocol = [:protocol] @ip = [:ip] @port = [:port] @connection_info = [:info] @sxl = nil @site_settings = nil # can't pick until we know the site id @state = :stopped if [:collect] @collector = RSMP::Collector.new self, [:collect] @collector.start end end |
#should_validate_ingoing_message?(message) ⇒ Boolean
302 303 304 305 306 307 308 |
# File 'lib/rsmp/proxy.rb', line 302 def return true unless @site_settings skip = @site_settings.dig('skip_validation') return true unless skip klass = .class.name.split('::').last !skip.include?(klass) end |
#start ⇒ Object
71 72 73 |
# File 'lib/rsmp/proxy.rb', line 71 def start set_state :starting end |
#start_reader ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/rsmp/proxy.rb', line 112 def start_reader @reader = @task.async do |task| task.annotate "reader" @stream ||= Async::IO::Stream.new(@socket) @protocol ||= Async::IO::Protocol::Line.new(@stream,WRAPPING_DELIMITER) # rsmp messages are json terminated with a form-feed while json = @protocol.read_line beginning = Time.now = process_packet json duration = Time.now - beginning ms = (duration*1000).round(4) if duration > 0 per_second = (1.0 / duration).round else per_second = Float::INFINITY end if type = .type m_id = Logger.(.m_id) else type = 'Unknown' m_id = nil end str = [type,m_id,"processed in #{ms}ms, #{per_second}req/s"].compact.join(' ') log str, level: :statistics end rescue Async::Wrapper::Cancelled # ignore rescue EOFError log "Connection closed", level: :warning rescue IOError => e log "IOError: #{e}", level: :warning rescue Errno::ECONNRESET log "Connection reset by peer", level: :warning rescue Errno::EPIPE log "Broken pipe", level: :warning rescue StandardError => e notify_error e, level: :internal end end |
#start_timer ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/rsmp/proxy.rb', line 162 def start_timer name = "timer" interval = @site_settings['intervals']['timer'] || 1 log "Starting #{name} with interval #{interval} seconds", level: :debug @latest_watchdog_received = Clock.now @timer = @task.async do |task| task.annotate "timer" next_time = Time.now.to_f loop do begin now = Clock.now timer(now) rescue RSMP::Schemer::Error => e puts "Timer: Schema error: #{e}" rescue EOFError => e log "Timer: Connection closed: #{e}", level: :warning rescue IOError => e log "Timer: IOError", level: :warning rescue Errno::ECONNRESET log "Timer: Connection reset by peer", level: :warning rescue Errno::EPIPE => e log "Timer: Broken pipe", level: :warning rescue StandardError => e notify_error e, level: :internal end ensure next_time += interval duration = next_time - Time.now.to_f task.sleep duration end end end |
#start_watchdog ⇒ Object
156 157 158 159 160 |
# File 'lib/rsmp/proxy.rb', line 156 def start_watchdog log "Starting watchdog with interval #{@site_settings['intervals']['watchdog']} seconds", level: :debug send_watchdog @watchdog_started = true end |
#stop ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/rsmp/proxy.rb', line 75 def stop return if @state == :stopped set_state :stopping stop_tasks notify_error DisconnectError.new("Connection was closed") ensure close_socket clear set_state :stopped end |
#stop_tasks ⇒ Object
246 247 248 249 |
# File 'lib/rsmp/proxy.rb', line 246 def stop_tasks @timer.stop if @timer @reader.stop if @reader end |
#timer(now) ⇒ Object
196 197 198 199 200 |
# File 'lib/rsmp/proxy.rb', line 196 def timer now watchdog_send_timer now check_ack_timeout now check_watchdog_timeout now end |
#verify_sequence(message) ⇒ Object
314 315 316 |
# File 'lib/rsmp/proxy.rb', line 314 def verify_sequence () unless @version_determined end |
#version_acknowledged ⇒ Object
564 565 |
# File 'lib/rsmp/proxy.rb', line 564 def version_acknowledged end |
#wait_for_acknowledgement(parent_task, options = {}, m_id) ⇒ Object
575 576 577 578 579 580 581 582 583 584 585 586 587 |
# File 'lib/rsmp/proxy.rb', line 575 def wait_for_acknowledgement parent_task, ={}, m_id collector = Collector.new self, .merge(task: parent_task, type: ['MessageAck','MessageNotAck']) collector.collect do || if .is_a?(MessageNotAck) if .attribute('oMId') == m_id m_id_short = RSMP::Message.shorten_m_id m_id, 8 raise RSMP::MessageRejected.new "Aggregated status request #{m_id_short} was rejected with '#{.attribute('rea')}'" end elsif .is_a?(MessageAck) collector.complete if .attribute('oMId') == m_id end end end |
#wait_for_state(state, timeout) ⇒ Object
444 445 446 447 448 449 450 451 452 453 |
# File 'lib/rsmp/proxy.rb', line 444 def wait_for_state state, timeout states = [state].flatten return if states.include?(@state) wait_for(@state_condition,timeout) do states.include?(@state) end @state rescue Async::TimeoutError raise RSMP::TimeoutError.new "Did not reach state #{state} within #{timeout}s" end |
#watchdog_send_timer(now) ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/rsmp/proxy.rb', line 202 def watchdog_send_timer now return unless @watchdog_started return if @site_settings['intervals']['watchdog'] == :never if @latest_watchdog_send_at == nil send_watchdog now else # we add half the timer interval to pick the timer # event closes to the wanted wathcdog interval diff = now - @latest_watchdog_send_at if (diff + 0.5*@site_settings['intervals']['timer']) >= (@site_settings['intervals']['watchdog']) send_watchdog now end end end |
#will_not_handle(message) ⇒ Object
379 380 381 382 383 |
# File 'lib/rsmp/proxy.rb', line 379 def will_not_handle reason = "since we're a #{self.class.name.downcase}" unless reason log "Ignoring #{.type}, #{reason}", message: , level: :warning dont_acknowledge , nil, reason end |