Class: TUI::MessageStore

Inherits:
Object
  • Object
show all
Defined in:
lib/tui/message_store.rb

Overview

Thread-safe in-memory store for chat entries displayed in the TUI. Holds the WebSocket-delivered view of the session’s conversation with no dependency on Rails or the Events module.

Accepts Action Cable message payloads and stores typed entries:

  • ‘:rendered, data:, message_type:, id:` for messages with structured decorator output

  • ‘:message, role:, content:, id:` for user/agent messages (fallback)

  • ‘:tool_counter, calls:, responses:` for tool activity

Structured data takes priority when available. Messages with nil rendered content fall back to existing behavior: tool messages aggregate into counters, conversation messages store role and content.

Entries with message IDs are maintained in ID order (ascending) regardless of arrival order, preventing misordering from race conditions between live broadcasts and viewport replays. Duplicate IDs are deduplicated by updating the existing entry.

Tool counters aggregate per agent turn: a new counter starts when a tool_call arrives after a conversation entry. Consecutive tool messages increment the same counter until the next conversation message breaks the chain.

When a message arrives with ‘“action” => “update”` and a known `“id”`, the existing entry is replaced in-place, preserving display order.

Constant Summary collapse

MESSAGE_TYPES =
%w[user_message agent_message].freeze
ROLE_MAP =
{
  "user_message" => "user",
  "agent_message" => "assistant"
}.freeze

Instance Method Summary collapse

Constructor Details

#initializeMessageStore

Returns a new instance of MessageStore.



38
39
40
41
42
43
44
45
46
# File 'lib/tui/message_store.rb', line 38

def initialize
  @entries = []
  @entries_by_id = {}
  @pending_entries = []
  @pending_by_id = {}
  @mutex = Mutex.new
  @version = 0
  @token_economy = default_token_economy
end

Instance Method Details

#add_pending(pending_message_id, payload) ⇒ void

This method returns an undefined value.

Adds a pending message to the separate pending list, or removes any existing entry when the decorator hides the PM in the current view mode (rendered hash present but its sole value is nil).

Accepts the full pending_message_created event payload. Falls back to a dimmed user-message envelope when no rendered key is present (legacy producers, simple test fixtures).

Pending messages always render after real messages.

Parameters:

  • pending_message_id (Integer)

    PendingMessage database ID

  • payload (Hash)

    event payload with “content”, “message_type”, and optionally “rendered” (hash of mode => decorator output)



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/tui/message_store.rb', line 164

def add_pending(pending_message_id, payload)
  data = pending_entry_data(payload)
  message_type = payload["message_type"] || "user_message"

  @mutex.synchronize do
    old = @pending_by_id.delete(pending_message_id)
    @pending_entries.delete(old) if old

    if data
      entry = {
        type: :rendered,
        data: data,
        message_type: message_type,
        pending_message_id: pending_message_id
      }
      @pending_entries << entry
      @pending_by_id[pending_message_id] = entry
    end

    @version += 1
  end
end

#clearvoid

This method returns an undefined value.

Removes all entries. Called on view mode change and session switch to prepare for re-decorated viewport messages from the server. Resets token economy totals since we’re starting fresh.



139
140
141
142
143
144
145
146
147
148
# File 'lib/tui/message_store.rb', line 139

def clear
  @mutex.synchronize do
    @entries = []
    @entries_by_id = {}
    @pending_entries = []
    @pending_by_id = {}
    @token_economy = default_token_economy
    @version += 1
  end
end

#last_pending_user_messageHash?

Returns the last pending user message for recall editing.

Returns:

  • (Hash, nil)

    ‘Integer, content: String` or nil



205
206
207
208
209
210
211
212
# File 'lib/tui/message_store.rb', line 205

def last_pending_user_message
  @mutex.synchronize do
    entry = @pending_entries.last
    return nil unless entry

    {pending_message_id: entry[:pending_message_id], content: entry.dig(:data, "content")}
  end
end

#messagesArray<Hash>

Returns thread-safe copy of stored entries (pending messages at the end).

Returns:

  • (Array<Hash>)

    thread-safe copy of stored entries (pending messages at the end)



57
58
59
# File 'lib/tui/message_store.rb', line 57

def messages
  @mutex.synchronize { @entries.dup + @pending_entries.dup }
end

#process_event(event_data) ⇒ Boolean

Processes a raw event payload from the WebSocket channel. Uses structured decorator data when available; falls back to role/content extraction for messages and tool counter aggregation.

Events with ‘“action” => “update”` and a matching `“id”` replace the existing entry’s data in-place rather than appending.

Extracts api_metrics when present and records the latest call’s token economy data for the HUD.

Parameters:

  • event_data (Hash)

    Action Cable event payload with “type”, “content”, and optionally “rendered” (hash of mode => lines), “id”, “action”, “api_metrics”

Returns:

  • (Boolean)

    true if the event type was recognized and handled



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/tui/message_store.rb', line 109

def process_event(event_data)
  message_id = event_data["id"]

  # Track API metrics for token economy HUD (only on create, not update)
  if event_data["action"] != "update"
    track_api_metrics(event_data["api_metrics"])
  end

  if event_data["action"] == "update" && message_id
    return update_existing(event_data, message_id)
  end

  rendered = extract_rendered(event_data)

  if rendered
    record_rendered(rendered, message_type: event_data["type"], id: message_id)
  else
    case event_data["type"]
    when "tool_call" then record_tool_call
    when "tool_response" then record_tool_response
    when *MESSAGE_TYPES then record_message(event_data)
    else false
    end
  end
end

#remove_above(cutoff_id) ⇒ Integer

Removes all entries with message ID <= cutoff. Used when Mneme evicts messages above the cutoff in the chat view (older messages at the top with smaller IDs).

Parameters:

  • cutoff_id (Integer)

    last evicted message ID

Returns:

  • (Integer)

    count of entries actually removed



236
237
238
239
240
241
242
243
244
245
246
# File 'lib/tui/message_store.rb', line 236

def remove_above(cutoff_id)
  @mutex.synchronize do
    evicted = @entries.select { |e| e[:id] && e[:id] <= cutoff_id }
    evicted.each do |entry|
      @entries.delete(entry)
      @entries_by_id.delete(entry[:id])
    end
    @version += 1 if evicted.any?
    evicted.size
  end
end

#remove_by_id(message_id) ⇒ Boolean

Removes an entry by its message ID. Used when a pending message is recalled for editing or deleted by another client.

Parameters:

  • message_id (Integer)

    database ID of the message to remove

Returns:

  • (Boolean)

    true if the entry was found and removed



219
220
221
222
223
224
225
226
227
228
# File 'lib/tui/message_store.rb', line 219

def remove_by_id(message_id)
  @mutex.synchronize do
    entry = @entries_by_id.delete(message_id)
    return false unless entry

    @entries.delete(entry)
    @version += 1
    true
  end
end

#remove_pending(pending_message_id) ⇒ Boolean

Removes a pending message by its PendingMessage ID.

Parameters:

  • pending_message_id (Integer)

    PendingMessage database ID

Returns:

  • (Boolean)

    true if found and removed



191
192
193
194
195
196
197
198
199
200
# File 'lib/tui/message_store.rb', line 191

def remove_pending(pending_message_id)
  @mutex.synchronize do
    entry = @pending_by_id.delete(pending_message_id)
    return false unless entry

    @pending_entries.delete(entry)
    @version += 1
    true
  end
end

#sizeInteger

Returns number of stored entries including pending (no array copy).

Returns:

  • (Integer)

    number of stored entries including pending (no array copy)



62
63
64
# File 'lib/tui/message_store.rb', line 62

def size
  @mutex.synchronize { @entries.size + @pending_entries.size }
end

#token_economyHash

Returns token economy data for HUD display.

Token counts, rate limits, and cache hit rate reflect the most recent API call — accumulating them across a session produces values larger than the context window, which is meaningless. ‘call_count` and `cache_history` remain session-wide so the HUD can detect “any metrics yet?” and render the per-call sparkline.

Returns:

  • (Hash)

    token economy stats:

    • :input_tokens [Integer] uncached input tokens from the latest call

    • :output_tokens [Integer] output tokens from the latest call

    • :cache_read_input_tokens [Integer] cached token reads from the latest call

    • :cache_creation_input_tokens [Integer] cache writes from the latest call

    • :call_count [Integer] number of API calls tracked this session

    • :cache_hit_rate [Float] hit rate of the latest call (0.0-1.0)

    • :rate_limits [Hash, nil] latest rate limit values from API

    • :cache_history [Array<Float>] per-call hit rates for the sparkline



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/tui/message_store.rb', line 83

def token_economy
  @mutex.synchronize do
    stats = @token_economy.dup
    total_input = stats[:input_tokens] + stats[:cache_read_input_tokens] + stats[:cache_creation_input_tokens]
    stats[:cache_hit_rate] = if total_input > 0
      stats[:cache_read_input_tokens].to_f / total_input
    else
      0.0
    end
    stats
  end
end

#versionInteger

Monotonically increasing counter that bumps on every mutation. Consumers compare this to a cached value to detect changes without copying the full entries array on every frame.

Returns:

  • (Integer)


52
53
54
# File 'lib/tui/message_store.rb', line 52

def version
  @mutex.synchronize { @version }
end