Class: Legate::Tools::BaseAsyncJobTool

Inherits:
Legate::Tool show all
Defined in:
lib/legate/tools/base_async_job_tool.rb

Overview

Abstract base class for tools that initiate asynchronous background tasks via threads.

Direct Known Subclasses

SleepyTool

Instance Attribute Summary

Attributes inherited from Legate::Tool

#description, #name, #parameters

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Legate::Tool

define_metadata, #execute, inherited, #initialize, #validate_and_coerce_params, #validate_params

Methods included from Legate::Tool::MetadataDsl

included

Constructor Details

This class inherits a constructor from Legate::Tool

Class Method Details

.job_resultsObject



18
19
20
# File 'lib/legate/tools/base_async_job_tool.rb', line 18

def job_results
  @job_results ||= Concurrent::Map.new
end

.store_job_error(jid, error_message, _error_class = 'StandardError') ⇒ Object

Helper method for workers to call upon failure to store error information.

Parameters:

  • jid (String)

    The Job ID.

  • error_message (String)

    The error message.

  • _error_class (String) (defaults to: 'StandardError')

    The class name of the error (kept for API compatibility).



85
86
87
88
# File 'lib/legate/tools/base_async_job_tool.rb', line 85

def self.store_job_error(jid, error_message, _error_class = 'StandardError')
  job_results[jid] = { 'status' => 'error', 'error_message' => error_message }
  Legate.logger.debug("Stored error result for job #{jid}")
end

.store_job_pending(jid) ⇒ Object

Helper method for workers to call at the beginning of their perform method to indicate the job has started processing.

Parameters:

  • jid (String)

    The Job ID.



68
69
70
71
# File 'lib/legate/tools/base_async_job_tool.rb', line 68

def self.store_job_pending(jid)
  job_results[jid] = { 'status' => 'pending' }
  Legate.logger.debug("Stored pending status for job #{jid}")
end

.store_job_result(jid, result) ⇒ Object

Helper method for workers to call upon completion to store their results.

Parameters:

  • jid (String)

    The Job ID.

  • result (Object)

    The result data.



76
77
78
79
# File 'lib/legate/tools/base_async_job_tool.rb', line 76

def self.store_job_result(jid, result)
  job_results[jid] = { 'status' => 'completed', 'result' => result.is_a?(String) ? result : result.to_json }
  Legate.logger.debug("Stored successful result for job #{jid}")
end

Instance Method Details

#prepare_job_arguments(params, context) ⇒ Array

Subclasses MUST override this method to prepare the arguments for the worker’s perform method based on the Legate tool’s parameters and context. Note: Arguments must be simple types serializable to JSON (strings, numbers, bools, arrays, hashes).

Parameters:

  • params (Hash)

    The validated parameters passed to the Legate tool.

  • context (Legate::ToolContext)

    Contextual information (session_id, etc.).

Returns:

  • (Array)

    An array of arguments to be passed to the worker’s perform method.

Raises:

  • (NotImplementedError)


36
37
38
# File 'lib/legate/tools/base_async_job_tool.rb', line 36

def prepare_job_arguments(params, context)
  raise NotImplementedError, "#{self.class.name} must implement #prepare_job_arguments(params, context)"
end

#worker_classClass

Subclasses MUST override this method to return the worker class that should be executed.

Returns:

  • (Class)

    The worker class (must respond to #perform).

Raises:

  • (NotImplementedError)


26
27
28
# File 'lib/legate/tools/base_async_job_tool.rb', line 26

def worker_class
  raise NotImplementedError, "#{self.class.name} must implement #worker_class"
end