Class: Mycel::Channel::Job

Inherits:
Context
  • Object
show all
Includes:
Mycel::Callbacks
Defined in:
lib/mycel.rb

Overview

Inbound side of a single request.

Lifecycle hooks (subscribe with ‘job.on(…)`):

:request  — fired with the request payload when start runs. The
            handler executes off main_loop via Session#spawn_handler
            (default: a fresh Thread; configurable via Session's
            `executor:` parameter).
:advise   — fired when the requester sends a mid-flight advise.
:error    — fired if the :request handler raises StandardError
            that escapes user-level rescue. SystemExit and
            Interrupt still propagate.

Send-side methods (‘respond`, `abort`, `progress`) write to the underlying session. They will raise IOError out of Session#send if the session has already closed — call `session.closed?` if you need to bail out gracefully:

j.on(:request) { |payload|
  result = compute(payload)
  break if @session.closed?
  j.respond(result)
}

Peer’s wrapper handles this for users of the RPC layer; only direct Channel users need to think about it.

Constant Summary

Constants inherited from Context

Context::STATE_INIT, Context::STATE_RUNNING, Context::STATE_STOPPED

Instance Method Summary collapse

Methods included from Mycel::Callbacks

#__mycel_callback_handlers__, #callback, #on

Methods inherited from Context

#wait

Constructor Details

#initialize(session, id) ⇒ Job

Returns a new instance of Job.



482
483
484
485
486
# File 'lib/mycel.rb', line 482

def initialize(session, id)
  super(session)
  @session.register_id(DIR_JOB, id, self)
  @id = id
end

Instance Method Details

#abortObject



546
547
548
549
550
551
552
# File 'lib/mycel.rb', line 546

def abort
  guard_assert_state(STATE_RUNNING) {
    @session.send(Message.generate_msg('abort', @id))
    @session.unregister_id(DIR_JOB, @id)
    change_state(STATE_STOPPED)
  }
end

#progress(payload) ⇒ Object



554
555
556
557
558
# File 'lib/mycel.rb', line 554

def progress(payload)
  guard_assert_state(STATE_RUNNING) {
    @session.send(Message.generate_payload_msg('progress', @id, payload))
  }
end

#respond(payload) ⇒ Object



538
539
540
541
542
543
544
# File 'lib/mycel.rb', line 538

def respond(payload)
  guard_assert_state(STATE_RUNNING) {
    @session.send(Message.generate_payload_msg('response', @id, payload))
    @session.unregister_id(DIR_JOB, @id)
    change_state(STATE_STOPPED)
  }
end

#startObject

Start the job: transition to RUNNING and dispatch the :request event to the user’s handler.

The state transition is synchronous so that subsequent messages (advise / abort / etc.) routed by Session#main_loop see STATE_RUNNING. The handler itself runs in a fresh thread so that main_loop is never blocked by user code — a long-running handler would otherwise stall every other in-flight Command and Job on the same session. Callers needing serialisation apply their own mutex/queue inside the handler.

Why the rescue catches StandardError and routes it to :error rather than propagating:

* Handler exceptions are a NORMAL part of RPC semantics, not
  a bug. The expected protocol is "raise → peer receives an
  error response". Peer#handle_method_request wraps the user
  method in its own rescue and converts the exception into a
  wire-format error response via Job#respond. So in the RPC
  case we never reach this rescue at all.

* What we ARE catching here is exceptions that escape Peer's
  wrapper — i.e. errors from Channel-direct users (no Peer)
  or failures inside Job#respond itself (e.g. IO closed mid-
  respond during shutdown). For those cases we still want to
  give observers a chance to react without polluting stderr.

* Re-raising would surface via Thread#report_on_exception.
  During normal shutdown that prints stack traces to stderr
  for benign races (Job#respond losing to a concurrent close).
  The :error callback gives the same information to anyone
  who actually wants it (loggers, tests, supervisors) without
  the noise.

* StandardError (not Exception) so that SystemExit / Interrupt
  / SignalException still propagate and let the process exit
  cleanly.


525
526
527
528
529
530
531
532
533
534
535
536
# File 'lib/mycel.rb', line 525

def start(...)
  guard_assert_state(STATE_INIT) {
    change_state(STATE_RUNNING)
  }
  @session.spawn_handler {
    begin
      callback(:request, ...)
    rescue StandardError => e
      callback(:error, e)
    end
  }
end

#they_advise(payload) ⇒ Object



560
561
562
563
564
# File 'lib/mycel.rb', line 560

def they_advise(payload)
  guard_assert_state(STATE_RUNNING) {
    callback(:advise, payload)
  }
end