File: | src/networking.c |
Warning: | line 1535, column 13 Value stored to 'nwritten' is never read |
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "atomicvar.h" |
32 | #include "cluster.h" |
33 | #include <sys/socket.h> |
34 | #include <sys/uio.h> |
35 | #include <math.h> |
36 | #include <ctype.h> |
37 | |
38 | static void setProtocolError(const char *errstr, client *c); |
39 | int postponeClientRead(client *c); |
40 | int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ |
41 | |
42 | /* Return the size consumed from the allocator, for the specified SDS string, |
43 | * including internal fragmentation. This function is used in order to compute |
44 | * the client output buffer size. */ |
45 | size_t sdsZmallocSize(sds s) { |
46 | void *sh = sdsAllocPtr(s); |
47 | return zmalloc_size(sh)je_malloc_usable_size(sh); |
48 | } |
49 | |
50 | /* Return the amount of memory used by the sds string at object->ptr |
51 | * for a string object. This includes internal fragmentation. */ |
52 | size_t getStringObjectSdsUsedMemory(robj *o) { |
53 | serverAssertWithInfo(NULL,o,o->type == OBJ_STRING)((o->type == 0)?(void)0 : (_serverAssertWithInfo(((void*)0 ),o,"o->type == OBJ_STRING","networking.c",53),__builtin_unreachable ())); |
54 | switch(o->encoding) { |
55 | case OBJ_ENCODING_RAW0: return sdsZmallocSize(o->ptr); |
56 | case OBJ_ENCODING_EMBSTR8: return zmalloc_size(o)je_malloc_usable_size(o)-sizeof(robj); |
57 | default: return 0; /* Just integer encoding for now. */ |
58 | } |
59 | } |
60 | |
61 | /* Return the length of a string object. |
62 | * This does NOT includes internal fragmentation or sds unused space. */ |
63 | size_t getStringObjectLen(robj *o) { |
64 | serverAssertWithInfo(NULL,o,o->type == OBJ_STRING)((o->type == 0)?(void)0 : (_serverAssertWithInfo(((void*)0 ),o,"o->type == OBJ_STRING","networking.c",64),__builtin_unreachable ())); |
65 | switch(o->encoding) { |
66 | case OBJ_ENCODING_RAW0: return sdslen(o->ptr); |
67 | case OBJ_ENCODING_EMBSTR8: return sdslen(o->ptr); |
68 | default: return 0; /* Just integer encoding for now. */ |
69 | } |
70 | } |
71 | |
72 | /* Client.reply list dup and free methods. */ |
73 | void *dupClientReplyValue(void *o) { |
74 | clientReplyBlock *old = o; |
75 | clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size); |
76 | memcpy(buf, o, sizeof(clientReplyBlock) + old->size); |
77 | return buf; |
78 | } |
79 | |
80 | void freeClientReplyValue(void *o) { |
81 | zfree(o); |
82 | } |
83 | |
84 | int listMatchObjects(void *a, void *b) { |
85 | return equalStringObjects(a,b); |
86 | } |
87 | |
88 | /* This function links the client to the global linked list of clients. |
89 | * unlinkClient() does the opposite, among other things. */ |
90 | void linkClient(client *c) { |
91 | listAddNodeTail(server.clients,c); |
92 | /* Note that we remember the linked list node where the client is stored, |
93 | * this way removing the client in unlinkClient() will not require |
94 | * a linear scan, but just a constant time operation. */ |
95 | c->client_list_node = listLast(server.clients)((server.clients)->tail); |
96 | uint64_t id = htonu64(c->id)intrev64(c->id); |
97 | raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL((void*)0)); |
98 | } |
99 | |
100 | /* Initialize client authentication state. |
101 | */ |
102 | static void clientSetDefaultAuth(client *c) { |
103 | /* If the default user does not require authentication, the user is |
104 | * directly authenticated. */ |
105 | c->user = DefaultUser; |
106 | c->authenticated = (c->user->flags & USER_FLAG_NOPASS(1<<4)) && |
107 | !(c->user->flags & USER_FLAG_DISABLED(1<<1)); |
108 | } |
109 | |
110 | client *createClient(connection *conn) { |
111 | client *c = zmalloc(sizeof(client)); |
112 | |
113 | /* passing NULL as conn it is possible to create a non connected client. |
114 | * This is useful since all the commands needs to be executed |
115 | * in the context of a client. When commands are executed in other |
116 | * contexts (for instance a Lua script) we need a non connected client. */ |
117 | if (conn) { |
118 | connNonBlock(conn); |
119 | connEnableTcpNoDelay(conn); |
120 | if (server.tcpkeepalive) |
121 | connKeepAlive(conn,server.tcpkeepalive); |
122 | connSetReadHandler(conn, readQueryFromClient); |
123 | connSetPrivateData(conn, c); |
124 | } |
125 | |
126 | selectDb(c,0); |
127 | uint64_t client_id; |
128 | atomicGetIncr(server.next_client_id, client_id, 1)do { client_id = __c11_atomic_fetch_add(&server.next_client_id ,(1),memory_order_relaxed); } while(0); |
129 | c->id = client_id; |
130 | c->resp = 2; |
131 | c->conn = conn; |
132 | c->name = NULL((void*)0); |
133 | c->bufpos = 0; |
134 | c->qb_pos = 0; |
135 | c->querybuf = sdsempty(); |
136 | c->pending_querybuf = sdsempty(); |
137 | c->querybuf_peak = 0; |
138 | c->reqtype = 0; |
139 | c->argc = 0; |
140 | c->argv = NULL((void*)0); |
141 | c->argv_len_sum = 0; |
142 | c->original_argc = 0; |
143 | c->original_argv = NULL((void*)0); |
144 | c->cmd = c->lastcmd = NULL((void*)0); |
145 | c->multibulklen = 0; |
146 | c->bulklen = -1; |
147 | c->sentlen = 0; |
148 | c->flags = 0; |
149 | c->ctime = c->lastinteraction = server.unixtime; |
150 | clientSetDefaultAuth(c); |
151 | c->replstate = REPL_STATE_NONE; |
152 | c->repl_put_online_on_ack = 0; |
153 | c->reploff = 0; |
154 | c->read_reploff = 0; |
155 | c->repl_ack_off = 0; |
156 | c->repl_ack_time = 0; |
157 | c->slave_listening_port = 0; |
158 | c->slave_addr = NULL((void*)0); |
159 | c->slave_capa = SLAVE_CAPA_NONE0; |
160 | c->reply = listCreate(); |
161 | c->reply_bytes = 0; |
162 | c->obuf_soft_limit_reached_time = 0; |
163 | listSetFreeMethod(c->reply,freeClientReplyValue)((c->reply)->free = (freeClientReplyValue)); |
164 | listSetDupMethod(c->reply,dupClientReplyValue)((c->reply)->dup = (dupClientReplyValue)); |
165 | c->btype = BLOCKED_NONE0; |
166 | c->bpop.timeout = 0; |
167 | c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL((void*)0)); |
168 | c->bpop.target = NULL((void*)0); |
169 | c->bpop.xread_group = NULL((void*)0); |
170 | c->bpop.xread_consumer = NULL((void*)0); |
171 | c->bpop.xread_group_noack = 0; |
172 | c->bpop.numreplicas = 0; |
173 | c->bpop.reploffset = 0; |
174 | c->woff = 0; |
175 | c->watched_keys = listCreate(); |
176 | c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL((void*)0)); |
177 | c->pubsub_patterns = listCreate(); |
178 | c->peerid = NULL((void*)0); |
179 | c->sockname = NULL((void*)0); |
180 | c->client_list_node = NULL((void*)0); |
181 | c->paused_list_node = NULL((void*)0); |
182 | c->client_tracking_redirection = 0; |
183 | c->client_tracking_prefixes = NULL((void*)0); |
184 | c->client_cron_last_memory_usage = 0; |
185 | c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL0; |
186 | c->auth_callback = NULL((void*)0); |
187 | c->auth_callback_privdata = NULL((void*)0); |
188 | c->auth_module = NULL((void*)0); |
189 | listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid)((c->pubsub_patterns)->free = (decrRefCountVoid)); |
190 | listSetMatchMethod(c->pubsub_patterns,listMatchObjects)((c->pubsub_patterns)->match = (listMatchObjects)); |
191 | if (conn) linkClient(c); |
192 | initClientMultiState(c); |
193 | return c; |
194 | } |
195 | |
196 | /* This function puts the client in the queue of clients that should write |
197 | * their output buffers to the socket. Note that it does not *yet* install |
198 | * the write handler, to start clients are put in a queue of clients that need |
199 | * to write, so we try to do that before returning in the event loop (see the |
200 | * handleClientsWithPendingWrites() function). |
201 | * If we fail and there is more data to write, compared to what the socket |
202 | * buffers can hold, then we'll really install the handler. */ |
203 | void clientInstallWriteHandler(client *c) { |
204 | /* Schedule the client to write the output buffers to the socket only |
205 | * if not already done and, for slaves, if the slave can actually receive |
206 | * writes at this stage. */ |
207 | if (!(c->flags & CLIENT_PENDING_WRITE(1<<21)) && |
208 | (c->replstate == REPL_STATE_NONE || |
209 | (c->replstate == SLAVE_STATE_ONLINE9 && !c->repl_put_online_on_ack))) |
210 | { |
211 | /* Here instead of installing the write handler, we just flag the |
212 | * client and put it into a list of clients that have something |
213 | * to write to the socket. This way before re-entering the event |
214 | * loop, we can try to directly write to the client sockets avoiding |
215 | * a system call. We'll only really install the write handler if |
216 | * we'll not be able to write the whole reply at once. */ |
217 | c->flags |= CLIENT_PENDING_WRITE(1<<21); |
218 | listAddNodeHead(server.clients_pending_write,c); |
219 | } |
220 | } |
221 | |
222 | /* This function is called every time we are going to transmit new data |
223 | * to the client. The behavior is the following: |
224 | * |
225 | * If the client should receive new data (normal clients will) the function |
226 | * returns C_OK, and make sure to install the write handler in our event |
227 | * loop so that when the socket is writable new data gets written. |
228 | * |
229 | * If the client should not receive new data, because it is a fake client |
230 | * (used to load AOF in memory), a master or because the setup of the write |
231 | * handler failed, the function returns C_ERR. |
232 | * |
233 | * The function may return C_OK without actually installing the write |
234 | * event handler in the following cases: |
235 | * |
236 | * 1) The event handler should already be installed since the output buffer |
237 | * already contains something. |
238 | * 2) The client is a slave but not yet online, so we want to just accumulate |
239 | * writes in the buffer but not actually sending them yet. |
240 | * |
241 | * Typically gets called every time a reply is built, before adding more |
242 | * data to the clients output buffers. If the function returns C_ERR no |
243 | * data should be appended to the output buffers. */ |
244 | int prepareClientToWrite(client *c) { |
245 | /* If it's the Lua client we always return ok without installing any |
246 | * handler since there is no socket at all. */ |
247 | if (c->flags & (CLIENT_LUA(1<<8)|CLIENT_MODULE(1<<27))) return C_OK0; |
248 | |
249 | /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */ |
250 | if (c->flags & CLIENT_CLOSE_ASAP(1<<10)) return C_ERR-1; |
251 | |
252 | /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ |
253 | if (c->flags & (CLIENT_REPLY_OFF(1<<22)|CLIENT_REPLY_SKIP(1<<24))) return C_ERR-1; |
254 | |
255 | /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag |
256 | * is set. */ |
257 | if ((c->flags & CLIENT_MASTER(1<<1)) && |
258 | !(c->flags & CLIENT_MASTER_FORCE_REPLY(1<<13))) return C_ERR-1; |
259 | |
260 | if (!c->conn) return C_ERR-1; /* Fake client for AOF loading. */ |
261 | |
262 | /* Schedule the client to write the output buffers to the socket, unless |
263 | * it should already be setup to do so (it has already pending data). |
264 | * |
265 | * If CLIENT_PENDING_READ is set, we're in an IO thread and should |
266 | * not install a write handler. Instead, it will be done by |
267 | * handleClientsWithPendingReadsUsingThreads() upon return. |
268 | */ |
269 | if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ(1<<29))) |
270 | clientInstallWriteHandler(c); |
271 | |
272 | /* Authorize the caller to queue in the output buffer of this client. */ |
273 | return C_OK0; |
274 | } |
275 | |
276 | /* ----------------------------------------------------------------------------- |
277 | * Low level functions to add more data to output buffers. |
278 | * -------------------------------------------------------------------------- */ |
279 | |
280 | /* Attempts to add the reply to the static buffer in the client struct. |
281 | * Returns C_ERR if the buffer is full, or the reply list is not empty, |
282 | * in which case the reply must be added to the reply list. */ |
283 | int _addReplyToBuffer(client *c, const char *s, size_t len) { |
284 | size_t available = sizeof(c->buf)-c->bufpos; |
285 | |
286 | if (c->flags & CLIENT_CLOSE_AFTER_REPLY(1<<6)) return C_OK0; |
287 | |
288 | /* If there already are entries in the reply list, we cannot |
289 | * add anything more to the static buffer. */ |
290 | if (listLength(c->reply)((c->reply)->len) > 0) return C_ERR-1; |
291 | |
292 | /* Check that the buffer has enough space available for this string. */ |
293 | if (len > available) return C_ERR-1; |
294 | |
295 | memcpy(c->buf+c->bufpos,s,len); |
296 | c->bufpos+=len; |
297 | return C_OK0; |
298 | } |
299 | |
300 | /* Adds the reply to the reply linked list. |
301 | * Note: some edits to this function need to be relayed to AddReplyFromClient. */ |
302 | void _addReplyProtoToList(client *c, const char *s, size_t len) { |
303 | if (c->flags & CLIENT_CLOSE_AFTER_REPLY(1<<6)) return; |
304 | |
305 | listNode *ln = listLast(c->reply)((c->reply)->tail); |
306 | clientReplyBlock *tail = ln? listNodeValue(ln)((ln)->value): NULL((void*)0); |
307 | |
308 | /* Note that 'tail' may be NULL even if we have a tail node, because when |
309 | * addReplyDeferredLen() is used, it sets a dummy node to NULL just |
310 | * fo fill it later, when the size of the bulk length is set. */ |
311 | |
312 | /* Append to tail string when possible. */ |
313 | if (tail) { |
314 | /* Copy the part we can fit into the tail, and leave the rest for a |
315 | * new node */ |
316 | size_t avail = tail->size - tail->used; |
317 | size_t copy = avail >= len? len: avail; |
318 | memcpy(tail->buf + tail->used, s, copy); |
319 | tail->used += copy; |
320 | s += copy; |
321 | len -= copy; |
322 | } |
323 | if (len) { |
324 | /* Create a new node, make sure it is allocated to at |
325 | * least PROTO_REPLY_CHUNK_BYTES */ |
326 | size_t size = len < PROTO_REPLY_CHUNK_BYTES(16*1024)? PROTO_REPLY_CHUNK_BYTES(16*1024): len; |
327 | tail = zmalloc(size + sizeof(clientReplyBlock)); |
328 | /* take over the allocation's internal fragmentation */ |
329 | tail->size = zmalloc_usable_size(tail)je_malloc_usable_size(tail) - sizeof(clientReplyBlock); |
330 | tail->used = len; |
331 | memcpy(tail->buf, s, len); |
332 | listAddNodeTail(c->reply, tail); |
333 | c->reply_bytes += tail->size; |
334 | } |
335 | asyncCloseClientOnOutputBufferLimitReached(c); |
336 | } |
337 | |
338 | /* ----------------------------------------------------------------------------- |
339 | * Higher level functions to queue data on the client output buffer. |
340 | * The following functions are the ones that commands implementations will call. |
341 | * -------------------------------------------------------------------------- */ |
342 | |
343 | /* Add the object 'obj' string representation to the client output buffer. */ |
344 | void addReply(client *c, robj *obj) { |
345 | if (prepareClientToWrite(c) != C_OK0) return; |
346 | |
347 | if (sdsEncodedObject(obj)(obj->encoding == 0 || obj->encoding == 8)) { |
348 | if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK0) |
349 | _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); |
350 | } else if (obj->encoding == OBJ_ENCODING_INT1) { |
351 | /* For integer encoded strings we just convert it into a string |
352 | * using our optimized function, and attach the resulting string |
353 | * to the output buffer. */ |
354 | char buf[32]; |
355 | size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); |
356 | if (_addReplyToBuffer(c,buf,len) != C_OK0) |
357 | _addReplyProtoToList(c,buf,len); |
358 | } else { |
359 | serverPanic("Wrong obj->encoding in addReply()")_serverPanic("networking.c",359,"Wrong obj->encoding in addReply()" ),__builtin_unreachable(); |
360 | } |
361 | } |
362 | |
363 | /* Add the SDS 's' string to the client output buffer, as a side effect |
364 | * the SDS string is freed. */ |
365 | void addReplySds(client *c, sds s) { |
366 | if (prepareClientToWrite(c) != C_OK0) { |
367 | /* The caller expects the sds to be free'd. */ |
368 | sdsfree(s); |
369 | return; |
370 | } |
371 | if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK0) |
372 | _addReplyProtoToList(c,s,sdslen(s)); |
373 | sdsfree(s); |
374 | } |
375 | |
376 | /* This low level function just adds whatever protocol you send it to the |
377 | * client buffer, trying the static buffer initially, and using the string |
378 | * of objects if not possible. |
379 | * |
380 | * It is efficient because does not create an SDS object nor an Redis object |
381 | * if not needed. The object will only be created by calling |
382 | * _addReplyProtoToList() if we fail to extend the existing tail object |
383 | * in the list of objects. */ |
384 | void addReplyProto(client *c, const char *s, size_t len) { |
385 | if (prepareClientToWrite(c) != C_OK0) return; |
386 | if (_addReplyToBuffer(c,s,len) != C_OK0) |
387 | _addReplyProtoToList(c,s,len); |
388 | } |
389 | |
390 | /* Low level function called by the addReplyError...() functions. |
391 | * It emits the protocol for a Redis error, in the form: |
392 | * |
393 | * -ERRORCODE Error Message<CR><LF> |
394 | * |
395 | * If the error code is already passed in the string 's', the error |
396 | * code provided is used, otherwise the string "-ERR " for the generic |
397 | * error code is automatically added. |
398 | * Note that 's' must NOT end with \r\n. */ |
399 | void addReplyErrorLength(client *c, const char *s, size_t len) { |
400 | /* If the string already starts with "-..." then the error code |
401 | * is provided by the caller. Otherwise we use "-ERR". */ |
402 | if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5); |
403 | addReplyProto(c,s,len); |
404 | addReplyProto(c,"\r\n",2); |
405 | } |
406 | |
407 | /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ |
408 | void afterErrorReply(client *c, const char *s, size_t len) { |
409 | /* Increment the global error counter */ |
410 | server.stat_total_error_replies++; |
411 | /* Increment the error stats |
412 | * If the string already starts with "-..." then the error prefix |
413 | * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */ |
414 | if (s[0] != '-') { |
415 | incrementErrorCount("ERR", 3); |
416 | } else { |
417 | char *spaceloc = memchr(s, ' ', len < 32 ? len : 32); |
418 | if (spaceloc) { |
419 | const size_t errEndPos = (size_t)(spaceloc - s); |
420 | incrementErrorCount(s+1, errEndPos-1); |
421 | } else { |
422 | /* Fallback to ERR if we can't retrieve the error prefix */ |
423 | incrementErrorCount("ERR", 3); |
424 | } |
425 | } |
426 | |
427 | /* Sometimes it could be normal that a slave replies to a master with |
428 | * an error and this function gets called. Actually the error will never |
429 | * be sent because addReply*() against master clients has no effect... |
430 | * A notable example is: |
431 | * |
432 | * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x |
433 | * |
434 | * Where the master must propagate the first change even if the second |
435 | * will produce an error. However it is useful to log such events since |
436 | * they are rare and may hint at errors in a script or a bug in Redis. */ |
437 | int ctype = getClientType(c); |
438 | if (ctype == CLIENT_TYPE_MASTER3 || ctype == CLIENT_TYPE_SLAVE1 || c->id == CLIENT_ID_AOF((18446744073709551615UL))) { |
439 | char *to, *from; |
440 | |
441 | if (c->id == CLIENT_ID_AOF((18446744073709551615UL))) { |
442 | to = "AOF-loading-client"; |
443 | from = "server"; |
444 | } else if (ctype == CLIENT_TYPE_MASTER3) { |
445 | to = "master"; |
446 | from = "replica"; |
447 | } else { |
448 | to = "replica"; |
449 | from = "master"; |
450 | } |
451 | |
452 | if (len > 4096) len = 4096; |
453 | char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>"; |
454 | serverLog(LL_WARNING3,"== CRITICAL == This %s is sending an error " |
455 | "to its %s: '%.*s' after processing the command " |
456 | "'%s'", from, to, (int)len, s, cmdname); |
457 | if (ctype == CLIENT_TYPE_MASTER3 && server.repl_backlog && |
458 | server.repl_backlog_histlen > 0) |
459 | { |
460 | showLatestBacklog(); |
461 | } |
462 | server.stat_unexpected_error_replies++; |
463 | } |
464 | } |
465 | |
466 | /* The 'err' object is expected to start with -ERRORCODE and end with \r\n. |
467 | * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */ |
468 | void addReplyErrorObject(client *c, robj *err) { |
469 | addReply(c, err); |
470 | afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */ |
471 | } |
472 | |
473 | /* See addReplyErrorLength for expectations from the input string. */ |
474 | void addReplyError(client *c, const char *err) { |
475 | addReplyErrorLength(c,err,strlen(err)); |
476 | afterErrorReply(c,err,strlen(err)); |
477 | } |
478 | |
479 | /* See addReplyErrorLength for expectations from the input string. */ |
480 | /* As a side effect the SDS string is freed. */ |
481 | void addReplyErrorSds(client *c, sds err) { |
482 | addReplyErrorLength(c,err,sdslen(err)); |
483 | afterErrorReply(c,err,sdslen(err)); |
484 | sdsfree(err); |
485 | } |
486 | |
487 | /* See addReplyErrorLength for expectations from the formatted string. |
488 | * The formatted string is safe to contain \r and \n anywhere. */ |
489 | void addReplyErrorFormat(client *c, const char *fmt, ...) { |
490 | va_list ap; |
491 | va_start(ap,fmt)__builtin_va_start(ap, fmt); |
492 | sds s = sdscatvprintf(sdsempty(),fmt,ap); |
493 | va_end(ap)__builtin_va_end(ap); |
494 | /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */ |
495 | s = sdstrim(s, "\r\n"); |
496 | /* Make sure there are no newlines in the middle of the string, otherwise |
497 | * invalid protocol is emitted. */ |
498 | s = sdsmapchars(s, "\r\n", " ", 2); |
499 | addReplyErrorLength(c,s,sdslen(s)); |
500 | afterErrorReply(c,s,sdslen(s)); |
501 | sdsfree(s); |
502 | } |
503 | |
504 | void addReplyStatusLength(client *c, const char *s, size_t len) { |
505 | addReplyProto(c,"+",1); |
506 | addReplyProto(c,s,len); |
507 | addReplyProto(c,"\r\n",2); |
508 | } |
509 | |
510 | void addReplyStatus(client *c, const char *status) { |
511 | addReplyStatusLength(c,status,strlen(status)); |
512 | } |
513 | |
514 | void addReplyStatusFormat(client *c, const char *fmt, ...) { |
515 | va_list ap; |
516 | va_start(ap,fmt)__builtin_va_start(ap, fmt); |
517 | sds s = sdscatvprintf(sdsempty(),fmt,ap); |
518 | va_end(ap)__builtin_va_end(ap); |
519 | addReplyStatusLength(c,s,sdslen(s)); |
520 | sdsfree(s); |
521 | } |
522 | |
523 | /* Sometimes we are forced to create a new reply node, and we can't append to |
524 | * the previous one, when that happens, we wanna try to trim the unused space |
525 | * at the end of the last reply node which we won't use anymore. */ |
526 | void trimReplyUnusedTailSpace(client *c) { |
527 | listNode *ln = listLast(c->reply)((c->reply)->tail); |
528 | clientReplyBlock *tail = ln? listNodeValue(ln)((ln)->value): NULL((void*)0); |
529 | |
530 | /* Note that 'tail' may be NULL even if we have a tail node, because when |
531 | * addReplyDeferredLen() is used */ |
532 | if (!tail) return; |
533 | |
534 | /* We only try to trim the space is relatively high (more than a 1/4 of the |
535 | * allocation), otherwise there's a high chance realloc will NOP. |
536 | * Also, to avoid large memmove which happens as part of realloc, we only do |
537 | * that if the used part is small. */ |
538 | if (tail->size - tail->used > tail->size / 4 && |
539 | tail->used < PROTO_REPLY_CHUNK_BYTES(16*1024)) |
540 | { |
541 | size_t old_size = tail->size; |
542 | tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); |
543 | /* take over the allocation's internal fragmentation (at least for |
544 | * memory usage tracking) */ |
545 | tail->size = zmalloc_usable_size(tail)je_malloc_usable_size(tail) - sizeof(clientReplyBlock); |
546 | c->reply_bytes = c->reply_bytes + tail->size - old_size; |
547 | listNodeValue(ln)((ln)->value) = tail; |
548 | } |
549 | } |
550 | |
551 | /* Adds an empty object to the reply list that will contain the multi bulk |
552 | * length, which is not known when this function is called. */ |
553 | void *addReplyDeferredLen(client *c) { |
554 | /* Note that we install the write event here even if the object is not |
555 | * ready to be sent, since we are sure that before returning to the |
556 | * event loop setDeferredAggregateLen() will be called. */ |
557 | if (prepareClientToWrite(c) != C_OK0) return NULL((void*)0); |
558 | trimReplyUnusedTailSpace(c); |
559 | listAddNodeTail(c->reply,NULL((void*)0)); /* NULL is our placeholder. */ |
560 | return listLast(c->reply)((c->reply)->tail); |
561 | } |
562 | |
563 | void setDeferredReply(client *c, void *node, const char *s, size_t length) { |
564 | listNode *ln = (listNode*)node; |
565 | clientReplyBlock *next; |
566 | |
567 | /* Abort when *node is NULL: when the client should not accept writes |
568 | * we return NULL in addReplyDeferredLen() */ |
569 | if (node == NULL((void*)0)) return; |
570 | serverAssert(!listNodeValue(ln))((!((ln)->value))?(void)0 : (_serverAssert("!listNodeValue(ln)" ,"networking.c",570),__builtin_unreachable())); |
571 | |
572 | /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(), |
573 | * with a new buffer structure containing the protocol needed to specify |
574 | * the length of the array following. However sometimes when there is |
575 | * little memory to move, we may instead remove this NULL node, and prefix |
576 | * our protocol in the node immediately after to it, in order to save a |
577 | * write(2) syscall later. Conditions needed to do it: |
578 | * |
579 | * - The next node is non-NULL, |
580 | * - It has enough room already allocated |
581 | * - And not too large (avoid large memmove) */ |
582 | if (ln->next != NULL((void*)0) && (next = listNodeValue(ln->next)((ln->next)->value)) && |
583 | next->size - next->used >= length && |
584 | next->used < PROTO_REPLY_CHUNK_BYTES(16*1024) * 4) |
585 | { |
586 | memmove(next->buf + length, next->buf, next->used); |
587 | memcpy(next->buf, s, length); |
588 | next->used += length; |
589 | listDelNode(c->reply,ln); |
590 | } else { |
591 | /* Create a new node */ |
592 | clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock)); |
593 | /* Take over the allocation's internal fragmentation */ |
594 | buf->size = zmalloc_usable_size(buf)je_malloc_usable_size(buf) - sizeof(clientReplyBlock); |
595 | buf->used = length; |
596 | memcpy(buf->buf, s, length); |
597 | listNodeValue(ln)((ln)->value) = buf; |
598 | c->reply_bytes += buf->size; |
599 | } |
600 | asyncCloseClientOnOutputBufferLimitReached(c); |
601 | } |
602 | |
603 | /* Populate the length object and try gluing it to the next chunk. */ |
604 | void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { |
605 | serverAssert(length >= 0)((length >= 0)?(void)0 : (_serverAssert("length >= 0","networking.c" ,605),__builtin_unreachable())); |
606 | |
607 | /* Abort when *node is NULL: when the client should not accept writes |
608 | * we return NULL in addReplyDeferredLen() */ |
609 | if (node == NULL((void*)0)) return; |
610 | |
611 | char lenstr[128]; |
612 | size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); |
613 | setDeferredReply(c, node, lenstr, lenstr_len); |
614 | } |
615 | |
616 | void setDeferredArrayLen(client *c, void *node, long length) { |
617 | setDeferredAggregateLen(c,node,length,'*'); |
618 | } |
619 | |
620 | void setDeferredMapLen(client *c, void *node, long length) { |
621 | int prefix = c->resp == 2 ? '*' : '%'; |
622 | if (c->resp == 2) length *= 2; |
623 | setDeferredAggregateLen(c,node,length,prefix); |
624 | } |
625 | |
626 | void setDeferredSetLen(client *c, void *node, long length) { |
627 | int prefix = c->resp == 2 ? '*' : '~'; |
628 | setDeferredAggregateLen(c,node,length,prefix); |
629 | } |
630 | |
631 | void setDeferredAttributeLen(client *c, void *node, long length) { |
632 | int prefix = c->resp == 2 ? '*' : '|'; |
633 | if (c->resp == 2) length *= 2; |
634 | setDeferredAggregateLen(c,node,length,prefix); |
635 | } |
636 | |
637 | void setDeferredPushLen(client *c, void *node, long length) { |
638 | int prefix = c->resp == 2 ? '*' : '>'; |
639 | setDeferredAggregateLen(c,node,length,prefix); |
640 | } |
641 | |
642 | /* Add a double as a bulk reply */ |
643 | void addReplyDouble(client *c, double d) { |
644 | if (isinf(d)__builtin_isinf_sign (d)) { |
645 | /* Libc in odd systems (Hi Solaris!) will format infinite in a |
646 | * different way, so better to handle it in an explicit way. */ |
647 | if (c->resp == 2) { |
648 | addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); |
649 | } else { |
650 | addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", |
651 | d > 0 ? 6 : 7); |
652 | } |
653 | } else { |
654 | char dbuf[MAX_LONG_DOUBLE_CHARS5*1024+3], |
655 | sbuf[MAX_LONG_DOUBLE_CHARS5*1024+32]; |
656 | int dlen, slen; |
657 | if (c->resp == 2) { |
658 | dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); |
659 | slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); |
660 | addReplyProto(c,sbuf,slen); |
661 | } else { |
662 | dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d); |
663 | addReplyProto(c,dbuf,dlen); |
664 | } |
665 | } |
666 | } |
667 | |
668 | /* Add a long double as a bulk reply, but uses a human readable formatting |
669 | * of the double instead of exposing the crude behavior of doubles to the |
670 | * dear user. */ |
671 | void addReplyHumanLongDouble(client *c, long double d) { |
672 | if (c->resp == 2) { |
673 | robj *o = createStringObjectFromLongDouble(d,1); |
674 | addReplyBulk(c,o); |
675 | decrRefCount(o); |
676 | } else { |
677 | char buf[MAX_LONG_DOUBLE_CHARS5*1024]; |
678 | int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); |
679 | addReplyProto(c,",",1); |
680 | addReplyProto(c,buf,len); |
681 | addReplyProto(c,"\r\n",2); |
682 | } |
683 | } |
684 | |
685 | /* Add a long long as integer reply or bulk len / multi bulk count. |
686 | * Basically this is used to output <prefix><long long><crlf>. */ |
687 | void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { |
688 | char buf[128]; |
689 | int len; |
690 | |
691 | /* Things like $3\r\n or *2\r\n are emitted very often by the protocol |
692 | * so we have a few shared objects to use if the integer is small |
693 | * like it is most of the times. */ |
694 | if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN32 && ll >= 0) { |
695 | addReply(c,shared.mbulkhdr[ll]); |
696 | return; |
697 | } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN32 && ll >= 0) { |
698 | addReply(c,shared.bulkhdr[ll]); |
699 | return; |
700 | } |
701 | |
702 | buf[0] = prefix; |
703 | len = ll2string(buf+1,sizeof(buf)-1,ll); |
704 | buf[len+1] = '\r'; |
705 | buf[len+2] = '\n'; |
706 | addReplyProto(c,buf,len+3); |
707 | } |
708 | |
709 | void addReplyLongLong(client *c, long long ll) { |
710 | if (ll == 0) |
711 | addReply(c,shared.czero); |
712 | else if (ll == 1) |
713 | addReply(c,shared.cone); |
714 | else |
715 | addReplyLongLongWithPrefix(c,ll,':'); |
716 | } |
717 | |
718 | void addReplyAggregateLen(client *c, long length, int prefix) { |
719 | serverAssert(length >= 0)((length >= 0)?(void)0 : (_serverAssert("length >= 0","networking.c" ,719),__builtin_unreachable())); |
720 | addReplyLongLongWithPrefix(c,length,prefix); |
721 | } |
722 | |
723 | void addReplyArrayLen(client *c, long length) { |
724 | addReplyAggregateLen(c,length,'*'); |
725 | } |
726 | |
727 | void addReplyMapLen(client *c, long length) { |
728 | int prefix = c->resp == 2 ? '*' : '%'; |
729 | if (c->resp == 2) length *= 2; |
730 | addReplyAggregateLen(c,length,prefix); |
731 | } |
732 | |
733 | void addReplySetLen(client *c, long length) { |
734 | int prefix = c->resp == 2 ? '*' : '~'; |
735 | addReplyAggregateLen(c,length,prefix); |
736 | } |
737 | |
738 | void addReplyAttributeLen(client *c, long length) { |
739 | int prefix = c->resp == 2 ? '*' : '|'; |
740 | if (c->resp == 2) length *= 2; |
741 | addReplyAggregateLen(c,length,prefix); |
742 | } |
743 | |
744 | void addReplyPushLen(client *c, long length) { |
745 | int prefix = c->resp == 2 ? '*' : '>'; |
746 | addReplyAggregateLen(c,length,prefix); |
747 | } |
748 | |
749 | void addReplyNull(client *c) { |
750 | if (c->resp == 2) { |
751 | addReplyProto(c,"$-1\r\n",5); |
752 | } else { |
753 | addReplyProto(c,"_\r\n",3); |
754 | } |
755 | } |
756 | |
757 | void addReplyBool(client *c, int b) { |
758 | if (c->resp == 2) { |
759 | addReply(c, b ? shared.cone : shared.czero); |
760 | } else { |
761 | addReplyProto(c, b ? "#t\r\n" : "#f\r\n",4); |
762 | } |
763 | } |
764 | |
765 | /* A null array is a concept that no longer exists in RESP3. However |
766 | * RESP2 had it, so API-wise we have this call, that will emit the correct |
767 | * RESP2 protocol, however for RESP3 the reply will always be just the |
768 | * Null type "_\r\n". */ |
769 | void addReplyNullArray(client *c) { |
770 | if (c->resp == 2) { |
771 | addReplyProto(c,"*-1\r\n",5); |
772 | } else { |
773 | addReplyProto(c,"_\r\n",3); |
774 | } |
775 | } |
776 | |
777 | /* Create the length prefix of a bulk reply, example: $2234 */ |
778 | void addReplyBulkLen(client *c, robj *obj) { |
779 | size_t len = stringObjectLen(obj); |
780 | |
781 | addReplyLongLongWithPrefix(c,len,'$'); |
782 | } |
783 | |
784 | /* Add a Redis Object as a bulk reply */ |
785 | void addReplyBulk(client *c, robj *obj) { |
786 | addReplyBulkLen(c,obj); |
787 | addReply(c,obj); |
788 | addReply(c,shared.crlf); |
789 | } |
790 | |
791 | /* Add a C buffer as bulk reply */ |
792 | void addReplyBulkCBuffer(client *c, const void *p, size_t len) { |
793 | addReplyLongLongWithPrefix(c,len,'$'); |
794 | addReplyProto(c,p,len); |
795 | addReply(c,shared.crlf); |
796 | } |
797 | |
798 | /* Add sds to reply (takes ownership of sds and frees it) */ |
799 | void addReplyBulkSds(client *c, sds s) { |
800 | addReplyLongLongWithPrefix(c,sdslen(s),'$'); |
801 | addReplySds(c,s); |
802 | addReply(c,shared.crlf); |
803 | } |
804 | |
805 | /* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */ |
806 | void setDeferredReplyBulkSds(client *c, void *node, sds s) { |
807 | sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s); |
808 | setDeferredReply(c, node, reply, sdslen(reply)); |
809 | sdsfree(reply); |
810 | sdsfree(s); |
811 | } |
812 | |
813 | /* Add a C null term string as bulk reply */ |
814 | void addReplyBulkCString(client *c, const char *s) { |
815 | if (s == NULL((void*)0)) { |
816 | addReplyNull(c); |
817 | } else { |
818 | addReplyBulkCBuffer(c,s,strlen(s)); |
819 | } |
820 | } |
821 | |
822 | /* Add a long long as a bulk reply */ |
823 | void addReplyBulkLongLong(client *c, long long ll) { |
824 | char buf[64]; |
825 | int len; |
826 | |
827 | len = ll2string(buf,64,ll); |
828 | addReplyBulkCBuffer(c,buf,len); |
829 | } |
830 | |
831 | /* Reply with a verbatim type having the specified extension. |
832 | * |
833 | * The 'ext' is the "extension" of the file, actually just a three |
834 | * character type that describes the format of the verbatim string. |
835 | * For instance "txt" means it should be interpreted as a text only |
836 | * file by the receiver, "md " as markdown, and so forth. Only the |
837 | * three first characters of the extension are used, and if the |
838 | * provided one is shorter than that, the remaining is filled with |
839 | * spaces. */ |
840 | void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { |
841 | if (c->resp == 2) { |
842 | addReplyBulkCBuffer(c,s,len); |
843 | } else { |
844 | char buf[32]; |
845 | size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); |
846 | char *p = buf+preflen-4; |
847 | for (int i = 0; i < 3; i++) { |
848 | if (*ext == '\0') { |
849 | p[i] = ' '; |
850 | } else { |
851 | p[i] = *ext++; |
852 | } |
853 | } |
854 | addReplyProto(c,buf,preflen); |
855 | addReplyProto(c,s,len); |
856 | addReplyProto(c,"\r\n",2); |
857 | } |
858 | } |
859 | |
860 | /* Add an array of C strings as status replies with a heading. |
861 | * This function is typically invoked by from commands that support |
862 | * subcommands in response to the 'help' subcommand. The help array |
863 | * is terminated by NULL sentinel. */ |
864 | void addReplyHelp(client *c, const char **help) { |
865 | sds cmd = sdsnew((char*) c->argv[0]->ptr); |
866 | void *blenp = addReplyDeferredLen(c); |
867 | int blen = 0; |
868 | |
869 | sdstoupper(cmd); |
870 | addReplyStatusFormat(c, |
871 | "%s <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",cmd); |
872 | sdsfree(cmd); |
873 | |
874 | while (help[blen]) addReplyStatus(c,help[blen++]); |
875 | |
876 | addReplyStatus(c,"HELP"); |
877 | addReplyStatus(c," Prints this help."); |
878 | |
879 | blen += 1; /* Account for the header. */ |
880 | blen += 2; /* Account for the footer. */ |
881 | setDeferredArrayLen(c,blenp,blen); |
882 | } |
883 | |
884 | /* Add a suggestive error reply. |
885 | * This function is typically invoked by from commands that support |
886 | * subcommands in response to an unknown subcommand or argument error. */ |
887 | void addReplySubcommandSyntaxError(client *c) { |
888 | sds cmd = sdsnew((char*) c->argv[0]->ptr); |
889 | sdstoupper(cmd); |
890 | addReplyErrorFormat(c, |
891 | "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.", |
892 | (char*)c->argv[1]->ptr,cmd); |
893 | sdsfree(cmd); |
894 | } |
895 | |
896 | /* Append 'src' client output buffers into 'dst' client output buffers. |
897 | * This function clears the output buffers of 'src' */ |
898 | void AddReplyFromClient(client *dst, client *src) { |
899 | /* If the source client contains a partial response due to client output |
900 | * buffer limits, propagate that to the dest rather than copy a partial |
901 | * reply. We don't wanna run the risk of copying partial response in case |
902 | * for some reason the output limits don't reach the same decision (maybe |
903 | * they changed) */ |
904 | if (src->flags & CLIENT_CLOSE_ASAP(1<<10)) { |
905 | sds client = catClientInfoString(sdsempty(),dst); |
906 | freeClientAsync(dst); |
907 | serverLog(LL_WARNING3,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); |
908 | sdsfree(client); |
909 | return; |
910 | } |
911 | |
912 | /* First add the static buffer (either into the static buffer or reply list) */ |
913 | addReplyProto(dst,src->buf, src->bufpos); |
914 | |
915 | /* We need to check with prepareClientToWrite again (after addReplyProto) |
916 | * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */ |
917 | if (prepareClientToWrite(dst) != C_OK0) |
918 | return; |
919 | |
920 | /* We're bypassing _addReplyProtoToList, so we need to add the pre/post |
921 | * checks in it. */ |
922 | if (dst->flags & CLIENT_CLOSE_AFTER_REPLY(1<<6)) return; |
923 | |
924 | /* Concatenate the reply list into the dest */ |
925 | if (listLength(src->reply)((src->reply)->len)) |
926 | listJoin(dst->reply,src->reply); |
927 | dst->reply_bytes += src->reply_bytes; |
928 | src->reply_bytes = 0; |
929 | src->bufpos = 0; |
930 | |
931 | /* Check output buffer limits */ |
932 | asyncCloseClientOnOutputBufferLimitReached(dst); |
933 | } |
934 | |
935 | /* Copy 'src' client output buffers into 'dst' client output buffers. |
936 | * The function takes care of freeing the old output buffers of the |
937 | * destination client. */ |
938 | void copyClientOutputBuffer(client *dst, client *src) { |
939 | listRelease(dst->reply); |
940 | dst->sentlen = 0; |
941 | dst->reply = listDup(src->reply); |
942 | memcpy(dst->buf,src->buf,src->bufpos); |
943 | dst->bufpos = src->bufpos; |
944 | dst->reply_bytes = src->reply_bytes; |
945 | } |
946 | |
947 | /* Return true if the specified client has pending reply buffers to write to |
948 | * the socket. */ |
949 | int clientHasPendingReplies(client *c) { |
950 | return c->bufpos || listLength(c->reply)((c->reply)->len); |
951 | } |
952 | |
953 | void clientAcceptHandler(connection *conn) { |
954 | client *c = connGetPrivateData(conn); |
955 | |
956 | if (connGetState(conn) != CONN_STATE_CONNECTED) { |
957 | serverLog(LL_WARNING3, |
958 | "Error accepting a client connection: %s", |
959 | connGetLastError(conn)); |
960 | freeClientAsync(c); |
961 | return; |
962 | } |
963 | |
964 | /* If the server is running in protected mode (the default) and there |
965 | * is no password set, nor a specific interface is bound, we don't accept |
966 | * requests from non loopback interfaces. Instead we try to explain the |
967 | * user what to do to fix it if needed. */ |
968 | if (server.protected_mode && |
969 | server.bindaddr_count == 0 && |
970 | DefaultUser->flags & USER_FLAG_NOPASS(1<<4) && |
971 | !(c->flags & CLIENT_UNIX_SOCKET(1<<11))) |
972 | { |
973 | char cip[NET_IP_STR_LEN46+1] = { 0 }; |
974 | connPeerToString(conn, cip, sizeof(cip)-1, NULL((void*)0)); |
975 | |
976 | if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) { |
977 | char *err = |
978 | "-DENIED Redis is running in protected mode because protected " |
979 | "mode is enabled, no bind address was specified, no " |
980 | "authentication password is requested to clients. In this mode " |
981 | "connections are only accepted from the loopback interface. " |
982 | "If you want to connect from external computers to Redis you " |
983 | "may adopt one of the following solutions: " |
984 | "1) Just disable protected mode sending the command " |
985 | "'CONFIG SET protected-mode no' from the loopback interface " |
986 | "by connecting to Redis from the same host the server is " |
987 | "running, however MAKE SURE Redis is not publicly accessible " |
988 | "from internet if you do so. Use CONFIG REWRITE to make this " |
989 | "change permanent. " |
990 | "2) Alternatively you can just disable the protected mode by " |
991 | "editing the Redis configuration file, and setting the protected " |
992 | "mode option to 'no', and then restarting the server. " |
993 | "3) If you started the server manually just for testing, restart " |
994 | "it with the '--protected-mode no' option. " |
995 | "4) Setup a bind address or an authentication password. " |
996 | "NOTE: You only need to do one of the above things in order for " |
997 | "the server to start accepting connections from the outside.\r\n"; |
998 | if (connWrite(c->conn,err,strlen(err)) == -1) { |
999 | /* Nothing to do, Just to avoid the warning... */ |
1000 | } |
1001 | server.stat_rejected_conn++; |
1002 | freeClientAsync(c); |
1003 | return; |
1004 | } |
1005 | } |
1006 | |
1007 | server.stat_numconnections++; |
1008 | moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE4, |
1009 | REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED0, |
1010 | c); |
1011 | } |
1012 | |
1013 | #define MAX_ACCEPTS_PER_CALL1000 1000 |
1014 | static void acceptCommonHandler(connection *conn, int flags, char *ip) { |
1015 | client *c; |
1016 | char conninfo[100]; |
1017 | UNUSED(ip)((void) ip); |
1018 | |
1019 | if (connGetState(conn) != CONN_STATE_ACCEPTING) { |
1020 | serverLog(LL_VERBOSE1, |
1021 | "Accepted client connection in error state: %s (conn: %s)", |
1022 | connGetLastError(conn), |
1023 | connGetInfo(conn, conninfo, sizeof(conninfo))); |
1024 | connClose(conn); |
1025 | return; |
1026 | } |
1027 | |
1028 | /* Limit the number of connections we take at the same time. |
1029 | * |
1030 | * Admission control will happen before a client is created and connAccept() |
1031 | * called, because we don't want to even start transport-level negotiation |
1032 | * if rejected. */ |
1033 | if (listLength(server.clients)((server.clients)->len) + getClusterConnectionsCount() |
1034 | >= server.maxclients) |
1035 | { |
1036 | char *err; |
1037 | if (server.cluster_enabled) |
1038 | err = "-ERR max number of clients + cluster " |
1039 | "connections reached\r\n"; |
1040 | else |
1041 | err = "-ERR max number of clients reached\r\n"; |
1042 | |
1043 | /* That's a best effort error message, don't check write errors. |
1044 | * Note that for TLS connections, no handshake was done yet so nothing |
1045 | * is written and the connection will just drop. */ |
1046 | if (connWrite(conn,err,strlen(err)) == -1) { |
1047 | /* Nothing to do, Just to avoid the warning... */ |
1048 | } |
1049 | server.stat_rejected_conn++; |
1050 | connClose(conn); |
1051 | return; |
1052 | } |
1053 | |
1054 | /* Create connection and client */ |
1055 | if ((c = createClient(conn)) == NULL((void*)0)) { |
1056 | serverLog(LL_WARNING3, |
1057 | "Error registering fd event for the new client: %s (conn: %s)", |
1058 | connGetLastError(conn), |
1059 | connGetInfo(conn, conninfo, sizeof(conninfo))); |
1060 | connClose(conn); /* May be already closed, just ignore errors */ |
1061 | return; |
1062 | } |
1063 | |
1064 | /* Last chance to keep flags */ |
1065 | c->flags |= flags; |
1066 | |
1067 | /* Initiate accept. |
1068 | * |
1069 | * Note that connAccept() is free to do two things here: |
1070 | * 1. Call clientAcceptHandler() immediately; |
1071 | * 2. Schedule a future call to clientAcceptHandler(). |
1072 | * |
1073 | * Because of that, we must do nothing else afterwards. |
1074 | */ |
1075 | if (connAccept(conn, clientAcceptHandler) == C_ERR-1) { |
1076 | char conninfo[100]; |
1077 | if (connGetState(conn) == CONN_STATE_ERROR) |
1078 | serverLog(LL_WARNING3, |
1079 | "Error accepting a client connection: %s (conn: %s)", |
1080 | connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); |
1081 | freeClient(connGetPrivateData(conn)); |
1082 | return; |
1083 | } |
1084 | } |
1085 | |
1086 | void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
1087 | int cport, cfd, max = MAX_ACCEPTS_PER_CALL1000; |
1088 | char cip[NET_IP_STR_LEN46]; |
1089 | UNUSED(el)((void) el); |
1090 | UNUSED(mask)((void) mask); |
1091 | UNUSED(privdata)((void) privdata); |
1092 | |
1093 | while(max--) { |
1094 | cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); |
1095 | if (cfd == ANET_ERR-1) { |
1096 | if (errno(*__errno_location ()) != EWOULDBLOCK11) |
1097 | serverLog(LL_WARNING3, |
1098 | "Accepting client connection: %s", server.neterr); |
1099 | return; |
1100 | } |
1101 | anetCloexec(cfd); |
1102 | serverLog(LL_VERBOSE1,"Accepted %s:%d", cip, cport); |
1103 | acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); |
1104 | } |
1105 | } |
1106 | |
1107 | void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
1108 | int cport, cfd, max = MAX_ACCEPTS_PER_CALL1000; |
1109 | char cip[NET_IP_STR_LEN46]; |
1110 | UNUSED(el)((void) el); |
1111 | UNUSED(mask)((void) mask); |
1112 | UNUSED(privdata)((void) privdata); |
1113 | |
1114 | while(max--) { |
1115 | cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); |
1116 | if (cfd == ANET_ERR-1) { |
1117 | if (errno(*__errno_location ()) != EWOULDBLOCK11) |
1118 | serverLog(LL_WARNING3, |
1119 | "Accepting client connection: %s", server.neterr); |
1120 | return; |
1121 | } |
1122 | anetCloexec(cfd); |
1123 | serverLog(LL_VERBOSE1,"Accepted %s:%d", cip, cport); |
1124 | acceptCommonHandler(connCreateAcceptedTLS(cfd, server.tls_auth_clients),0,cip); |
1125 | } |
1126 | } |
1127 | |
1128 | void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { |
1129 | int cfd, max = MAX_ACCEPTS_PER_CALL1000; |
1130 | UNUSED(el)((void) el); |
1131 | UNUSED(mask)((void) mask); |
1132 | UNUSED(privdata)((void) privdata); |
1133 | |
1134 | while(max--) { |
1135 | cfd = anetUnixAccept(server.neterr, fd); |
1136 | if (cfd == ANET_ERR-1) { |
1137 | if (errno(*__errno_location ()) != EWOULDBLOCK11) |
1138 | serverLog(LL_WARNING3, |
1139 | "Accepting client connection: %s", server.neterr); |
1140 | return; |
1141 | } |
1142 | anetCloexec(cfd); |
1143 | serverLog(LL_VERBOSE1,"Accepted connection to %s", server.unixsocket); |
1144 | acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET(1<<11),NULL((void*)0)); |
1145 | } |
1146 | } |
1147 | |
1148 | void freeClientOriginalArgv(client *c) { |
1149 | /* We didn't rewrite this client */ |
1150 | if (!c->original_argv) return; |
1151 | |
1152 | for (int j = 0; j < c->original_argc; j++) |
1153 | decrRefCount(c->original_argv[j]); |
1154 | zfree(c->original_argv); |
1155 | c->original_argv = NULL((void*)0); |
1156 | c->original_argc = 0; |
1157 | } |
1158 | |
1159 | static void freeClientArgv(client *c) { |
1160 | int j; |
1161 | for (j = 0; j < c->argc; j++) |
1162 | decrRefCount(c->argv[j]); |
1163 | c->argc = 0; |
1164 | c->cmd = NULL((void*)0); |
1165 | c->argv_len_sum = 0; |
1166 | } |
1167 | |
1168 | /* Close all the slaves connections. This is useful in chained replication |
1169 | * when we resync with our own master and want to force all our slaves to |
1170 | * resync with us as well. */ |
1171 | void disconnectSlaves(void) { |
1172 | listIter li; |
1173 | listNode *ln; |
1174 | listRewind(server.slaves,&li); |
1175 | while((ln = listNext(&li))) { |
1176 | freeClient((client*)ln->value); |
1177 | } |
1178 | } |
1179 | |
1180 | /* Check if there is any other slave waiting dumping RDB finished expect me. |
1181 | * This function is useful to judge current dumping RDB can be used for full |
1182 | * synchronization or not. */ |
1183 | int anyOtherSlaveWaitRdb(client *except_me) { |
1184 | listIter li; |
1185 | listNode *ln; |
1186 | |
1187 | listRewind(server.slaves, &li); |
1188 | while((ln = listNext(&li))) { |
1189 | client *slave = ln->value; |
1190 | if (slave != except_me && |
1191 | slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END7) |
1192 | { |
1193 | return 1; |
1194 | } |
1195 | } |
1196 | return 0; |
1197 | } |
1198 | |
1199 | /* Remove the specified client from global lists where the client could |
1200 | * be referenced, not including the Pub/Sub channels. |
1201 | * This is used by freeClient() and replicationCacheMaster(). */ |
1202 | void unlinkClient(client *c) { |
1203 | listNode *ln; |
1204 | |
1205 | /* If this is marked as current client unset it. */ |
1206 | if (server.current_client == c) server.current_client = NULL((void*)0); |
1207 | |
1208 | /* Certain operations must be done only if the client has an active connection. |
1209 | * If the client was already unlinked or if it's a "fake client" the |
1210 | * conn is already set to NULL. */ |
1211 | if (c->conn) { |
1212 | /* Remove from the list of active clients. */ |
1213 | if (c->client_list_node) { |
1214 | uint64_t id = htonu64(c->id)intrev64(c->id); |
1215 | raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL((void*)0)); |
1216 | listDelNode(server.clients,c->client_list_node); |
1217 | c->client_list_node = NULL((void*)0); |
1218 | } |
1219 | |
1220 | /* Check if this is a replica waiting for diskless replication (rdb pipe), |
1221 | * in which case it needs to be cleaned from that list */ |
1222 | if (c->flags & CLIENT_SLAVE(1<<0) && |
1223 | c->replstate == SLAVE_STATE_WAIT_BGSAVE_END7 && |
1224 | server.rdb_pipe_conns) |
1225 | { |
1226 | int i; |
1227 | for (i=0; i < server.rdb_pipe_numconns; i++) { |
1228 | if (server.rdb_pipe_conns[i] == c->conn) { |
1229 | rdbPipeWriteHandlerConnRemoved(c->conn); |
1230 | server.rdb_pipe_conns[i] = NULL((void*)0); |
1231 | break; |
1232 | } |
1233 | } |
1234 | } |
1235 | connClose(c->conn); |
1236 | c->conn = NULL((void*)0); |
1237 | } |
1238 | |
1239 | /* Remove from the list of pending writes if needed. */ |
1240 | if (c->flags & CLIENT_PENDING_WRITE(1<<21)) { |
1241 | ln = listSearchKey(server.clients_pending_write,c); |
1242 | serverAssert(ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssert("ln != NULL","networking.c" ,1242),__builtin_unreachable())); |
1243 | listDelNode(server.clients_pending_write,ln); |
1244 | c->flags &= ~CLIENT_PENDING_WRITE(1<<21); |
1245 | } |
1246 | |
1247 | /* Remove from the list of pending reads if needed. */ |
1248 | if (c->flags & CLIENT_PENDING_READ(1<<29)) { |
1249 | ln = listSearchKey(server.clients_pending_read,c); |
1250 | serverAssert(ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssert("ln != NULL","networking.c" ,1250),__builtin_unreachable())); |
1251 | listDelNode(server.clients_pending_read,ln); |
1252 | c->flags &= ~CLIENT_PENDING_READ(1<<29); |
1253 | } |
1254 | |
1255 | /* When client was just unblocked because of a blocking operation, |
1256 | * remove it from the list of unblocked clients. */ |
1257 | if (c->flags & CLIENT_UNBLOCKED(1<<7)) { |
1258 | ln = listSearchKey(server.unblocked_clients,c); |
1259 | serverAssert(ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssert("ln != NULL","networking.c" ,1259),__builtin_unreachable())); |
1260 | listDelNode(server.unblocked_clients,ln); |
1261 | c->flags &= ~CLIENT_UNBLOCKED(1<<7); |
1262 | } |
1263 | |
1264 | /* Clear the tracking status. */ |
1265 | if (c->flags & CLIENT_TRACKING(1ULL<<31)) disableTracking(c); |
1266 | } |
1267 | |
1268 | void freeClient(client *c) { |
1269 | listNode *ln; |
1270 | |
1271 | /* If a client is protected, yet we need to free it right now, make sure |
1272 | * to at least use asynchronous freeing. */ |
1273 | if (c->flags & CLIENT_PROTECTED(1<<28)) { |
1274 | freeClientAsync(c); |
1275 | return; |
1276 | } |
1277 | |
1278 | /* For connected clients, call the disconnection event of modules hooks. */ |
1279 | if (c->conn) { |
1280 | moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE4, |
1281 | REDISMODULE_SUBEVENT_CLIENT_CHANGE_DISCONNECTED1, |
1282 | c); |
1283 | } |
1284 | |
1285 | /* Notify module system that this client auth status changed. */ |
1286 | moduleNotifyUserChanged(c); |
1287 | |
1288 | /* If this client was scheduled for async freeing we need to remove it |
1289 | * from the queue. Note that we need to do this here, because later |
1290 | * we may call replicationCacheMaster() and the client should already |
1291 | * be removed from the list of clients to free. */ |
1292 | if (c->flags & CLIENT_CLOSE_ASAP(1<<10)) { |
1293 | ln = listSearchKey(server.clients_to_close,c); |
1294 | serverAssert(ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssert("ln != NULL","networking.c" ,1294),__builtin_unreachable())); |
1295 | listDelNode(server.clients_to_close,ln); |
1296 | } |
1297 | |
1298 | /* If it is our master that's being disconnected we should make sure |
1299 | * to cache the state to try a partial resynchronization later. |
1300 | * |
1301 | * Note that before doing this we make sure that the client is not in |
1302 | * some unexpected state, by checking its flags. */ |
1303 | if (server.master && c->flags & CLIENT_MASTER(1<<1)) { |
1304 | serverLog(LL_WARNING3,"Connection with master lost."); |
1305 | if (!(c->flags & (CLIENT_PROTOCOL_ERROR(1ULL<<39)|CLIENT_BLOCKED(1<<4)))) { |
1306 | c->flags &= ~(CLIENT_CLOSE_ASAP(1<<10)|CLIENT_CLOSE_AFTER_REPLY(1<<6)); |
1307 | replicationCacheMaster(c); |
1308 | return; |
1309 | } |
1310 | } |
1311 | |
1312 | /* Log link disconnection with slave */ |
1313 | if (getClientType(c) == CLIENT_TYPE_SLAVE1) { |
1314 | serverLog(LL_WARNING3,"Connection with replica %s lost.", |
1315 | replicationGetSlaveName(c)); |
1316 | } |
1317 | |
1318 | /* Free the query buffer */ |
1319 | sdsfree(c->querybuf); |
1320 | sdsfree(c->pending_querybuf); |
1321 | c->querybuf = NULL((void*)0); |
1322 | |
1323 | /* Deallocate structures used to block on blocking ops. */ |
1324 | if (c->flags & CLIENT_BLOCKED(1<<4)) unblockClient(c); |
1325 | dictRelease(c->bpop.keys); |
1326 | |
1327 | /* UNWATCH all the keys */ |
1328 | unwatchAllKeys(c); |
1329 | listRelease(c->watched_keys); |
1330 | |
1331 | /* Unsubscribe from all the pubsub channels */ |
1332 | pubsubUnsubscribeAllChannels(c,0); |
1333 | pubsubUnsubscribeAllPatterns(c,0); |
1334 | dictRelease(c->pubsub_channels); |
1335 | listRelease(c->pubsub_patterns); |
1336 | |
1337 | /* Free data structures. */ |
1338 | listRelease(c->reply); |
1339 | freeClientArgv(c); |
1340 | freeClientOriginalArgv(c); |
1341 | |
1342 | /* Unlink the client: this will close the socket, remove the I/O |
1343 | * handlers, and remove references of the client from different |
1344 | * places where active clients may be referenced. */ |
1345 | unlinkClient(c); |
1346 | |
1347 | /* Master/slave cleanup Case 1: |
1348 | * we lost the connection with a slave. */ |
1349 | if (c->flags & CLIENT_SLAVE(1<<0)) { |
1350 | /* If there is no any other slave waiting dumping RDB finished, the |
1351 | * current child process need not continue to dump RDB, then we kill it. |
1352 | * So child process won't use more memory, and we also can fork a new |
1353 | * child process asap to dump rdb for next full synchronization or bgsave. |
1354 | * But we also need to check if users enable 'save' RDB, if enable, we |
1355 | * should not remove directly since that means RDB is important for users |
1356 | * to keep data safe and we may delay configured 'save' for full sync. */ |
1357 | if (server.saveparamslen == 0 && |
1358 | c->replstate == SLAVE_STATE_WAIT_BGSAVE_END7 && |
1359 | server.child_type == CHILD_TYPE_RDB1 && |
1360 | server.rdb_child_type == RDB_CHILD_TYPE_DISK1 && |
1361 | anyOtherSlaveWaitRdb(c) == 0) |
1362 | { |
1363 | killRDBChild(); |
1364 | } |
1365 | if (c->replstate == SLAVE_STATE_SEND_BULK8) { |
1366 | if (c->repldbfd != -1) close(c->repldbfd); |
1367 | if (c->replpreamble) sdsfree(c->replpreamble); |
1368 | } |
1369 | list *l = (c->flags & CLIENT_MONITOR(1<<2)) ? server.monitors : server.slaves; |
1370 | ln = listSearchKey(l,c); |
1371 | serverAssert(ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssert("ln != NULL","networking.c" ,1371),__builtin_unreachable())); |
1372 | listDelNode(l,ln); |
1373 | /* We need to remember the time when we started to have zero |
1374 | * attached slaves, as after some time we'll free the replication |
1375 | * backlog. */ |
1376 | if (getClientType(c) == CLIENT_TYPE_SLAVE1 && listLength(server.slaves)((server.slaves)->len) == 0) |
1377 | server.repl_no_slaves_since = server.unixtime; |
1378 | refreshGoodSlavesCount(); |
1379 | /* Fire the replica change modules event. */ |
1380 | if (c->replstate == SLAVE_STATE_ONLINE9) |
1381 | moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE6, |
1382 | REDISMODULE_SUBEVENT_REPLICA_CHANGE_OFFLINE1, |
1383 | NULL((void*)0)); |
1384 | } |
1385 | |
1386 | /* Master/slave cleanup Case 2: |
1387 | * we lost the connection with the master. */ |
1388 | if (c->flags & CLIENT_MASTER(1<<1)) replicationHandleMasterDisconnection(); |
1389 | |
1390 | /* Remove the contribution that this client gave to our |
1391 | * incrementally computed memory usage. */ |
1392 | server.stat_clients_type_memory[c->client_cron_last_memory_type] -= |
1393 | c->client_cron_last_memory_usage; |
1394 | |
1395 | /* Release other dynamically allocated client structure fields, |
1396 | * and finally release the client structure itself. */ |
1397 | if (c->name) decrRefCount(c->name); |
1398 | zfree(c->argv); |
1399 | c->argv_len_sum = 0; |
1400 | freeClientMultiState(c); |
1401 | sdsfree(c->peerid); |
1402 | sdsfree(c->sockname); |
1403 | sdsfree(c->slave_addr); |
1404 | zfree(c); |
1405 | } |
1406 | |
1407 | /* Schedule a client to free it at a safe time in the serverCron() function. |
1408 | * This function is useful when we need to terminate a client but we are in |
1409 | * a context where calling freeClient() is not possible, because the client |
1410 | * should be valid for the continuation of the flow of the program. */ |
1411 | void freeClientAsync(client *c) { |
1412 | /* We need to handle concurrent access to the server.clients_to_close list |
1413 | * only in the freeClientAsync() function, since it's the only function that |
1414 | * may access the list while Redis uses I/O threads. All the other accesses |
1415 | * are in the context of the main thread while the other threads are |
1416 | * idle. */ |
1417 | if (c->flags & CLIENT_CLOSE_ASAP(1<<10) || c->flags & CLIENT_LUA(1<<8)) return; |
1418 | c->flags |= CLIENT_CLOSE_ASAP(1<<10); |
1419 | if (server.io_threads_num == 1) { |
1420 | /* no need to bother with locking if there's just one thread (the main thread) */ |
1421 | listAddNodeTail(server.clients_to_close,c); |
1422 | return; |
1423 | } |
1424 | static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER{ { 0, 0, 0, 0, PTHREAD_MUTEX_TIMED_NP, 0, 0, { 0, 0 } } }; |
1425 | pthread_mutex_lock(&async_free_queue_mutex); |
1426 | listAddNodeTail(server.clients_to_close,c); |
1427 | pthread_mutex_unlock(&async_free_queue_mutex); |
1428 | } |
1429 | |
1430 | /* Free the clients marked as CLOSE_ASAP, return the number of clients |
1431 | * freed. */ |
1432 | int freeClientsInAsyncFreeQueue(void) { |
1433 | int freed = 0; |
1434 | listIter li; |
1435 | listNode *ln; |
1436 | |
1437 | listRewind(server.clients_to_close,&li); |
1438 | while ((ln = listNext(&li)) != NULL((void*)0)) { |
1439 | client *c = listNodeValue(ln)((ln)->value); |
1440 | |
1441 | if (c->flags & CLIENT_PROTECTED(1<<28)) continue; |
1442 | |
1443 | c->flags &= ~CLIENT_CLOSE_ASAP(1<<10); |
1444 | freeClient(c); |
1445 | listDelNode(server.clients_to_close,ln); |
1446 | freed++; |
1447 | } |
1448 | return freed; |
1449 | } |
1450 | |
1451 | /* Return a client by ID, or NULL if the client ID is not in the set |
1452 | * of registered clients. Note that "fake clients", created with -1 as FD, |
1453 | * are not registered clients. */ |
1454 | client *lookupClientByID(uint64_t id) { |
1455 | id = htonu64(id)intrev64(id); |
1456 | client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id)); |
1457 | return (c == raxNotFound) ? NULL((void*)0) : c; |
1458 | } |
1459 | |
1460 | /* Write data in output buffers to client. Return C_OK if the client |
1461 | * is still valid after the call, C_ERR if it was freed because of some |
1462 | * error. If handler_installed is set, it will attempt to clear the |
1463 | * write event. |
1464 | * |
1465 | * This function is called by threads, but always with handler_installed |
1466 | * set to 0. So when handler_installed is set to 0 the function must be |
1467 | * thread safe. */ |
1468 | int writeToClient(client *c, int handler_installed) { |
1469 | /* Update total number of writes on server */ |
1470 | atomicIncr(server.stat_total_writes_processed, 1)__c11_atomic_fetch_add(&server.stat_total_writes_processed ,(1),memory_order_relaxed); |
1471 | |
1472 | ssize_t nwritten = 0, totwritten = 0; |
1473 | size_t objlen; |
1474 | clientReplyBlock *o; |
1475 | |
1476 | while(clientHasPendingReplies(c)) { |
1477 | if (c->bufpos > 0) { |
1478 | nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); |
1479 | if (nwritten <= 0) break; |
1480 | c->sentlen += nwritten; |
1481 | totwritten += nwritten; |
1482 | |
1483 | /* If the buffer was sent, set bufpos to zero to continue with |
1484 | * the remainder of the reply. */ |
1485 | if ((int)c->sentlen == c->bufpos) { |
1486 | c->bufpos = 0; |
1487 | c->sentlen = 0; |
1488 | } |
1489 | } else { |
1490 | o = listNodeValue(listFirst(c->reply))((((c->reply)->head))->value); |
1491 | objlen = o->used; |
1492 | |
1493 | if (objlen == 0) { |
1494 | c->reply_bytes -= o->size; |
1495 | listDelNode(c->reply,listFirst(c->reply)((c->reply)->head)); |
1496 | continue; |
1497 | } |
1498 | |
1499 | nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); |
1500 | if (nwritten <= 0) break; |
1501 | c->sentlen += nwritten; |
1502 | totwritten += nwritten; |
1503 | |
1504 | /* If we fully sent the object on head go to the next one */ |
1505 | if (c->sentlen == objlen) { |
1506 | c->reply_bytes -= o->size; |
1507 | listDelNode(c->reply,listFirst(c->reply)((c->reply)->head)); |
1508 | c->sentlen = 0; |
1509 | /* If there are no longer objects in the list, we expect |
1510 | * the count of reply bytes to be exactly zero. */ |
1511 | if (listLength(c->reply)((c->reply)->len) == 0) |
1512 | serverAssert(c->reply_bytes == 0)((c->reply_bytes == 0)?(void)0 : (_serverAssert("c->reply_bytes == 0" ,"networking.c",1512),__builtin_unreachable())); |
1513 | } |
1514 | } |
1515 | /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT |
1516 | * bytes, in a single threaded server it's a good idea to serve |
1517 | * other clients as well, even if a very large request comes from |
1518 | * super fast link that is always able to accept data (in real world |
1519 | * scenario think about 'KEYS *' against the loopback interface). |
1520 | * |
1521 | * However if we are over the maxmemory limit we ignore that and |
1522 | * just deliver as much data as it is possible to deliver. |
1523 | * |
1524 | * Moreover, we also send as much as possible if the client is |
1525 | * a slave or a monitor (otherwise, on high-speed traffic, the |
1526 | * replication/output buffer will grow indefinitely) */ |
1527 | if (totwritten > NET_MAX_WRITES_PER_EVENT(1024*64) && |
1528 | (server.maxmemory == 0 || |
1529 | zmalloc_used_memory() < server.maxmemory) && |
1530 | !(c->flags & CLIENT_SLAVE(1<<0))) break; |
1531 | } |
1532 | atomicIncr(server.stat_net_output_bytes, totwritten)__c11_atomic_fetch_add(&server.stat_net_output_bytes,(totwritten ),memory_order_relaxed); |
1533 | if (nwritten == -1) { |
1534 | if (connGetState(c->conn) == CONN_STATE_CONNECTED) { |
1535 | nwritten = 0; |
Value stored to 'nwritten' is never read | |
1536 | } else { |
1537 | serverLog(LL_VERBOSE1, |
1538 | "Error writing to client: %s", connGetLastError(c->conn)); |
1539 | freeClientAsync(c); |
1540 | return C_ERR-1; |
1541 | } |
1542 | } |
1543 | if (totwritten > 0) { |
1544 | /* For clients representing masters we don't count sending data |
1545 | * as an interaction, since we always send REPLCONF ACK commands |
1546 | * that take some time to just fill the socket output buffer. |
1547 | * We just rely on data / pings received for timeout detection. */ |
1548 | if (!(c->flags & CLIENT_MASTER(1<<1))) c->lastinteraction = server.unixtime; |
1549 | } |
1550 | if (!clientHasPendingReplies(c)) { |
1551 | c->sentlen = 0; |
1552 | /* Note that writeToClient() is called in a threaded way, but |
1553 | * adDeleteFileEvent() is not thread safe: however writeToClient() |
1554 | * is always called with handler_installed set to 0 from threads |
1555 | * so we are fine. */ |
1556 | if (handler_installed) connSetWriteHandler(c->conn, NULL((void*)0)); |
1557 | |
1558 | /* Close connection after entire reply has been sent. */ |
1559 | if (c->flags & CLIENT_CLOSE_AFTER_REPLY(1<<6)) { |
1560 | freeClientAsync(c); |
1561 | return C_ERR-1; |
1562 | } |
1563 | } |
1564 | return C_OK0; |
1565 | } |
1566 | |
1567 | /* Write event handler. Just send data to the client. */ |
1568 | void sendReplyToClient(connection *conn) { |
1569 | client *c = connGetPrivateData(conn); |
1570 | writeToClient(c,1); |
1571 | } |
1572 | |
1573 | /* This function is called just before entering the event loop, in the hope |
1574 | * we can just write the replies to the client output buffer without any |
1575 | * need to use a syscall in order to install the writable event handler, |
1576 | * get it called, and so forth. */ |
1577 | int handleClientsWithPendingWrites(void) { |
1578 | listIter li; |
1579 | listNode *ln; |
1580 | int processed = listLength(server.clients_pending_write)((server.clients_pending_write)->len); |
1581 | |
1582 | listRewind(server.clients_pending_write,&li); |
1583 | while((ln = listNext(&li))) { |
1584 | client *c = listNodeValue(ln)((ln)->value); |
1585 | c->flags &= ~CLIENT_PENDING_WRITE(1<<21); |
1586 | listDelNode(server.clients_pending_write,ln); |
1587 | |
1588 | /* If a client is protected, don't do anything, |
1589 | * that may trigger write error or recreate handler. */ |
1590 | if (c->flags & CLIENT_PROTECTED(1<<28)) continue; |
1591 | |
1592 | /* Don't write to clients that are going to be closed anyway. */ |
1593 | if (c->flags & CLIENT_CLOSE_ASAP(1<<10)) continue; |
1594 | |
1595 | /* Try to write buffers to the client socket. */ |
1596 | if (writeToClient(c,0) == C_ERR-1) continue; |
1597 | |
1598 | /* If after the synchronous writes above we still have data to |
1599 | * output to the client, we need to install the writable handler. */ |
1600 | if (clientHasPendingReplies(c)) { |
1601 | int ae_barrier = 0; |
1602 | /* For the fsync=always policy, we want that a given FD is never |
1603 | * served for reading and writing in the same event loop iteration, |
1604 | * so that in the middle of receiving the query, and serving it |
1605 | * to the client, we'll call beforeSleep() that will do the |
1606 | * actual fsync of AOF to disk. the write barrier ensures that. */ |
1607 | if (server.aof_state == AOF_ON1 && |
1608 | server.aof_fsync == AOF_FSYNC_ALWAYS1) |
1609 | { |
1610 | ae_barrier = 1; |
1611 | } |
1612 | if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR-1) { |
1613 | freeClientAsync(c); |
1614 | } |
1615 | } |
1616 | } |
1617 | return processed; |
1618 | } |
1619 | |
1620 | /* resetClient prepare the client to process the next command */ |
1621 | void resetClient(client *c) { |
1622 | redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL((void*)0); |
1623 | |
1624 | freeClientArgv(c); |
1625 | c->reqtype = 0; |
1626 | c->multibulklen = 0; |
1627 | c->bulklen = -1; |
1628 | |
1629 | /* We clear the ASKING flag as well if we are not inside a MULTI, and |
1630 | * if what we just executed is not the ASKING command itself. */ |
1631 | if (!(c->flags & CLIENT_MULTI(1<<3)) && prevcmd != askingCommand) |
1632 | c->flags &= ~CLIENT_ASKING(1<<9); |
1633 | |
1634 | /* We do the same for the CACHING command as well. It also affects |
1635 | * the next command or transaction executed, in a way very similar |
1636 | * to ASKING. */ |
1637 | if (!(c->flags & CLIENT_MULTI(1<<3)) && prevcmd != clientCommand) |
1638 | c->flags &= ~CLIENT_TRACKING_CACHING(1ULL<<36); |
1639 | |
1640 | /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply |
1641 | * to the next command will be sent, but set the flag if the command |
1642 | * we just processed was "CLIENT REPLY SKIP". */ |
1643 | c->flags &= ~CLIENT_REPLY_SKIP(1<<24); |
1644 | if (c->flags & CLIENT_REPLY_SKIP_NEXT(1<<23)) { |
1645 | c->flags |= CLIENT_REPLY_SKIP(1<<24); |
1646 | c->flags &= ~CLIENT_REPLY_SKIP_NEXT(1<<23); |
1647 | } |
1648 | } |
1649 | |
1650 | /* This function is used when we want to re-enter the event loop but there |
1651 | * is the risk that the client we are dealing with will be freed in some |
1652 | * way. This happens for instance in: |
1653 | * |
1654 | * * DEBUG RELOAD and similar. |
1655 | * * When a Lua script is in -BUSY state. |
1656 | * |
1657 | * So the function will protect the client by doing two things: |
1658 | * |
1659 | * 1) It removes the file events. This way it is not possible that an |
1660 | * error is signaled on the socket, freeing the client. |
1661 | * 2) Moreover it makes sure that if the client is freed in a different code |
1662 | * path, it is not really released, but only marked for later release. */ |
1663 | void protectClient(client *c) { |
1664 | c->flags |= CLIENT_PROTECTED(1<<28); |
1665 | if (c->conn) { |
1666 | connSetReadHandler(c->conn,NULL((void*)0)); |
1667 | connSetWriteHandler(c->conn,NULL((void*)0)); |
1668 | } |
1669 | } |
1670 | |
1671 | /* This will undo the client protection done by protectClient() */ |
1672 | void unprotectClient(client *c) { |
1673 | if (c->flags & CLIENT_PROTECTED(1<<28)) { |
1674 | c->flags &= ~CLIENT_PROTECTED(1<<28); |
1675 | if (c->conn) { |
1676 | connSetReadHandler(c->conn,readQueryFromClient); |
1677 | if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); |
1678 | } |
1679 | } |
1680 | } |
1681 | |
1682 | /* Like processMultibulkBuffer(), but for the inline protocol instead of RESP, |
1683 | * this function consumes the client query buffer and creates a command ready |
1684 | * to be executed inside the client structure. Returns C_OK if the command |
1685 | * is ready to be executed, or C_ERR if there is still protocol to read to |
1686 | * have a well formed command. The function also returns C_ERR when there is |
1687 | * a protocol error: in such a case the client structure is setup to reply |
1688 | * with the error and close the connection. */ |
1689 | int processInlineBuffer(client *c) { |
1690 | char *newline; |
1691 | int argc, j, linefeed_chars = 1; |
1692 | sds *argv, aux; |
1693 | size_t querylen; |
1694 | |
1695 | /* Search for end of line */ |
1696 | newline = strchr(c->querybuf+c->qb_pos,'\n'); |
1697 | |
1698 | /* Nothing to do without a \r\n */ |
1699 | if (newline == NULL((void*)0)) { |
1700 | if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE(1024*64)) { |
1701 | addReplyError(c,"Protocol error: too big inline request"); |
1702 | setProtocolError("too big inline request",c); |
1703 | } |
1704 | return C_ERR-1; |
1705 | } |
1706 | |
1707 | /* Handle the \r\n case. */ |
1708 | if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') |
1709 | newline--, linefeed_chars++; |
1710 | |
1711 | /* Split the input buffer up to the \r\n */ |
1712 | querylen = newline-(c->querybuf+c->qb_pos); |
1713 | aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); |
1714 | argv = sdssplitargs(aux,&argc); |
1715 | sdsfree(aux); |
1716 | if (argv == NULL((void*)0)) { |
1717 | addReplyError(c,"Protocol error: unbalanced quotes in request"); |
1718 | setProtocolError("unbalanced quotes in inline request",c); |
1719 | return C_ERR-1; |
1720 | } |
1721 | |
1722 | /* Newline from slaves can be used to refresh the last ACK time. |
1723 | * This is useful for a slave to ping back while loading a big |
1724 | * RDB file. */ |
1725 | if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE1) |
1726 | c->repl_ack_time = server.unixtime; |
1727 | |
1728 | /* Masters should never send us inline protocol to run actual |
1729 | * commands. If this happens, it is likely due to a bug in Redis where |
1730 | * we got some desynchronization in the protocol, for example |
1731 | * beause of a PSYNC gone bad. |
1732 | * |
1733 | * However the is an exception: masters may send us just a newline |
1734 | * to keep the connection active. */ |
1735 | if (querylen != 0 && c->flags & CLIENT_MASTER(1<<1)) { |
1736 | sdsfreesplitres(argv,argc); |
1737 | serverLog(LL_WARNING3,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master."); |
1738 | setProtocolError("Master using the inline protocol. Desync?",c); |
1739 | return C_ERR-1; |
1740 | } |
1741 | |
1742 | /* Move querybuffer position to the next query in the buffer. */ |
1743 | c->qb_pos += querylen+linefeed_chars; |
1744 | |
1745 | /* Setup argv array on client structure */ |
1746 | if (argc) { |
1747 | if (c->argv) zfree(c->argv); |
1748 | c->argv = zmalloc(sizeof(robj*)*argc); |
1749 | c->argv_len_sum = 0; |
1750 | } |
1751 | |
1752 | /* Create redis objects for all arguments. */ |
1753 | for (c->argc = 0, j = 0; j < argc; j++) { |
1754 | c->argv[c->argc] = createObject(OBJ_STRING0,argv[j]); |
1755 | c->argc++; |
1756 | c->argv_len_sum += sdslen(argv[j]); |
1757 | } |
1758 | zfree(argv); |
1759 | return C_OK0; |
1760 | } |
1761 | |
1762 | /* Helper function. Record protocol erro details in server log, |
1763 | * and set the client as CLIENT_CLOSE_AFTER_REPLY and |
1764 | * CLIENT_PROTOCOL_ERROR. */ |
1765 | #define PROTO_DUMP_LEN128 128 |
1766 | static void setProtocolError(const char *errstr, client *c) { |
1767 | if (server.verbosity <= LL_VERBOSE1 || c->flags & CLIENT_MASTER(1<<1)) { |
1768 | sds client = catClientInfoString(sdsempty(),c); |
1769 | |
1770 | /* Sample some protocol to given an idea about what was inside. */ |
1771 | char buf[256]; |
1772 | if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN128) { |
1773 | snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos); |
1774 | } else { |
1775 | snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN128/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN128, PROTO_DUMP_LEN128/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN128/2); |
1776 | } |
1777 | |
1778 | /* Remove non printable chars. */ |
1779 | char *p = buf; |
1780 | while (*p != '\0') { |
1781 | if (!isprint(*p)((*__ctype_b_loc ())[(int) ((*p))] & (unsigned short int) _ISprint)) *p = '.'; |
1782 | p++; |
1783 | } |
1784 | |
1785 | /* Log all the client and protocol info. */ |
1786 | int loglevel = (c->flags & CLIENT_MASTER(1<<1)) ? LL_WARNING3 : |
1787 | LL_VERBOSE1; |
1788 | serverLog(loglevel, |
1789 | "Protocol error (%s) from client: %s. %s", errstr, client, buf); |
1790 | sdsfree(client); |
1791 | } |
1792 | c->flags |= (CLIENT_CLOSE_AFTER_REPLY(1<<6)|CLIENT_PROTOCOL_ERROR(1ULL<<39)); |
1793 | } |
1794 | |
1795 | /* Process the query buffer for client 'c', setting up the client argument |
1796 | * vector for command execution. Returns C_OK if after running the function |
1797 | * the client has a well-formed ready to be processed command, otherwise |
1798 | * C_ERR if there is still to read more buffer to get the full command. |
1799 | * The function also returns C_ERR when there is a protocol error: in such a |
1800 | * case the client structure is setup to reply with the error and close |
1801 | * the connection. |
1802 | * |
1803 | * This function is called if processInputBuffer() detects that the next |
1804 | * command is in RESP format, so the first byte in the command is found |
1805 | * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ |
1806 | int processMultibulkBuffer(client *c) { |
1807 | char *newline = NULL((void*)0); |
1808 | int ok; |
1809 | long long ll; |
1810 | |
1811 | if (c->multibulklen == 0) { |
1812 | /* The client should have been reset */ |
1813 | serverAssertWithInfo(c,NULL,c->argc == 0)((c->argc == 0)?(void)0 : (_serverAssertWithInfo(c,((void* )0),"c->argc == 0","networking.c",1813),__builtin_unreachable ())); |
1814 | |
1815 | /* Multi bulk length cannot be read without a \r\n */ |
1816 | newline = strchr(c->querybuf+c->qb_pos,'\r'); |
1817 | if (newline == NULL((void*)0)) { |
1818 | if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE(1024*64)) { |
1819 | addReplyError(c,"Protocol error: too big mbulk count string"); |
1820 | setProtocolError("too big mbulk count string",c); |
1821 | } |
1822 | return C_ERR-1; |
1823 | } |
1824 | |
1825 | /* Buffer should also contain \n */ |
1826 | if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) |
1827 | return C_ERR-1; |
1828 | |
1829 | /* We know for sure there is a whole line since newline != NULL, |
1830 | * so go ahead and find out the multi bulk length. */ |
1831 | serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*')((c->querybuf[c->qb_pos] == '*')?(void)0 : (_serverAssertWithInfo (c,((void*)0),"c->querybuf[c->qb_pos] == '*'","networking.c" ,1831),__builtin_unreachable())); |
1832 | ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); |
1833 | if (!ok || ll > 1024*1024) { |
1834 | addReplyError(c,"Protocol error: invalid multibulk length"); |
1835 | setProtocolError("invalid mbulk count",c); |
1836 | return C_ERR-1; |
1837 | } |
1838 | |
1839 | c->qb_pos = (newline-c->querybuf)+2; |
1840 | |
1841 | if (ll <= 0) return C_OK0; |
1842 | |
1843 | c->multibulklen = ll; |
1844 | |
1845 | /* Setup argv array on client structure */ |
1846 | if (c->argv) zfree(c->argv); |
1847 | c->argv = zmalloc(sizeof(robj*)*c->multibulklen); |
1848 | c->argv_len_sum = 0; |
1849 | } |
1850 | |
1851 | serverAssertWithInfo(c,NULL,c->multibulklen > 0)((c->multibulklen > 0)?(void)0 : (_serverAssertWithInfo (c,((void*)0),"c->multibulklen > 0","networking.c",1851 ),__builtin_unreachable())); |
1852 | while(c->multibulklen) { |
1853 | /* Read bulk length if unknown */ |
1854 | if (c->bulklen == -1) { |
1855 | newline = strchr(c->querybuf+c->qb_pos,'\r'); |
1856 | if (newline == NULL((void*)0)) { |
1857 | if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE(1024*64)) { |
1858 | addReplyError(c, |
1859 | "Protocol error: too big bulk count string"); |
1860 | setProtocolError("too big bulk count string",c); |
1861 | return C_ERR-1; |
1862 | } |
1863 | break; |
1864 | } |
1865 | |
1866 | /* Buffer should also contain \n */ |
1867 | if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) |
1868 | break; |
1869 | |
1870 | if (c->querybuf[c->qb_pos] != '$') { |
1871 | addReplyErrorFormat(c, |
1872 | "Protocol error: expected '$', got '%c'", |
1873 | c->querybuf[c->qb_pos]); |
1874 | setProtocolError("expected $ but got something else",c); |
1875 | return C_ERR-1; |
1876 | } |
1877 | |
1878 | ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); |
1879 | if (!ok || ll < 0 || |
1880 | (!(c->flags & CLIENT_MASTER(1<<1)) && ll > server.proto_max_bulk_len)) { |
1881 | addReplyError(c,"Protocol error: invalid bulk length"); |
1882 | setProtocolError("invalid bulk length",c); |
1883 | return C_ERR-1; |
1884 | } |
1885 | |
1886 | c->qb_pos = newline-c->querybuf+2; |
1887 | if (ll >= PROTO_MBULK_BIG_ARG(1024*32)) { |
1888 | /* If we are going to read a large object from network |
1889 | * try to make it likely that it will start at c->querybuf |
1890 | * boundary so that we can optimize object creation |
1891 | * avoiding a large copy of data. |
1892 | * |
1893 | * But only when the data we have not parsed is less than |
1894 | * or equal to ll+2. If the data length is greater than |
1895 | * ll+2, trimming querybuf is just a waste of time, because |
1896 | * at this time the querybuf contains not only our bulk. */ |
1897 | if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { |
1898 | sdsrange(c->querybuf,c->qb_pos,-1); |
1899 | c->qb_pos = 0; |
1900 | /* Hint the sds library about the amount of bytes this string is |
1901 | * going to contain. */ |
1902 | c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf)); |
1903 | } |
1904 | } |
1905 | c->bulklen = ll; |
1906 | } |
1907 | |
1908 | /* Read bulk argument */ |
1909 | if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { |
1910 | /* Not enough data (+2 == trailing \r\n) */ |
1911 | break; |
1912 | } else { |
1913 | /* Optimization: if the buffer contains JUST our bulk element |
1914 | * instead of creating a new object by *copying* the sds we |
1915 | * just use the current sds string. */ |
1916 | if (c->qb_pos == 0 && |
1917 | c->bulklen >= PROTO_MBULK_BIG_ARG(1024*32) && |
1918 | sdslen(c->querybuf) == (size_t)(c->bulklen+2)) |
1919 | { |
1920 | c->argv[c->argc++] = createObject(OBJ_STRING0,c->querybuf); |
1921 | c->argv_len_sum += c->bulklen; |
1922 | sdsIncrLen(c->querybuf,-2); /* remove CRLF */ |
1923 | /* Assume that if we saw a fat argument we'll see another one |
1924 | * likely... */ |
1925 | c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); |
1926 | sdsclear(c->querybuf); |
1927 | } else { |
1928 | c->argv[c->argc++] = |
1929 | createStringObject(c->querybuf+c->qb_pos,c->bulklen); |
1930 | c->argv_len_sum += c->bulklen; |
1931 | c->qb_pos += c->bulklen+2; |
1932 | } |
1933 | c->bulklen = -1; |
1934 | c->multibulklen--; |
1935 | } |
1936 | } |
1937 | |
1938 | /* We're done when c->multibulk == 0 */ |
1939 | if (c->multibulklen == 0) return C_OK0; |
1940 | |
1941 | /* Still not ready to process the command */ |
1942 | return C_ERR-1; |
1943 | } |
1944 | |
1945 | /* Perform necessary tasks after a command was executed: |
1946 | * |
1947 | * 1. The client is reset unless there are reasons to avoid doing it. |
1948 | * 2. In the case of master clients, the replication offset is updated. |
1949 | * 3. Propagate commands we got from our master to replicas down the line. */ |
1950 | void commandProcessed(client *c) { |
1951 | long long prev_offset = c->reploff; |
1952 | if (c->flags & CLIENT_MASTER(1<<1) && !(c->flags & CLIENT_MULTI(1<<3))) { |
1953 | /* Update the applied replication offset of our master. */ |
1954 | c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; |
1955 | } |
1956 | |
1957 | /* Don't reset the client structure for clients blocked in a |
1958 | * module blocking command, so that the reply callback will |
1959 | * still be able to access the client argv and argc field. |
1960 | * The client will be reset in unblockClientFromModule(). */ |
1961 | if (!(c->flags & CLIENT_BLOCKED(1<<4)) || |
1962 | (c->btype != BLOCKED_MODULE3 && c->btype != BLOCKED_PAUSE6)) |
1963 | { |
1964 | resetClient(c); |
1965 | } |
1966 | |
1967 | /* If the client is a master we need to compute the difference |
1968 | * between the applied offset before and after processing the buffer, |
1969 | * to understand how much of the replication stream was actually |
1970 | * applied to the master state: this quantity, and its corresponding |
1971 | * part of the replication stream, will be propagated to the |
1972 | * sub-replicas and to the replication backlog. */ |
1973 | if (c->flags & CLIENT_MASTER(1<<1)) { |
1974 | long long applied = c->reploff - prev_offset; |
1975 | if (applied) { |
1976 | replicationFeedSlavesFromMasterStream(server.slaves, |
1977 | c->pending_querybuf, applied); |
1978 | sdsrange(c->pending_querybuf,applied,-1); |
1979 | } |
1980 | } |
1981 | } |
1982 | |
1983 | /* This function calls processCommand(), but also performs a few sub tasks |
1984 | * for the client that are useful in that context: |
1985 | * |
1986 | * 1. It sets the current client to the client 'c'. |
1987 | * 2. calls commandProcessed() if the command was handled. |
1988 | * |
1989 | * The function returns C_ERR in case the client was freed as a side effect |
1990 | * of processing the command, otherwise C_OK is returned. */ |
1991 | int processCommandAndResetClient(client *c) { |
1992 | int deadclient = 0; |
1993 | server.current_client = c; |
1994 | if (processCommand(c) == C_OK0) { |
1995 | commandProcessed(c); |
1996 | } |
1997 | if (server.current_client == NULL((void*)0)) deadclient = 1; |
1998 | server.current_client = NULL((void*)0); |
1999 | /* performEvictions may flush slave output buffers. This may |
2000 | * result in a slave, that may be the active client, to be |
2001 | * freed. */ |
2002 | return deadclient ? C_ERR-1 : C_OK0; |
2003 | } |
2004 | |
2005 | |
2006 | /* This function will execute any fully parsed commands pending on |
2007 | * the client. Returns C_ERR if the client is no longer valid after executing |
2008 | * the command, and C_OK for all other cases. */ |
2009 | int processPendingCommandsAndResetClient(client *c) { |
2010 | if (c->flags & CLIENT_PENDING_COMMAND(1<<30)) { |
2011 | c->flags &= ~CLIENT_PENDING_COMMAND(1<<30); |
2012 | if (processCommandAndResetClient(c) == C_ERR-1) { |
2013 | return C_ERR-1; |
2014 | } |
2015 | } |
2016 | return C_OK0; |
2017 | } |
2018 | |
2019 | /* This function is called every time, in the client structure 'c', there is |
2020 | * more query buffer to process, because we read more data from the socket |
2021 | * or because a client was blocked and later reactivated, so there could be |
2022 | * pending query buffer, already representing a full command, to process. */ |
2023 | void processInputBuffer(client *c) { |
2024 | /* Keep processing while there is something in the input buffer */ |
2025 | while(c->qb_pos < sdslen(c->querybuf)) { |
2026 | /* Immediately abort if the client is in the middle of something. */ |
2027 | if (c->flags & CLIENT_BLOCKED(1<<4)) break; |
2028 | |
2029 | /* Don't process more buffers from clients that have already pending |
2030 | * commands to execute in c->argv. */ |
2031 | if (c->flags & CLIENT_PENDING_COMMAND(1<<30)) break; |
2032 | |
2033 | /* Don't process input from the master while there is a busy script |
2034 | * condition on the slave. We want just to accumulate the replication |
2035 | * stream (instead of replying -BUSY like we do with other clients) and |
2036 | * later resume the processing. */ |
2037 | if (server.lua_timedout && c->flags & CLIENT_MASTER(1<<1)) break; |
2038 | |
2039 | /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is |
2040 | * written to the client. Make sure to not let the reply grow after |
2041 | * this flag has been set (i.e. don't process more commands). |
2042 | * |
2043 | * The same applies for clients we want to terminate ASAP. */ |
2044 | if (c->flags & (CLIENT_CLOSE_AFTER_REPLY(1<<6)|CLIENT_CLOSE_ASAP(1<<10))) break; |
2045 | |
2046 | /* Determine request type when unknown. */ |
2047 | if (!c->reqtype) { |
2048 | if (c->querybuf[c->qb_pos] == '*') { |
2049 | c->reqtype = PROTO_REQ_MULTIBULK2; |
2050 | } else { |
2051 | c->reqtype = PROTO_REQ_INLINE1; |
2052 | } |
2053 | } |
2054 | |
2055 | if (c->reqtype == PROTO_REQ_INLINE1) { |
2056 | if (processInlineBuffer(c) != C_OK0) break; |
2057 | /* If the Gopher mode and we got zero or one argument, process |
2058 | * the request in Gopher mode. To avoid data race, Redis won't |
2059 | * support Gopher if enable io threads to read queries. */ |
2060 | if (server.gopher_enabled && !server.io_threads_do_reads && |
2061 | ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') || |
2062 | c->argc == 0)) |
2063 | { |
2064 | processGopherRequest(c); |
2065 | resetClient(c); |
2066 | c->flags |= CLIENT_CLOSE_AFTER_REPLY(1<<6); |
2067 | break; |
2068 | } |
2069 | } else if (c->reqtype == PROTO_REQ_MULTIBULK2) { |
2070 | if (processMultibulkBuffer(c) != C_OK0) break; |
2071 | } else { |
2072 | serverPanic("Unknown request type")_serverPanic("networking.c",2072,"Unknown request type"),__builtin_unreachable (); |
2073 | } |
2074 | |
2075 | /* Multibulk processing could see a <= 0 length. */ |
2076 | if (c->argc == 0) { |
2077 | resetClient(c); |
2078 | } else { |
2079 | /* If we are in the context of an I/O thread, we can't really |
2080 | * execute the command here. All we can do is to flag the client |
2081 | * as one that needs to process the command. */ |
2082 | if (c->flags & CLIENT_PENDING_READ(1<<29)) { |
2083 | c->flags |= CLIENT_PENDING_COMMAND(1<<30); |
2084 | break; |
2085 | } |
2086 | |
2087 | /* We are finally ready to execute the command. */ |
2088 | if (processCommandAndResetClient(c) == C_ERR-1) { |
2089 | /* If the client is no longer valid, we avoid exiting this |
2090 | * loop and trimming the client buffer later. So we return |
2091 | * ASAP in that case. */ |
2092 | return; |
2093 | } |
2094 | } |
2095 | } |
2096 | |
2097 | /* Trim to pos */ |
2098 | if (c->qb_pos) { |
2099 | sdsrange(c->querybuf,c->qb_pos,-1); |
2100 | c->qb_pos = 0; |
2101 | } |
2102 | } |
2103 | |
2104 | void readQueryFromClient(connection *conn) { |
2105 | client *c = connGetPrivateData(conn); |
2106 | int nread, readlen; |
2107 | size_t qblen; |
2108 | |
2109 | /* Check if we want to read from the client later when exiting from |
2110 | * the event loop. This is the case if threaded I/O is enabled. */ |
2111 | if (postponeClientRead(c)) return; |
2112 | |
2113 | /* Update total number of reads on server */ |
2114 | atomicIncr(server.stat_total_reads_processed, 1)__c11_atomic_fetch_add(&server.stat_total_reads_processed ,(1),memory_order_relaxed); |
2115 | |
2116 | readlen = PROTO_IOBUF_LEN(1024*16); |
2117 | /* If this is a multi bulk request, and we are processing a bulk reply |
2118 | * that is large enough, try to maximize the probability that the query |
2119 | * buffer contains exactly the SDS string representing the object, even |
2120 | * at the risk of requiring more read(2) calls. This way the function |
2121 | * processMultiBulkBuffer() can avoid copying buffers to create the |
2122 | * Redis Object representing the argument. */ |
2123 | if (c->reqtype == PROTO_REQ_MULTIBULK2 && c->multibulklen && c->bulklen != -1 |
2124 | && c->bulklen >= PROTO_MBULK_BIG_ARG(1024*32)) |
2125 | { |
2126 | ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); |
2127 | |
2128 | /* Note that the 'remaining' variable may be zero in some edge case, |
2129 | * for example once we resume a blocked client after CLIENT PAUSE. */ |
2130 | if (remaining > 0 && remaining < readlen) readlen = remaining; |
2131 | } |
2132 | |
2133 | qblen = sdslen(c->querybuf); |
2134 | if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; |
2135 | c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); |
2136 | nread = connRead(c->conn, c->querybuf+qblen, readlen); |
2137 | if (nread == -1) { |
2138 | if (connGetState(conn) == CONN_STATE_CONNECTED) { |
2139 | return; |
2140 | } else { |
2141 | serverLog(LL_VERBOSE1, "Reading from client: %s",connGetLastError(c->conn)); |
2142 | freeClientAsync(c); |
2143 | return; |
2144 | } |
2145 | } else if (nread == 0) { |
2146 | serverLog(LL_VERBOSE1, "Client closed connection"); |
2147 | freeClientAsync(c); |
2148 | return; |
2149 | } else if (c->flags & CLIENT_MASTER(1<<1)) { |
2150 | /* Append the query buffer to the pending (not applied) buffer |
2151 | * of the master. We'll use this buffer later in order to have a |
2152 | * copy of the string applied by the last command executed. */ |
2153 | c->pending_querybuf = sdscatlen(c->pending_querybuf, |
2154 | c->querybuf+qblen,nread); |
2155 | } |
2156 | |
2157 | sdsIncrLen(c->querybuf,nread); |
2158 | c->lastinteraction = server.unixtime; |
2159 | if (c->flags & CLIENT_MASTER(1<<1)) c->read_reploff += nread; |
2160 | atomicIncr(server.stat_net_input_bytes, nread)__c11_atomic_fetch_add(&server.stat_net_input_bytes,(nread ),memory_order_relaxed); |
2161 | if (sdslen(c->querybuf) > server.client_max_querybuf_len) { |
2162 | sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); |
2163 | |
2164 | bytes = sdscatrepr(bytes,c->querybuf,64); |
2165 | serverLog(LL_WARNING3,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); |
2166 | sdsfree(ci); |
2167 | sdsfree(bytes); |
2168 | freeClientAsync(c); |
2169 | return; |
2170 | } |
2171 | |
2172 | /* There is more data in the client input buffer, continue parsing it |
2173 | * in case to check if there is a full command to execute. */ |
2174 | processInputBuffer(c); |
2175 | } |
2176 | |
2177 | void getClientsMaxBuffers(unsigned long *longest_output_list, |
2178 | unsigned long *biggest_input_buffer) { |
2179 | client *c; |
2180 | listNode *ln; |
2181 | listIter li; |
2182 | unsigned long lol = 0, bib = 0; |
2183 | |
2184 | listRewind(server.clients,&li); |
2185 | while ((ln = listNext(&li)) != NULL((void*)0)) { |
2186 | c = listNodeValue(ln)((ln)->value); |
2187 | |
2188 | if (listLength(c->reply)((c->reply)->len) > lol) lol = listLength(c->reply)((c->reply)->len); |
2189 | if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); |
2190 | } |
2191 | *longest_output_list = lol; |
2192 | *biggest_input_buffer = bib; |
2193 | } |
2194 | |
2195 | /* A Redis "Address String" is a colon separated ip:port pair. |
2196 | * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234". |
2197 | * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234". |
2198 | * For Unix sockets we use path:0, like in "/tmp/redis:0". |
2199 | * |
2200 | * An Address String always fits inside a buffer of NET_ADDR_STR_LEN bytes, |
2201 | * including the null term. |
2202 | * |
2203 | * On failure the function still populates 'addr' with the "?:0" string in case |
2204 | * you want to relax error checking or need to display something anyway (see |
2205 | * anetFdToString implementation for more info). */ |
2206 | void genClientAddrString(client *client, char *addr, |
2207 | size_t addr_len, int fd_to_str_type) { |
2208 | if (client->flags & CLIENT_UNIX_SOCKET(1<<11)) { |
2209 | /* Unix socket client. */ |
2210 | snprintf(addr,addr_len,"%s:0",server.unixsocket); |
2211 | } else { |
2212 | /* TCP client. */ |
2213 | connFormatFdAddr(client->conn,addr,addr_len,fd_to_str_type); |
2214 | } |
2215 | } |
2216 | |
2217 | /* This function returns the client peer id, by creating and caching it |
2218 | * if client->peerid is NULL, otherwise returning the cached value. |
2219 | * The Peer ID never changes during the life of the client, however it |
2220 | * is expensive to compute. */ |
2221 | char *getClientPeerId(client *c) { |
2222 | char peerid[NET_ADDR_STR_LEN(46 +32)]; |
2223 | |
2224 | if (c->peerid == NULL((void*)0)) { |
2225 | genClientAddrString(c,peerid,sizeof(peerid),FD_TO_PEER_NAME0); |
2226 | c->peerid = sdsnew(peerid); |
2227 | } |
2228 | return c->peerid; |
2229 | } |
2230 | |
2231 | /* This function returns the client bound socket name, by creating and caching |
2232 | * it if client->sockname is NULL, otherwise returning the cached value. |
2233 | * The Socket Name never changes during the life of the client, however it |
2234 | * is expensive to compute. */ |
2235 | char *getClientSockname(client *c) { |
2236 | char sockname[NET_ADDR_STR_LEN(46 +32)]; |
2237 | |
2238 | if (c->sockname == NULL((void*)0)) { |
2239 | genClientAddrString(c,sockname,sizeof(sockname),FD_TO_SOCK_NAME1); |
2240 | c->sockname = sdsnew(sockname); |
2241 | } |
2242 | return c->sockname; |
2243 | } |
2244 | |
2245 | /* Concatenate a string representing the state of a client in a human |
2246 | * readable format, into the sds string 's'. */ |
2247 | sds catClientInfoString(sds s, client *client) { |
2248 | char flags[16], events[3], conninfo[CONN_INFO_LEN32], *p; |
2249 | |
2250 | p = flags; |
2251 | if (client->flags & CLIENT_SLAVE(1<<0)) { |
2252 | if (client->flags & CLIENT_MONITOR(1<<2)) |
2253 | *p++ = 'O'; |
2254 | else |
2255 | *p++ = 'S'; |
2256 | } |
2257 | if (client->flags & CLIENT_MASTER(1<<1)) *p++ = 'M'; |
2258 | if (client->flags & CLIENT_PUBSUB(1<<18)) *p++ = 'P'; |
2259 | if (client->flags & CLIENT_MULTI(1<<3)) *p++ = 'x'; |
2260 | if (client->flags & CLIENT_BLOCKED(1<<4)) *p++ = 'b'; |
2261 | if (client->flags & CLIENT_TRACKING(1ULL<<31)) *p++ = 't'; |
2262 | if (client->flags & CLIENT_TRACKING_BROKEN_REDIR(1ULL<<32)) *p++ = 'R'; |
2263 | if (client->flags & CLIENT_TRACKING_BCAST(1ULL<<33)) *p++ = 'B'; |
2264 | if (client->flags & CLIENT_DIRTY_CAS(1<<5)) *p++ = 'd'; |
2265 | if (client->flags & CLIENT_CLOSE_AFTER_REPLY(1<<6)) *p++ = 'c'; |
2266 | if (client->flags & CLIENT_UNBLOCKED(1<<7)) *p++ = 'u'; |
2267 | if (client->flags & CLIENT_CLOSE_ASAP(1<<10)) *p++ = 'A'; |
2268 | if (client->flags & CLIENT_UNIX_SOCKET(1<<11)) *p++ = 'U'; |
2269 | if (client->flags & CLIENT_READONLY(1<<17)) *p++ = 'r'; |
2270 | if (p == flags) *p++ = 'N'; |
2271 | *p++ = '\0'; |
2272 | |
2273 | p = events; |
2274 | if (client->conn) { |
2275 | if (connHasReadHandler(client->conn)) *p++ = 'r'; |
2276 | if (connHasWriteHandler(client->conn)) *p++ = 'w'; |
2277 | } |
2278 | *p = '\0'; |
2279 | |
2280 | /* Compute the total memory consumed by this client. */ |
2281 | size_t obufmem = getClientOutputBufferMemoryUsage(client); |
2282 | size_t total_mem = obufmem; |
2283 | total_mem += zmalloc_size(client)je_malloc_usable_size(client); /* includes client->buf */ |
2284 | total_mem += sdsZmallocSize(client->querybuf); |
2285 | /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory |
2286 | * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to |
2287 | * spot problematic clients. */ |
2288 | total_mem += client->argv_len_sum; |
2289 | if (client->argv) |
2290 | total_mem += zmalloc_size(client->argv)je_malloc_usable_size(client->argv); |
2291 | |
2292 | return sdscatfmt(s, |
2293 | "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I", |
2294 | (unsigned long long) client->id, |
2295 | getClientPeerId(client), |
2296 | getClientSockname(client), |
2297 | connGetInfo(client->conn, conninfo, sizeof(conninfo)), |
2298 | client->name ? (char*)client->name->ptr : "", |
2299 | (long long)(server.unixtime - client->ctime), |
2300 | (long long)(server.unixtime - client->lastinteraction), |
2301 | flags, |
2302 | client->db->id, |
2303 | (int) dictSize(client->pubsub_channels)((client->pubsub_channels)->ht[0].used+(client->pubsub_channels )->ht[1].used), |
2304 | (int) listLength(client->pubsub_patterns)((client->pubsub_patterns)->len), |
2305 | (client->flags & CLIENT_MULTI(1<<3)) ? client->mstate.count : -1, |
2306 | (unsigned long long) sdslen(client->querybuf), |
2307 | (unsigned long long) sdsavail(client->querybuf), |
2308 | (unsigned long long) client->argv_len_sum, |
2309 | (unsigned long long) client->bufpos, |
2310 | (unsigned long long) listLength(client->reply)((client->reply)->len), |
2311 | (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ |
2312 | (unsigned long long) total_mem, |
2313 | events, |
2314 | client->lastcmd ? client->lastcmd->name : "NULL", |
2315 | client->user ? client->user->name : "(superuser)", |
2316 | (client->flags & CLIENT_TRACKING(1ULL<<31)) ? (long long) client->client_tracking_redirection : -1); |
2317 | } |
2318 | |
2319 | sds getAllClientsInfoString(int type) { |
2320 | listNode *ln; |
2321 | listIter li; |
2322 | client *client; |
2323 | sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients)((server.clients)->len)); |
2324 | sdsclear(o); |
2325 | listRewind(server.clients,&li); |
2326 | while ((ln = listNext(&li)) != NULL((void*)0)) { |
2327 | client = listNodeValue(ln)((ln)->value); |
2328 | if (type != -1 && getClientType(client) != type) continue; |
2329 | o = catClientInfoString(o,client); |
2330 | o = sdscatlen(o,"\n",1); |
2331 | } |
2332 | return o; |
2333 | } |
2334 | |
2335 | /* This function implements CLIENT SETNAME, including replying to the |
2336 | * user with an error if the charset is wrong (in that case C_ERR is |
2337 | * returned). If the function succeeeded C_OK is returned, and it's up |
2338 | * to the caller to send a reply if needed. |
2339 | * |
2340 | * Setting an empty string as name has the effect of unsetting the |
2341 | * currently set name: the client will remain unnamed. |
2342 | * |
2343 | * This function is also used to implement the HELLO SETNAME option. */ |
2344 | int clientSetNameOrReply(client *c, robj *name) { |
2345 | int len = sdslen(name->ptr); |
2346 | char *p = name->ptr; |
2347 | |
2348 | /* Setting the client name to an empty string actually removes |
2349 | * the current name. */ |
2350 | if (len == 0) { |
2351 | if (c->name) decrRefCount(c->name); |
2352 | c->name = NULL((void*)0); |
2353 | return C_OK0; |
2354 | } |
2355 | |
2356 | /* Otherwise check if the charset is ok. We need to do this otherwise |
2357 | * CLIENT LIST format will break. You should always be able to |
2358 | * split by space to get the different fields. */ |
2359 | for (int j = 0; j < len; j++) { |
2360 | if (p[j] < '!' || p[j] > '~') { /* ASCII is assumed. */ |
2361 | addReplyError(c, |
2362 | "Client names cannot contain spaces, " |
2363 | "newlines or special characters."); |
2364 | return C_ERR-1; |
2365 | } |
2366 | } |
2367 | if (c->name) decrRefCount(c->name); |
2368 | c->name = name; |
2369 | incrRefCount(name); |
2370 | return C_OK0; |
2371 | } |
2372 | |
2373 | /* Reset the client state to resemble a newly connected client. |
2374 | */ |
2375 | void resetCommand(client *c) { |
2376 | listNode *ln; |
2377 | |
2378 | /* MONITOR clients are also marked with CLIENT_SLAVE, we need to |
2379 | * distinguish between the two. |
2380 | */ |
2381 | if (c->flags & CLIENT_MONITOR(1<<2)) { |
2382 | ln = listSearchKey(server.monitors,c); |
2383 | serverAssert(ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssert("ln != NULL","networking.c" ,2383),__builtin_unreachable())); |
2384 | listDelNode(server.monitors,ln); |
2385 | |
2386 | c->flags &= ~(CLIENT_MONITOR(1<<2)|CLIENT_SLAVE(1<<0)); |
2387 | } |
2388 | |
2389 | if (c->flags & (CLIENT_SLAVE(1<<0)|CLIENT_MASTER(1<<1)|CLIENT_MODULE(1<<27))) { |
2390 | addReplyError(c,"can only reset normal client connections"); |
2391 | return; |
2392 | } |
2393 | |
2394 | if (c->flags & CLIENT_TRACKING(1ULL<<31)) disableTracking(c); |
2395 | selectDb(c,0); |
2396 | c->resp = 2; |
2397 | |
2398 | clientSetDefaultAuth(c); |
2399 | moduleNotifyUserChanged(c); |
2400 | discardTransaction(c); |
2401 | |
2402 | pubsubUnsubscribeAllChannels(c,0); |
2403 | pubsubUnsubscribeAllPatterns(c,0); |
2404 | |
2405 | if (c->name) { |
2406 | decrRefCount(c->name); |
2407 | c->name = NULL((void*)0); |
2408 | } |
2409 | |
2410 | /* Selectively clear state flags not covered above */ |
2411 | c->flags &= ~(CLIENT_ASKING(1<<9)|CLIENT_READONLY(1<<17)|CLIENT_PUBSUB(1<<18)| |
2412 | CLIENT_REPLY_OFF(1<<22)|CLIENT_REPLY_SKIP_NEXT(1<<23)); |
2413 | |
2414 | addReplyStatus(c,"RESET"); |
2415 | } |
2416 | |
2417 | void clientCommand(client *c) { |
2418 | listNode *ln; |
2419 | listIter li; |
2420 | |
2421 | if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { |
2422 | const char *help[] = { |
2423 | "CACHING (YES|NO)", |
2424 | " Enable/disable tracking of the keys for next command in OPTIN/OPTOUT modes.", |
2425 | "GETREDIR", |
2426 | " Return the client ID we are redirecting to when tracking is enabled.", |
2427 | "GETNAME", |
2428 | " Return the name of the current connection.", |
2429 | "ID", |
2430 | " Return the ID of the current connection.", |
2431 | "INFO", |
2432 | " Return information about the current client connection.", |
2433 | "KILL <ip:port>", |
2434 | " Kill connection made from <ip:port>.", |
2435 | "KILL <option> <value> [<option> <value> [...]]", |
2436 | " Kill connections. Options are:", |
2437 | " * ADDR (<ip:port>|<unixsocket>:0)", |
2438 | " Kill connections made from the specified address", |
2439 | " * LADDR (<ip:port>|<unixsocket>:0)", |
2440 | " Kill connections made to specified local address", |
2441 | " * TYPE (normal|master|replica|pubsub)", |
2442 | " Kill connections by type.", |
2443 | " * USER <username>", |
2444 | " Kill connections authenticated by <username>.", |
2445 | " * SKIPME (YES|NO)", |
2446 | " Skip killing current connection (default: yes).", |
2447 | "LIST [options ...]", |
2448 | " Return information about client connections. Options:", |
2449 | " * TYPE (NORMAL|MASTER|REPLICA|PUBSUB)", |
2450 | " Return clients of specified type.", |
2451 | "UNPAUSE", |
2452 | " Stop the current client pause, resuming traffic.", |
2453 | "PAUSE <timeout> [WRITE|ALL]", |
2454 | " Suspend all, or just write, clients for <timout> milliseconds.", |
2455 | "REPLY (ON|OFF|SKIP)", |
2456 | " Control the replies sent to the current connection.", |
2457 | "SETNAME <name>", |
2458 | " Assign the name <name> to the current connection.", |
2459 | "UNBLOCK <clientid> [TIMEOUT|ERROR]", |
2460 | " Unblock the specified blocked client.", |
2461 | "TRACKING (ON|OFF) [REDIRECT <id>] [BCAST] [PREFIX <prefix> [...]]", |
2462 | " [OPTIN] [OPTOUT]", |
2463 | " Control server assisted client side caching.", |
2464 | "TRACKINGINFO", |
2465 | " Report tracking status for the current connection.", |
2466 | NULL((void*)0) |
2467 | }; |
2468 | addReplyHelp(c, help); |
2469 | } else if (!strcasecmp(c->argv[1]->ptr,"id") && c->argc == 2) { |
2470 | /* CLIENT ID */ |
2471 | addReplyLongLong(c,c->id); |
2472 | } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { |
2473 | /* CLIENT INFO */ |
2474 | sds o = catClientInfoString(sdsempty(), c); |
2475 | o = sdscatlen(o,"\n",1); |
2476 | addReplyVerbatim(c,o,sdslen(o),"txt"); |
2477 | sdsfree(o); |
2478 | } else if (!strcasecmp(c->argv[1]->ptr,"list")) { |
2479 | /* CLIENT LIST */ |
2480 | int type = -1; |
2481 | sds o = NULL((void*)0); |
2482 | if (c->argc == 4 && !strcasecmp(c->argv[2]->ptr,"type")) { |
2483 | type = getClientTypeByName(c->argv[3]->ptr); |
2484 | if (type == -1) { |
2485 | addReplyErrorFormat(c,"Unknown client type '%s'", |
2486 | (char*) c->argv[3]->ptr); |
2487 | return; |
2488 | } |
2489 | } else if (c->argc > 3 && !strcasecmp(c->argv[2]->ptr,"id")) { |
2490 | int j; |
2491 | o = sdsempty(); |
2492 | for (j = 3; j < c->argc; j++) { |
2493 | long long cid; |
2494 | if (getLongLongFromObjectOrReply(c, c->argv[j], &cid, |
2495 | "Invalid client ID")) { |
2496 | sdsfree(o); |
2497 | return; |
2498 | } |
2499 | client *cl = lookupClientByID(cid); |
2500 | if (cl) { |
2501 | o = catClientInfoString(o, cl); |
2502 | o = sdscatlen(o, "\n", 1); |
2503 | } |
2504 | } |
2505 | } else if (c->argc != 2) { |
2506 | addReplyErrorObject(c,shared.syntaxerr); |
2507 | return; |
2508 | } |
2509 | |
2510 | if (!o) |
2511 | o = getAllClientsInfoString(type); |
2512 | addReplyVerbatim(c,o,sdslen(o),"txt"); |
2513 | sdsfree(o); |
2514 | } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) { |
2515 | /* CLIENT REPLY ON|OFF|SKIP */ |
2516 | if (!strcasecmp(c->argv[2]->ptr,"on")) { |
2517 | c->flags &= ~(CLIENT_REPLY_SKIP(1<<24)|CLIENT_REPLY_OFF(1<<22)); |
2518 | addReply(c,shared.ok); |
2519 | } else if (!strcasecmp(c->argv[2]->ptr,"off")) { |
2520 | c->flags |= CLIENT_REPLY_OFF(1<<22); |
2521 | } else if (!strcasecmp(c->argv[2]->ptr,"skip")) { |
2522 | if (!(c->flags & CLIENT_REPLY_OFF(1<<22))) |
2523 | c->flags |= CLIENT_REPLY_SKIP_NEXT(1<<23); |
2524 | } else { |
2525 | addReplyErrorObject(c,shared.syntaxerr); |
2526 | return; |
2527 | } |
2528 | } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { |
2529 | /* CLIENT KILL <ip:port> |
2530 | * CLIENT KILL <option> [value] ... <option> [value] */ |
2531 | char *addr = NULL((void*)0); |
2532 | char *laddr = NULL((void*)0); |
2533 | user *user = NULL((void*)0); |
2534 | int type = -1; |
2535 | uint64_t id = 0; |
2536 | int skipme = 1; |
2537 | int killed = 0, close_this_client = 0; |
2538 | |
2539 | if (c->argc == 3) { |
2540 | /* Old style syntax: CLIENT KILL <addr> */ |
2541 | addr = c->argv[2]->ptr; |
2542 | skipme = 0; /* With the old form, you can kill yourself. */ |
2543 | } else if (c->argc > 3) { |
2544 | int i = 2; /* Next option index. */ |
2545 | |
2546 | /* New style syntax: parse options. */ |
2547 | while(i < c->argc) { |
2548 | int moreargs = c->argc > i+1; |
2549 | |
2550 | if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) { |
2551 | long long tmp; |
2552 | |
2553 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL((void*)0)) |
2554 | != C_OK0) return; |
2555 | id = tmp; |
2556 | } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) { |
2557 | type = getClientTypeByName(c->argv[i+1]->ptr); |
2558 | if (type == -1) { |
2559 | addReplyErrorFormat(c,"Unknown client type '%s'", |
2560 | (char*) c->argv[i+1]->ptr); |
2561 | return; |
2562 | } |
2563 | } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) { |
2564 | addr = c->argv[i+1]->ptr; |
2565 | } else if (!strcasecmp(c->argv[i]->ptr,"laddr") && moreargs) { |
2566 | laddr = c->argv[i+1]->ptr; |
2567 | } else if (!strcasecmp(c->argv[i]->ptr,"user") && moreargs) { |
2568 | user = ACLGetUserByName(c->argv[i+1]->ptr, |
2569 | sdslen(c->argv[i+1]->ptr)); |
2570 | if (user == NULL((void*)0)) { |
2571 | addReplyErrorFormat(c,"No such user '%s'", |
2572 | (char*) c->argv[i+1]->ptr); |
2573 | return; |
2574 | } |
2575 | } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && moreargs) { |
2576 | if (!strcasecmp(c->argv[i+1]->ptr,"yes")) { |
2577 | skipme = 1; |
2578 | } else if (!strcasecmp(c->argv[i+1]->ptr,"no")) { |
2579 | skipme = 0; |
2580 | } else { |
2581 | addReplyErrorObject(c,shared.syntaxerr); |
2582 | return; |
2583 | } |
2584 | } else { |
2585 | addReplyErrorObject(c,shared.syntaxerr); |
2586 | return; |
2587 | } |
2588 | i += 2; |
2589 | } |
2590 | } else { |
2591 | addReplyErrorObject(c,shared.syntaxerr); |
2592 | return; |
2593 | } |
2594 | |
2595 | /* Iterate clients killing all the matching clients. */ |
2596 | listRewind(server.clients,&li); |
2597 | while ((ln = listNext(&li)) != NULL((void*)0)) { |
2598 | client *client = listNodeValue(ln)((ln)->value); |
2599 | if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; |
2600 | if (laddr && strcmp(getClientSockname(client),laddr) != 0) continue; |
2601 | if (type != -1 && getClientType(client) != type) continue; |
2602 | if (id != 0 && client->id != id) continue; |
2603 | if (user && client->user != user) continue; |
2604 | if (c == client && skipme) continue; |
2605 | |
2606 | /* Kill it. */ |
2607 | if (c == client) { |
2608 | close_this_client = 1; |
2609 | } else { |
2610 | freeClient(client); |
2611 | } |
2612 | killed++; |
2613 | } |
2614 | |
2615 | /* Reply according to old/new format. */ |
2616 | if (c->argc == 3) { |
2617 | if (killed == 0) |
2618 | addReplyError(c,"No such client"); |
2619 | else |
2620 | addReply(c,shared.ok); |
2621 | } else { |
2622 | addReplyLongLong(c,killed); |
2623 | } |
2624 | |
2625 | /* If this client has to be closed, flag it as CLOSE_AFTER_REPLY |
2626 | * only after we queued the reply to its output buffers. */ |
2627 | if (close_this_client) c->flags |= CLIENT_CLOSE_AFTER_REPLY(1<<6); |
2628 | } else if (!strcasecmp(c->argv[1]->ptr,"unblock") && (c->argc == 3 || |
2629 | c->argc == 4)) |
2630 | { |
2631 | /* CLIENT UNBLOCK <id> [timeout|error] */ |
2632 | long long id; |
2633 | int unblock_error = 0; |
2634 | |
2635 | if (c->argc == 4) { |
2636 | if (!strcasecmp(c->argv[3]->ptr,"timeout")) { |
2637 | unblock_error = 0; |
2638 | } else if (!strcasecmp(c->argv[3]->ptr,"error")) { |
2639 | unblock_error = 1; |
2640 | } else { |
2641 | addReplyError(c, |
2642 | "CLIENT UNBLOCK reason should be TIMEOUT or ERROR"); |
2643 | return; |
2644 | } |
2645 | } |
2646 | if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL((void*)0)) |
2647 | != C_OK0) return; |
2648 | struct client *target = lookupClientByID(id); |
2649 | if (target && target->flags & CLIENT_BLOCKED(1<<4)) { |
2650 | if (unblock_error) |
2651 | addReplyError(target, |
2652 | "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); |
2653 | else |
2654 | replyToBlockedClientTimedOut(target); |
2655 | unblockClient(target); |
2656 | addReply(c,shared.cone); |
2657 | } else { |
2658 | addReply(c,shared.czero); |
2659 | } |
2660 | } else if (!strcasecmp(c->argv[1]->ptr,"setname") && c->argc == 3) { |
2661 | /* CLIENT SETNAME */ |
2662 | if (clientSetNameOrReply(c,c->argv[2]) == C_OK0) |
2663 | addReply(c,shared.ok); |
2664 | } else if (!strcasecmp(c->argv[1]->ptr,"getname") && c->argc == 2) { |
2665 | /* CLIENT GETNAME */ |
2666 | if (c->name) |
2667 | addReplyBulk(c,c->name); |
2668 | else |
2669 | addReplyNull(c); |
2670 | } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) { |
2671 | /* CLIENT UNPAUSE */ |
2672 | unpauseClients(); |
2673 | addReply(c,shared.ok); |
2674 | } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 || |
2675 | c->argc == 4)) |
2676 | { |
2677 | /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */ |
2678 | mstime_t end; |
2679 | int type = CLIENT_PAUSE_ALL; |
2680 | if (c->argc == 4) { |
2681 | if (!strcasecmp(c->argv[3]->ptr,"write")) { |
2682 | type = CLIENT_PAUSE_WRITE; |
2683 | } else if (!strcasecmp(c->argv[3]->ptr,"all")) { |
2684 | type = CLIENT_PAUSE_ALL; |
2685 | } else { |
2686 | addReplyError(c, |
2687 | "CLIENT PAUSE mode must be WRITE or ALL"); |
2688 | return; |
2689 | } |
2690 | } |
2691 | |
2692 | if (getTimeoutFromObjectOrReply(c,c->argv[2],&end, |
2693 | UNIT_MILLISECONDS1) != C_OK0) return; |
2694 | pauseClients(end, type); |
2695 | addReply(c,shared.ok); |
2696 | } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { |
2697 | /* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first] |
2698 | * [PREFIX second] [OPTIN] [OPTOUT] ... */ |
2699 | long long redir = 0; |
2700 | uint64_t options = 0; |
2701 | robj **prefix = NULL((void*)0); |
2702 | size_t numprefix = 0; |
2703 | |
2704 | /* Parse the options. */ |
2705 | for (int j = 3; j < c->argc; j++) { |
2706 | int moreargs = (c->argc-1) - j; |
2707 | |
2708 | if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { |
2709 | j++; |
2710 | if (redir != 0) { |
2711 | addReplyError(c,"A client can only redirect to a single " |
2712 | "other client"); |
2713 | zfree(prefix); |
2714 | return; |
2715 | } |
2716 | |
2717 | if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL((void*)0)) != |
2718 | C_OK0) |
2719 | { |
2720 | zfree(prefix); |
2721 | return; |
2722 | } |
2723 | /* We will require the client with the specified ID to exist |
2724 | * right now, even if it is possible that it gets disconnected |
2725 | * later. Still a valid sanity check. */ |
2726 | if (lookupClientByID(redir) == NULL((void*)0)) { |
2727 | addReplyError(c,"The client ID you want redirect to " |
2728 | "does not exist"); |
2729 | zfree(prefix); |
2730 | return; |
2731 | } |
2732 | } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { |
2733 | options |= CLIENT_TRACKING_BCAST(1ULL<<33); |
2734 | } else if (!strcasecmp(c->argv[j]->ptr,"optin")) { |
2735 | options |= CLIENT_TRACKING_OPTIN(1ULL<<34); |
2736 | } else if (!strcasecmp(c->argv[j]->ptr,"optout")) { |
2737 | options |= CLIENT_TRACKING_OPTOUT(1ULL<<35); |
2738 | } else if (!strcasecmp(c->argv[j]->ptr,"noloop")) { |
2739 | options |= CLIENT_TRACKING_NOLOOP(1ULL<<37); |
2740 | } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { |
2741 | j++; |
2742 | prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); |
2743 | prefix[numprefix++] = c->argv[j]; |
2744 | } else { |
2745 | zfree(prefix); |
2746 | addReplyErrorObject(c,shared.syntaxerr); |
2747 | return; |
2748 | } |
2749 | } |
2750 | |
2751 | /* Options are ok: enable or disable the tracking for this client. */ |
2752 | if (!strcasecmp(c->argv[2]->ptr,"on")) { |
2753 | /* Before enabling tracking, make sure options are compatible |
2754 | * among each other and with the current state of the client. */ |
2755 | if (!(options & CLIENT_TRACKING_BCAST(1ULL<<33)) && numprefix) { |
2756 | addReplyError(c, |
2757 | "PREFIX option requires BCAST mode to be enabled"); |
2758 | zfree(prefix); |
2759 | return; |
2760 | } |
2761 | |
2762 | if (c->flags & CLIENT_TRACKING(1ULL<<31)) { |
2763 | int oldbcast = !!(c->flags & CLIENT_TRACKING_BCAST(1ULL<<33)); |
2764 | int newbcast = !!(options & CLIENT_TRACKING_BCAST(1ULL<<33)); |
2765 | if (oldbcast != newbcast) { |
2766 | addReplyError(c, |
2767 | "You can't switch BCAST mode on/off before disabling " |
2768 | "tracking for this client, and then re-enabling it with " |
2769 | "a different mode."); |
2770 | zfree(prefix); |
2771 | return; |
2772 | } |
2773 | } |
2774 | |
2775 | if (options & CLIENT_TRACKING_BCAST(1ULL<<33) && |
2776 | options & (CLIENT_TRACKING_OPTIN(1ULL<<34)|CLIENT_TRACKING_OPTOUT(1ULL<<35))) |
2777 | { |
2778 | addReplyError(c, |
2779 | "OPTIN and OPTOUT are not compatible with BCAST"); |
2780 | zfree(prefix); |
2781 | return; |
2782 | } |
2783 | |
2784 | if (options & CLIENT_TRACKING_OPTIN(1ULL<<34) && options & CLIENT_TRACKING_OPTOUT(1ULL<<35)) |
2785 | { |
2786 | addReplyError(c, |
2787 | "You can't specify both OPTIN mode and OPTOUT mode"); |
2788 | zfree(prefix); |
2789 | return; |
2790 | } |
2791 | |
2792 | if ((options & CLIENT_TRACKING_OPTIN(1ULL<<34) && c->flags & CLIENT_TRACKING_OPTOUT(1ULL<<35)) || |
2793 | (options & CLIENT_TRACKING_OPTOUT(1ULL<<35) && c->flags & CLIENT_TRACKING_OPTIN(1ULL<<34))) |
2794 | { |
2795 | addReplyError(c, |
2796 | "You can't switch OPTIN/OPTOUT mode before disabling " |
2797 | "tracking for this client, and then re-enabling it with " |
2798 | "a different mode."); |
2799 | zfree(prefix); |
2800 | return; |
2801 | } |
2802 | |
2803 | if (options & CLIENT_TRACKING_BCAST(1ULL<<33)) { |
2804 | if (!checkPrefixCollisionsOrReply(c,prefix,numprefix)) { |
2805 | zfree(prefix); |
2806 | return; |
2807 | } |
2808 | } |
2809 | |
2810 | enableTracking(c,redir,options,prefix,numprefix); |
2811 | } else if (!strcasecmp(c->argv[2]->ptr,"off")) { |
2812 | disableTracking(c); |
2813 | } else { |
2814 | zfree(prefix); |
2815 | addReplyErrorObject(c,shared.syntaxerr); |
2816 | return; |
2817 | } |
2818 | zfree(prefix); |
2819 | addReply(c,shared.ok); |
2820 | } else if (!strcasecmp(c->argv[1]->ptr,"caching") && c->argc >= 3) { |
2821 | if (!(c->flags & CLIENT_TRACKING(1ULL<<31))) { |
2822 | addReplyError(c,"CLIENT CACHING can be called only when the " |
2823 | "client is in tracking mode with OPTIN or " |
2824 | "OPTOUT mode enabled"); |
2825 | return; |
2826 | } |
2827 | |
2828 | char *opt = c->argv[2]->ptr; |
2829 | if (!strcasecmp(opt,"yes")) { |
2830 | if (c->flags & CLIENT_TRACKING_OPTIN(1ULL<<34)) { |
2831 | c->flags |= CLIENT_TRACKING_CACHING(1ULL<<36); |
2832 | } else { |
2833 | addReplyError(c,"CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode."); |
2834 | return; |
2835 | } |
2836 | } else if (!strcasecmp(opt,"no")) { |
2837 | if (c->flags & CLIENT_TRACKING_OPTOUT(1ULL<<35)) { |
2838 | c->flags |= CLIENT_TRACKING_CACHING(1ULL<<36); |
2839 | } else { |
2840 | addReplyError(c,"CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode."); |
2841 | return; |
2842 | } |
2843 | } else { |
2844 | addReplyErrorObject(c,shared.syntaxerr); |
2845 | return; |
2846 | } |
2847 | |
2848 | /* Common reply for when we succeeded. */ |
2849 | addReply(c,shared.ok); |
2850 | } else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) { |
2851 | /* CLIENT GETREDIR */ |
2852 | if (c->flags & CLIENT_TRACKING(1ULL<<31)) { |
2853 | addReplyLongLong(c,c->client_tracking_redirection); |
2854 | } else { |
2855 | addReplyLongLong(c,-1); |
2856 | } |
2857 | } else if (!strcasecmp(c->argv[1]->ptr,"trackinginfo") && c->argc == 2) { |
2858 | addReplyMapLen(c,3); |
2859 | |
2860 | /* Flags */ |
2861 | addReplyBulkCString(c,"flags"); |
2862 | void *arraylen_ptr = addReplyDeferredLen(c); |
2863 | int numflags = 0; |
2864 | addReplyBulkCString(c,c->flags & CLIENT_TRACKING(1ULL<<31) ? "on" : "off"); |
2865 | numflags++; |
2866 | if (c->flags & CLIENT_TRACKING_BCAST(1ULL<<33)) { |
2867 | addReplyBulkCString(c,"bcast"); |
2868 | numflags++; |
2869 | } |
2870 | if (c->flags & CLIENT_TRACKING_OPTIN(1ULL<<34)) { |
2871 | addReplyBulkCString(c,"optin"); |
2872 | numflags++; |
2873 | if (c->flags & CLIENT_TRACKING_CACHING(1ULL<<36)) { |
2874 | addReplyBulkCString(c,"caching-yes"); |
2875 | numflags++; |
2876 | } |
2877 | } |
2878 | if (c->flags & CLIENT_TRACKING_OPTOUT(1ULL<<35)) { |
2879 | addReplyBulkCString(c,"optout"); |
2880 | numflags++; |
2881 | if (c->flags & CLIENT_TRACKING_CACHING(1ULL<<36)) { |
2882 | addReplyBulkCString(c,"caching-no"); |
2883 | numflags++; |
2884 | } |
2885 | } |
2886 | if (c->flags & CLIENT_TRACKING_NOLOOP(1ULL<<37)) { |
2887 | addReplyBulkCString(c,"noloop"); |
2888 | numflags++; |
2889 | } |
2890 | if (c->flags & CLIENT_TRACKING_BROKEN_REDIR(1ULL<<32)) { |
2891 | addReplyBulkCString(c,"broken_redirect"); |
2892 | numflags++; |
2893 | } |
2894 | setDeferredSetLen(c,arraylen_ptr,numflags); |
2895 | |
2896 | /* Redirect */ |
2897 | addReplyBulkCString(c,"redirect"); |
2898 | if (c->flags & CLIENT_TRACKING(1ULL<<31)) { |
2899 | addReplyLongLong(c,c->client_tracking_redirection); |
2900 | } else { |
2901 | addReplyLongLong(c,-1); |
2902 | } |
2903 | |
2904 | /* Prefixes */ |
2905 | addReplyBulkCString(c,"prefixes"); |
2906 | if (c->client_tracking_prefixes) { |
2907 | addReplyArrayLen(c,raxSize(c->client_tracking_prefixes)); |
2908 | raxIterator ri; |
2909 | raxStart(&ri,c->client_tracking_prefixes); |
2910 | raxSeek(&ri,"^",NULL((void*)0),0); |
2911 | while(raxNext(&ri)) { |
2912 | addReplyBulkCBuffer(c,ri.key,ri.key_len); |
2913 | } |
2914 | raxStop(&ri); |
2915 | } else { |
2916 | addReplyArrayLen(c,0); |
2917 | } |
2918 | } else { |
2919 | addReplySubcommandSyntaxError(c); |
2920 | } |
2921 | } |
2922 | |
2923 | /* HELLO [<protocol-version> [AUTH <user> <password>] [SETNAME <name>] ] */ |
2924 | void helloCommand(client *c) { |
2925 | long long ver = 0; |
2926 | int next_arg = 1; |
2927 | |
2928 | if (c->argc >= 2) { |
2929 | if (getLongLongFromObjectOrReply(c, c->argv[next_arg++], &ver, |
2930 | "Protocol version is not an integer or out of range") != C_OK0) { |
2931 | return; |
2932 | } |
2933 | |
2934 | if (ver < 2 || ver > 3) { |
2935 | addReplyError(c,"-NOPROTO unsupported protocol version"); |
2936 | return; |
2937 | } |
2938 | } |
2939 | |
2940 | for (int j = next_arg; j < c->argc; j++) { |
2941 | int moreargs = (c->argc-1) - j; |
2942 | const char *opt = c->argv[j]->ptr; |
2943 | if (!strcasecmp(opt,"AUTH") && moreargs >= 2) { |
2944 | if (ACLAuthenticateUser(c, c->argv[j+1], c->argv[j+2]) == C_ERR-1) { |
2945 | addReplyError(c,"-WRONGPASS invalid username-password pair or user is disabled."); |
2946 | return; |
2947 | } |
2948 | j += 2; |
2949 | } else if (!strcasecmp(opt,"SETNAME") && moreargs) { |
2950 | if (clientSetNameOrReply(c, c->argv[j+1]) == C_ERR-1) return; |
2951 | j++; |
2952 | } else { |
2953 | addReplyErrorFormat(c,"Syntax error in HELLO option '%s'",opt); |
2954 | return; |
2955 | } |
2956 | } |
2957 | |
2958 | /* At this point we need to be authenticated to continue. */ |
2959 | if (!c->authenticated) { |
2960 | addReplyError(c,"-NOAUTH HELLO must be called with the client already " |
2961 | "authenticated, otherwise the HELLO AUTH <user> <pass> " |
2962 | "option can be used to authenticate the client and " |
2963 | "select the RESP protocol version at the same time"); |
2964 | return; |
2965 | } |
2966 | |
2967 | /* Let's switch to the specified RESP mode. */ |
2968 | if (ver) c->resp = ver; |
2969 | addReplyMapLen(c,6 + !server.sentinel_mode); |
2970 | |
2971 | addReplyBulkCString(c,"server"); |
2972 | addReplyBulkCString(c,"redis"); |
2973 | |
2974 | addReplyBulkCString(c,"version"); |
2975 | addReplyBulkCString(c,REDIS_VERSION"6.2.1"); |
2976 | |
2977 | addReplyBulkCString(c,"proto"); |
2978 | addReplyLongLong(c,c->resp); |
2979 | |
2980 | addReplyBulkCString(c,"id"); |
2981 | addReplyLongLong(c,c->id); |
2982 | |
2983 | addReplyBulkCString(c,"mode"); |
2984 | if (server.sentinel_mode) addReplyBulkCString(c,"sentinel"); |
2985 | else if (server.cluster_enabled) addReplyBulkCString(c,"cluster"); |
2986 | else addReplyBulkCString(c,"standalone"); |
2987 | |
2988 | if (!server.sentinel_mode) { |
2989 | addReplyBulkCString(c,"role"); |
2990 | addReplyBulkCString(c,server.masterhost ? "replica" : "master"); |
2991 | } |
2992 | |
2993 | addReplyBulkCString(c,"modules"); |
2994 | addReplyLoadedModules(c); |
2995 | } |
2996 | |
2997 | /* This callback is bound to POST and "Host:" command names. Those are not |
2998 | * really commands, but are used in security attacks in order to talk to |
2999 | * Redis instances via HTTP, with a technique called "cross protocol scripting" |
3000 | * which exploits the fact that services like Redis will discard invalid |
3001 | * HTTP headers and will process what follows. |
3002 | * |
3003 | * As a protection against this attack, Redis will terminate the connection |
3004 | * when a POST or "Host:" header is seen, and will log the event from |
3005 | * time to time (to avoid creating a DOS as a result of too many logs). */ |
3006 | void securityWarningCommand(client *c) { |
3007 | static time_t logged_time; |
3008 | time_t now = time(NULL((void*)0)); |
3009 | |
3010 | if (labs(now-logged_time) > 60) { |
3011 | serverLog(LL_WARNING3,"Possible SECURITY ATTACK detected. It looks like somebody is sending POST or Host: commands to Redis. This is likely due to an attacker attempting to use Cross Protocol Scripting to compromise your Redis instance. Connection aborted."); |
3012 | logged_time = now; |
3013 | } |
3014 | freeClientAsync(c); |
3015 | } |
3016 | |
3017 | /* Keep track of the original command arguments so that we can generate |
3018 | * an accurate slowlog entry after the command has been executed. */ |
3019 | static void retainOriginalCommandVector(client *c) { |
3020 | /* We already rewrote this command, so don't rewrite it again */ |
3021 | if (c->original_argv) return; |
3022 | c->original_argc = c->argc; |
3023 | c->original_argv = zmalloc(sizeof(robj*)*(c->argc)); |
3024 | for (int j = 0; j < c->argc; j++) { |
3025 | c->original_argv[j] = c->argv[j]; |
3026 | incrRefCount(c->argv[j]); |
3027 | } |
3028 | } |
3029 | |
3030 | /* Rewrite the command vector of the client. All the new objects ref count |
3031 | * is incremented. The old command vector is freed, and the old objects |
3032 | * ref count is decremented. */ |
3033 | void rewriteClientCommandVector(client *c, int argc, ...) { |
3034 | va_list ap; |
3035 | int j; |
3036 | robj **argv; /* The new argument vector */ |
3037 | |
3038 | argv = zmalloc(sizeof(robj*)*argc); |
3039 | va_start(ap,argc)__builtin_va_start(ap, argc); |
3040 | for (j = 0; j < argc; j++) { |
3041 | robj *a; |
3042 | |
3043 | a = va_arg(ap, robj*)__builtin_va_arg(ap, robj*); |
3044 | argv[j] = a; |
3045 | incrRefCount(a); |
3046 | } |
3047 | replaceClientCommandVector(c, argc, argv); |
3048 | va_end(ap)__builtin_va_end(ap); |
3049 | } |
3050 | |
3051 | /* Completely replace the client command vector with the provided one. */ |
3052 | void replaceClientCommandVector(client *c, int argc, robj **argv) { |
3053 | int j; |
3054 | retainOriginalCommandVector(c); |
3055 | freeClientArgv(c); |
3056 | zfree(c->argv); |
3057 | c->argv = argv; |
3058 | c->argc = argc; |
3059 | c->argv_len_sum = 0; |
3060 | for (j = 0; j < c->argc; j++) |
3061 | if (c->argv[j]) |
3062 | c->argv_len_sum += getStringObjectLen(c->argv[j]); |
3063 | c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); |
3064 | serverAssertWithInfo(c,NULL,c->cmd != NULL)((c->cmd != ((void*)0))?(void)0 : (_serverAssertWithInfo(c ,((void*)0),"c->cmd != NULL","networking.c",3064),__builtin_unreachable ())); |
3065 | } |
3066 | |
3067 | /* Rewrite a single item in the command vector. |
3068 | * The new val ref count is incremented, and the old decremented. |
3069 | * |
3070 | * It is possible to specify an argument over the current size of the |
3071 | * argument vector: in this case the array of objects gets reallocated |
3072 | * and c->argc set to the max value. However it's up to the caller to |
3073 | * |
3074 | * 1. Make sure there are no "holes" and all the arguments are set. |
3075 | * 2. If the original argument vector was longer than the one we |
3076 | * want to end with, it's up to the caller to set c->argc and |
3077 | * free the no longer used objects on c->argv. */ |
3078 | void rewriteClientCommandArgument(client *c, int i, robj *newval) { |
3079 | robj *oldval; |
3080 | retainOriginalCommandVector(c); |
3081 | if (i >= c->argc) { |
3082 | c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1)); |
3083 | c->argc = i+1; |
3084 | c->argv[i] = NULL((void*)0); |
3085 | } |
3086 | oldval = c->argv[i]; |
3087 | if (oldval) c->argv_len_sum -= getStringObjectLen(oldval); |
3088 | if (newval) c->argv_len_sum += getStringObjectLen(newval); |
3089 | c->argv[i] = newval; |
3090 | incrRefCount(newval); |
3091 | if (oldval) decrRefCount(oldval); |
3092 | |
3093 | /* If this is the command name make sure to fix c->cmd. */ |
3094 | if (i == 0) { |
3095 | c->cmd = lookupCommandOrOriginal(c->argv[0]->ptr); |
3096 | serverAssertWithInfo(c,NULL,c->cmd != NULL)((c->cmd != ((void*)0))?(void)0 : (_serverAssertWithInfo(c ,((void*)0),"c->cmd != NULL","networking.c",3096),__builtin_unreachable ())); |
3097 | } |
3098 | } |
3099 | |
3100 | /* This function returns the number of bytes that Redis is |
3101 | * using to store the reply still not read by the client. |
3102 | * |
3103 | * Note: this function is very fast so can be called as many time as |
3104 | * the caller wishes. The main usage of this function currently is |
3105 | * enforcing the client output length limits. */ |
3106 | unsigned long getClientOutputBufferMemoryUsage(client *c) { |
3107 | unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); |
3108 | return c->reply_bytes + (list_item_size*listLength(c->reply)((c->reply)->len)); |
3109 | } |
3110 | |
3111 | /* Get the class of a client, used in order to enforce limits to different |
3112 | * classes of clients. |
3113 | * |
3114 | * The function will return one of the following: |
3115 | * CLIENT_TYPE_NORMAL -> Normal client |
3116 | * CLIENT_TYPE_SLAVE -> Slave |
3117 | * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels |
3118 | * CLIENT_TYPE_MASTER -> The client representing our replication master. |
3119 | */ |
3120 | int getClientType(client *c) { |
3121 | if (c->flags & CLIENT_MASTER(1<<1)) return CLIENT_TYPE_MASTER3; |
3122 | /* Even though MONITOR clients are marked as replicas, we |
3123 | * want the expose them as normal clients. */ |
3124 | if ((c->flags & CLIENT_SLAVE(1<<0)) && !(c->flags & CLIENT_MONITOR(1<<2))) |
3125 | return CLIENT_TYPE_SLAVE1; |
3126 | if (c->flags & CLIENT_PUBSUB(1<<18)) return CLIENT_TYPE_PUBSUB2; |
3127 | return CLIENT_TYPE_NORMAL0; |
3128 | } |
3129 | |
3130 | int getClientTypeByName(char *name) { |
3131 | if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL0; |
3132 | else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE1; |
3133 | else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE1; |
3134 | else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB2; |
3135 | else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER3; |
3136 | else return -1; |
3137 | } |
3138 | |
3139 | char *getClientTypeName(int class) { |
3140 | switch(class) { |
3141 | case CLIENT_TYPE_NORMAL0: return "normal"; |
3142 | case CLIENT_TYPE_SLAVE1: return "slave"; |
3143 | case CLIENT_TYPE_PUBSUB2: return "pubsub"; |
3144 | case CLIENT_TYPE_MASTER3: return "master"; |
3145 | default: return NULL((void*)0); |
3146 | } |
3147 | } |
3148 | |
3149 | /* The function checks if the client reached output buffer soft or hard |
3150 | * limit, and also update the state needed to check the soft limit as |
3151 | * a side effect. |
3152 | * |
3153 | * Return value: non-zero if the client reached the soft or the hard limit. |
3154 | * Otherwise zero is returned. */ |
3155 | int checkClientOutputBufferLimits(client *c) { |
3156 | int soft = 0, hard = 0, class; |
3157 | unsigned long used_mem = getClientOutputBufferMemoryUsage(c); |
3158 | |
3159 | class = getClientType(c); |
3160 | /* For the purpose of output buffer limiting, masters are handled |
3161 | * like normal clients. */ |
3162 | if (class == CLIENT_TYPE_MASTER3) class = CLIENT_TYPE_NORMAL0; |
3163 | |
3164 | if (server.client_obuf_limits[class].hard_limit_bytes && |
3165 | used_mem >= server.client_obuf_limits[class].hard_limit_bytes) |
3166 | hard = 1; |
3167 | if (server.client_obuf_limits[class].soft_limit_bytes && |
3168 | used_mem >= server.client_obuf_limits[class].soft_limit_bytes) |
3169 | soft = 1; |
3170 | |
3171 | /* We need to check if the soft limit is reached continuously for the |
3172 | * specified amount of seconds. */ |
3173 | if (soft) { |
3174 | if (c->obuf_soft_limit_reached_time == 0) { |
3175 | c->obuf_soft_limit_reached_time = server.unixtime; |
3176 | soft = 0; /* First time we see the soft limit reached */ |
3177 | } else { |
3178 | time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; |
3179 | |
3180 | if (elapsed <= |
3181 | server.client_obuf_limits[class].soft_limit_seconds) { |
3182 | soft = 0; /* The client still did not reached the max number of |
3183 | seconds for the soft limit to be considered |
3184 | reached. */ |
3185 | } |
3186 | } |
3187 | } else { |
3188 | c->obuf_soft_limit_reached_time = 0; |
3189 | } |
3190 | return soft || hard; |
3191 | } |
3192 | |
3193 | /* Asynchronously close a client if soft or hard limit is reached on the |
3194 | * output buffer size. The caller can check if the client will be closed |
3195 | * checking if the client CLIENT_CLOSE_ASAP flag is set. |
3196 | * |
3197 | * Note: we need to close the client asynchronously because this function is |
3198 | * called from contexts where the client can't be freed safely, i.e. from the |
3199 | * lower level functions pushing data inside the client output buffers. */ |
3200 | void asyncCloseClientOnOutputBufferLimitReached(client *c) { |
3201 | if (!c->conn) return; /* It is unsafe to free fake clients. */ |
3202 | serverAssert(c->reply_bytes < SIZE_MAX-(1024*64))((c->reply_bytes < (18446744073709551615UL)-(1024*64))? (void)0 : (_serverAssert("c->reply_bytes < SIZE_MAX-(1024*64)" ,"networking.c",3202),__builtin_unreachable())); |
3203 | if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP(1<<10)) return; |
3204 | if (checkClientOutputBufferLimits(c)) { |
3205 | sds client = catClientInfoString(sdsempty(),c); |
3206 | |
3207 | freeClientAsync(c); |
3208 | serverLog(LL_WARNING3,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); |
3209 | sdsfree(client); |
3210 | } |
3211 | } |
3212 | |
3213 | /* Helper function used by performEvictions() in order to flush slaves |
3214 | * output buffers without returning control to the event loop. |
3215 | * This is also called by SHUTDOWN for a best-effort attempt to send |
3216 | * slaves the latest writes. */ |
3217 | void flushSlavesOutputBuffers(void) { |
3218 | listIter li; |
3219 | listNode *ln; |
3220 | |
3221 | listRewind(server.slaves,&li); |
3222 | while((ln = listNext(&li))) { |
3223 | client *slave = listNodeValue(ln)((ln)->value); |
3224 | int can_receive_writes = connHasWriteHandler(slave->conn) || |
3225 | (slave->flags & CLIENT_PENDING_WRITE(1<<21)); |
3226 | |
3227 | /* We don't want to send the pending data to the replica in a few |
3228 | * cases: |
3229 | * |
3230 | * 1. For some reason there is neither the write handler installed |
3231 | * nor the client is flagged as to have pending writes: for some |
3232 | * reason this replica may not be set to receive data. This is |
3233 | * just for the sake of defensive programming. |
3234 | * |
3235 | * 2. The put_online_on_ack flag is true. To know why we don't want |
3236 | * to send data to the replica in this case, please grep for the |
3237 | * flag for this flag. |
3238 | * |
3239 | * 3. Obviously if the slave is not ONLINE. |
3240 | */ |
3241 | if (slave->replstate == SLAVE_STATE_ONLINE9 && |
3242 | can_receive_writes && |
3243 | !slave->repl_put_online_on_ack && |
3244 | clientHasPendingReplies(slave)) |
3245 | { |
3246 | writeToClient(slave,0); |
3247 | } |
3248 | } |
3249 | } |
3250 | |
3251 | /* Pause clients up to the specified unixtime (in ms) for a given type of |
3252 | * commands. |
3253 | * |
3254 | * A main use case of this function is to allow pausing replication traffic |
3255 | * so that a failover without data loss to occur. Replicas will continue to receive |
3256 | * traffic to faciliate this functionality. |
3257 | * |
3258 | * This function is also internally used by Redis Cluster for the manual |
3259 | * failover procedure implemented by CLUSTER FAILOVER. |
3260 | * |
3261 | * The function always succeed, even if there is already a pause in progress. |
3262 | * In such a case, the duration is set to the maximum and new end time and the |
3263 | * type is set to the more restrictive type of pause. */ |
3264 | void pauseClients(mstime_t end, pause_type type) { |
3265 | if (type > server.client_pause_type) { |
3266 | server.client_pause_type = type; |
3267 | } |
3268 | |
3269 | if (end > server.client_pause_end_time) { |
3270 | server.client_pause_end_time = end; |
3271 | } |
3272 | |
3273 | /* We allow write commands that were queued |
3274 | * up before and after to execute. We need |
3275 | * to track this state so that we don't assert |
3276 | * in propagate(). */ |
3277 | if (server.in_exec) { |
3278 | server.client_pause_in_transaction = 1; |
3279 | } |
3280 | } |
3281 | |
3282 | /* Unpause clients and queue them for reprocessing. */ |
3283 | void unpauseClients(void) { |
3284 | listNode *ln; |
3285 | listIter li; |
3286 | client *c; |
3287 | |
3288 | server.client_pause_type = CLIENT_PAUSE_OFF; |
3289 | |
3290 | /* Unblock all of the clients so they are reprocessed. */ |
3291 | listRewind(server.paused_clients,&li); |
3292 | while ((ln = listNext(&li)) != NULL((void*)0)) { |
3293 | c = listNodeValue(ln)((ln)->value); |
3294 | unblockClient(c); |
3295 | } |
3296 | } |
3297 | |
3298 | /* Returns true if clients are paused and false otherwise. */ |
3299 | int areClientsPaused(void) { |
3300 | return server.client_pause_type != CLIENT_PAUSE_OFF; |
3301 | } |
3302 | |
3303 | /* Checks if the current client pause has elapsed and unpause clients |
3304 | * if it has. Also returns true if clients are now paused and false |
3305 | * otherwise. */ |
3306 | int checkClientPauseTimeoutAndReturnIfPaused(void) { |
3307 | if (server.client_pause_end_time < server.mstime) { |
3308 | unpauseClients(); |
3309 | } |
3310 | return areClientsPaused(); |
3311 | } |
3312 | |
3313 | /* This function is called by Redis in order to process a few events from |
3314 | * time to time while blocked into some not interruptible operation. |
3315 | * This allows to reply to clients with the -LOADING error while loading the |
3316 | * data set at startup or after a full resynchronization with the master |
3317 | * and so forth. |
3318 | * |
3319 | * It calls the event loop in order to process a few events. Specifically we |
3320 | * try to call the event loop 4 times as long as we receive acknowledge that |
3321 | * some event was processed, in order to go forward with the accept, read, |
3322 | * write, close sequence needed to serve a client. |
3323 | * |
3324 | * The function returns the total number of events processed. */ |
3325 | void processEventsWhileBlocked(void) { |
3326 | int iterations = 4; /* See the function top-comment. */ |
3327 | |
3328 | /* Update our cached time since it is used to create and update the last |
3329 | * interaction time with clients and for other important things. */ |
3330 | updateCachedTime(0); |
3331 | |
3332 | /* Note: when we are processing events while blocked (for instance during |
3333 | * busy Lua scripts), we set a global flag. When such flag is set, we |
3334 | * avoid handling the read part of clients using threaded I/O. |
3335 | * See https://github.com/antirez/redis/issues/6988 for more info. */ |
3336 | ProcessingEventsWhileBlocked = 1; |
3337 | while (iterations--) { |
3338 | long long startval = server.events_processed_while_blocked; |
3339 | long long ae_events = aeProcessEvents(server.el, |
3340 | AE_FILE_EVENTS(1<<0)|AE_DONT_WAIT(1<<2)| |
3341 | AE_CALL_BEFORE_SLEEP(1<<3)|AE_CALL_AFTER_SLEEP(1<<4)); |
3342 | /* Note that server.events_processed_while_blocked will also get |
3343 | * incremeted by callbacks called by the event loop handlers. */ |
3344 | server.events_processed_while_blocked += ae_events; |
3345 | long long events = server.events_processed_while_blocked - startval; |
3346 | if (!events) break; |
3347 | } |
3348 | |
3349 | whileBlockedCron(); |
3350 | |
3351 | ProcessingEventsWhileBlocked = 0; |
3352 | } |
3353 | |
3354 | /* ========================================================================== |
3355 | * Threaded I/O |
3356 | * ========================================================================== */ |
3357 | |
3358 | #define IO_THREADS_MAX_NUM128 128 |
3359 | #define IO_THREADS_OP_READ0 0 |
3360 | #define IO_THREADS_OP_WRITE1 1 |
3361 | |
3362 | pthread_t io_threads[IO_THREADS_MAX_NUM128]; |
3363 | pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM128]; |
3364 | redisAtomic_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM128]; |
3365 | int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ |
3366 | |
3367 | /* This is the list of clients each thread will serve when threaded I/O is |
3368 | * used. We spawn io_threads_num-1 threads, since one is the main thread |
3369 | * itself. */ |
3370 | list *io_threads_list[IO_THREADS_MAX_NUM128]; |
3371 | |
3372 | static inline unsigned long getIOPendingCount(int i) { |
3373 | unsigned long count = 0; |
3374 | atomicGetWithSync(io_threads_pending[i], count)do { count = __c11_atomic_load(&io_threads_pending[i],memory_order_seq_cst ); } while(0); |
3375 | return count; |
3376 | } |
3377 | |
3378 | static inline void setIOPendingCount(int i, unsigned long count) { |
3379 | atomicSetWithSync(io_threads_pending[i], count)__c11_atomic_store(&io_threads_pending[i],count,memory_order_seq_cst ); |
3380 | } |
3381 | |
3382 | void *IOThreadMain(void *myid) { |
3383 | /* The ID is the thread number (from 0 to server.iothreads_num-1), and is |
3384 | * used by the thread to just manipulate a single sub-array of clients. */ |
3385 | long id = (unsigned long)myid; |
3386 | char thdname[16]; |
3387 | |
3388 | snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); |
3389 | redis_set_thread_title(thdname)pthread_setname_np(pthread_self(), thdname); |
3390 | redisSetCpuAffinity(server.server_cpulist); |
3391 | makeThreadKillable(); |
3392 | |
3393 | while(1) { |
3394 | /* Wait for start */ |
3395 | for (int j = 0; j < 1000000; j++) { |
3396 | if (getIOPendingCount(id) != 0) break; |
3397 | } |
3398 | |
3399 | /* Give the main thread a chance to stop this thread. */ |
3400 | if (getIOPendingCount(id) == 0) { |
3401 | pthread_mutex_lock(&io_threads_mutex[id]); |
3402 | pthread_mutex_unlock(&io_threads_mutex[id]); |
3403 | continue; |
3404 | } |
3405 | |
3406 | serverAssert(getIOPendingCount(id) != 0)((getIOPendingCount(id) != 0)?(void)0 : (_serverAssert("getIOPendingCount(id) != 0" ,"networking.c",3406),__builtin_unreachable())); |
3407 | |
3408 | /* Process: note that the main thread will never touch our list |
3409 | * before we drop the pending count to 0. */ |
3410 | listIter li; |
3411 | listNode *ln; |
3412 | listRewind(io_threads_list[id],&li); |
3413 | while((ln = listNext(&li))) { |
3414 | client *c = listNodeValue(ln)((ln)->value); |
3415 | if (io_threads_op == IO_THREADS_OP_WRITE1) { |
3416 | writeToClient(c,0); |
3417 | } else if (io_threads_op == IO_THREADS_OP_READ0) { |
3418 | readQueryFromClient(c->conn); |
3419 | } else { |
3420 | serverPanic("io_threads_op value is unknown")_serverPanic("networking.c",3420,"io_threads_op value is unknown" ),__builtin_unreachable(); |
3421 | } |
3422 | } |
3423 | listEmpty(io_threads_list[id]); |
3424 | setIOPendingCount(id, 0); |
3425 | } |
3426 | } |
3427 | |
3428 | /* Initialize the data structures needed for threaded I/O. */ |
3429 | void initThreadedIO(void) { |
3430 | server.io_threads_active = 0; /* We start with threads not active. */ |
3431 | |
3432 | /* Don't spawn any thread if the user selected a single thread: |
3433 | * we'll handle I/O directly from the main thread. */ |
3434 | if (server.io_threads_num == 1) return; |
3435 | |
3436 | if (server.io_threads_num > IO_THREADS_MAX_NUM128) { |
3437 | serverLog(LL_WARNING3,"Fatal: too many I/O threads configured. " |
3438 | "The maximum number is %d.", IO_THREADS_MAX_NUM128); |
3439 | exit(1); |
3440 | } |
3441 | |
3442 | /* Spawn and initialize the I/O threads. */ |
3443 | for (int i = 0; i < server.io_threads_num; i++) { |
3444 | /* Things we do for all the threads including the main thread. */ |
3445 | io_threads_list[i] = listCreate(); |
3446 | if (i == 0) continue; /* Thread 0 is the main thread. */ |
3447 | |
3448 | /* Things we do only for the additional threads. */ |
3449 | pthread_t tid; |
3450 | pthread_mutex_init(&io_threads_mutex[i],NULL((void*)0)); |
3451 | setIOPendingCount(i, 0); |
3452 | pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ |
3453 | if (pthread_create(&tid,NULL((void*)0),IOThreadMain,(void*)(long)i) != 0) { |
3454 | serverLog(LL_WARNING3,"Fatal: Can't initialize IO thread."); |
3455 | exit(1); |
3456 | } |
3457 | io_threads[i] = tid; |
3458 | } |
3459 | } |
3460 | |
3461 | void killIOThreads(void) { |
3462 | int err, j; |
3463 | for (j = 0; j < server.io_threads_num; j++) { |
3464 | if (io_threads[j] == pthread_self()) continue; |
3465 | if (io_threads[j] && pthread_cancel(io_threads[j]) == 0) { |
3466 | if ((err = pthread_join(io_threads[j],NULL((void*)0))) != 0) { |
3467 | serverLog(LL_WARNING3, |
3468 | "IO thread(tid:%lu) can not be joined: %s", |
3469 | (unsigned long)io_threads[j], strerror(err)); |
3470 | } else { |
3471 | serverLog(LL_WARNING3, |
3472 | "IO thread(tid:%lu) terminated",(unsigned long)io_threads[j]); |
3473 | } |
3474 | } |
3475 | } |
3476 | } |
3477 | |
3478 | void startThreadedIO(void) { |
3479 | serverAssert(server.io_threads_active == 0)((server.io_threads_active == 0)?(void)0 : (_serverAssert("server.io_threads_active == 0" ,"networking.c",3479),__builtin_unreachable())); |
3480 | for (int j = 1; j < server.io_threads_num; j++) |
3481 | pthread_mutex_unlock(&io_threads_mutex[j]); |
3482 | server.io_threads_active = 1; |
3483 | } |
3484 | |
3485 | void stopThreadedIO(void) { |
3486 | /* We may have still clients with pending reads when this function |
3487 | * is called: handle them before stopping the threads. */ |
3488 | handleClientsWithPendingReadsUsingThreads(); |
3489 | serverAssert(server.io_threads_active == 1)((server.io_threads_active == 1)?(void)0 : (_serverAssert("server.io_threads_active == 1" ,"networking.c",3489),__builtin_unreachable())); |
3490 | for (int j = 1; j < server.io_threads_num; j++) |
3491 | pthread_mutex_lock(&io_threads_mutex[j]); |
3492 | server.io_threads_active = 0; |
3493 | } |
3494 | |
3495 | /* This function checks if there are not enough pending clients to justify |
3496 | * taking the I/O threads active: in that case I/O threads are stopped if |
3497 | * currently active. We track the pending writes as a measure of clients |
3498 | * we need to handle in parallel, however the I/O threading is disabled |
3499 | * globally for reads as well if we have too little pending clients. |
3500 | * |
3501 | * The function returns 0 if the I/O threading should be used because there |
3502 | * are enough active threads, otherwise 1 is returned and the I/O threads |
3503 | * could be possibly stopped (if already active) as a side effect. */ |
3504 | int stopThreadedIOIfNeeded(void) { |
3505 | int pending = listLength(server.clients_pending_write)((server.clients_pending_write)->len); |
3506 | |
3507 | /* Return ASAP if IO threads are disabled (single threaded mode). */ |
3508 | if (server.io_threads_num == 1) return 1; |
3509 | |
3510 | if (pending < (server.io_threads_num*2)) { |
3511 | if (server.io_threads_active) stopThreadedIO(); |
3512 | return 1; |
3513 | } else { |
3514 | return 0; |
3515 | } |
3516 | } |
3517 | |
3518 | int handleClientsWithPendingWritesUsingThreads(void) { |
3519 | int processed = listLength(server.clients_pending_write)((server.clients_pending_write)->len); |
3520 | if (processed == 0) return 0; /* Return ASAP if there are no clients. */ |
3521 | |
3522 | /* If I/O threads are disabled or we have few clients to serve, don't |
3523 | * use I/O threads, but the boring synchronous code. */ |
3524 | if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { |
3525 | return handleClientsWithPendingWrites(); |
3526 | } |
3527 | |
3528 | /* Start threads if needed. */ |
3529 | if (!server.io_threads_active) startThreadedIO(); |
3530 | |
3531 | /* Distribute the clients across N different lists. */ |
3532 | listIter li; |
3533 | listNode *ln; |
3534 | listRewind(server.clients_pending_write,&li); |
3535 | int item_id = 0; |
3536 | while((ln = listNext(&li))) { |
3537 | client *c = listNodeValue(ln)((ln)->value); |
3538 | c->flags &= ~CLIENT_PENDING_WRITE(1<<21); |
3539 | |
3540 | /* Remove clients from the list of pending writes since |
3541 | * they are going to be closed ASAP. */ |
3542 | if (c->flags & CLIENT_CLOSE_ASAP(1<<10)) { |
3543 | listDelNode(server.clients_pending_write, ln); |
3544 | continue; |
3545 | } |
3546 | |
3547 | int target_id = item_id % server.io_threads_num; |
3548 | listAddNodeTail(io_threads_list[target_id],c); |
3549 | item_id++; |
3550 | } |
3551 | |
3552 | /* Give the start condition to the waiting threads, by setting the |
3553 | * start condition atomic var. */ |
3554 | io_threads_op = IO_THREADS_OP_WRITE1; |
3555 | for (int j = 1; j < server.io_threads_num; j++) { |
3556 | int count = listLength(io_threads_list[j])((io_threads_list[j])->len); |
3557 | setIOPendingCount(j, count); |
3558 | } |
3559 | |
3560 | /* Also use the main thread to process a slice of clients. */ |
3561 | listRewind(io_threads_list[0],&li); |
3562 | while((ln = listNext(&li))) { |
3563 | client *c = listNodeValue(ln)((ln)->value); |
3564 | writeToClient(c,0); |
3565 | } |
3566 | listEmpty(io_threads_list[0]); |
3567 | |
3568 | /* Wait for all the other threads to end their work. */ |
3569 | while(1) { |
3570 | unsigned long pending = 0; |
3571 | for (int j = 1; j < server.io_threads_num; j++) |
3572 | pending += getIOPendingCount(j); |
3573 | if (pending == 0) break; |
3574 | } |
3575 | |
3576 | /* Run the list of clients again to install the write handler where |
3577 | * needed. */ |
3578 | listRewind(server.clients_pending_write,&li); |
3579 | while((ln = listNext(&li))) { |
3580 | client *c = listNodeValue(ln)((ln)->value); |
3581 | |
3582 | /* Install the write handler if there are pending writes in some |
3583 | * of the clients. */ |
3584 | if (clientHasPendingReplies(c) && |
3585 | connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR-1) |
3586 | { |
3587 | freeClientAsync(c); |
3588 | } |
3589 | } |
3590 | listEmpty(server.clients_pending_write); |
3591 | |
3592 | /* Update processed count on server */ |
3593 | server.stat_io_writes_processed += processed; |
3594 | |
3595 | return processed; |
3596 | } |
3597 | |
3598 | /* Return 1 if we want to handle the client read later using threaded I/O. |
3599 | * This is called by the readable handler of the event loop. |
3600 | * As a side effect of calling this function the client is put in the |
3601 | * pending read clients and flagged as such. */ |
3602 | int postponeClientRead(client *c) { |
3603 | if (server.io_threads_active && |
3604 | server.io_threads_do_reads && |
3605 | !ProcessingEventsWhileBlocked && |
3606 | !(c->flags & (CLIENT_MASTER(1<<1)|CLIENT_SLAVE(1<<0)|CLIENT_PENDING_READ(1<<29)))) |
3607 | { |
3608 | c->flags |= CLIENT_PENDING_READ(1<<29); |
3609 | listAddNodeHead(server.clients_pending_read,c); |
3610 | return 1; |
3611 | } else { |
3612 | return 0; |
3613 | } |
3614 | } |
3615 | |
3616 | /* When threaded I/O is also enabled for the reading + parsing side, the |
3617 | * readable handler will just put normal clients into a queue of clients to |
3618 | * process (instead of serving them synchronously). This function runs |
3619 | * the queue using the I/O threads, and process them in order to accumulate |
3620 | * the reads in the buffers, and also parse the first command available |
3621 | * rendering it in the client structures. */ |
3622 | int handleClientsWithPendingReadsUsingThreads(void) { |
3623 | if (!server.io_threads_active || !server.io_threads_do_reads) return 0; |
3624 | int processed = listLength(server.clients_pending_read)((server.clients_pending_read)->len); |
3625 | if (processed == 0) return 0; |
3626 | |
3627 | /* Distribute the clients across N different lists. */ |
3628 | listIter li; |
3629 | listNode *ln; |
3630 | listRewind(server.clients_pending_read,&li); |
3631 | int item_id = 0; |
3632 | while((ln = listNext(&li))) { |
3633 | client *c = listNodeValue(ln)((ln)->value); |
3634 | int target_id = item_id % server.io_threads_num; |
3635 | listAddNodeTail(io_threads_list[target_id],c); |
3636 | item_id++; |
3637 | } |
3638 | |
3639 | /* Give the start condition to the waiting threads, by setting the |
3640 | * start condition atomic var. */ |
3641 | io_threads_op = IO_THREADS_OP_READ0; |
3642 | for (int j = 1; j < server.io_threads_num; j++) { |
3643 | int count = listLength(io_threads_list[j])((io_threads_list[j])->len); |
3644 | setIOPendingCount(j, count); |
3645 | } |
3646 | |
3647 | /* Also use the main thread to process a slice of clients. */ |
3648 | listRewind(io_threads_list[0],&li); |
3649 | while((ln = listNext(&li))) { |
3650 | client *c = listNodeValue(ln)((ln)->value); |
3651 | readQueryFromClient(c->conn); |
3652 | } |
3653 | listEmpty(io_threads_list[0]); |
3654 | |
3655 | /* Wait for all the other threads to end their work. */ |
3656 | while(1) { |
3657 | unsigned long pending = 0; |
3658 | for (int j = 1; j < server.io_threads_num; j++) |
3659 | pending += getIOPendingCount(j); |
3660 | if (pending == 0) break; |
3661 | } |
3662 | |
3663 | /* Run the list of clients again to process the new buffers. */ |
3664 | while(listLength(server.clients_pending_read)((server.clients_pending_read)->len)) { |
3665 | ln = listFirst(server.clients_pending_read)((server.clients_pending_read)->head); |
3666 | client *c = listNodeValue(ln)((ln)->value); |
3667 | c->flags &= ~CLIENT_PENDING_READ(1<<29); |
3668 | listDelNode(server.clients_pending_read,ln); |
3669 | |
3670 | if (processPendingCommandsAndResetClient(c) == C_ERR-1) { |
3671 | /* If the client is no longer valid, we avoid |
3672 | * processing the client later. So we just go |
3673 | * to the next. */ |
3674 | continue; |
3675 | } |
3676 | |
3677 | processInputBuffer(c); |
3678 | |
3679 | /* We may have pending replies if a thread readQueryFromClient() produced |
3680 | * replies and did not install a write handler (it can't). |
3681 | */ |
3682 | if (!(c->flags & CLIENT_PENDING_WRITE(1<<21)) && clientHasPendingReplies(c)) |
3683 | clientInstallWriteHandler(c); |
3684 | } |
3685 | |
3686 | /* Update processed count on server */ |
3687 | server.stat_io_reads_processed += processed; |
3688 | |
3689 | return processed; |
3690 | } |