Class: Clacky::Channel::Adapters::Weixin::SendQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/clacky/server/channel/adapters/weixin/adapter.rb

Overview

Per-user send queue with buffering, throttling, and retry for Weixin iLink.

Design:

- Each chat_id has a pending buffer of text fragments.
- A background flusher thread periodically checks all buffers.
- Flush triggers: char threshold reached, time interval elapsed, or explicit flush.
- Actual send calls are spaced by MIN_SEND_INTERVAL to avoid rate-limiting.
- ret:-2 (rate-limited) triggers exponential backoff retry.

Defined Under Namespace

Classes: Entry

Constant Summary collapse

FLUSH_CHAR_THRESHOLD =
400
FLUSH_INTERVAL =
0.8
MIN_SEND_INTERVAL =
1.0
RETRY_BACKOFFS =
[1.0, 2.0, 4.0]

Instance Method Summary collapse

Constructor Details

#initialize(api_client, logger: Clacky::Logger) ⇒ SendQueue

Returns a new instance of SendQueue.



27
28
29
30
31
32
33
34
35
36
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 27

def initialize(api_client, logger: Clacky::Logger)
  @api_client   = api_client
  @logger       = logger
  @buffers      = {}
  @buffer_mutex = Mutex.new
  @last_sent_at = {}
  @last_mutex   = Mutex.new
  @running      = true
  @flusher      = Thread.new { flush_loop }
end

Instance Method Details

#enqueue(chat_id, text, context_token) ⇒ Object

Enqueue text for a chat_id. Non-blocking.



39
40
41
42
43
44
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 39

def enqueue(chat_id, text, context_token)
  @buffer_mutex.synchronize do
    @buffers[chat_id] ||= []
    @buffers[chat_id] << Entry.new(text: text, context_token: context_token, enqueued_at: Time.now)
  end
end

#flush(chat_id) ⇒ Object

Force-flush all pending text for a chat_id. Non-blocking.



47
48
49
50
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 47

def flush(chat_id)
  entries = @buffer_mutex.synchronize { @buffers.delete(chat_id) || [] }
  send_entries(chat_id, entries) unless entries.empty?
end

#stopObject

Stop the flusher thread. Waits up to 30s for pending messages to drain.



53
54
55
56
57
58
# File 'lib/clacky/server/channel/adapters/weixin/adapter.rb', line 53

def stop
  @running = false
  @flusher.join(30)
  # Force-flush any remaining entries regardless of threshold.
  drain_all_buffers
end