Class: Tempest::Jetstream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/tempest/jetstream/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(url: DEFAULT_URL, wanted_collections: [], wanted_dids: [], decoder: Decoder, transport: nil) ⇒ Client

Returns a new instance of Client.



11
12
13
14
15
16
17
# File 'lib/tempest/jetstream/client.rb', line 11

def initialize(url: DEFAULT_URL, wanted_collections: [], wanted_dids: [], decoder: Decoder, transport: nil)
  @url = url
  @wanted_collections = Array(wanted_collections)
  @wanted_dids = Array(wanted_dids)
  @decoder = decoder
  @transport = transport
end

Instance Method Details

#each_event(cursor: nil, &block) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/tempest/jetstream/client.rb', line 32

def each_event(cursor: nil, &block)
  return enum_for(:each_event, cursor: cursor) unless block

  transport.each_message(subscribe_url(cursor: cursor)) do |raw|
    event = @decoder.decode(raw)
    yield event if event
  end
end

#subscribe_url(cursor: nil) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/tempest/jetstream/client.rb', line 19

def subscribe_url(cursor: nil)
  params = []
  @wanted_collections.each { |c| params << ["wantedCollections", c] }
  @wanted_dids.each { |d| params << ["wantedDids", d] }
  params << ["cursor", cursor.to_s] if cursor
  return @url if params.empty?

  uri = URI(@url)
  existing = uri.query ? URI.decode_www_form(uri.query) : []
  uri.query = URI.encode_www_form(existing + params)
  uri.to_s
end