Class: Harnex::Inbox

Inherits:
Object
  • Object
show all
Defined in:
lib/harnex/runtime/inbox.rb

Constant Summary collapse

DEFAULT_TTL =
120
MAX_PENDING =
64
DELIVERY_TIMEOUT =
300

Instance Method Summary collapse

Constructor Details

#initialize(session, state_machine, ttl: DEFAULT_TTL) ⇒ Inbox

Returns a new instance of Inbox.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/harnex/runtime/inbox.rb', line 9

def initialize(session, state_machine, ttl: DEFAULT_TTL)
  @session = session
  @state_machine = state_machine
  @ttl = ttl.to_f
  @queue = []
  @messages = {}
  @mutex = Mutex.new
  @condvar = ConditionVariable.new
  @thread = nil
  @running = false
  @delivered_total = 0
  @expired_total = 0
end

Instance Method Details

#clearObject



109
110
111
112
113
114
115
116
# File 'lib/harnex/runtime/inbox.rb', line 109

def clear
  @mutex.synchronize do
    count = @queue.size
    @queue.each { |msg| msg.status = :dropped }
    @queue.clear
    count
  end
end

#drop(message_id) ⇒ Object



98
99
100
101
102
103
104
105
106
107
# File 'lib/harnex/runtime/inbox.rb', line 98

def drop(message_id)
  @mutex.synchronize do
    msg = @messages[message_id]
    return nil unless msg && @queue.any? { |queued| queued.id == message_id }

    @queue.delete_if { |queued| queued.id == message_id }
    msg.status = :dropped
    msg.to_h
  end
end

#enqueue(text:, submit:, enter_only:, force: false) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/harnex/runtime/inbox.rb', line 35

def enqueue(text:, submit:, enter_only:, force: false)
  msg = Message.new(
    id: SecureRandom.hex(8),
    text: text,
    submit: submit,
    enter_only: enter_only,
    force: force,
    queued_at: Time.now,
    status: :queued
  )

  # Force messages bypass the queue entirely
  if force
    return deliver_now(msg)
  end

  # Fast path: prompt ready and queue empty — deliver immediately.
  # Check under lock, then release before calling deliver_now to
  # avoid recursive locking (deliver_now also acquires @mutex).
  try_fast = @mutex.synchronize do
    @queue.empty? && @state_machine.state == :prompt
  end

  if try_fast
    begin
      result = deliver_now(msg)
      return result if msg.status == :delivered
    rescue StandardError
      # Fall through to queue if delivery failed
    end
    msg.status = :queued
    msg.error = nil
  end

  @mutex.synchronize do
    raise "inbox full (#{MAX_PENDING} pending messages)" if @queue.size >= MAX_PENDING

    @queue << msg
    @messages[msg.id] = msg
    @condvar.broadcast
  end

  { ok: true, status: "queued", message_id: msg.id, http_status: 202 }
end

#message_status(id) ⇒ Object



80
81
82
83
84
85
86
# File 'lib/harnex/runtime/inbox.rb', line 80

def message_status(id)
  @mutex.synchronize do
    msg = @messages[id]
    return nil unless msg
    msg.to_h
  end
end

#pending_messagesObject



94
95
96
# File 'lib/harnex/runtime/inbox.rb', line 94

def pending_messages
  @mutex.synchronize { @queue.map(&:to_h) }
end

#startObject



23
24
25
26
# File 'lib/harnex/runtime/inbox.rb', line 23

def start
  @running = true
  @thread = Thread.new { delivery_loop }
end

#statsObject



88
89
90
91
92
# File 'lib/harnex/runtime/inbox.rb', line 88

def stats
  @mutex.synchronize do
    { pending: @queue.size, delivered_total: @delivered_total, expired_total: @expired_total }
  end
end

#stopObject



28
29
30
31
32
33
# File 'lib/harnex/runtime/inbox.rb', line 28

def stop
  @running = false
  @mutex.synchronize { @condvar.broadcast }
  @thread&.join(2)
  @thread&.kill
end