Class: AgentC::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/agent_c/pipeline.rb

Defined Under Namespace

Classes: Step

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session:, task:, git: ->(dir) { Utils::Git.new(dir) }) ⇒ Pipeline

Returns a new instance of Pipeline.



10
11
12
13
14
15
16
17
18
# File 'lib/agent_c/pipeline.rb', line 10

def initialize(
  session:,
  task:,
  git: ->(dir) { Utils::Git.new(dir) }
)
  @session = session
  @task = task
  @git = git
end

Instance Attribute Details

#sessionObject (readonly)

Returns the value of attribute session.



9
10
11
# File 'lib/agent_c/pipeline.rb', line 9

def session
  @session
end

#taskObject (readonly)

Returns the value of attribute task.



9
10
11
# File 'lib/agent_c/pipeline.rb', line 9

def task
  @task
end

Class Method Details

.agent_step(name, **params, &block) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/agent_c/pipeline.rb', line 39

def agent_step(name, **params, &block)
  step(name) do
    resolved_params = (
      if block
        instance_exec(&block)
      elsif params.empty?
        i18n_attributes = (
          if record.respond_to?(:i18n_attributes)
            record.i18n_attributes
          else
            record.attributes
          end
        )

        {
          tool_args: {
            workspace_dir: workspace.dir,
            env: workspace.env,
          },
          cached_prompt: I18n.t("#{name}.cached_prompts"),
          prompt: I18n.t("#{name}.prompt", **i18n_attributes.symbolize_keys),
          tools: I18n.t("#{name}.tools"),
          schema: -> {
            next unless I18n.exists?("#{name}.response_schema")

            I18n.t("#{name}.response_schema").each do |name, spec|
              extra = spec.except(:required, :description, :type)

              if extra.key?(:of)
                extra[:of] = extra[:of]&.to_sym
              end

              send(
                spec.fetch(:type, "string"),
                name,
                required: spec.fetch(:required, true),
                description: spec.fetch(:description),
                **extra
              )
            end
          }
        }
      else
        i18n_attributes = (
          if record.respond_to?(:i18n_attributes)
            record.i18n_attributes
          else
            record.attributes
          end
        )

        {
          tool_args: {
            workspace_dir: workspace.dir,
            env: workspace.env,
          }
        }.tap { |hash|
          if params.key?(:prompt_key)
            hash[:prompt] = I18n.t(params[:prompt_key],  **i18n_attributes.symbolize_keys)
          end

          if params.key?(:cached_prompt_keys)
            hash[:cached_prompt] = params[:cached_prompt_keys].map { I18n.t(_1) }
          end
        }.merge(params.except(:cached_prompt_keys, :prompt_key))
      end
    )

    result = session.prompt(
      on_chat_created: -> (id) { task.chat_ids << id},
      **resolved_params
    )

    if result.success?
      task.record.update!(result.data)
    else
      task.fail!(result.error_message)
    end
  end
end

.callObject



5
6
7
# File 'lib/agent_c/pipeline.rb', line 5

def self.call(...)
  new(...).tap(&:call)
end

.on_failure(&block) ⇒ Object



31
32
33
# File 'lib/agent_c/pipeline.rb', line 31

def on_failure(&block)
  self.on_failures << block
end

.on_failuresObject



23
24
25
# File 'lib/agent_c/pipeline.rb', line 23

def on_failures
  @on_failures ||= []
end

.step(name, &block) ⇒ Object



35
36
37
# File 'lib/agent_c/pipeline.rb', line 35

def step(name, &block)
  self.steps << Step.new(name:, block:)
end

.stepsObject



27
28
29
# File 'lib/agent_c/pipeline.rb', line 27

def steps
  @steps ||= []
end

Instance Method Details

#callObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/agent_c/pipeline.rb', line 121

def call
  raise "Task.workspace is nil" unless task.workspace

  log("start")

  while(task.pending?)
    break if task.failed?

    step = self.class.steps.find { !task.completed_steps.include?(_1.name.to_s) }
    break if step.nil?

    @rewind_to = nil

    store.transaction do
      log_prefix = "step: '#{step.name}'"

      log("#{log_prefix} start")

      instance_exec(&step.block)

      if task.failed?
        log("#{log_prefix} failed, executing on_failures")
        self.class.on_failures.each { instance_exec(&_1)}
      elsif @rewind_to
        matching_steps = task.completed_steps.select { _1 == @rewind_to }

        if matching_steps.count == 0
          raise ArgumentError, <<~TXT
            Cannot rewind to a step that's not been completed yet:

            rewind_to!(#{@rewind_to.inspect})
            completed_steps: #{task.completed_steps.inspect}
          TXT
        elsif matching_steps.count > 1
          raise ArgumentError, <<~TXT
            Cannot rewind to a step with a non-distinct name. The step
            name appears multiple times:

            rewind_to!(#{@rewind_to.inspect})
            completed_steps: #{task.completed_steps.inspect}
          TXT
        end

        log("#{log_prefix} rewind_to! #{@rewind_to.inspect}")
        task
          .completed_steps
          .index(@rewind_to)
          .then { task.update!(completed_steps: task.completed_steps[0..._1]) }
      else
        log("#{log_prefix} done")
        task.completed_steps << step.name.to_s
      end
    end
  end

  store.transaction do
    log("done")
    task.done! unless task.failed?
  end
rescue => e
  store.transaction do
    log("Exception raised, running on_failures")
    task.fail!(["#{e.class.name}:#{e.message}", e.backtrace].join("\n"))
    self.class.on_failures.each { instance_exec(&_1) }
  end
end

#log(msg) ⇒ Object



208
209
210
# File 'lib/agent_c/pipeline.rb', line 208

def log(msg)
  logger.info("task: #{task.id}: #{msg}")
end

#loggerObject



212
213
214
# File 'lib/agent_c/pipeline.rb', line 212

def logger
  session.logger
end

#recordObject



192
193
194
# File 'lib/agent_c/pipeline.rb', line 192

def record
  task.record
end

#repoObject



204
205
206
# File 'lib/agent_c/pipeline.rb', line 204

def repo
  @repo ||= @git.call(workspace.dir)
end

#rewind_to!(step) ⇒ Object



200
201
202
# File 'lib/agent_c/pipeline.rb', line 200

def rewind_to!(step)
  @rewind_to = step.to_s
end

#storeObject



196
197
198
# File 'lib/agent_c/pipeline.rb', line 196

def store
  task.store
end

#workspaceObject



188
189
190
# File 'lib/agent_c/pipeline.rb', line 188

def workspace
  task.workspace
end