Class: SpreeCmCommissioner::WaitingGuestsCaller

Inherits:
BaseInteractor show all
Defined in:
app/interactors/spree_cm_commissioner/waiting_guests_caller.rb

Constant Summary collapse

MIN_WAIT_TO_ENTER_SECONDS =

10 min floor — Waiting Room step (full journey, most uncertainty)

(ENV['WAITING_ROOM_MIN_WAIT_TO_ENTER_SECONDS'] || 600).to_i
MIN_QUEUE_TO_ENTER_SECONDS =

5 min floor — Queue step (position assigned, sub-segment of the journey)

(ENV['WAITING_ROOM_MIN_QUEUE_TO_ENTER_SECONDS'] || 300).to_i
FIRESTORE_BATCH_SIZE =

Firestore bounds a batch update by payload size (10 MiB); 500 ops/commit leaves us far under that.

(ENV['WAITING_ROOM_FIRESTORE_BATCH_SIZE'] || 500).to_i

Instance Method Summary collapse

Instance Method Details

#callObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 15

def call
  started_at = Time.zone.now
  available_slots = fetch_available_slots

  # Always run through to mark_as — even with no slots — so the lobby keeps a fresh heartbeat
  # (logs.waiting_guests_caller_job) during peak/full periods instead of going stale
  # and looking like a dead cron.
  long_waiting_guests = available_slots.positive? ? fetch_long_waiting_guests(available_slots) : []
  calling_all(long_waiting_guests) if long_waiting_guests.any?

  mark_as(
    full: long_waiting_guests.size >= available_slots,
    available_slots: available_slots - long_waiting_guests.size,
    has_queue: waiting_exists?([previous_records_path, records_path]),
    avg_wait_to_enter_seconds: compute_avg_wait_to_enter_seconds(long_waiting_guests),
    avg_queue_to_enter_seconds: compute_avg_queue_to_enter_seconds(long_waiting_guests),
    recent_slots_per_call: available_slots.positive? ? available_slots : nil,
    active_sessions: @active_sessions,
    max_sessions: @max_sessions,
    logs: call_logs(started_at: started_at, called_count: long_waiting_guests.size)
  )
end

#call_logs(started_at:, called_count:) ⇒ Object

Point-in-time snapshot (overwritten each run, never cumulative) of this caller run: last-run timing (heartbeat) and how many guests were called. Capacity state (active/max sessions, available_slots) lives top-level, not here.



154
155
156
157
158
159
160
161
162
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 154

def call_logs(started_at:, called_count:)
  finished_at = Time.zone.now
  {
    called_count: called_count,
    last_started_at: started_at,
    last_finished_at: finished_at,
    last_duration_ms: ((finished_at - started_at) * 1000).round
  }
end

#calling_all(waiting_guests) ⇒ Object

For alert waiting guests to enter room, we just update :allow_to_enter_room_at. App will listen to firebase & start refresh session token to enter room.

Commit in Firestore batches (chunks of FIRESTORE_BATCH_SIZE) instead of one update per guest, so e.g. 1000 guests = 2 commits, not 1000 round-trips. update merges, so we only send the changed field and leave the rest of each doc intact.



105
106
107
108
109
110
111
112
113
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 105

def calling_all(waiting_guests)
  allow_at = Time.zone.now

  waiting_guests.each_slice(FIRESTORE_BATCH_SIZE) do |slice|
    firestore.batch do |b|
      slice.each { |document| b.update(document.ref, { allow_to_enter_room_at: allow_at }) }
    end
  end
end

#compute_avg_queue_to_enter_seconds(guests) ⇒ Object

Average queue wait (position_set_at → allow_to_enter_room_at) for just-called guests. Used for the Queue step ETA: how long after getting a position until I actually enter. Returns nil when none of the called guests had position_set_at (first run, or no prior stamp).



177
178
179
180
181
182
183
184
185
186
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 177

def compute_avg_queue_to_enter_seconds(guests)
  return nil if guests.empty?

  now = Time.zone.now
  positioned = guests.select { |doc| doc.data[:position_set_at] }
  return nil if positioned.empty?

  total = positioned.sum { |doc| (now - doc.data[:position_set_at].to_time).to_f }
  [(total / positioned.size).round, MIN_QUEUE_TO_ENTER_SECONDS].max
end

#compute_avg_wait_to_enter_seconds(guests) ⇒ Object

Average total wait (queued_at → allow_to_enter_room_at) for just-called guests. Used for the Waiting Room step ETA: how long until I get in from when I joined.



166
167
168
169
170
171
172
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 166

def compute_avg_wait_to_enter_seconds(guests)
  return nil if guests.empty?

  now = Time.zone.now
  total = guests.sum { |doc| (now - doc.data[:queued_at].to_time).to_f }
  [(total / guests.size).round, MIN_WAIT_TO_ENTER_SECONDS].max
end

#current_dateObject



115
116
117
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 115

def current_date
  Time.zone.now.strftime('%Y-%m-%d')
end

#default_records_path(date) ⇒ Object



95
96
97
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 95

def default_records_path(date)
  "waiting_guests/#{date}/records"
end

#eligible_guests_in(records_path, limit) ⇒ Object



63
64
65
66
67
68
69
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 63

def eligible_guests_in(records_path, limit)
  firestore.col(records_path)
           .where('allow_to_enter_room_at', '==', nil)
           .order('queued_at')
           .limit(limit)
           .get.to_a
end

#fetch_available_slotsObject



38
39
40
41
42
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 38

def fetch_available_slots
  @max_sessions = fetch_max_sessions
  @active_sessions = SpreeCmCommissioner::WaitingRoomSession.active.count
  [@max_sessions - @active_sessions, 0].max
end

#fetch_long_waiting_guests(available_slots) ⇒ Object

This query requires an index; create it in Firebase beforehand. Client must create waiting_guests documents with :queued_at and :allow_to_enter_room_at set to nil to allow filter + order queries.

Yesterday's guests are always older than today's, so fill from yesterday first, then use any leftover slots for today. This way no one queued before the midnight rollover gets skipped. e.g. 5 slots, 2 waiting in yesterday -> take both, then take 3 from today.



50
51
52
53
54
55
56
57
58
59
60
61
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 50

def fetch_long_waiting_guests(available_slots)
  previous_guests = eligible_guests_in(previous_records_path, available_slots)

  # Pre-flip window: the lobby pointer still points at yesterday, so both paths resolve to the
  # same partition — return now to avoid querying (and double-counting) it twice.
  return previous_guests if records_path == previous_records_path

  remaining_slots = available_slots - previous_guests.size
  return previous_guests if remaining_slots <= 0

  previous_guests + eligible_guests_in(records_path, remaining_slots)
end

#fetch_max_sessionsObject



196
197
198
199
200
201
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 196

def fetch_max_sessions
  fetcher = SpreeCmCommissioner::WaitingRoomSystemMetadataFetcher.new(firestore: firestore)
  fetcher.load_document_data

  fetcher.max_sessions_count_with_min
end

#firestoreObject



203
204
205
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 203

def firestore
  @firestore ||= Google::Cloud::Firestore.new(project_id: [:project_id], credentials: )
end

#lobby_dataObject



188
189
190
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 188

def lobby_data
  @lobby_data ||= lobby_document.get.data
end

#lobby_documentObject



192
193
194
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 192

def lobby_document
  @lobby_document ||= firestore.col('waiting_rooms').doc('lobby')
end

#mark_as(full:, available_slots:, has_queue:, avg_wait_to_enter_seconds: nil, avg_queue_to_enter_seconds: nil, recent_slots_per_call: nil, active_sessions: nil, max_sessions: nil, logs: nil) ⇒ Object

merge: true so we preserve the published waiting_guests_records_path on the lobby doc. App-facing fields plus live capacity state (full / available_slots / active_sessions / max_sessions / ETAs) stay top-level; this run's observability snapshot is namespaced under logs.waiting_guests_caller_job (see #call_logs). merge: true deep-merges nested maps, so each job owns its own key under logs without clobbering the others.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 128

def mark_as( # rubocop:disable Metrics/ParameterLists
  full:,
  available_slots:,
  has_queue:,
  avg_wait_to_enter_seconds: nil,
  avg_queue_to_enter_seconds: nil,
  recent_slots_per_call: nil,
  active_sessions: nil,
  max_sessions: nil,
  logs: nil
)
  data = { full: full, available_slots: available_slots, has_queue: has_queue }

  data[:avg_wait_to_enter_seconds] = avg_wait_to_enter_seconds if avg_wait_to_enter_seconds
  data[:avg_queue_to_enter_seconds] = avg_queue_to_enter_seconds if avg_queue_to_enter_seconds
  data[:recent_slots_per_call] = recent_slots_per_call if recent_slots_per_call
  data[:active_sessions] = active_sessions unless active_sessions.nil?
  data[:max_sessions] = max_sessions unless max_sessions.nil?
  data[:logs] = { waiting_guests_caller_job: logs } if logs

  lobby_document.set(data, merge: true)
end

#previous_dateObject



119
120
121
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 119

def previous_date
  1.day.ago.strftime('%Y-%m-%d')
end

#previous_records_pathObject

Drain target is derived from the server date, never the (possibly stale) lobby pointer.



91
92
93
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 91

def previous_records_path
  default_records_path(previous_date)
end

#records_pathObject

Published path is authoritative; fall back to the server's own date if not yet published.



86
87
88
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 86

def records_path
  lobby_data&.dig(:waiting_guests_records_path).presence || default_records_path(current_date)
end

#service_accountObject



207
208
209
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 207

def 
  @service_account ||= Rails.application.credentials.
end

#waiting_exists?(records_paths) ⇒ Boolean

Is anyone still waiting across these partitions? A yes/no (limit 1), so we stop at the first hit instead of counting. uniq folds the pre-flip case (both paths == the same partition before the lobby pointer flips) into a single query, matching #fetch_long_waiting_guests's guard. Order matters for read count only (not correctness): yesterday-first exits after one read on the cross-midnight stragglers the caller drains first.

Returns:

  • (Boolean)


76
77
78
79
80
81
82
83
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 76

def waiting_exists?(records_paths)
  records_paths.uniq.any? do |records_path|
    firestore.col(records_path)
             .where('allow_to_enter_room_at', '==', nil)
             .limit(1)
             .get.any?
  end
end