Class: Dalli::Protocol::Meta::ResponseProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/dalli/protocol/response_processor.rb

Overview

Class that encapsulates logic for processing meta protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.

Constant Summary collapse

EN =
'EN'
END_TOKEN =
'END'
EX =
'EX'
HD =
'HD'
MN =
'MN'
NF =
'NF'
NS =
'NS'
OK =
'OK'
RESET =
'RESET'
STAT =
'STAT'
VA =
'VA'
VERSION =
'VERSION'
SERVER_ERROR =
'SERVER_ERROR'
T_OK =
[OK].freeze
T_RESET =
[RESET].freeze
T_EN_HD =
[EN, HD].freeze
T_VERSION =
[VERSION].freeze
T_VA_EN_HD =
[VA, EN, HD].freeze
T_HD_NF_EX =
[HD, NF, EX].freeze
T_HD_NS_NF_EX =
[HD, NS, NF, EX].freeze
T_VA_NF_NS_EX =
[VA, NF, NS, EX].freeze
T_END_TOKEN_STAT =
[END_TOKEN, STAT].freeze

Instance Method Summary collapse

Constructor Details

#initialize(io_source, value_marshaller) ⇒ ResponseProcessor

Returns a new instance of ResponseProcessor.



36
37
38
39
# File 'lib/dalli/protocol/response_processor.rb', line 36

def initialize(io_source, value_marshaller)
  @io_source = io_source
  @value_marshaller = value_marshaller
end

Instance Method Details

#bitflags_from_tokens(tokens) ⇒ Object



212
213
214
# File 'lib/dalli/protocol/response_processor.rb', line 212

def bitflags_from_tokens(tokens)
  value_from_tokens(tokens, 'f')&.to_i
end

#body_len_from_tokens(tokens) ⇒ Object



241
242
243
# File 'lib/dalli/protocol/response_processor.rb', line 241

def body_len_from_tokens(tokens)
  value_from_tokens(tokens, 's')&.to_i
end

#build_metadata_result(tokens) ⇒ Object



84
85
86
87
88
89
90
# File 'lib/dalli/protocol/response_processor.rb', line 84

def (tokens)
  {
    value: nil, cas: cas_from_tokens(tokens),
    won_recache: tokens.include?('W'), stale: tokens.include?('X'),
    lost_recache: tokens.include?('Z')
  }
end

#cas_from_tokens(tokens) ⇒ Object



216
217
218
# File 'lib/dalli/protocol/response_processor.rb', line 216

def cas_from_tokens(tokens)
  value_from_tokens(tokens, 'c')&.to_i
end

#consume_all_responses_until_mnObject



153
154
155
156
157
158
# File 'lib/dalli/protocol/response_processor.rb', line 153

def consume_all_responses_until_mn
  tokens = next_line_to_tokens

  tokens = next_line_to_tokens while tokens.first != MN
  true
end

#decr_incrObject



118
119
120
121
122
123
124
# File 'lib/dalli/protocol/response_processor.rb', line 118

def decr_incr
  tokens = error_on_unexpected!(T_VA_NF_NS_EX)
  return false if [NS, EX].include?(tokens.first)
  return nil if tokens.first == NF

  read_line.to_i
end

#error_on_unexpected!(expected_codes) ⇒ Object

Raises:



202
203
204
205
206
207
208
209
210
# File 'lib/dalli/protocol/response_processor.rb', line 202

def error_on_unexpected!(expected_codes)
  tokens = next_line_to_tokens

  return tokens if expected_codes.include?(tokens.first)

  raise Dalli::ServerError, tokens.join(' ').to_s if tokens.first == SERVER_ERROR

  raise Dalli::DalliError, "Response error: #{tokens.first}"
end

#flushObject



136
137
138
139
140
# File 'lib/dalli/protocol/response_processor.rb', line 136

def flush
  error_on_unexpected!(T_OK)

  true
end

#full_response_from_buffer(tokens, body, resp_size) ⇒ Object



160
161
162
163
# File 'lib/dalli/protocol/response_processor.rb', line 160

def full_response_from_buffer(tokens, body, resp_size)
  value = @value_marshaller.retrieve(body, bitflags_from_tokens(tokens))
  [resp_size, tokens.first == VA, cas_from_tokens(tokens), key_from_tokens(tokens), value]
end

#getk_response_from_buffer(buf, offset = 0) ⇒ Object

This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.

The remaining three values in the array are the ResponseHeader, key, and value.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/dalli/protocol/response_processor.rb', line 175

def getk_response_from_buffer(buf, offset = 0)
  # Find the header terminator starting from offset
  term_idx = buf.index(TERMINATOR, offset)
  return [0, nil, nil, nil, nil] unless term_idx

  header = buf.byteslice(offset, term_idx - offset)
  tokens = header.split
  header_len = header.bytesize + TERMINATOR.length
  body_len = body_len_from_tokens(tokens)

  # We have a complete response that has no body.
  # This is either the response to the terminating
  # noop or, if the status is not MN, an intermediate
  # error response that needs to be discarded.
  return [header_len, true, nil, nil, nil] if body_len.zero?

  resp_size = header_len + body_len + TERMINATOR.length
  # The header is in the buffer, but the body is not.  As we don't have
  # a complete response, don't advance the buffer
  return [0, nil, nil, nil, nil] unless buf.bytesize >= offset + resp_size

  # The full response is in our buffer, so parse it and return
  # the values
  body = buf.byteslice(offset + header_len, body_len)
  full_response_from_buffer(tokens, body, resp_size)
end

#hit_status_from_tokens(tokens) ⇒ Object

Returns true if item was previously hit, false if first access, nil if not requested The h flag returns h0 (first access) or h1 (previously accessed)



228
229
230
231
232
233
# File 'lib/dalli/protocol/response_processor.rb', line 228

def hit_status_from_tokens(tokens)
  hit_token = tokens.find { |t| t.start_with?('h') && t.length == 2 }
  return nil unless hit_token

  hit_token[1] == '1'
end

#key_from_tokens(tokens) ⇒ Object



220
221
222
223
224
# File 'lib/dalli/protocol/response_processor.rb', line 220

def key_from_tokens(tokens)
  encoded_key = value_from_tokens(tokens, 'k')
  base64_encoded = tokens.any?('b')
  KeyRegularizer.decode(encoded_key, base64_encoded)
end

#last_access_from_tokens(tokens) ⇒ Object

Returns seconds since last access, or nil if not requested The l flag returns l<seconds>



237
238
239
# File 'lib/dalli/protocol/response_processor.rb', line 237

def last_access_from_tokens(tokens)
  value_from_tokens(tokens, 'l')&.to_i
end

#meta_deleteObject



113
114
115
116
# File 'lib/dalli/protocol/response_processor.rb', line 113

def meta_delete
  tokens = error_on_unexpected!(T_HD_NF_EX)
  tokens.first == HD
end

#meta_get_with_metadata(cache_nils: false, return_hit_status: false, return_last_access: false) ⇒ Object

Returns a hash with all requested metadata:

  • :value - the cached value (or nil if miss)

  • :cas - the CAS value (if return_cas was requested)

  • :won_recache - true if client won the right to recache (W flag)

  • :stale - true if the item is stale (X flag)

  • :lost_recache - true if another client is already recaching (Z flag)

  • :hit_before - true/false if item was previously accessed (h flag, if requested)

  • :last_access - seconds since last access (l flag, if requested)

Used by meta_get for comprehensive metadata retrieval. Supports thundering herd protection (N/R flags) and metadata flags (h/l/u).



75
76
77
78
79
80
81
82
# File 'lib/dalli/protocol/response_processor.rb', line 75

def (cache_nils: false, return_hit_status: false, return_last_access: false)
  tokens = error_on_unexpected!(T_VA_EN_HD)
  result = (tokens)
  result[:hit_before] = hit_status_from_tokens(tokens) if return_hit_status
  result[:last_access] = last_access_from_tokens(tokens) if return_last_access
  result[:value] = parse_value_from_tokens(tokens, cache_nils)
  result
end

#meta_get_with_value(cache_nils: false) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/dalli/protocol/response_processor.rb', line 41

def meta_get_with_value(cache_nils: false)
  tokens = error_on_unexpected!(T_VA_EN_HD)
  return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN
  return true unless tokens.first == VA

  @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens))
end

#meta_get_with_value_and_casObject



49
50
51
52
53
54
55
56
57
# File 'lib/dalli/protocol/response_processor.rb', line 49

def meta_get_with_value_and_cas
  tokens = error_on_unexpected!(T_VA_EN_HD)
  return [nil, 0] if tokens.first == EN

  cas = cas_from_tokens(tokens)
  return [nil, cas] unless tokens.first == VA

  [@value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)), cas]
end

#meta_get_without_valueObject



59
60
61
62
# File 'lib/dalli/protocol/response_processor.rb', line 59

def meta_get_without_value
  tokens = error_on_unexpected!(T_EN_HD)
  tokens.first == EN ? nil : true
end

#meta_set_append_prependObject



106
107
108
109
110
111
# File 'lib/dalli/protocol/response_processor.rb', line 106

def meta_set_append_prepend
  tokens = error_on_unexpected!(T_HD_NS_NF_EX)
  return false unless tokens.first == HD

  true
end

#meta_set_with_casObject



99
100
101
102
103
104
# File 'lib/dalli/protocol/response_processor.rb', line 99

def meta_set_with_cas
  tokens = error_on_unexpected!(T_HD_NS_NF_EX)
  return false unless tokens.first == HD

  cas_from_tokens(tokens)
end

#next_line_to_tokensObject



256
257
258
259
# File 'lib/dalli/protocol/response_processor.rb', line 256

def next_line_to_tokens
  line = read_line
  line&.split || []
end

#parse_value_from_tokens(tokens, cache_nils) ⇒ Object



92
93
94
95
96
97
# File 'lib/dalli/protocol/response_processor.rb', line 92

def parse_value_from_tokens(tokens, cache_nils)
  return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN
  return unless tokens.first == VA

  @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens))
end

#read_data(data_size) ⇒ Object



261
262
263
# File 'lib/dalli/protocol/response_processor.rb', line 261

def read_data(data_size)
  @io_source.read(data_size + TERMINATOR.bytesize)&.chomp!(TERMINATOR)
end

#read_lineObject



252
253
254
# File 'lib/dalli/protocol/response_processor.rb', line 252

def read_line
  @io_source.read_line&.chomp!(TERMINATOR)
end

#resetObject



142
143
144
145
146
# File 'lib/dalli/protocol/response_processor.rb', line 142

def reset
  error_on_unexpected!(T_RESET)

  true
end

#statsObject



126
127
128
129
130
131
132
133
134
# File 'lib/dalli/protocol/response_processor.rb', line 126

def stats
  tokens = error_on_unexpected!(T_END_TOKEN_STAT)
  values = {}
  while tokens.first != END_TOKEN
    values[tokens[1]] = tokens[2]
    tokens = next_line_to_tokens
  end
  values
end

#value_from_tokens(tokens, flag) ⇒ Object



245
246
247
248
249
250
# File 'lib/dalli/protocol/response_processor.rb', line 245

def value_from_tokens(tokens, flag)
  bitflags_token = tokens.find { |t| t.start_with?(flag) }
  return 0 unless bitflags_token

  bitflags_token[1..]
end

#versionObject



148
149
150
151
# File 'lib/dalli/protocol/response_processor.rb', line 148

def version
  tokens = error_on_unexpected!(T_VERSION)
  tokens.last
end