10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/legion/extensions/metering/runners/rollup.rb', line 10
def rollup_hour(hour: nil, **)
return { status: 'skipped', reason: 'data_unavailable' } unless data_available?
hour = resolve_hour(hour)
hour_end = hour + 3600
records_ds = Legion::Data.connection[:metering_records]
.where(::Sequel.lit('recorded_at >= ? AND recorded_at < ?', hour, hour_end))
raw_count = records_ds.count
groups = records_ds.group_by { |r| [r[:worker_id], r[:provider], r[:model_id]] }
rollup_dataset = Legion::Data.connection[:metering_hourly_rollup]
rolled_up = 0
groups.each do |(w, p, m), rows|
rollup_row = build_rollup_row(w, p, m, hour, rows)
existing = rollup_dataset.where(worker_id: w, provider: p, model_id: m, hour: hour).first
if existing
rollup_dataset.where(id: existing[:id]).update(
rollup_row.except(:worker_id, :provider, :model_id, :hour)
)
else
rollup_dataset.insert(rollup_row)
end
rolled_up += 1
end
log.info("[metering] rollup_hour: hour=#{hour.iso8601} groups=#{rolled_up} raw_records=#{raw_count}")
{ rolled_up: rolled_up, hour: hour.iso8601, raw_records: raw_count }
end
|