Class: SpreeCmCommissioner::WaitingGuestsCaller
- Inherits:
-
BaseInteractor
- Object
- BaseInteractor
- SpreeCmCommissioner::WaitingGuestsCaller
- Defined in:
- app/interactors/spree_cm_commissioner/waiting_guests_caller.rb
Constant Summary collapse
- MIN_WAIT_TO_ENTER_SECONDS =
10 min floor — Waiting Room step (full journey, most uncertainty)
(ENV['WAITING_ROOM_MIN_WAIT_TO_ENTER_SECONDS'] || 600).to_i
- MIN_QUEUE_TO_ENTER_SECONDS =
5 min floor — Queue step (position assigned, sub-segment of the journey)
(ENV['WAITING_ROOM_MIN_QUEUE_TO_ENTER_SECONDS'] || 300).to_i
- FIRESTORE_BATCH_SIZE =
Firestore bounds a batch update by payload size (10 MiB); 500 ops/commit leaves us far under that.
(ENV['WAITING_ROOM_FIRESTORE_BATCH_SIZE'] || 500).to_i
Instance Method Summary collapse
- #call ⇒ Object
-
#call_logs(started_at:, called_count:) ⇒ Object
Point-in-time snapshot (overwritten each run, never cumulative) of this caller run: last-run timing (heartbeat) and how many guests were called.
-
#calling_all(waiting_guests) ⇒ Object
For alert waiting guests to enter room, we just update :allow_to_enter_room_at.
-
#compute_avg_queue_to_enter_seconds(guests) ⇒ Object
Average queue wait (position_set_at → allow_to_enter_room_at) for just-called guests.
-
#compute_avg_wait_to_enter_seconds(guests) ⇒ Object
Average total wait (queued_at → allow_to_enter_room_at) for just-called guests.
- #current_date ⇒ Object
- #default_records_path(date) ⇒ Object
- #eligible_guests_in(records_path, limit) ⇒ Object
- #fetch_available_slots ⇒ Object
-
#fetch_long_waiting_guests(available_slots) ⇒ Object
This query requires an index; create it in Firebase beforehand.
- #fetch_max_sessions ⇒ Object
- #firestore ⇒ Object
- #lobby_data ⇒ Object
- #lobby_document ⇒ Object
-
#mark_as(full:, available_slots:, has_queue:, avg_wait_to_enter_seconds: nil, avg_queue_to_enter_seconds: nil, recent_slots_per_call: nil, active_sessions: nil, max_sessions: nil, logs: nil) ⇒ Object
merge: true so we preserve the published
waiting_guests_records_pathon the lobby doc. - #previous_date ⇒ Object
-
#previous_records_path ⇒ Object
Drain target is derived from the server date, never the (possibly stale) lobby pointer.
-
#records_path ⇒ Object
Published path is authoritative; fall back to the server's own date if not yet published.
- #service_account ⇒ Object
-
#waiting_exists?(records_paths) ⇒ Boolean
Is anyone still waiting across these partitions? A yes/no (limit 1), so we stop at the first hit instead of counting.
Instance Method Details
#call ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 15 def call started_at = Time.zone.now available_slots = fetch_available_slots # Always run through to mark_as — even with no slots — so the lobby keeps a fresh heartbeat # (logs.waiting_guests_caller_job) during peak/full periods instead of going stale # and looking like a dead cron. long_waiting_guests = available_slots.positive? ? fetch_long_waiting_guests(available_slots) : [] calling_all(long_waiting_guests) if long_waiting_guests.any? mark_as( full: long_waiting_guests.size >= available_slots, available_slots: available_slots - long_waiting_guests.size, has_queue: waiting_exists?([previous_records_path, records_path]), avg_wait_to_enter_seconds: compute_avg_wait_to_enter_seconds(long_waiting_guests), avg_queue_to_enter_seconds: compute_avg_queue_to_enter_seconds(long_waiting_guests), recent_slots_per_call: available_slots.positive? ? available_slots : nil, active_sessions: @active_sessions, max_sessions: @max_sessions, logs: call_logs(started_at: started_at, called_count: long_waiting_guests.size) ) end |
#call_logs(started_at:, called_count:) ⇒ Object
Point-in-time snapshot (overwritten each run, never cumulative) of this caller run: last-run timing (heartbeat) and how many guests were called. Capacity state (active/max sessions, available_slots) lives top-level, not here.
154 155 156 157 158 159 160 161 162 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 154 def call_logs(started_at:, called_count:) finished_at = Time.zone.now { called_count: called_count, last_started_at: started_at, last_finished_at: finished_at, last_duration_ms: ((finished_at - started_at) * 1000).round } end |
#calling_all(waiting_guests) ⇒ Object
For alert waiting guests to enter room, we just update :allow_to_enter_room_at. App will listen to firebase & start refresh session token to enter room.
Commit in Firestore batches (chunks of FIRESTORE_BATCH_SIZE) instead of one update per guest, so e.g. 1000 guests = 2 commits, not 1000 round-trips. update merges, so we only send the changed field and leave the rest of each doc intact.
105 106 107 108 109 110 111 112 113 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 105 def calling_all(waiting_guests) allow_at = Time.zone.now waiting_guests.each_slice(FIRESTORE_BATCH_SIZE) do |slice| firestore.batch do |b| slice.each { |document| b.update(document.ref, { allow_to_enter_room_at: allow_at }) } end end end |
#compute_avg_queue_to_enter_seconds(guests) ⇒ Object
Average queue wait (position_set_at → allow_to_enter_room_at) for just-called guests. Used for the Queue step ETA: how long after getting a position until I actually enter. Returns nil when none of the called guests had position_set_at (first run, or no prior stamp).
177 178 179 180 181 182 183 184 185 186 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 177 def compute_avg_queue_to_enter_seconds(guests) return nil if guests.empty? now = Time.zone.now positioned = guests.select { |doc| doc.data[:position_set_at] } return nil if positioned.empty? total = positioned.sum { |doc| (now - doc.data[:position_set_at].to_time).to_f } [(total / positioned.size).round, MIN_QUEUE_TO_ENTER_SECONDS].max end |
#compute_avg_wait_to_enter_seconds(guests) ⇒ Object
Average total wait (queued_at → allow_to_enter_room_at) for just-called guests. Used for the Waiting Room step ETA: how long until I get in from when I joined.
166 167 168 169 170 171 172 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 166 def compute_avg_wait_to_enter_seconds(guests) return nil if guests.empty? now = Time.zone.now total = guests.sum { |doc| (now - doc.data[:queued_at].to_time).to_f } [(total / guests.size).round, MIN_WAIT_TO_ENTER_SECONDS].max end |
#current_date ⇒ Object
115 116 117 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 115 def current_date Time.zone.now.strftime('%Y-%m-%d') end |
#default_records_path(date) ⇒ Object
95 96 97 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 95 def default_records_path(date) "waiting_guests/#{date}/records" end |
#eligible_guests_in(records_path, limit) ⇒ Object
63 64 65 66 67 68 69 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 63 def eligible_guests_in(records_path, limit) firestore.col(records_path) .where('allow_to_enter_room_at', '==', nil) .order('queued_at') .limit(limit) .get.to_a end |
#fetch_available_slots ⇒ Object
38 39 40 41 42 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 38 def fetch_available_slots @max_sessions = fetch_max_sessions @active_sessions = SpreeCmCommissioner::WaitingRoomSession.active.count [@max_sessions - @active_sessions, 0].max end |
#fetch_long_waiting_guests(available_slots) ⇒ Object
This query requires an index; create it in Firebase beforehand. Client must create waiting_guests documents with :queued_at and :allow_to_enter_room_at set to nil to allow filter + order queries.
Yesterday's guests are always older than today's, so fill from yesterday first, then use any leftover slots for today. This way no one queued before the midnight rollover gets skipped. e.g. 5 slots, 2 waiting in yesterday -> take both, then take 3 from today.
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 50 def fetch_long_waiting_guests(available_slots) previous_guests = eligible_guests_in(previous_records_path, available_slots) # Pre-flip window: the lobby pointer still points at yesterday, so both paths resolve to the # same partition — return now to avoid querying (and double-counting) it twice. return previous_guests if records_path == previous_records_path remaining_slots = available_slots - previous_guests.size return previous_guests if remaining_slots <= 0 previous_guests + eligible_guests_in(records_path, remaining_slots) end |
#fetch_max_sessions ⇒ Object
196 197 198 199 200 201 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 196 def fetch_max_sessions fetcher = SpreeCmCommissioner::WaitingRoomSystemMetadataFetcher.new(firestore: firestore) fetcher.load_document_data fetcher.max_sessions_count_with_min end |
#firestore ⇒ Object
203 204 205 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 203 def firestore @firestore ||= Google::Cloud::Firestore.new(project_id: service_account[:project_id], credentials: service_account) end |
#lobby_data ⇒ Object
188 189 190 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 188 def lobby_data @lobby_data ||= lobby_document.get.data end |
#lobby_document ⇒ Object
192 193 194 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 192 def lobby_document @lobby_document ||= firestore.col('waiting_rooms').doc('lobby') end |
#mark_as(full:, available_slots:, has_queue:, avg_wait_to_enter_seconds: nil, avg_queue_to_enter_seconds: nil, recent_slots_per_call: nil, active_sessions: nil, max_sessions: nil, logs: nil) ⇒ Object
merge: true so we preserve the published waiting_guests_records_path on the lobby doc.
App-facing fields plus live capacity state (full / available_slots / active_sessions /
max_sessions / ETAs) stay top-level; this run's observability snapshot is namespaced under
logs.waiting_guests_caller_job (see #call_logs). merge: true deep-merges nested maps, so
each job owns its own key under logs without clobbering the others.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 128 def mark_as( # rubocop:disable Metrics/ParameterLists full:, available_slots:, has_queue:, avg_wait_to_enter_seconds: nil, avg_queue_to_enter_seconds: nil, recent_slots_per_call: nil, active_sessions: nil, max_sessions: nil, logs: nil ) data = { full: full, available_slots: available_slots, has_queue: has_queue } data[:avg_wait_to_enter_seconds] = avg_wait_to_enter_seconds if avg_wait_to_enter_seconds data[:avg_queue_to_enter_seconds] = avg_queue_to_enter_seconds if avg_queue_to_enter_seconds data[:recent_slots_per_call] = recent_slots_per_call if recent_slots_per_call data[:active_sessions] = active_sessions unless active_sessions.nil? data[:max_sessions] = max_sessions unless max_sessions.nil? data[:logs] = { waiting_guests_caller_job: logs } if logs lobby_document.set(data, merge: true) end |
#previous_date ⇒ Object
119 120 121 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 119 def previous_date 1.day.ago.strftime('%Y-%m-%d') end |
#previous_records_path ⇒ Object
Drain target is derived from the server date, never the (possibly stale) lobby pointer.
91 92 93 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 91 def previous_records_path default_records_path(previous_date) end |
#records_path ⇒ Object
Published path is authoritative; fall back to the server's own date if not yet published.
86 87 88 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 86 def records_path lobby_data&.dig(:waiting_guests_records_path).presence || default_records_path(current_date) end |
#service_account ⇒ Object
207 208 209 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 207 def service_account @service_account ||= Rails.application.credentials.cloud_firestore_service_account end |
#waiting_exists?(records_paths) ⇒ Boolean
Is anyone still waiting across these partitions? A yes/no (limit 1), so we stop at the first hit instead of counting. uniq folds the pre-flip case (both paths == the same partition before the lobby pointer flips) into a single query, matching #fetch_long_waiting_guests's guard. Order matters for read count only (not correctness): yesterday-first exits after one read on the cross-midnight stragglers the caller drains first.
76 77 78 79 80 81 82 83 |
# File 'app/interactors/spree_cm_commissioner/waiting_guests_caller.rb', line 76 def waiting_exists?(records_paths) records_paths.uniq.any? do |records_path| firestore.col(records_path) .where('allow_to_enter_room_at', '==', nil) .limit(1) .get.any? end end |