Class: RepoTender::Sync::Engine
- Inherits:
-
Object
- Object
- RepoTender::Sync::Engine
- Defined in:
- lib/repo_tender/sync/engine.rb
Overview
The sync engine: one run that brings every tracked repo to the evergreen invariant (PRD §3.3). Splits observation (RepoPlan) from execution (this class). Bounded by Async::Semaphore (config.concurrency) inside one Sync{} block. A single repo’s Failure does NOT abort the run — it is captured, the state row is written as status: error, and the run continues.
Slice 2 gate wiring:
G7 — Async::Semaphore(concurrency) bounds the in-flight count.
G8 — every processed repo gets a state row; failures are
recorded with status: error + last_error.
G9 — second run on a fresh set performs no network calls
(the :skip_fresh plan short-circuits the SCM.fetch).
G10 — OrgRef expansion via the injected forge; an org-list
Failure is recorded and does not abort the run.
sync-startup gate wiring (GS1–GS6):
GS1 — expand_orgs fans org listings out concurrently via
Async::Barrier + Async::Semaphore(config.concurrency).
GS2 — forge.check_authenticated called ONCE before fan-out;
on auth Failure all orgs recorded failed, no list_org.
GS3 — CF3 / G10 resilience preserved (per-org failure isolated,
prior repo_count/last_listed_at preserved on failure).
GS4 — reporter events: attach → listing_started → org_listed*
→ listing_finished → run_started → repo* → run_finished
→ detach.
Constant Summary collapse
- DEFAULT_URL_BUILDER =
Default clone URL: scp-like SSH form ‘git@<host>:<owner>/<name>.git`. SSH uses the user’s configured SSH keys (default ‘~/.ssh/id_rsa`/whatever `~/.ssh/config` resolves) with no interactive `Username for ’github.com’:‘ prompt — the field defect Slice 6 fixed (the previous HTTPS default made a missing-repo clone prompt for credentials). This is the seam the Slice 2 disagreement-#6 ruling anticipated (“legit future seam (ssh/token)”). No new config field is added in this slice — the transport flip is on the default builder only; tests can still inject a different builder (e.g. file:// for a local bare remote in the G6 missing-path test).
->(ref) { "git@#{ref.host}:#{ref.owner}/#{ref.name}.git" }.freeze
Instance Method Summary collapse
-
#call(config:, paths:) ⇒ Dry::Monads::Result<State::Store::State>
Runs one sync pass.
-
#initialize(scm: SCM::Git.new, forge: Forge::GitHub.new, clock: -> { Time.now }, url_builder: DEFAULT_URL_BUILDER, reporter: RepoTender::UI::NullReporter.new) ⇒ Engine
constructor
A new instance of Engine.
Constructor Details
#initialize(scm: SCM::Git.new, forge: Forge::GitHub.new, clock: -> { Time.now }, url_builder: DEFAULT_URL_BUILDER, reporter: RepoTender::UI::NullReporter.new) ⇒ Engine
Returns a new instance of Engine.
69 70 71 72 73 74 75 76 77 |
# File 'lib/repo_tender/sync/engine.rb', line 69 def initialize(scm: SCM::Git.new, forge: Forge::GitHub.new, clock: -> { Time.now }, url_builder: DEFAULT_URL_BUILDER, reporter: RepoTender::UI::NullReporter.new) @scm = scm @forge = forge @clock = clock @url_builder = url_builder @reporter = reporter end |
Instance Method Details
#call(config:, paths:) ⇒ Dry::Monads::Result<State::Store::State>
Runs one sync pass.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/repo_tender/sync/engine.rb', line 83 def call(config:, paths:) Sync do |task| semaphore = Async::Semaphore.new(config.concurrency, parent: task) = Async::Barrier.new # Acquire an exclusive advisory lock on the sidecar lockfile # BEFORE loading state. Held across the entire load→write span # so an overlapping run never writes and cannot clobber the # in-flight run's data (CF10). Non-blocking: if another run # holds the lock we bail cleanly rather than blocking forever # (a blocked launchd tick would pile up on every subsequent # StartInterval fire). lock_result = State::Lock.acquire(paths.state_file) do # State is loaded once at the start (or initialized empty for # a missing state.yaml). A new State object is built from # the run's outcomes and written atomically at the end. Per- # repo state rows that did not change are preserved. state = State::Store.load(paths.state_file).success now = @clock.call # Attach the reporter before expansion so the render fiber is # alive during listing (GS4: attach fires before listing_started). @reporter.attach(task) begin # Phase 1: org expansion (concurrent; per-org failures isolated). # CF3 (Slice 4 Lane 02) passes the prev state's org map so an # org-list Failure can preserve the prior good repo_count + # last_listed_at instead of clobbering with 0/nil. org_records, discovered_repos = (config, now, prev_orgs: state.orgs, task: task, semaphore: semaphore) # Phase 2: dedupe explicit + discovered repos by (host, owner, # name); explicit wins. repos_to_process = dedupe(config.repos, discovered_repos) @reporter.run_started(total: repos_to_process.size) # Phase 3: fan out per-repo work through barrier + semaphore. # Results are gathered in a mutex-protected array (barrier # tasks run on a Fiber scheduler; shared mutation must be # serialized). Each result is a [key, Repo | nil, error] tuple # (see process_one for the shape). results_mutex = Mutex.new results = [] repos_to_process.each do |repo_ref| .async do # `semaphore.async` spawns a child task and returns its # Task handle. The barrier only tracks this outer task; # if we don't `.wait` on the inner task, `barrier.wait` # would return before the per-repo work finishes and # `build_new_state` would see an empty results array. inner = semaphore.async do outcome = process_one(repo_ref, config, now) results_mutex.synchronize { results << outcome } end inner.wait end end .wait summary = results.each_with_object(Hash.new(0)) do |outcome, h| _, repo, error = outcome h[error ? "error" : repo.status.to_s] += 1 end @reporter.run_finished(summary) # Phase 4: assemble new state, write once. new_state = build_new_state(state, results, org_records) write_result = State::Store.write(paths.state_file, new_state) if write_result.failure? write_result else Dry::Monads::Success(new_state) end ensure @reporter.detach end end if lock_result == State::Lock::NOT_ACQUIRED warn "repo-tender: skipped — another sync in progress" Dry::Monads::Success(State::Store.load(paths.state_file).success) else lock_result end end end |