Class: Clacky::Channel::Adapters::Weixin::SendQueue
- Inherits:
-
Object
- Object
- Clacky::Channel::Adapters::Weixin::SendQueue
- Defined in:
- lib/clacky/server/channel/adapters/weixin/adapter.rb
Overview
Per-user send queue with buffering, throttling, and retry for Weixin iLink.
Design:
- Each chat_id has a pending buffer of text fragments.
- A background flusher thread periodically checks all buffers.
- Flush triggers: char threshold reached, time interval elapsed, or explicit flush.
- Actual send calls are spaced by MIN_SEND_INTERVAL to avoid rate-limiting.
- ret:-2 (rate-limited) triggers exponential backoff retry.
Defined Under Namespace
Classes: Entry
Constant Summary collapse
- FLUSH_CHAR_THRESHOLD =
400- FLUSH_INTERVAL =
0.8- MIN_SEND_INTERVAL =
1.0- RETRY_BACKOFFS =
[1.0, 2.0, 4.0]
Instance Method Summary collapse
-
#enqueue(chat_id, text, context_token) ⇒ Object
Enqueue text for a chat_id.
-
#flush(chat_id) ⇒ Object
Force-flush all pending text for a chat_id.
-
#initialize(api_client, logger: Clacky::Logger) ⇒ SendQueue
constructor
A new instance of SendQueue.
-
#stop ⇒ Object
Stop the flusher thread.
Constructor Details
#initialize(api_client, logger: Clacky::Logger) ⇒ SendQueue
Returns a new instance of SendQueue.
27 28 29 30 31 32 33 34 35 36 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 27 def initialize(api_client, logger: Clacky::Logger) @api_client = api_client @logger = logger @buffers = {} @buffer_mutex = Mutex.new @last_sent_at = {} @last_mutex = Mutex.new @running = true @flusher = Thread.new { flush_loop } end |
Instance Method Details
#enqueue(chat_id, text, context_token) ⇒ Object
Enqueue text for a chat_id. Non-blocking.
39 40 41 42 43 44 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 39 def enqueue(chat_id, text, context_token) @buffer_mutex.synchronize do @buffers[chat_id] ||= [] @buffers[chat_id] << Entry.new(text: text, context_token: context_token, enqueued_at: Time.now) end end |
#flush(chat_id) ⇒ Object
Force-flush all pending text for a chat_id. Non-blocking.
47 48 49 50 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 47 def flush(chat_id) entries = @buffer_mutex.synchronize { @buffers.delete(chat_id) || [] } send_entries(chat_id, entries) unless entries.empty? end |
#stop ⇒ Object
Stop the flusher thread. Waits up to 30s for pending messages to drain.
53 54 55 56 57 58 |
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 53 def stop @running = false @flusher.join(30) # Force-flush any remaining entries regardless of threshold. drain_all_buffers end |