Class: Dynflow::Dispatcher::ClientDispatcher
Defined Under Namespace
Modules: TrackedRequest Classes: PingCache
Instance Attribute Summary collapse
-
#ping_cache ⇒ Object
readonly
Returns the value of attribute ping_cache.
Instance Method Summary collapse
-
#add_ping_cache_record(id) ⇒ Object
Records when was the world with provided id last seen using a PingCache.
- #dispatch_request(request, client_world_id, request_id) ⇒ Object
- #dispatch_response(envelope) ⇒ Object
-
#initialize(world, ping_cache_age) ⇒ ClientDispatcher
constructor
A new instance of ClientDispatcher.
- #publish_request(future, request, timeout) ⇒ Object
- #start_termination(*args) ⇒ Object
- #timeout(request_id) ⇒ Object
Methods inherited from Abstract
Methods inherited from Actor
#behaviour_definition, #finish_termination, #terminating?
Methods included from MethodicActor
Methods included from Actor::LogWithFullBacktrace
Constructor Details
#initialize(world, ping_cache_age) ⇒ ClientDispatcher
Returns a new instance of ClientDispatcher.
107 108 109 110 111 112 113 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 107 def initialize(world, ping_cache_age) @world = Type! world, World @last_id_suffix = 0 @tracked_requests = {} @terminated = nil @ping_cache = PingCache.new world, ping_cache_age end |
Instance Attribute Details
#ping_cache ⇒ Object (readonly)
Returns the value of attribute ping_cache.
106 107 108 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 106 def ping_cache @ping_cache end |
Instance Method Details
#add_ping_cache_record(id) ⇒ Object
Records when was the world with provided id last seen using a PingCache
190 191 192 193 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 190 def add_ping_cache_record(id) log Logger::DEBUG, "adding ping cache record for #{id}" @ping_cache.add_record id end |
#dispatch_request(request, client_world_id, request_id) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 134 def dispatch_request(request, client_world_id, request_id) ignore_unknown = false executor_id = match request, (on ~Execution | ~Planning do |execution| AnyExecutor end), (on ~Event do |event| ignore_unknown = event.optional find_executor(event.execution_plan_id) end), (on ~Halt do |event| executor = find_executor(event.execution_plan_id) executor == Dispatcher::UnknownWorld ? AnyExecutor : executor end), (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _| receiver_id end) envelope = Envelope[request_id, client_world_id, executor_id, request] if Dispatcher::UnknownWorld === envelope.receiver_id raise Dynflow::Error, "Could not find an executor for #{envelope}" unless ignore_unknown = "Could not find an executor for optional #{envelope}, discarding." log(Logger::DEBUG, ) return respond(envelope, Failed[]) end connector.send(envelope).value! rescue => e log(Logger::ERROR, e) respond(envelope, Failed[e.]) if envelope end |
#dispatch_response(envelope) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 165 def dispatch_response(envelope) return unless @tracked_requests.key?(envelope.request_id) match envelope., (on ~Accepted do @tracked_requests[envelope.request_id].accept! end), (on ~Failed do |msg| resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error)) end), (on Done do resolve_tracked_request(envelope.request_id) end), (on Pong do add_ping_cache_record(envelope.sender_id) resolve_tracked_request(envelope.request_id) end), (on ExecutionStatus.(~any) do |steps| @tracked_requests.delete(envelope.request_id).success! steps end) end |
#publish_request(future, request, timeout) ⇒ Object
115 116 117 118 119 120 121 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 115 def publish_request(future, request, timeout) with_ping_request_caching(request, future) do track_request(future, request, timeout) do |tracked_request| dispatch_request(request, @world.id, tracked_request.id) end end end |
#start_termination(*args) ⇒ Object
127 128 129 130 131 132 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 127 def start_termination(*args) super @tracked_requests.values.each { |tracked_request| tracked_request.fail!(Dynflow::Error.new('Dispatcher terminated')) } @tracked_requests.clear finish_termination end |