Module: Legion::MCP::Audit
- Extended by:
- Logging::Helper
- Defined in:
- lib/legion/mcp/audit.rb
Constant Summary collapse
- ROUTING_KEYS =
{ tool_call: 'mcp.audit.tool_call', client_call: 'mcp.audit.client_call', governance: 'mcp.audit.governance' }.freeze
Class Method Summary collapse
- .emit_client_call(**event) ⇒ Object
- .emit_governance(**event) ⇒ Object
- .emit_tool_call(**event) ⇒ Object
- .publish(routing_key, event, message_sym) ⇒ Object
- .transport_available? ⇒ Boolean
- .transport_connected? ⇒ Boolean
Class Method Details
.emit_client_call(**event) ⇒ Object
20 21 22 |
# File 'lib/legion/mcp/audit.rb', line 20 def emit_client_call(**event) publish(ROUTING_KEYS[:client_call], event, :ClientCallEvent) end |
.emit_governance(**event) ⇒ Object
24 25 26 |
# File 'lib/legion/mcp/audit.rb', line 24 def emit_governance(**event) publish(ROUTING_KEYS[:governance], event, :GovernanceEvent) end |
.emit_tool_call(**event) ⇒ Object
16 17 18 |
# File 'lib/legion/mcp/audit.rb', line 16 def emit_tool_call(**event) publish(ROUTING_KEYS[:tool_call], event, :ToolCallEvent) end |
.publish(routing_key, event, message_sym) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/legion/mcp/audit.rb', line 40 def publish(routing_key, event, ) return unless transport_available? && transport_connected? = Legion::MCP::Transport::Messages.const_get() .new(routing_key: routing_key, **event).publish rescue StandardError => e handle_exception(e, level: :warn, handled: true, operation: 'mcp.audit.publish') end |
.transport_available? ⇒ Boolean
28 29 30 31 |
# File 'lib/legion/mcp/audit.rb', line 28 def transport_available? !!(defined?(Legion::Transport::Message) && defined?(Legion::MCP::Transport::Exchanges::Audit)) end |
.transport_connected? ⇒ Boolean
33 34 35 36 37 38 |
# File 'lib/legion/mcp/audit.rb', line 33 def transport_connected? Legion::Settings.dig(:transport, :connected) == true rescue StandardError => e handle_exception(e, level: :debug, handled: true, operation: 'mcp.audit.transport_connected?') false end |