Class: RepoTender::Sync::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/repo_tender/sync/engine.rb

Overview

The sync engine: one run that brings every tracked repo to the evergreen invariant (PRD §3.3). Splits observation (RepoPlan) from execution (this class). Bounded by Async::Semaphore (config.concurrency) inside one Sync{} block. A single repo’s Failure does NOT abort the run — it is captured, the state row is written as status: error, and the run continues.

Slice 2 gate wiring:

G7  — Async::Semaphore(concurrency) bounds the in-flight count.
G8  — every processed repo gets a state row; failures are
      recorded with status: error + last_error.
G9  — second run on a fresh set performs no network calls
      (the :skip_fresh plan short-circuits the SCM.fetch).
G10 — OrgRef expansion via the injected forge; an org-list
      Failure is recorded and does not abort the run.

sync-startup gate wiring (GS1–GS6):

GS1 — expand_orgs fans org listings out concurrently via
      Async::Barrier + Async::Semaphore(config.concurrency).
GS2 — forge.check_authenticated called ONCE before fan-out;
      on auth Failure all orgs recorded failed, no list_org.
GS3 — CF3 / G10 resilience preserved (per-org failure isolated,
      prior repo_count/last_listed_at preserved on failure).
GS4 — reporter events: attach → listing_started → org_listed*
      → listing_finished → run_started → repo* → run_finished
      → detach.

Constant Summary collapse

DEFAULT_URL_BUILDER =

Default clone URL: scp-like SSH form ‘git@<host>:<owner>/<name>.git`. SSH uses the user’s configured SSH keys (default ‘~/.ssh/id_rsa`/whatever `~/.ssh/config` resolves) with no interactive `Username for ’github.com’:‘ prompt — the field defect Slice 6 fixed (the previous HTTPS default made a missing-repo clone prompt for credentials). This is the seam the Slice 2 disagreement-#6 ruling anticipated (“legit future seam (ssh/token)”). No new config field is added in this slice — the transport flip is on the default builder only; tests can still inject a different builder (e.g. file:// for a local bare remote in the G6 missing-path test).

->(ref) { "git@#{ref.host}:#{ref.owner}/#{ref.name}.git" }.freeze

Instance Method Summary collapse

Constructor Details

#initialize(scm: SCM::Git.new, forge: Forge::GitHub.new, clock: -> { Time.now }, url_builder: DEFAULT_URL_BUILDER, reporter: RepoTender::UI::NullReporter.new) ⇒ Engine

Returns a new instance of Engine.



69
70
71
72
73
74
75
76
77
# File 'lib/repo_tender/sync/engine.rb', line 69

def initialize(scm: SCM::Git.new, forge: Forge::GitHub.new,
  clock: -> { Time.now }, url_builder: DEFAULT_URL_BUILDER,
  reporter: RepoTender::UI::NullReporter.new)
  @scm = scm
  @forge = forge
  @clock = clock
  @url_builder = url_builder
  @reporter = reporter
end

Instance Method Details

#call(config:, paths:) ⇒ Dry::Monads::Result<State::Store::State>

Runs one sync pass.

Parameters:

  • config (Config::Config)

    the validated config struct

  • paths (Paths)

    the XDG paths object

Returns:



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/repo_tender/sync/engine.rb', line 83

def call(config:, paths:)
  Sync do |task|
    semaphore = Async::Semaphore.new(config.concurrency, parent: task)
    barrier = Async::Barrier.new

    # Acquire an exclusive advisory lock on the sidecar lockfile
    # BEFORE loading state. Held across the entire load→write span
    # so an overlapping run never writes and cannot clobber the
    # in-flight run's data (CF10). Non-blocking: if another run
    # holds the lock we bail cleanly rather than blocking forever
    # (a blocked launchd tick would pile up on every subsequent
    # StartInterval fire).
    lock_result = State::Lock.acquire(paths.state_file) do
      # State is loaded once at the start (or initialized empty for
      # a missing state.yaml). A new State object is built from
      # the run's outcomes and written atomically at the end. Per-
      # repo state rows that did not change are preserved.
      state = State::Store.load(paths.state_file).success
      now = @clock.call

      # Attach the reporter before expansion so the render fiber is
      # alive during listing (GS4: attach fires before listing_started).
      @reporter.attach(task)
      begin
        # Phase 1: org expansion (concurrent; per-org failures isolated).
        # CF3 (Slice 4 Lane 02) passes the prev state's org map so an
        # org-list Failure can preserve the prior good repo_count +
        # last_listed_at instead of clobbering with 0/nil.
        org_records, discovered_repos = expand_orgs(config, now, prev_orgs: state.orgs, task: task, semaphore: semaphore)

        # Phase 2: dedupe explicit + discovered repos by (host, owner,
        # name); explicit wins.
        repos_to_process = dedupe(config.repos, discovered_repos)

        @reporter.run_started(total: repos_to_process.size)

        # Phase 3: fan out per-repo work through barrier + semaphore.
        # Results are gathered in a mutex-protected array (barrier
        # tasks run on a Fiber scheduler; shared mutation must be
        # serialized). Each result is a [key, Repo | nil, error] tuple
        # (see process_one for the shape).
        results_mutex = Mutex.new
        results = []

        repos_to_process.each do |repo_ref|
          barrier.async do
            # `semaphore.async` spawns a child task and returns its
            # Task handle. The barrier only tracks this outer task;
            # if we don't `.wait` on the inner task, `barrier.wait`
            # would return before the per-repo work finishes and
            # `build_new_state` would see an empty results array.
            inner = semaphore.async do
              outcome = process_one(repo_ref, config, now)
              results_mutex.synchronize { results << outcome }
            end
            inner.wait
          end
        end
        barrier.wait

        summary = results.each_with_object(Hash.new(0)) do |outcome, h|
          _, repo, error = outcome
          h[error ? "error" : repo.status.to_s] += 1
        end
        @reporter.run_finished(summary)

        # Phase 4: assemble new state, write once.
        new_state = build_new_state(state, results, org_records)
        write_result = State::Store.write(paths.state_file, new_state)
        if write_result.failure?
          write_result
        else
          Dry::Monads::Success(new_state)
        end
      ensure
        @reporter.detach
      end
    end

    if lock_result == State::Lock::NOT_ACQUIRED
      warn "repo-tender: skipped — another sync in progress"
      Dry::Monads::Success(State::Store.load(paths.state_file).success)
    else
      lock_result
    end
  end
end