Class: Tempest::Jetstream::Client
- Inherits:
-
Object
- Object
- Tempest::Jetstream::Client
- Defined in:
- lib/tempest/jetstream/client.rb
Instance Method Summary collapse
- #each_event(cursor: nil, &block) ⇒ Object
-
#initialize(url: DEFAULT_URL, wanted_collections: [], wanted_dids: [], decoder: Decoder, transport: nil) ⇒ Client
constructor
A new instance of Client.
- #subscribe_url(cursor: nil) ⇒ Object
Constructor Details
#initialize(url: DEFAULT_URL, wanted_collections: [], wanted_dids: [], decoder: Decoder, transport: nil) ⇒ Client
Returns a new instance of Client.
11 12 13 14 15 16 17 |
# File 'lib/tempest/jetstream/client.rb', line 11 def initialize(url: DEFAULT_URL, wanted_collections: [], wanted_dids: [], decoder: Decoder, transport: nil) @url = url @wanted_collections = Array(wanted_collections) @wanted_dids = Array(wanted_dids) @decoder = decoder @transport = transport end |
Instance Method Details
#each_event(cursor: nil, &block) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/tempest/jetstream/client.rb', line 32 def each_event(cursor: nil, &block) return enum_for(:each_event, cursor: cursor) unless block transport.(subscribe_url(cursor: cursor)) do |raw| event = @decoder.decode(raw) yield event if event end end |
#subscribe_url(cursor: nil) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/tempest/jetstream/client.rb', line 19 def subscribe_url(cursor: nil) params = [] @wanted_collections.each { |c| params << ["wantedCollections", c] } @wanted_dids.each { |d| params << ["wantedDids", d] } params << ["cursor", cursor.to_s] if cursor return @url if params.empty? uri = URI(@url) existing = uri.query ? URI.decode_www_form(uri.query) : [] uri.query = URI.encode_www_form(existing + params) uri.to_s end |