Class: OpenC3::ActivityModel
- Defined in:
- lib/openc3/models/activity_model.rb
Constant Summary collapse
- MAX_DURATION =
Time::SEC_PER_DAY
- START_GRACE_SECONDS =
Grace window (in seconds) to allow creating activities slightly in the past. This handles race conditions where real-time activity notifications arrive after the start time has already passed (e.g. from external systems). This is consistent with the -15 second window in the timeline microservice.
15- PRIMARY_KEY =
MUST be equal to ‘TimelineModel::PRIMARY_KEY` minus the leading __
'__openc3_timelines'.freeze
- VALID_KINDS =
See run_activity(activity) in openc3/lib/openc3/microservices/timeline_microservice.rb
%w(command script reserve expire)
Instance Attribute Summary collapse
-
#data ⇒ Object
readonly
Returns the value of attribute data.
-
#events ⇒ Object
readonly
Returns the value of attribute events.
-
#fulfillment ⇒ Object
readonly
Returns the value of attribute fulfillment.
-
#kind ⇒ Object
readonly
Returns the value of attribute kind.
-
#recurring ⇒ Object
readonly
Returns the value of attribute recurring.
-
#start ⇒ Object
readonly
Returns the value of attribute start.
-
#stop ⇒ Object
readonly
Returns the value of attribute stop.
-
#uuid ⇒ Object
readonly
Returns the value of attribute uuid.
Attributes inherited from Model
#name, #plugin, #scope, #updated_at
Class Method Summary collapse
-
.activities(name:, scope:) ⇒ Array|nil
Called via the microservice this gets the previous 00:00:15 to 01:01:00.
-
.all(name:, scope:, limit: 100) ⇒ Array<Hash>
Array up to the limit of the models (as Hash objects) stored under the primary key.
-
.count(name:, scope:) ⇒ Integer
Count of the members stored under the primary key.
-
.destroy(name:, scope:, score:, uuid: nil, recurring: nil) ⇒ Integer
Remove one member from a sorted set.
-
.from_json(json, name:, scope:) ⇒ ActivityModel
Model generated from the passed JSON.
-
.get(name:, start:, stop:, scope:, limit: 100) ⇒ Array|nil
Array up to 100 of this model or empty array if name not found under primary_key.
-
.range_destroy(name:, scope:, min:, max:) ⇒ Integer
Remove members from min to max of the sorted set.
-
.score(name:, score:, scope:, uuid: nil) ⇒ String|nil
String of the saved json or nil if score not found under primary_key.
Instance Method Summary collapse
-
#add_event(status:, username: nil, changes: nil) ⇒ Object
add_event will make an event.
-
#as_json(*a) ⇒ Hash
Generated from the ActivityModel.
-
#commit(status:, message: nil, fulfillment: nil, timestamp: nil) ⇒ Object
commit will make an event and save the object to the redis database.
-
#create(overlap: true, username: nil) ⇒ Object
Update the Redis hash at primary_key and set the score equal to the start Epoch time the member is set to the JSON generated via calling as_json.
- #diff_field(changes, field, old_val, new_val) ⇒ Object
-
#initialize(name:, start:, stop:, kind:, data:, scope:, updated_at: 0, fulfillment: nil, uuid: nil, events: nil, recurring: {}) ⇒ ActivityModel
constructor
A new instance of ActivityModel.
-
#notify(kind:, extra: nil) ⇒ Object
update the redis stream / timeline topic that something has changed.
-
#set_input(start:, stop:, kind: nil, data: nil, uuid: nil, events: nil, fulfillment: nil, recurring: nil) ⇒ Object
Set the values of the instance, @start, @kind, @data, @events…
-
#update(start:, stop:, kind:, data:, overlap: true, username: nil) ⇒ Object
Update the Redis hash at primary_key and remove the current activity at the current score and update the score to the new score equal to the start Epoch time this uses a multi to execute both the remove and create.
-
#validate_input(start:, stop:, kind:, data:) ⇒ Object
validate the input to the rules we have created for timelines.
-
#validate_time(start, stop, ignore_score: nil) ⇒ Object
validate_time searches from the current activity @stop (exclusive because we allow overlap of stop with start) back through @start - MAX_DURATION.
Methods inherited from Model
#check_disable_erb, #deploy, #destroy, #destroyed?, #diff, filter, find_all_by_plugin, get_all_models, get_model, handle_config, names, set, store, store_queued, #undeploy
Constructor Details
#initialize(name:, start:, stop:, kind:, data:, scope:, updated_at: 0, fulfillment: nil, uuid: nil, events: nil, recurring: {}) ⇒ ActivityModel
Returns a new instance of ActivityModel.
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/openc3/models/activity_model.rb', line 165 def initialize( name:, # part of Model start:, stop:, kind:, data:, scope:, # part of Model updated_at: 0, # part of Model fulfillment: nil, uuid: nil, events: nil, recurring: {} ) super("#{scope}#{PRIMARY_KEY}__#{name}", name: name, scope: scope) # Validate everything that isn't already in Model set_input( start: start, stop: stop, kind: kind, data: data, fulfillment: fulfillment, uuid: uuid, events: events, recurring: recurring, ) @updated_at = updated_at end |
Instance Attribute Details
#data ⇒ Object (readonly)
Returns the value of attribute data.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def data @data end |
#events ⇒ Object (readonly)
Returns the value of attribute events.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def events @events end |
#fulfillment ⇒ Object (readonly)
Returns the value of attribute fulfillment.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def fulfillment @fulfillment end |
#kind ⇒ Object (readonly)
Returns the value of attribute kind.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def kind @kind end |
#recurring ⇒ Object (readonly)
Returns the value of attribute recurring.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def recurring @recurring end |
#start ⇒ Object (readonly)
Returns the value of attribute start.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def start @start end |
#stop ⇒ Object (readonly)
Returns the value of attribute stop.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def stop @stop end |
#uuid ⇒ Object (readonly)
Returns the value of attribute uuid.
163 164 165 |
# File 'lib/openc3/models/activity_model.rb', line 163 def uuid @uuid end |
Class Method Details
.activities(name:, scope:) ⇒ Array|nil
Called via the microservice this gets the previous 00:00:15 to 01:01:00. This should allow for a small buffer around the timeline to make sure the schedule doesn’t get stale. 00:00:15 was selected as the schedule queue used in the microservice has round robin array with 15 slots to make sure we don’t miss a planned task.
46 47 48 49 50 51 52 |
# File 'lib/openc3/models/activity_model.rb', line 46 def self.activities(name:, scope:) now = Time.now.to_f start_score = now - 15 stop_score = (now + 3660) array = Store.zrangebyscore("#{scope}#{PRIMARY_KEY}__#{name}", start_score, stop_score) return array.map { |value| ActivityModel.from_json(value, name: name, scope: scope) } end |
.all(name:, scope:, limit: 100) ⇒ Array<Hash>
Returns Array up to the limit of the models (as Hash objects) stored under the primary key.
64 65 66 67 |
# File 'lib/openc3/models/activity_model.rb', line 64 def self.all(name:, scope:, limit: 100) array = Store.zrange("#{scope}#{PRIMARY_KEY}__#{name}", 0, -1, :limit => [0, limit]) return array.map { |value| JSON.parse(value, allow_nan: true, create_additions: true) } end |
.count(name:, scope:) ⇒ Integer
Returns count of the members stored under the primary key.
86 87 88 |
# File 'lib/openc3/models/activity_model.rb', line 86 def self.count(name:, scope:) return Store.zcard("#{scope}#{PRIMARY_KEY}__#{name}") end |
.destroy(name:, scope:, score:, uuid: nil, recurring: nil) ⇒ Integer
Remove one member from a sorted set.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/openc3/models/activity_model.rb', line 92 def self.destroy(name:, scope:, score:, uuid: nil, recurring: nil) result = 0 # Delete all recurring activities if recurring activity = self.score(name: name, score: score, scope: scope) if activity and activity.recurring['end'] and activity.recurring['uuid'] json = Store.zrangebyscore("#{scope}#{PRIMARY_KEY}__#{name}", activity.recurring['start'], activity.recurring['end']) parsed = json.map { |value| ActivityModel.from_json(value, name: name, scope: scope) } parsed.each_with_index do |value, index| if value.recurring['uuid'] == uuid Store.zrem("#{scope}#{PRIMARY_KEY}__#{name}", json[index]) result += 1 end end end end # First find all the activities at the score json = Store.zrangebyscore("#{scope}#{PRIMARY_KEY}__#{name}", score, score, :limit => [0, 100]) parsed = json.map { |value| JSON.parse(value, allow_nan: true, create_additions: true) } parsed.each_with_index do |value, index| if uuid # If the uuid is given then only delete activities that match the uuid if value['uuid'] == uuid Store.zrem("#{scope}#{PRIMARY_KEY}__#{name}", json[index]) result += 1 break end else # If the uuid is not given (backwards compatibility) then delete all activities # at the score that do NOT have a uuid next if value['uuid'] Store.zrem("#{scope}#{PRIMARY_KEY}__#{name}", json[index]) result += 1 end end notification = { # start / stop to match SortedModel 'data' => JSON.generate({'start' => score, 'uuid' => uuid}, allow_nan: true), 'kind' => 'deleted', 'type' => 'activity', 'timeline' => name } TimelineTopic.write_activity(notification, scope: scope) return result end |
.from_json(json, name:, scope:) ⇒ ActivityModel
Returns Model generated from the passed JSON.
157 158 159 160 161 |
# File 'lib/openc3/models/activity_model.rb', line 157 def self.from_json(json, name:, scope:) json = JSON.parse(json, allow_nan: true, create_additions: true) if String === json raise "json data is nil" if json.nil? self.new(**json.transform_keys(&:to_sym), name: name, scope: scope) end |
.get(name:, start:, stop:, scope:, limit: 100) ⇒ Array|nil
Returns Array up to 100 of this model or empty array if name not found under primary_key.
55 56 57 58 59 60 61 |
# File 'lib/openc3/models/activity_model.rb', line 55 def self.get(name:, start:, stop:, scope:, limit: 100) if start > stop raise ActivityInputError.new "start: #{start} must be <= stop: #{stop}" end array = Store.zrangebyscore("#{scope}#{PRIMARY_KEY}__#{name}", start, stop, :limit => [0, limit]) return array.map { |value| JSON.parse(value, allow_nan: true, create_additions: true) } end |
.range_destroy(name:, scope:, min:, max:) ⇒ Integer
Remove members from min to max of the sorted set.
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/openc3/models/activity_model.rb', line 143 def self.range_destroy(name:, scope:, min:, max:) result = Store.zremrangebyscore("#{scope}#{PRIMARY_KEY}__#{name}", min, max) notification = { # start / stop to match SortedModel 'data' => JSON.generate({'start' => min, 'stop' => max}, allow_nan: true), 'kind' => 'deleted', 'type' => 'activity', 'timeline' => name } TimelineTopic.write_activity(notification, scope: scope) return result end |
.score(name:, score:, scope:, uuid: nil) ⇒ String|nil
Returns String of the saved json or nil if score not found under primary_key.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/openc3/models/activity_model.rb', line 70 def self.score(name:, score:, scope:, uuid: nil) values = Store.zrangebyscore("#{scope}#{PRIMARY_KEY}__#{name}", score, score) if values and values.length > 0 if uuid values.each do |value| activity = ActivityModel.from_json(value, name: name, scope: scope) return activity if activity.uuid == uuid end else return ActivityModel.from_json(values[0], name: name, scope: scope) end end return nil end |
Instance Method Details
#add_event(status:, username: nil, changes: nil) ⇒ Object
add_event will make an event. This will NOT save the object to the redis database
426 427 428 429 430 431 432 433 434 |
# File 'lib/openc3/models/activity_model.rb', line 426 def add_event(status:, username: nil, changes: nil) event = { 'time' => Time.now.to_i, 'event' => status } event['username'] = username if username event['changes'] = changes if changes @events << event end |
#as_json(*a) ⇒ Hash
Returns generated from the ActivityModel.
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/openc3/models/activity_model.rb', line 461 def as_json(*a) { 'name' => @name, 'updated_at' => @updated_at, 'start' => @start, 'stop' => @stop, 'kind' => @kind, 'data' => @data.as_json(*a), 'scope' => @scope, 'fulfillment' => @fulfillment, 'uuid' => @uuid, 'events' => @events, 'recurring' => @recurring.as_json(*a) } end |
#commit(status:, message: nil, fulfillment: nil, timestamp: nil) ⇒ Object
commit will make an event and save the object to the redis database
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 |
# File 'lib/openc3/models/activity_model.rb', line 399 def commit(status:, message: nil, fulfillment: nil, timestamp: nil) event = { 'time' => ? .to_i : Time.now.to_i, 'event' => status, 'commit' => true } event['message'] = unless .nil? @fulfillment = fulfillment.nil? ? @fulfillment : fulfillment @events << event json = Store.zrangebyscore(@primary_key, @start, @start) parsed = json.map { |value| JSON.parse(value, allow_nan: true, create_additions: true) } parsed.each_with_index do |value, index| if value['uuid'] == @uuid Store.multi do |multi| multi.zrem(@primary_key, json[index]) multi.zadd(@primary_key, @start, JSON.generate(self.as_json, allow_nan: true)) end end end notify(kind: 'event') end |
#create(overlap: true, username: nil) ⇒ Object
Update the Redis hash at primary_key and set the score equal to the start Epoch time the member is set to the JSON generated via calling as_json
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/openc3/models/activity_model.rb', line 270 def create(overlap: true, username: nil) # Validate that the timeline exists in this scope before creating activities. # Activities must be attached to an existing timeline within the same scope. unless TimelineModel.get(name: @name, scope: @scope) raise ActivityError.new "timeline '#{@name}' does not exist in scope '#{@scope}'" end if @recurring['end'] and @recurring['frequency'] and @recurring['span'] # First validate the initial recurring activity ... all others are just offsets validate_input(start: @start, stop: @stop, kind: @kind, data: @data) # Create a uuid for deleting related recurring in the future @recurring['uuid'] = SecureRandom.uuid @recurring['start'] = @start duration = @stop - @start recurrence = 0 case @recurring['span'] when 'minutes' recurrence = @recurring['frequency'].to_i * 60 when 'hours' recurrence = @recurring['frequency'].to_i * 3600 when 'days' recurrence = @recurring['frequency'].to_i * 86400 end unless overlap # Get all the existing events in the recurring time range as well as those before # the start of the recurring time range to ensure we don't start inside an existing event existing = Store.zrevrangebyscore(@primary_key, @recurring['end'] - 1, @recurring['start'] - MAX_DURATION) existing.map! {|value| JSON.parse(value, allow_nan: true, create_additions: true) } end last_stop = nil # Update @updated_at and add an event assuming it all completes ok @updated_at = Time.now.to_nsec_from_epoch add_event(status: 'created', username: username) Store.multi do |multi| (@start..@recurring['end']).step(recurrence).each do |start_time| @start = start_time @stop = start_time + duration @uuid = SecureRandom.uuid if last_stop and @start < last_stop @events.pop # Remove previously created event raise ActivityOverlapError.new "Recurring activity overlap. Increase recurrence delta or decrease activity duration." end unless overlap existing.each do |value| if (@start >= value['start'] and @start < value['stop']) || (@stop > value['start'] and @stop <= value['stop']) @events.pop # Remove previously created event raise ActivityOverlapError.new "activity overlaps existing at #{value['start']}" end end end multi.zadd(@primary_key, @start, JSON.generate(self.as_json, allow_nan: true)) last_stop = @stop end end notify(kind: 'created') else validate_input(start: @start, stop: @stop, kind: @kind, data: @data) unless overlap # If we don't allow overlap we need to validate the time collision = validate_time(@start, @stop) unless collision.nil? raise ActivityOverlapError.new "activity overlaps existing at #{collision}" end end @updated_at = Time.now.to_nsec_from_epoch add_event(status: 'created', username: username) Store.zadd(@primary_key, @start, JSON.generate(self.as_json, allow_nan: true)) notify(kind: 'created') end end |
#diff_field(changes, field, old_val, new_val) ⇒ Object
436 437 438 |
# File 'lib/openc3/models/activity_model.rb', line 436 def diff_field(changes, field, old_val, new_val) changes[field] = {'old' => old_val, 'new' => new_val} unless old_val == new_val end |
#notify(kind:, extra: nil) ⇒ Object
update the redis stream / timeline topic that something has changed
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 |
# File 'lib/openc3/models/activity_model.rb', line 441 def notify(kind:, extra: nil) notification = { 'data' => JSON.generate(as_json, allow_nan: true), 'kind' => kind, 'type' => 'activity', 'timeline' => @name } if extra extra.each do |key, value| notification[key.to_s] = value end end begin TimelineTopic.write_activity(notification, scope: @scope) rescue StandardError => e raise ActivityError.new "Failed to write to stream: #{notification}, #{e}" end end |
#set_input(start:, stop:, kind: nil, data: nil, uuid: nil, events: nil, fulfillment: nil, recurring: nil) ⇒ Object
Set the values of the instance, @start, @kind, @data, @events…
255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/openc3/models/activity_model.rb', line 255 def set_input(start:, stop:, kind: nil, data: nil, uuid: nil, events: nil, fulfillment: nil, recurring: nil) kind = kind.to_s.downcase validate_input(start: start, stop: stop, kind: kind, data: data) @start = start @stop = stop @fulfillment = fulfillment.nil? ? false : fulfillment @kind = kind @data = data.nil? ? @data : data @uuid = uuid.nil? ? SecureRandom.uuid : uuid @events = events.nil? ? Array.new : events @recurring = recurring.nil? ? @recurring : recurring end |
#update(start:, stop:, kind:, data:, overlap: true, username: nil) ⇒ Object
Update the Redis hash at primary_key and remove the current activity at the current score and update the score to the new score equal to the start Epoch time this uses a multi to execute both the remove and create.
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/openc3/models/activity_model.rb', line 350 def update(start:, stop:, kind:, data:, overlap: true, username: nil) array = Store.zrangebyscore(@primary_key, @start, @start) if array.length == 0 raise ActivityError.new "failed to find activity at: #{@start}" end old_start = @start old_uuid = @uuid unless overlap # If we don't allow overlap we need to validate the time collision = validate_time(start, stop, ignore_score: old_start) unless collision.nil? raise ActivityOverlapError.new "failed to update #{old_start}, no activities can overlap, collision: #{collision}" end end # Compute changeset for audit trail before applying changes changes = {} diff_field(changes, 'start', @start, start) diff_field(changes, 'stop', @stop, stop) diff_field(changes, 'kind', @kind, kind) old_data = @data.reject { |k, _| k == 'username' } new_data = data.reject { |k, _| k == 'username' } (old_data.keys | new_data.keys).each do |key| diff_field(changes, "data.#{key}", old_data[key], new_data[key]) end set_input(start: start, stop: stop, kind: kind, data: data, events: @events) @updated_at = Time.now.to_nsec_from_epoch add_event(status: 'updated', username: username, changes: changes.empty? ? nil : changes) json = Store.zrangebyscore(@primary_key, old_start, old_start) parsed = json.map { |value| JSON.parse(value, allow_nan: true, create_additions: true) } parsed.each_with_index do |value, index| if value['uuid'] == old_uuid Store.multi do |multi| multi.zrem(@primary_key, json[index]) multi.zadd(@primary_key, @start, JSON.generate(self.as_json, allow_nan: true)) end end end notify(kind: 'updated', extra: {old_start: old_start, old_uuid: old_uuid}) return @start end |
#validate_input(start:, stop:, kind:, data:) ⇒ Object
validate the input to the rules we have created for timelines.
-
A task’s start MUST NOT be more than START_GRACE_SECONDS in the past.
-
A task’s start MUST be before the stop.
-
A task CAN NOT be longer than MAX_DURATION (86400) in seconds.
-
A task MUST have a kind.
-
A task MUST have a data object/hash.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/openc3/models/activity_model.rb', line 226 def validate_input(start:, stop:, kind:, data:) begin DateTime.strptime(start.to_s, '%s') DateTime.strptime(stop.to_s, '%s') rescue Date::Error raise ActivityInputError.new "start and stop must be seconds: #{start}, #{stop}" end now_f = Time.now.to_f begin duration = stop - start rescue NoMethodError raise ActivityInputError.new "start and stop must be seconds: #{start}, #{stop}" end if now_f >= start + START_GRACE_SECONDS and kind != 'expire' raise ActivityInputError.new "activity must not be more than #{START_GRACE_SECONDS} seconds in the past, current_time: #{now_f} vs #{start}" elsif duration > MAX_DURATION and kind != 'expire' raise ActivityInputError.new "activity can not be longer than #{MAX_DURATION} seconds" elsif duration <= 0 raise ActivityInputError.new "start: #{start} must be before stop: #{stop}" elsif !VALID_KINDS.include?(kind) raise ActivityInputError.new "unknown kind: #{kind}, must be one of #{VALID_KINDS.join(', ')}" elsif data.nil? raise ActivityInputError.new "data must not be nil: #{data}" elsif data.is_a?(Hash) == false raise ActivityInputError.new "data must be a json object/hash: #{data}" end end |
#validate_time(start, stop, ignore_score: nil) ⇒ Object
validate_time searches from the current activity @stop (exclusive because we allow overlap of stop with start) back through @start - MAX_DURATION. The method is trying to validate that this new activity does not overlap with anything else. The reason we search back past @start through MAX_DURATION is because we need to return all the activities that may start before us and verify that we don’t overlap them. Activities are only inserted by @start time so we need to go back to verify we don’t overlap existing @stop. Note: Score is the Seconds since the Unix Epoch: (%s) Number of seconds since 1970-01-01 00:00:00 UTC. zrange rev byscore finds activities from in reverse order so the first task is the closest task to the current score. In this case a parameter ignore_score allows the request to ignore that time and skip to the next time but if nothing is found in the time range we can return nil.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/openc3/models/activity_model.rb', line 204 def validate_time(start, stop, ignore_score: nil) # Adding a '(' makes the max value exclusive array = Store.zrevrangebyscore(@primary_key, "(#{stop}", start - MAX_DURATION) array.each do |value| activity = JSON.parse(value, allow_nan: true, create_additions: true) if ignore_score == activity['start'] next elsif activity['stop'] > start return activity['start'] else return nil end end return nil end |