Class: GtfsDf::Feed

Inherits:
Object
  • Object
show all
Defined in:
lib/gtfs_df/feed.rb

Constant Summary collapse

GTFS_FILES =
%w[
  agency
  stops
  routes
  trips
  stop_times
  calendar
  calendar_dates
  pathways
  levels
  feed_info
  shapes
  frequencies
  transfers
  fare_attributes
  fare_rules
  attributions
  translations
  stop_areas
  stop_attributes
  rider_categories
  fare_media
  fare_products
  fare_leg_rules
  fare_leg_join_rules
  fare_transfer_rules
  areas
  networks
  route_networks
  location_groups
  location_group_stops
  booking_rules
].freeze
REQUIRED_GTFS_FILES =

Initialize with a hash of DataFrames

%w[agency stops routes trips stop_times].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data = {}, parse_times: false) ⇒ Feed

Returns a new instance of Feed.

Parameters:

  • data (Hash) (defaults to: {})

    Hash of DataFrames for each GTFS file

  • parse_times (Boolean) (defaults to: false)

    Whether to parse time fields to seconds since midnight (default: false)



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/gtfs_df/feed.rb', line 48

def initialize(data = {}, parse_times: false)
  @parse_times = parse_times

  missing = REQUIRED_GTFS_FILES.reject { |file| data[file].is_a?(Polars::DataFrame) }
  # At least one of calendar or calendar_dates must be present
  unless data["calendar"].is_a?(Polars::DataFrame) || data["calendar_dates"].is_a?(Polars::DataFrame)
    missing << "calendar.txt or calendar_dates.txt"
  end
  unless missing.empty?
    raise GtfsDf::Error, "Missing required GTFS files: #{missing.map do |f|
      f.end_with?(".txt") ? f : f + ".txt"
    end.join(", ")}"
  end

  @graph = GtfsDf::Graph.build

  GTFS_FILES.each do |file|
    df = data[file]
    schema_class_name = file.split("_").map(&:capitalize).join
    schema_class = begin
      GtfsDf::Schema.const_get(schema_class_name)
    rescue
      nil
    end
    if df.is_a?(Polars::DataFrame) && schema_class && schema_class.const_defined?(:SCHEMA)
      df = schema_class.new(df).df
      # Parse time fields if enabled and they're still strings
      if @parse_times && schema_class.respond_to?(:time_fields)
        time_fields = schema_class.time_fields
        time_fields.each do |field|
          next unless df.columns.include?(field)
          # Only parse if the field is still a string (not already parsed)
          if df[field].dtype == Polars::String
            df = df.with_columns(
              GtfsDf::Utils.as_seconds_since_midnight(field)
            )
          end
        end
      end
    end
    instance_variable_set("@#{file}", df.is_a?(Polars::DataFrame) ? df : nil)
  end
end

Instance Attribute Details

#graphObject (readonly)

Returns the value of attribute graph.



41
42
43
# File 'lib/gtfs_df/feed.rb', line 41

def graph
  @graph
end

#parse_timesObject

Returns the value of attribute parse_times.



40
41
42
# File 'lib/gtfs_df/feed.rb', line 40

def parse_times
  @parse_times
end

Instance Method Details

#[](file_name) ⇒ dataframe

Utility method for getting a dataframe, e.g. feed

Parameters:

  • file (string)

    name

Returns:

  • (dataframe)


165
166
167
# File 'lib/gtfs_df/feed.rb', line 165

def [](file_name)
  send(file_name)
end

#[]=(file_name, value) ⇒ Object

Utility method for setting a dataframe, e.g. feed = new_dataframe

Parameters:

  • file (string)

    name



173
174
175
# File 'lib/gtfs_df/feed.rb', line 173

def []=(file_name, value)
  send("#{file_name}=", value)
end

#busiest_weekDate

Identifies the start date of the busiest week in the feed by trip count.

Returns:

  • (Date)

    The Monday of the busiest week



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/gtfs_df/feed.rb', line 321

def busiest_week
  daily_total = trip_count_dates
  return nil if daily_total.nil? || daily_total.height == 0

  # Group by week (ISO week, starting Monday)
  weekly_agg = daily_total
    .with_columns(Polars.col("date").dt.truncate("1w").alias("week_start"))
    .group_by("week_start")
    .agg(Polars.col("count").sum.alias("total_trips"))

  # Get the week with max trips
  # Sort by total_trips descending, then date ascending to pick the earliest date in case of a tie
  sorted_weeks = weekly_agg.sort(["total_trips", "week_start"], descending: [true, false])
  best_week = sorted_weeks.head(1)

  return nil if best_week.height == 0

  # Return the start date of the busiest week
  best_week["week_start"][0]
end

#by_dataframe_name{file_name => dataframe}

Utility method that returns a hash of dataframes by file name

Returns:

  • ({file_name => dataframe})


154
155
156
157
158
159
# File 'lib/gtfs_df/feed.rb', line 154

def by_dataframe_name
  GTFS_FILES.filter_map do |file|
    dataframe = send(file)
    dataframe ? [file, dataframe] : nil
  end.to_h
end

#filter(view, filter_only_children: false) ⇒ Object

Filter the feed using a view hash

Parameters:

  • view (Hash)

    The view used to filter the feed, with format { file => filters }. Example view: { ‘routes’ => { ‘route_id’ => ‘123’ }, ‘trips’ => { ‘service_id’ => ‘A’ } }

  • filter_only_children (Boolean) (defaults to: false)

    Whether only child dependencies should be pruned. When false, we:

    • Treat trips as the atomic unit of GTFS. Therefore, if we filter to one stop referenced by TripA, we will preserve _all stops_ referenced by TripA.

    • Prune unreferenced parent objects (e.g. route is a parent of trip. Unreferenced routes will be pruned.)

    When true we:

    • Do not treat trips as atomic. I can filter stopA without maintaining other stops for trips that reference it.

    • Only filter child objects



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/gtfs_df/feed.rb', line 106

def filter(view, filter_only_children: false)
  filtered = {}

  GTFS_FILES.each do |file|
    df = send(file)
    next unless df

    filtered[file] = df
  end

  if filter_only_children
    view.each do |file, filters|
      filtered = filter!(file, filters, filtered.dup, filter_only_children: true)
    end
  else
    # Trips are the atomic unit of GTFS, we will generate a new view
    # based on the set of trips that would be included for each invidual filter
    # and cascade changes from this view in order to retain referential integrity
    trip_ids = Polars::Series.new.alias("trip_id")

    view.each do |file, filters|
      new_filtered = filter!(file, filters, filtered.dup)
      trip_ids = if trip_ids.empty?
        new_filtered["trips"]["trip_id"]
      else
        trip_ids.filter(trip_ids.is_in(new_filtered["trips"]["trip_id"].implode))
      end
    end

    if trip_ids
      filtered = filter!("trips", {"trip_id" => trip_ids}, filtered.dup)
    end
  end

  # Remove files that are empty, but keep required files even if empty
  filtered.delete_if do |file, df|
    is_required_file = REQUIRED_GTFS_FILES.include?(file) ||
      file == "calendar" && !filtered["calendar_dates"] ||
      file == "calendar_dates" && !filtered["calendar"]

    (!df || df.height == 0) && !is_required_file
  end
  self.class.new(filtered, parse_times: @parse_times)
end

#frequency_based_trip_countsPolars::DataFrame

Returns a DataFrame of trip counts from the frequencies table Columns: [trip_id, freq_count]

Returns:

  • (Polars::DataFrame)


290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/gtfs_df/feed.rb', line 290

def frequency_based_trip_counts
  # If the feed was initialized with the parse_times flag, we already have
  # seconds since midnight in these columns, otherwise we need to convert
  # them first, so we can get the duration in seconds
  end_time_seconds_col, start_time_seconds_col = if @parse_times
    [Polars.col("end_time"), Polars.col("start_time")]
  else
    [
      GtfsDf::Utils.as_seconds_since_midnight("end_time"),
      GtfsDf::Utils.as_seconds_since_midnight("start_time")
    ]
  end

  duration_seconds = (end_time_seconds_col - start_time_seconds_col).alias("duration_seconds")
  count = (duration_seconds / Polars.col("headway_secs")).floor.sum.alias("freq_count")

  # The frequencies table is optional, we default to an empty dataframe to
  # remove friction in the join with trips.
  if @frequencies
    @frequencies.group_by("trip_id").agg(count).select("trip_id", "freq_count")
  else
    Polars::DataFrame.new(
      {"trip_id" => [], "freq_count" => []},
      schema: {"trip_id" => Polars::String, "freq_count" => Polars::Float64}
    )
  end
end

#service_datesPolars::DataFrame

Returns a DataFrame of all service_id/date pairs active in the feed. Columns: [date, service_id]

Returns:

  • (Polars::DataFrame)


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/gtfs_df/feed.rb', line 181

def service_dates
  start_date_col = Polars.col("start_date")
  end_date_col = Polars.col("end_date")
  date_col = Polars.col("date")

  calendar_df = @calendar&.with_columns(
    GtfsDf::Utils.parse_date(start_date_col),
    GtfsDf::Utils.parse_date(end_date_col)
  )

  calendar_dates_df = @calendar_dates&.with_columns(
    GtfsDf::Utils.parse_date(date_col)
  )

  # Expand calendar to a range of (service_id, date)
  services_by_date = nil
  if calendar_df
    expanded = calendar_df.with_columns(
      Polars.date_ranges(start_date_col, end_date_col, "1d").alias("date")
    ).explode("date")

    dow_col_names = [
      "monday",
      "tuesday",
      "wednesday",
      "thursday",
      "friday",
      "saturday",
      "sunday"
    ]

    # Each day in the calendar table defines if a day of the week has service or not
    # 1 - Service is available for all Mondays in the date range.
    # 0 - Service is not available for Mondays in the date range.
    # https://gtfs.org/documentation/schedule/reference/#calendartxt
    #
    # This filter will be applied to the expanded calendar dates, where the
    # ranges become rows of individual dates, we need to ensure that each
    # individual date matches the day of the week (DOW) before we check if
    # it's enabled.
    filter_expr = dow_col_names.each_with_index.reduce(Polars.lit(false)) do |expr, (dow_col_name, idx)|
      # Polars weekday: Monday=1, Sunday=7
      expr | ((Polars.col("date").dt.weekday == (idx + 1)) & (Polars.col(dow_col_name) == "1"))
    end

    services_by_date = expanded.filter(filter_expr).select("date", "service_id")
  end

  # Apply calendar_dates exceptions
  if calendar_dates_df
    exception_type_col = Polars.col("exception_type")

    additions = calendar_dates_df
      .filter(exception_type_col == "1")
      .select("date", "service_id")

    subtractions = calendar_dates_df
      .filter(exception_type_col == "2")
      .select("date", "service_id")

    services_by_date = if services_by_date
      # If we found service dates from the calendar table, we need to first
      # add the inclusions, then remove the exceptions coming from the calendar_dates
      services_by_date
        .vstack(additions).unique
        .join(subtractions, on: ["service_id", "date"], how: "anti")
    else
      # Otherwise, we can just use the additions as the new services_by_date
      additions.unique
    end
  end

  services_by_date
end

#trip_count_datesPolars::DataFrame

Returns a DataFrame of trip counts per date. Columns: [date, count]

Returns:

  • (Polars::DataFrame)


260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/gtfs_df/feed.rb', line 260

def trip_count_dates
  cached_service_dates = service_dates
  return nil if cached_service_dates.nil? || cached_service_dates.height == 0

  # This expression builds from the dataframe returned by frequency based
  # trip counts, defaulting to 1 for the trips that don't have an entry in
  # the frequencies table. We're defining the expression here just to
  # remove some noise from the join below.
  trip_size = Polars.coalesce("freq_count", Polars.lit(1)).alias("trip_size")

  # Count trips per service_id, considering the possible size they may have
  # from the frequencies table.
  trip_counts = @trips
    .join(frequency_based_trip_counts, on: "trip_id", how: "left")
    .group_by("service_id")
    .agg(trip_size.sum.alias("trip_count"))

  # Join to services to get trips per date
  daily_trips = cached_service_dates
    .join(trip_counts, on: "service_id", how: "left")
    .with_columns(Polars.col("trip_count").fill_null(0))

  # Sum trips per date
  daily_trips.group_by("date").agg(Polars.col("trip_count").sum.alias("count"))
end