Class: Dalli::Protocol::Meta::ResponseProcessor
- Inherits:
-
Object
- Object
- Dalli::Protocol::Meta::ResponseProcessor
- Defined in:
- lib/dalli/protocol/response_processor.rb
Overview
Class that encapsulates logic for processing meta protocol responses from memcached. Includes logic for pulling data from an IO source and parsing into local values. Handles errors on unexpected values.
Constant Summary collapse
- EN =
'EN'- END_TOKEN =
'END'- EX =
'EX'- HD =
'HD'- MN =
'MN'- NF =
'NF'- NS =
'NS'- OK =
'OK'- RESET =
'RESET'- STAT =
'STAT'- VA =
'VA'- VERSION =
'VERSION'- SERVER_ERROR =
'SERVER_ERROR'- T_OK =
[OK].freeze
- T_RESET =
[RESET].freeze
- T_EN_HD =
[EN, HD].freeze
- T_VERSION =
[VERSION].freeze
- T_VA_EN_HD =
[VA, EN, HD].freeze
- T_HD_NF_EX =
[HD, NF, EX].freeze
- T_HD_NS_NF_EX =
[HD, NS, NF, EX].freeze
- T_VA_NF_NS_EX =
[VA, NF, NS, EX].freeze
- T_END_TOKEN_STAT =
[END_TOKEN, STAT].freeze
Instance Method Summary collapse
- #bitflags_from_tokens(tokens) ⇒ Object
- #body_len_from_tokens(tokens) ⇒ Object
- #build_metadata_result(tokens) ⇒ Object
- #cas_from_tokens(tokens) ⇒ Object
- #consume_all_responses_until_mn ⇒ Object
- #decr_incr ⇒ Object
- #error_on_unexpected!(expected_codes) ⇒ Object
- #flush ⇒ Object
- #full_response_from_buffer(tokens, body, resp_size) ⇒ Object
-
#getk_response_from_buffer(buf, offset = 0) ⇒ Object
This method returns an array of values used in a pipelined getk process.
-
#hit_status_from_tokens(tokens) ⇒ Object
Returns true if item was previously hit, false if first access, nil if not requested The h flag returns h0 (first access) or h1 (previously accessed).
-
#initialize(io_source, value_marshaller) ⇒ ResponseProcessor
constructor
A new instance of ResponseProcessor.
- #key_from_tokens(tokens) ⇒ Object
-
#last_access_from_tokens(tokens) ⇒ Object
Returns seconds since last access, or nil if not requested The l flag returns l<seconds>.
- #meta_delete ⇒ Object
-
#meta_get_with_metadata(cache_nils: false, return_hit_status: false, return_last_access: false) ⇒ Object
Returns a hash with all requested metadata: - :value - the cached value (or nil if miss) - :cas - the CAS value (if return_cas was requested) - :won_recache - true if client won the right to recache (W flag) - :stale - true if the item is stale (X flag) - :lost_recache - true if another client is already recaching (Z flag) - :hit_before - true/false if item was previously accessed (h flag, if requested) - :last_access - seconds since last access (l flag, if requested).
- #meta_get_with_value(cache_nils: false) ⇒ Object
- #meta_get_with_value_and_cas ⇒ Object
- #meta_get_without_value ⇒ Object
- #meta_set_append_prepend ⇒ Object
- #meta_set_with_cas ⇒ Object
- #next_line_to_tokens ⇒ Object
- #parse_value_from_tokens(tokens, cache_nils) ⇒ Object
- #read_data(data_size) ⇒ Object
- #read_line ⇒ Object
- #reset ⇒ Object
- #stats ⇒ Object
- #value_from_tokens(tokens, flag) ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(io_source, value_marshaller) ⇒ ResponseProcessor
Returns a new instance of ResponseProcessor.
36 37 38 39 |
# File 'lib/dalli/protocol/response_processor.rb', line 36 def initialize(io_source, value_marshaller) @io_source = io_source @value_marshaller = value_marshaller end |
Instance Method Details
#bitflags_from_tokens(tokens) ⇒ Object
212 213 214 |
# File 'lib/dalli/protocol/response_processor.rb', line 212 def bitflags_from_tokens(tokens) value_from_tokens(tokens, 'f')&.to_i end |
#body_len_from_tokens(tokens) ⇒ Object
241 242 243 |
# File 'lib/dalli/protocol/response_processor.rb', line 241 def body_len_from_tokens(tokens) value_from_tokens(tokens, 's')&.to_i end |
#build_metadata_result(tokens) ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/dalli/protocol/response_processor.rb', line 84 def (tokens) { value: nil, cas: cas_from_tokens(tokens), won_recache: tokens.include?('W'), stale: tokens.include?('X'), lost_recache: tokens.include?('Z') } end |
#cas_from_tokens(tokens) ⇒ Object
216 217 218 |
# File 'lib/dalli/protocol/response_processor.rb', line 216 def cas_from_tokens(tokens) value_from_tokens(tokens, 'c')&.to_i end |
#consume_all_responses_until_mn ⇒ Object
153 154 155 156 157 158 |
# File 'lib/dalli/protocol/response_processor.rb', line 153 def consume_all_responses_until_mn tokens = next_line_to_tokens tokens = next_line_to_tokens while tokens.first != MN true end |
#decr_incr ⇒ Object
118 119 120 121 122 123 124 |
# File 'lib/dalli/protocol/response_processor.rb', line 118 def decr_incr tokens = error_on_unexpected!(T_VA_NF_NS_EX) return false if [NS, EX].include?(tokens.first) return nil if tokens.first == NF read_line.to_i end |
#error_on_unexpected!(expected_codes) ⇒ Object
202 203 204 205 206 207 208 209 210 |
# File 'lib/dalli/protocol/response_processor.rb', line 202 def error_on_unexpected!(expected_codes) tokens = next_line_to_tokens return tokens if expected_codes.include?(tokens.first) raise Dalli::ServerError, tokens.join(' ').to_s if tokens.first == SERVER_ERROR raise Dalli::DalliError, "Response error: #{tokens.first}" end |
#flush ⇒ Object
136 137 138 139 140 |
# File 'lib/dalli/protocol/response_processor.rb', line 136 def flush error_on_unexpected!(T_OK) true end |
#full_response_from_buffer(tokens, body, resp_size) ⇒ Object
160 161 162 163 |
# File 'lib/dalli/protocol/response_processor.rb', line 160 def full_response_from_buffer(tokens, body, resp_size) value = @value_marshaller.retrieve(body, bitflags_from_tokens(tokens)) [resp_size, tokens.first == VA, cas_from_tokens(tokens), key_from_tokens(tokens), value] end |
#getk_response_from_buffer(buf, offset = 0) ⇒ Object
This method returns an array of values used in a pipelined getk process. The first value is the number of bytes by which to advance the pointer in the buffer. If the complete response is found in the buffer, this will be the response size. Otherwise it is zero.
The remaining three values in the array are the ResponseHeader, key, and value.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/dalli/protocol/response_processor.rb', line 175 def getk_response_from_buffer(buf, offset = 0) # Find the header terminator starting from offset term_idx = buf.index(TERMINATOR, offset) return [0, nil, nil, nil, nil] unless term_idx header = buf.byteslice(offset, term_idx - offset) tokens = header.split header_len = header.bytesize + TERMINATOR.length body_len = body_len_from_tokens(tokens) # We have a complete response that has no body. # This is either the response to the terminating # noop or, if the status is not MN, an intermediate # error response that needs to be discarded. return [header_len, true, nil, nil, nil] if body_len.zero? resp_size = header_len + body_len + TERMINATOR.length # The header is in the buffer, but the body is not. As we don't have # a complete response, don't advance the buffer return [0, nil, nil, nil, nil] unless buf.bytesize >= offset + resp_size # The full response is in our buffer, so parse it and return # the values body = buf.byteslice(offset + header_len, body_len) full_response_from_buffer(tokens, body, resp_size) end |
#hit_status_from_tokens(tokens) ⇒ Object
Returns true if item was previously hit, false if first access, nil if not requested The h flag returns h0 (first access) or h1 (previously accessed)
228 229 230 231 232 233 |
# File 'lib/dalli/protocol/response_processor.rb', line 228 def hit_status_from_tokens(tokens) hit_token = tokens.find { |t| t.start_with?('h') && t.length == 2 } return nil unless hit_token hit_token[1] == '1' end |
#key_from_tokens(tokens) ⇒ Object
220 221 222 223 224 |
# File 'lib/dalli/protocol/response_processor.rb', line 220 def key_from_tokens(tokens) encoded_key = value_from_tokens(tokens, 'k') base64_encoded = tokens.any?('b') KeyRegularizer.decode(encoded_key, base64_encoded) end |
#last_access_from_tokens(tokens) ⇒ Object
Returns seconds since last access, or nil if not requested The l flag returns l<seconds>
237 238 239 |
# File 'lib/dalli/protocol/response_processor.rb', line 237 def last_access_from_tokens(tokens) value_from_tokens(tokens, 'l')&.to_i end |
#meta_delete ⇒ Object
113 114 115 116 |
# File 'lib/dalli/protocol/response_processor.rb', line 113 def tokens = error_on_unexpected!(T_HD_NF_EX) tokens.first == HD end |
#meta_get_with_metadata(cache_nils: false, return_hit_status: false, return_last_access: false) ⇒ Object
Returns a hash with all requested metadata:
-
:value - the cached value (or nil if miss)
-
:cas - the CAS value (if return_cas was requested)
-
:won_recache - true if client won the right to recache (W flag)
-
:stale - true if the item is stale (X flag)
-
:lost_recache - true if another client is already recaching (Z flag)
-
:hit_before - true/false if item was previously accessed (h flag, if requested)
-
:last_access - seconds since last access (l flag, if requested)
Used by meta_get for comprehensive metadata retrieval. Supports thundering herd protection (N/R flags) and metadata flags (h/l/u).
75 76 77 78 79 80 81 82 |
# File 'lib/dalli/protocol/response_processor.rb', line 75 def (cache_nils: false, return_hit_status: false, return_last_access: false) tokens = error_on_unexpected!(T_VA_EN_HD) result = (tokens) result[:hit_before] = hit_status_from_tokens(tokens) if return_hit_status result[:last_access] = last_access_from_tokens(tokens) if return_last_access result[:value] = parse_value_from_tokens(tokens, cache_nils) result end |
#meta_get_with_value(cache_nils: false) ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/dalli/protocol/response_processor.rb', line 41 def (cache_nils: false) tokens = error_on_unexpected!(T_VA_EN_HD) return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN return true unless tokens.first == VA @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)) end |
#meta_get_with_value_and_cas ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/dalli/protocol/response_processor.rb', line 49 def tokens = error_on_unexpected!(T_VA_EN_HD) return [nil, 0] if tokens.first == EN cas = cas_from_tokens(tokens) return [nil, cas] unless tokens.first == VA [@value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)), cas] end |
#meta_get_without_value ⇒ Object
59 60 61 62 |
# File 'lib/dalli/protocol/response_processor.rb', line 59 def tokens = error_on_unexpected!(T_EN_HD) tokens.first == EN ? nil : true end |
#meta_set_append_prepend ⇒ Object
106 107 108 109 110 111 |
# File 'lib/dalli/protocol/response_processor.rb', line 106 def tokens = error_on_unexpected!(T_HD_NS_NF_EX) return false unless tokens.first == HD true end |
#meta_set_with_cas ⇒ Object
99 100 101 102 103 104 |
# File 'lib/dalli/protocol/response_processor.rb', line 99 def tokens = error_on_unexpected!(T_HD_NS_NF_EX) return false unless tokens.first == HD cas_from_tokens(tokens) end |
#next_line_to_tokens ⇒ Object
256 257 258 259 |
# File 'lib/dalli/protocol/response_processor.rb', line 256 def next_line_to_tokens line = read_line line&.split || [] end |
#parse_value_from_tokens(tokens, cache_nils) ⇒ Object
92 93 94 95 96 97 |
# File 'lib/dalli/protocol/response_processor.rb', line 92 def parse_value_from_tokens(tokens, cache_nils) return cache_nils ? ::Dalli::NOT_FOUND : nil if tokens.first == EN return unless tokens.first == VA @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)) end |
#read_data(data_size) ⇒ Object
261 262 263 |
# File 'lib/dalli/protocol/response_processor.rb', line 261 def read_data(data_size) @io_source.read(data_size + TERMINATOR.bytesize)&.chomp!(TERMINATOR) end |
#read_line ⇒ Object
252 253 254 |
# File 'lib/dalli/protocol/response_processor.rb', line 252 def read_line @io_source.read_line&.chomp!(TERMINATOR) end |
#reset ⇒ Object
142 143 144 145 146 |
# File 'lib/dalli/protocol/response_processor.rb', line 142 def reset error_on_unexpected!(T_RESET) true end |
#stats ⇒ Object
126 127 128 129 130 131 132 133 134 |
# File 'lib/dalli/protocol/response_processor.rb', line 126 def stats tokens = error_on_unexpected!(T_END_TOKEN_STAT) values = {} while tokens.first != END_TOKEN values[tokens[1]] = tokens[2] tokens = next_line_to_tokens end values end |
#value_from_tokens(tokens, flag) ⇒ Object
245 246 247 248 249 250 |
# File 'lib/dalli/protocol/response_processor.rb', line 245 def value_from_tokens(tokens, flag) bitflags_token = tokens.find { |t| t.start_with?(flag) } return 0 unless bitflags_token bitflags_token[1..] end |
#version ⇒ Object
148 149 150 151 |
# File 'lib/dalli/protocol/response_processor.rb', line 148 def version tokens = error_on_unexpected!(T_VERSION) tokens.last end |