Class: Mycel::Channel::Job
- Includes:
- Mycel::Callbacks
- Defined in:
- lib/mycel.rb
Overview
Inbound side of a single request.
Lifecycle hooks (subscribe with ‘job.on(…)`):
:request — fired with the request payload when start runs. The
handler executes off main_loop via Session#spawn_handler
(default: a fresh Thread; configurable via Session's
`executor:` parameter).
:advise — fired when the requester sends a mid-flight advise.
:error — fired if the :request handler raises StandardError
that escapes user-level rescue. SystemExit and
Interrupt still propagate.
Send-side methods (‘respond`, `abort`, `progress`) write to the underlying session. They will raise IOError out of Session#send if the session has already closed — call `session.closed?` if you need to bail out gracefully:
j.on(:request) { |payload|
result = compute(payload)
break if @session.closed?
j.respond(result)
}
Peer’s wrapper handles this for users of the RPC layer; only direct Channel users need to think about it.
Constant Summary
Constants inherited from Context
Context::STATE_INIT, Context::STATE_RUNNING, Context::STATE_STOPPED
Instance Attribute Summary
Attributes inherited from Context
Instance Method Summary collapse
- #abort ⇒ Object
-
#initialize(session, id) ⇒ Job
constructor
A new instance of Job.
- #progress(payload) ⇒ Object
- #respond(payload) ⇒ Object
-
#start ⇒ Object
Start the job: transition to RUNNING and dispatch the :request event to the user’s handler.
- #they_advise(payload) ⇒ Object
Methods included from Mycel::Callbacks
#__mycel_callback_handlers__, #callback, #on
Methods inherited from Context
Constructor Details
Instance Method Details
#abort ⇒ Object
584 585 586 587 588 589 590 |
# File 'lib/mycel.rb', line 584 def abort guard_assert_state(STATE_RUNNING) { @session.send(Message.generate_msg('abort', @id)) @session.unregister_id(DIR_JOB, @id) change_state(STATE_STOPPED) } end |
#progress(payload) ⇒ Object
592 593 594 595 596 |
# File 'lib/mycel.rb', line 592 def progress(payload) guard_assert_state(STATE_RUNNING) { @session.send(Message.generate_payload_msg('progress', @id, payload)) } end |
#respond(payload) ⇒ Object
576 577 578 579 580 581 582 |
# File 'lib/mycel.rb', line 576 def respond(payload) guard_assert_state(STATE_RUNNING) { @session.send(Message.generate_payload_msg('response', @id, payload)) @session.unregister_id(DIR_JOB, @id) change_state(STATE_STOPPED) } end |
#start ⇒ Object
Start the job: transition to RUNNING and dispatch the :request event to the user’s handler.
The state transition is synchronous so that subsequent messages (advise / abort / etc.) routed by Session#main_loop see STATE_RUNNING. The handler itself runs in a fresh thread so that main_loop is never blocked by user code — a long-running handler would otherwise stall every other in-flight Command and Job on the same session. Callers needing serialisation apply their own mutex/queue inside the handler.
Why the rescue catches StandardError and routes it to :error rather than propagating:
* Handler exceptions are a NORMAL part of RPC semantics, not
a bug. The expected protocol is "raise → peer receives an
error response". Peer#handle_method_request wraps the user
method in its own rescue and converts the exception into a
wire-format error response via Job#respond. So in the RPC
case we never reach this rescue at all.
* What we ARE catching here is exceptions that escape Peer's
wrapper — i.e. errors from Channel-direct users (no Peer)
or failures inside Job#respond itself (e.g. IO closed mid-
respond during shutdown). For those cases we still want to
give observers a chance to react without polluting stderr.
* Re-raising would surface via Thread#report_on_exception.
During normal shutdown that prints stack traces to stderr
for benign races (Job#respond losing to a concurrent close).
The :error callback gives the same information to anyone
who actually wants it (loggers, tests, supervisors) without
the noise.
* StandardError (not Exception) so that SystemExit / Interrupt
/ SignalException still propagate and let the process exit
cleanly.
563 564 565 566 567 568 569 570 571 572 573 574 |
# File 'lib/mycel.rb', line 563 def start(...) guard_assert_state(STATE_INIT) { change_state(STATE_RUNNING) } @session.spawn_handler { begin callback(:request, ...) rescue StandardError => e callback(:error, e) end } end |
#they_advise(payload) ⇒ Object
598 599 600 601 602 |
# File 'lib/mycel.rb', line 598 def they_advise(payload) guard_assert_state(STATE_RUNNING) { callback(:advise, payload) } end |