Module: NatsWorker::Worker::ClassMethods

Defined in:
lib/nats_worker/worker.rb

Instance Method Summary collapse

Instance Method Details

#from_stream(stream, subject: nil, durable: nil, threads: nil, fetch: nil, fetch_timeout: nil, ack_wait: nil, consumer_config: {}) ⇒ Object

Subscription configuration for this worker.

Parameters:

  • stream (String)

    JetStream stream name (required)

  • subject (String) (defaults to: nil)

    subject filter (defaults to “>” inside the stream)

  • durable (String) (defaults to: nil)

    durable consumer name (defaults to class name underscored)

  • threads (Integer) (defaults to: nil)

    number of pull-loop threads

  • fetch (Integer) (defaults to: nil)

    batch size for each pull

  • fetch_timeout (Integer) (defaults to: nil)

    seconds to wait for a batch

  • ack_wait (Integer) (defaults to: nil)

    consumer ack_wait in seconds

  • consumer_config (Hash) (defaults to: {})

    extra fields merged into the consumer config



47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/nats_worker/worker.rb', line 47

def from_stream(stream, subject: nil, durable: nil, threads: nil,
                fetch: nil, fetch_timeout: nil, ack_wait: nil,
                consumer_config: {})
  @nats_subscription = {
    stream:          stream,
    subject:         subject,
    durable:         durable || default_durable_name,
    threads:         threads,
    fetch:           fetch,
    fetch_timeout:   fetch_timeout,
    ack_wait:        ack_wait,
    consumer_config: consumer_config
  }
end

#nats_subscriptionObject



62
63
64
65
# File 'lib/nats_worker/worker.rb', line 62

def nats_subscription
  @nats_subscription or raise NatsWorker::Error,
    "#{name} did not declare from_stream(...)"
end