Bug Summary

File:src/t_stream.c
Warning:line 384, column 15
The left operand of '>' is a garbage value

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -triple x86_64-pc-linux-gnu -analyze -disable-free -disable-llvm-verifier -discard-value-names -main-file-name t_stream.c -analyzer-store=region -analyzer-opt-analyze-nested-blocks -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model static -mthread-model posix -mframe-pointer=none -fmath-errno -fno-rounding-math -masm-verbose -mconstructor-aliases -munwind-tables -target-cpu x86-64 -dwarf-column-info -fno-split-dwarf-inlining -debugger-tuning=gdb -resource-dir /usr/lib/llvm-10/lib/clang/10.0.0 -D REDIS_STATIC= -I ../deps/hiredis -I ../deps/linenoise -I ../deps/lua/src -I ../deps/hdr_histogram -D USE_JEMALLOC -I ../deps/jemalloc/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/llvm-10/lib/clang/10.0.0/include -internal-externc-isystem /usr/include/x86_64-linux-gnu -internal-externc-isystem /include -internal-externc-isystem /usr/include -O2 -Wno-c11-extensions -Wno-missing-field-initializers -std=c11 -fdebug-compilation-dir /home/netto/Desktop/redis-6.2.1/src -ferror-limit 19 -fmessage-length 0 -fgnuc-version=4.2.1 -fobjc-runtime=gcc -fdiagnostics-show-option -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -o /tmp/scan-build-2021-03-14-133648-8817-1 -x c t_stream.c
1/*
2 * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include "server.h"
31#include "endianconv.h"
32#include "stream.h"
33
34/* Every stream item inside the listpack, has a flags field that is used to
35 * mark the entry as deleted, or having the same field as the "master"
36 * entry at the start of the listpack> */
37#define STREAM_ITEM_FLAG_NONE0 0 /* No special flags. */
38#define STREAM_ITEM_FLAG_DELETED(1<<0) (1<<0) /* Entry is deleted. Skip it. */
39#define STREAM_ITEM_FLAG_SAMEFIELDS(1<<1) (1<<1) /* Same fields as master entry. */
40
41/* For stream commands that require multiple IDs
42 * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN',
43 * avoid malloc allocation.*/
44#define STREAMID_STATIC_VECTOR_LEN8 8
45
46/* Max pre-allocation for listpack. This is done to avoid abuse of a user
47 * setting stream_node_max_bytes to a huge number. */
48#define STREAM_LISTPACK_MAX_PRE_ALLOCATE4096 4096
49
50void streamFreeCG(streamCG *cg);
51void streamFreeNACK(streamNACK *na);
52size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
53int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
54int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
55
56/* -----------------------------------------------------------------------
57 * Low level stream encoding: a radix tree of listpacks.
58 * ----------------------------------------------------------------------- */
59
60/* Create a new stream data structure. */
61stream *streamNew(void) {
62 stream *s = zmalloc(sizeof(*s));
63 s->rax = raxNew();
64 s->length = 0;
65 s->last_id.ms = 0;
66 s->last_id.seq = 0;
67 s->cgroups = NULL((void*)0); /* Created on demand to save memory when not used. */
68 return s;
69}
70
71/* Free a stream, including the listpacks stored inside the radix tree. */
72void freeStream(stream *s) {
73 raxFreeWithCallback(s->rax,(void(*)(void*))lpFree);
74 if (s->cgroups)
75 raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG);
76 zfree(s);
77}
78
79/* Return the length of a stream. */
80unsigned long streamLength(const robj *subject) {
81 stream *s = subject->ptr;
82 return s->length;
83}
84
85/* Set 'id' to be its successor stream ID.
86 * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a
87 * C_ERR is returned. */
88int streamIncrID(streamID *id) {
89 int ret = C_OK0;
90 if (id->seq == UINT64_MAX(18446744073709551615UL)) {
91 if (id->ms == UINT64_MAX(18446744073709551615UL)) {
92 /* Special case where 'id' is the last possible streamID... */
93 id->ms = id->seq = 0;
94 ret = C_ERR-1;
95 } else {
96 id->ms++;
97 id->seq = 0;
98 }
99 } else {
100 id->seq++;
101 }
102 return ret;
103}
104
105/* Set 'id' to be its predecessor stream ID.
106 * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is
107 * returned. */
108int streamDecrID(streamID *id) {
109 int ret = C_OK0;
110 if (id->seq == 0) {
111 if (id->ms == 0) {
112 /* Special case where 'id' is the first possible streamID... */
113 id->ms = id->seq = UINT64_MAX(18446744073709551615UL);
114 ret = C_ERR-1;
115 } else {
116 id->ms--;
117 id->seq = UINT64_MAX(18446744073709551615UL);
118 }
119 } else {
120 id->seq--;
121 }
122 return ret;
123}
124
125/* Generate the next stream item ID given the previous one. If the current
126 * milliseconds Unix time is greater than the previous one, just use this
127 * as time part and start with sequence part of zero. Otherwise we use the
128 * previous time (and never go backward) and increment the sequence. */
129void streamNextID(streamID *last_id, streamID *new_id) {
130 uint64_t ms = mstime();
131 if (ms > last_id->ms) {
132 new_id->ms = ms;
133 new_id->seq = 0;
134 } else {
135 *new_id = *last_id;
136 streamIncrID(new_id);
137 }
138}
139
140/* This is a helper function for the COPY command.
141 * Duplicate a Stream object, with the guarantee that the returned object
142 * has the same encoding as the original one.
143 *
144 * The resulting object always has refcount set to 1 */
145robj *streamDup(robj *o) {
146 robj *sobj;
147
148 serverAssert(o->type == OBJ_STREAM)((o->type == 6)?(void)0 : (_serverAssert("o->type == OBJ_STREAM"
,"t_stream.c",148),__builtin_unreachable()))
;
149
150 switch (o->encoding) {
151 case OBJ_ENCODING_STREAM10:
152 sobj = createStreamObject();
153 break;
154 default:
155 serverPanic("Wrong encoding.")_serverPanic("t_stream.c",155,"Wrong encoding."),__builtin_unreachable
()
;
156 break;
157 }
158
159 stream *s;
160 stream *new_s;
161 s = o->ptr;
162 new_s = sobj->ptr;
163
164 raxIterator ri;
165 uint64_t rax_key[2];
166 raxStart(&ri, s->rax);
167 raxSeek(&ri, "^", NULL((void*)0), 0);
168 size_t lp_bytes = 0; /* Total bytes in the listpack. */
169 unsigned char *lp = NULL((void*)0); /* listpack pointer. */
170 /* Get a reference to the listpack node. */
171 while (raxNext(&ri)) {
172 lp = ri.data;
173 lp_bytes = lpBytes(lp);
174 unsigned char *new_lp = zmalloc(lp_bytes);
175 memcpy(new_lp, lp, lp_bytes);
176 memcpy(rax_key, ri.key, sizeof(rax_key));
177 raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
178 new_lp, NULL((void*)0));
179 }
180 new_s->length = s->length;
181 new_s->last_id = s->last_id;
182 raxStop(&ri);
183
184 if (s->cgroups == NULL((void*)0)) return sobj;
185
186 /* Consumer Groups */
187 raxIterator ri_cgroups;
188 raxStart(&ri_cgroups, s->cgroups);
189 raxSeek(&ri_cgroups, "^", NULL((void*)0), 0);
190 while (raxNext(&ri_cgroups)) {
191 streamCG *cg = ri_cgroups.data;
192 streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
193 ri_cgroups.key_len, &cg->last_id);
194
195 serverAssert(new_cg != NULL)((new_cg != ((void*)0))?(void)0 : (_serverAssert("new_cg != NULL"
,"t_stream.c",195),__builtin_unreachable()))
;
196
197 /* Consumer Group PEL */
198 raxIterator ri_cg_pel;
199 raxStart(&ri_cg_pel,cg->pel);
200 raxSeek(&ri_cg_pel,"^",NULL((void*)0),0);
201 while(raxNext(&ri_cg_pel)){
202 streamNACK *nack = ri_cg_pel.data;
203 streamNACK *new_nack = streamCreateNACK(NULL((void*)0));
204 new_nack->delivery_time = nack->delivery_time;
205 new_nack->delivery_count = nack->delivery_count;
206 raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL((void*)0));
207 }
208 raxStop(&ri_cg_pel);
209
210 /* Consumers */
211 raxIterator ri_consumers;
212 raxStart(&ri_consumers, cg->consumers);
213 raxSeek(&ri_consumers, "^", NULL((void*)0), 0);
214 while (raxNext(&ri_consumers)) {
215 streamConsumer *consumer = ri_consumers.data;
216 streamConsumer *new_consumer;
217 new_consumer = zmalloc(sizeof(*new_consumer));
218 new_consumer->name = sdsdup(consumer->name);
219 new_consumer->pel = raxNew();
220 raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
221 sdslen(new_consumer->name), new_consumer, NULL((void*)0));
222 new_consumer->seen_time = consumer->seen_time;
223
224 /* Consumer PEL */
225 raxIterator ri_cpel;
226 raxStart(&ri_cpel, consumer->pel);
227 raxSeek(&ri_cpel, "^", NULL((void*)0), 0);
228 while (raxNext(&ri_cpel)) {
229 streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID));
230
231 serverAssert(new_nack != raxNotFound)((new_nack != raxNotFound)?(void)0 : (_serverAssert("new_nack != raxNotFound"
,"t_stream.c",231),__builtin_unreachable()))
;
232
233 new_nack->consumer = new_consumer;
234 raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL((void*)0));
235 }
236 raxStop(&ri_cpel);
237 }
238 raxStop(&ri_consumers);
239 }
240 raxStop(&ri_cgroups);
241 return sobj;
242}
243
244/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
245 * instead of a string. */
246unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
247 char buf[LONG_STR_SIZE21];
248 int slen = ll2string(buf,sizeof(buf),value);
249 return lpAppend(lp,(unsigned char*)buf,slen);
250}
251
252/* This is just a wrapper for lpReplace() to directly use a 64 bit integer
253 * instead of a string to replace the current element. The function returns
254 * the new listpack as return value, and also updates the current cursor
255 * by updating '*pos'. */
256unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) {
257 char buf[LONG_STR_SIZE21];
258 int slen = ll2string(buf,sizeof(buf),value);
259 return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE2, pos);
260}
261
262/* This is a wrapper function for lpGet() to directly get an integer value
263 * from the listpack (that may store numbers as a string), converting
264 * the string if needed.
265 * The 'valid" argument is an optional output parameter to get an indication
266 * if the record was valid, when this parameter is NULL, the function will
267 * fail with an assertion. */
268static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) {
269 int64_t v;
270 unsigned char *e = lpGet(ele,&v,NULL((void*)0));
271 if (e == NULL((void*)0)) {
272 if (valid)
273 *valid = 1;
274 return v;
275 }
276 /* The following code path should never be used for how listpacks work:
277 * they should always be able to store an int64_t value in integer
278 * encoded form. However the implementation may change. */
279 long long ll;
280 int ret = string2ll((char*)e,v,&ll);
281 if (valid)
282 *valid = ret;
283 else
284 serverAssert(ret != 0)((ret != 0)?(void)0 : (_serverAssert("ret != 0","t_stream.c",
284),__builtin_unreachable()))
;
285 v = ll;
286 return v;
287}
288
289#define lpGetInteger(ele)lpGetIntegerIfValid(ele, ((void*)0)) lpGetIntegerIfValid(ele, NULL((void*)0))
290
291/* Get an edge streamID of a given listpack.
292 * 'master_id' is an input param, used to build the 'edge_id' output param */
293int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id)
294{
295 if (lp == NULL((void*)0))
12
Assuming 'lp' is equal to NULL
13
Taking true branch
296 return 0;
14
Returning without writing to 'edge_id->ms'
297
298 unsigned char *lp_ele;
299
300 /* We need to seek either the first or the last entry depending
301 * on the direction of the iteration. */
302 if (first) {
303 /* Get the master fields count. */
304 lp_ele = lpFirst(lp); /* Seek items count */
305 lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */
306 lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */
307 int64_t master_fields_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0));
308 lp_ele = lpNext(lp, lp_ele); /* Seek first field. */
309
310 /* If we are iterating in normal order, skip the master fields
311 * to seek the first actual entry. */
312 for (int64_t i = 0; i < master_fields_count; i++)
313 lp_ele = lpNext(lp, lp_ele);
314
315 /* If we are going forward, skip the previous entry's
316 * lp-count field (or in case of the master entry, the zero
317 * term field) */
318 lp_ele = lpNext(lp, lp_ele);
319 if (lp_ele == NULL((void*)0))
320 return 0;
321 } else {
322 /* If we are iterating in reverse direction, just seek the
323 * last part of the last entry in the listpack (that is, the
324 * fields count). */
325 lp_ele = lpLast(lp);
326
327 /* If we are going backward, read the number of elements this
328 * entry is composed of, and jump backward N times to seek
329 * its start. */
330 int64_t lp_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0));
331 if (lp_count == 0) /* We reached the master entry. */
332 return 0;
333
334 while (lp_count--)
335 lp_ele = lpPrev(lp, lp_ele);
336 }
337
338 lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */
339
340 /* Get the ID: it is encoded as difference between the master
341 * ID and this entry ID. */
342 streamID id = *master_id;
343 id.ms += lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0));
344 lp_ele = lpNext(lp, lp_ele);
345 id.seq += lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0));
346 *edge_id = id;
347 return 1;
348}
349
350/* Debugging function to log the full content of a listpack. Useful
351 * for development and debugging. */
352void streamLogListpackContent(unsigned char *lp) {
353 unsigned char *p = lpFirst(lp);
354 while(p) {
355 unsigned char buf[LP_INTBUF_SIZE21];
356 int64_t v;
357 unsigned char *ele = lpGet(p,&v,buf);
358 serverLog(LL_WARNING3,"- [%d] '%.*s'", (int)v, (int)v, ele);
359 p = lpNext(lp,p);
360 }
361}
362
363/* Convert the specified stream entry ID as a 128 bit big endian number, so
364 * that the IDs can be sorted lexicographically. */
365void streamEncodeID(void *buf, streamID *id) {
366 uint64_t e[2];
367 e[0] = htonu64(id->ms)intrev64(id->ms);
368 e[1] = htonu64(id->seq)intrev64(id->seq);
369 memcpy(buf,e,sizeof(e));
370}
371
372/* This is the reverse of streamEncodeID(): the decoded ID will be stored
373 * in the 'id' structure passed by reference. The buffer 'buf' must point
374 * to a 128 bit big-endian encoded ID. */
375void streamDecodeID(void *buf, streamID *id) {
376 uint64_t e[2];
377 memcpy(e,buf,sizeof(e));
378 id->ms = ntohu64(e[0])intrev64(e[0]);
379 id->seq = ntohu64(e[1])intrev64(e[1]);
380}
381
382/* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */
383int streamCompareID(streamID *a, streamID *b) {
384 if (a->ms > b->ms) return 1;
17
The left operand of '>' is a garbage value
385 else if (a->ms < b->ms) return -1;
386 /* The ms part is the same. Check the sequence part. */
387 else if (a->seq > b->seq) return 1;
388 else if (a->seq < b->seq) return -1;
389 /* Everything is the same: IDs are equal. */
390 return 0;
391}
392
393void streamGetEdgeID(stream *s, int first, streamID *edge_id)
394{
395 raxIterator ri;
396 raxStart(&ri, s->rax);
397 int empty;
398 if (first) {
399 raxSeek(&ri, "^", NULL((void*)0), 0);
400 empty = !raxNext(&ri);
401 } else {
402 raxSeek(&ri, "$", NULL((void*)0), 0);
403 empty = !raxPrev(&ri);
404 }
405
406 if (empty) {
407 /* Stream is empty, mark edge ID as lowest/highest possible. */
408 edge_id->ms = first ? UINT64_MAX(18446744073709551615UL) : 0;
409 edge_id->seq = first ? UINT64_MAX(18446744073709551615UL) : 0;
410 raxStop(&ri);
411 return;
412 }
413
414 unsigned char *lp = ri.data;
415
416 /* Read the master ID from the radix tree key. */
417 streamID master_id;
418 streamDecodeID(ri.key, &master_id);
419
420 /* Construct edge ID. */
421 lpGetEdgeStreamID(lp, first, &master_id, edge_id);
422
423 raxStop(&ri);
424}
425
426/* Adds a new item into the stream 's' having the specified number of
427 * field-value pairs as specified in 'numfields' and stored into 'argv'.
428 * Returns the new entry ID populating the 'added_id' structure.
429 *
430 * If 'use_id' is not NULL, the ID is not auto-generated by the function,
431 * but instead the passed ID is used to add the new entry. In this case
432 * adding the entry may fail as specified later in this comment.
433 *
434 * The function returns C_OK if the item was added, this is always true
435 * if the ID was generated by the function. However the function may return
436 * C_ERR if an ID was given via 'use_id', but adding it failed since the
437 * current top ID is greater or equal. */
438int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
439
440 /* Generate the new entry ID. */
441 streamID id;
442 if (use_id)
443 id = *use_id;
444 else
445 streamNextID(&s->last_id,&id);
446
447 /* Check that the new ID is greater than the last entry ID
448 * or return an error. Automatically generated IDs might
449 * overflow (and wrap-around) when incrementing the sequence
450 part. */
451 if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR-1;
452
453 /* Add the new entry. */
454 raxIterator ri;
455 raxStart(&ri,s->rax);
456 raxSeek(&ri,"$",NULL((void*)0),0);
457
458 size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
459 unsigned char *lp = NULL((void*)0); /* Tail listpack pointer. */
460
461 /* Get a reference to the tail node listpack. */
462 if (raxNext(&ri)) {
463 lp = ri.data;
464 lp_bytes = lpBytes(lp);
465 }
466 raxStop(&ri);
467
468 /* We have to add the key into the radix tree in lexicographic order,
469 * to do so we consider the ID as a single 128 bit number written in
470 * big endian, so that the most significant bytes are the first ones. */
471 uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
472 streamID master_id; /* ID of the master entry in the listpack. */
473
474 /* Create a new listpack and radix tree node if needed. Note that when
475 * a new listpack is created, we populate it with a "master entry". This
476 * is just a set of fields that is taken as references in order to compress
477 * the stream entries that we'll add inside the listpack.
478 *
479 * Note that while we use the first added entry fields to create
480 * the master entry, the first added entry is NOT represented in the master
481 * entry, which is a stand alone object. But of course, the first entry
482 * will compress well because it's used as reference.
483 *
484 * The master entry is composed like in the following example:
485 *
486 * +-------+---------+------------+---------+--/--+---------+---------+-+
487 * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
488 * +-------+---------+------------+---------+--/--+---------+---------+-+
489 *
490 * count and deleted just represent respectively the total number of
491 * entries inside the listpack that are valid, and marked as deleted
492 * (deleted flag in the entry flags set). So the total number of items
493 * actually inside the listpack (both deleted and not) is count+deleted.
494 *
495 * The real entries will be encoded with an ID that is just the
496 * millisecond and sequence difference compared to the key stored at
497 * the radix tree node containing the listpack (delta encoding), and
498 * if the fields of the entry are the same as the master entry fields, the
499 * entry flags will specify this fact and the entry fields and number
500 * of fields will be omitted (see later in the code of this function).
501 *
502 * The "0" entry at the end is the same as the 'lp-count' entry in the
503 * regular stream entries (see below), and marks the fact that there are
504 * no more entries, when we scan the stream from right to left. */
505
506 /* First of all, check if we can append to the current macro node or
507 * if we need to switch to the next one. 'lp' will be set to NULL if
508 * the current node is full. */
509 if (lp != NULL((void*)0)) {
510 if (server.stream_node_max_bytes &&
511 lp_bytes >= server.stream_node_max_bytes)
512 {
513 lp = NULL((void*)0);
514 } else if (server.stream_node_max_entries) {
515 unsigned char *lp_ele = lpFirst(lp);
516 /* Count both live entries and deleted ones. */
517 int64_t count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)) + lpGetInteger(lpNext(lp,lp_ele))lpGetIntegerIfValid(lpNext(lp,lp_ele), ((void*)0));
518 if (count >= server.stream_node_max_entries) {
519 /* Shrink extra pre-allocated memory */
520 lp = lpShrinkToFit(lp);
521 if (ri.data != lp)
522 raxInsert(s->rax,ri.key,ri.key_len,lp,NULL((void*)0));
523 lp = NULL((void*)0);
524 }
525 }
526 }
527
528 int flags = STREAM_ITEM_FLAG_NONE0;
529 if (lp == NULL((void*)0)) {
530 master_id = id;
531 streamEncodeID(rax_key,&id);
532 /* Create the listpack having the master entry ID and fields.
533 * Pre-allocate some bytes when creating listpack to avoid realloc on
534 * every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
535 * and won't realloc on every XADD.
536 * When listpack reaches max number of entries, we'll shrink the
537 * allocation to fit the data. */
538 size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE4096;
539 if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
540 prealloc = server.stream_node_max_bytes;
541 }
542 lp = lpNew(prealloc);
543 lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
544 lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
545 lp = lpAppendInteger(lp,numfields);
546 for (int64_t i = 0; i < numfields; i++) {
547 sds field = argv[i*2]->ptr;
548 lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
549 }
550 lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
551 raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL((void*)0));
552 /* The first entry we insert, has obviously the same fields of the
553 * master entry. */
554 flags |= STREAM_ITEM_FLAG_SAMEFIELDS(1<<1);
555 } else {
556 serverAssert(ri.key_len == sizeof(rax_key))((ri.key_len == sizeof(rax_key))?(void)0 : (_serverAssert("ri.key_len == sizeof(rax_key)"
,"t_stream.c",556),__builtin_unreachable()))
;
557 memcpy(rax_key,ri.key,sizeof(rax_key));
558
559 /* Read the master ID from the radix tree key. */
560 streamDecodeID(rax_key,&master_id);
561 unsigned char *lp_ele = lpFirst(lp);
562
563 /* Update count and skip the deleted fields. */
564 int64_t count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0));
565 lp = lpReplaceInteger(lp,&lp_ele,count+1);
566 lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
567 lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
568
569 /* Check if the entry we are adding, have the same fields
570 * as the master entry. */
571 int64_t master_fields_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0));
572 lp_ele = lpNext(lp,lp_ele);
573 if (numfields == master_fields_count) {
574 int64_t i;
575 for (i = 0; i < master_fields_count; i++) {
576 sds field = argv[i*2]->ptr;
577 int64_t e_len;
578 unsigned char buf[LP_INTBUF_SIZE21];
579 unsigned char *e = lpGet(lp_ele,&e_len,buf);
580 /* Stop if there is a mismatch. */
581 if (sdslen(field) != (size_t)e_len ||
582 memcmp(e,field,e_len) != 0) break;
583 lp_ele = lpNext(lp,lp_ele);
584 }
585 /* All fields are the same! We can compress the field names
586 * setting a single bit in the flags. */
587 if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS(1<<1);
588 }
589 }
590
591 /* Populate the listpack with the new entry. We use the following
592 * encoding:
593 *
594 * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
595 * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
596 * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
597 *
598 * However if the SAMEFIELD flag is set, we have just to populate
599 * the entry with the values, so it becomes:
600 *
601 * +-----+--------+-------+-/-+-------+--------+
602 * |flags|entry-id|value-1|...|value-N|lp-count|
603 * +-----+--------+-------+-/-+-------+--------+
604 *
605 * The entry-id field is actually two separated fields: the ms
606 * and seq difference compared to the master entry.
607 *
608 * The lp-count field is a number that states the number of listpack pieces
609 * that compose the entry, so that it's possible to travel the entry
610 * in reverse order: we can just start from the end of the listpack, read
611 * the entry, and jump back N times to seek the "flags" field to read
612 * the stream full entry. */
613 lp = lpAppendInteger(lp,flags);
614 lp = lpAppendInteger(lp,id.ms - master_id.ms);
615 lp = lpAppendInteger(lp,id.seq - master_id.seq);
616 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)))
617 lp = lpAppendInteger(lp,numfields);
618 for (int64_t i = 0; i < numfields; i++) {
619 sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
620 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)))
621 lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
622 lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
623 }
624 /* Compute and store the lp-count field. */
625 int64_t lp_count = numfields;
626 lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
627 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) {
628 /* If the item is not compressed, it also has the fields other than
629 * the values, and an additional num-fileds field. */
630 lp_count += numfields+1;
631 }
632 lp = lpAppendInteger(lp,lp_count);
633
634 /* Insert back into the tree in order to update the listpack pointer. */
635 if (ri.data != lp)
636 raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL((void*)0));
637 s->length++;
638 s->last_id = id;
639 if (added_id) *added_id = id;
640 return C_OK0;
641}
642
643typedef struct {
644 /* XADD options */
645 streamID id; /* User-provided ID, for XADD only. */
646 int id_given; /* Was an ID different than "*" specified? for XADD only. */
647 int no_mkstream; /* if set to 1 do not create new stream */
648
649 /* XADD + XTRIM common options */
650 int trim_strategy; /* TRIM_STRATEGY_* */
651 int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
652 int approx_trim; /* If 1 only delete whole radix tree nodes, so
653 * the trim argument is not applied verbatim. */
654 long long limit; /* Maximum amount of entries to trim. If 0, no limitation
655 * on the amount of trimming work is enforced. */
656 /* TRIM_STRATEGY_MAXLEN options */
657 long long maxlen; /* After trimming, leave stream at this length . */
658 /* TRIM_STRATEGY_MINID options */
659 streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
660} streamAddTrimArgs;
661
662#define TRIM_STRATEGY_NONE0 0
663#define TRIM_STRATEGY_MAXLEN1 1
664#define TRIM_STRATEGY_MINID2 2
665
666/* Trim the stream 's' according to args->trim_strategy, and return the
667 * number of elements removed from the stream. The 'approx' option, if non-zero,
668 * specifies that the trimming must be performed in a approximated way in
669 * order to maximize performances. This means that the stream may contain
670 * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen'
671 * in case of MAXLEN), and elements are only removed if we can remove
672 * a *whole* node of the radix tree. The elements are removed from the head
673 * of the stream (older elements).
674 *
675 * The function may return zero if:
676 *
677 * 1) The minimal entry ID of the stream is already < 'id' (MINID); or
678 * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or
679 * 3) The 'approx' option is true and the head node did not have enough elements
680 * to be deleted.
681 *
682 * args->limit is the maximum number of entries to delete. The purpose is to
683 * prevent this function from taking to long.
684 * If 'limit' is 0 then we do not limit the number of deleted entries.
685 * Much like the 'approx', if 'limit' is smaller than the number of entries
686 * that should be trimmed, there is a chance we will still have entries with
687 * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
688 */
689int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
690 size_t maxlen = args->maxlen;
691 streamID *id = &args->minid;
692 int approx = args->approx_trim;
693 int64_t limit = args->limit;
694 int trim_strategy = args->trim_strategy;
695
696 if (trim_strategy == TRIM_STRATEGY_NONE0)
6
Assuming 'trim_strategy' is not equal to TRIM_STRATEGY_NONE
7
Taking false branch
697 return 0;
698
699 raxIterator ri;
700 raxStart(&ri,s->rax);
701 raxSeek(&ri,"^",NULL((void*)0),0);
702
703 int64_t deleted = 0;
704 while (raxNext(&ri)) {
8
Loop condition is true. Entering loop body
705 /* Check if we exceeded the amount of work we could do */
706 if (limit
8.1
'limit' is 0
&& deleted >= limit)
707 break;
708
709 if (trim_strategy == TRIM_STRATEGY_MAXLEN1 && s->length <= maxlen)
9
Assuming 'trim_strategy' is not equal to TRIM_STRATEGY_MAXLEN
710 break;
711
712 unsigned char *lp = ri.data, *p = lpFirst(lp);
713 int64_t entries = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
714
715 /* Check if we can remove the whole node. */
716 int remove_node;
717 streamID master_id = {0}; /* For MINID */
718 if (trim_strategy
9.1
'trim_strategy' is not equal to TRIM_STRATEGY_MAXLEN
== TRIM_STRATEGY_MAXLEN1) {
10
Taking false branch
719 remove_node = s->length - entries >= maxlen;
720 } else {
721 /* Read the master ID from the radix tree key. */
722 streamDecodeID(ri.key, &master_id);
723
724 /* Read last ID. */
725 streamID last_id;
726 lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
11
Calling 'lpGetEdgeStreamID'
15
Returning from 'lpGetEdgeStreamID'
727
728 /* We can remove the entire node id its last ID < 'id' */
729 remove_node = streamCompareID(&last_id, id) < 0;
16
Calling 'streamCompareID'
730 }
731
732 if (remove_node) {
733 lpFree(lp);
734 raxRemove(s->rax,ri.key,ri.key_len,NULL((void*)0));
735 raxSeek(&ri,">=",ri.key,ri.key_len);
736 s->length -= entries;
737 deleted += entries;
738 continue;
739 }
740
741 /* If we cannot remove a whole element, and approx is true,
742 * stop here. */
743 if (approx) break;
744
745 /* Now we have to trim entries from within 'lp' */
746 int64_t deleted_from_lp = 0;
747
748 p = lpNext(lp, p); /* Skip deleted field. */
749 p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */
750
751 /* Skip all the master fields. */
752 int64_t master_fields_count = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
753 p = lpNext(lp,p); /* Skip the first field. */
754 for (int64_t j = 0; j < master_fields_count; j++)
755 p = lpNext(lp,p); /* Skip all master fields. */
756 p = lpNext(lp,p); /* Skip the zero master entry terminator. */
757
758 /* 'p' is now pointing to the first entry inside the listpack.
759 * We have to run entry after entry, marking entries as deleted
760 * if they are already not deleted. */
761 while (p) {
762 /* We keep a copy of p (which point to flags part) in order to
763 * update it after (and if) we actually remove the entry */
764 unsigned char *pcopy = p;
765
766 int flags = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
767 p = lpNext(lp, p); /* Skip flags. */
768 int to_skip;
769
770 int ms_delta = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
771 p = lpNext(lp, p); /* Skip ID ms delta */
772 int seq_delta = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
773 p = lpNext(lp, p); /* Skip ID seq delta */
774
775 streamID currid = {0}; /* For MINID */
776 if (trim_strategy == TRIM_STRATEGY_MINID2) {
777 currid.ms = master_id.ms + ms_delta;
778 currid.seq = master_id.seq + seq_delta;
779 }
780
781 int stop;
782 if (trim_strategy == TRIM_STRATEGY_MAXLEN1) {
783 stop = s->length <= maxlen;
784 } else {
785 /* Following IDs will definitely be greater because the rax
786 * tree is sorted, no point of continuing. */
787 stop = streamCompareID(&currid, id) >= 0;
788 }
789 if (stop)
790 break;
791
792 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) {
793 to_skip = master_fields_count;
794 } else {
795 to_skip = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); /* Get num-fields. */
796 p = lpNext(lp,p); /* Skip num-fields. */
797 to_skip *= 2; /* Fields and values. */
798 }
799
800 while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
801 p = lpNext(lp,p); /* Skip the final lp-count field. */
802
803 /* Mark the entry as deleted. */
804 if (!(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) {
805 intptr_t delta = p - lp;
806 flags |= STREAM_ITEM_FLAG_DELETED(1<<0);
807 lp = lpReplaceInteger(lp, &pcopy, flags);
808 deleted_from_lp++;
809 s->length--;
810 p = lp + delta;
811 }
812 }
813 deleted += deleted_from_lp;
814
815 /* Now we the entries/deleted counters. */
816 p = lpFirst(lp);
817 lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
818 p = lpNext(lp,p); /* Skip deleted field. */
819 int64_t marked_deleted = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
820 lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp);
821 p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */
822
823 /* Here we should perform garbage collection in case at this point
824 * there are too many entries deleted inside the listpack. */
825 entries -= deleted_from_lp;
826 marked_deleted += deleted_from_lp;
827 if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
828 /* TODO: perform a garbage collection. */
829 }
830
831 /* Update the listpack with the new pointer. */
832 raxInsert(s->rax,ri.key,ri.key_len,lp,NULL((void*)0));
833
834 break; /* If we are here, there was enough to delete in the current
835 node, so no need to go to the next node. */
836 }
837
838 raxStop(&ri);
839 return deleted;
840}
841
842/* Trims a stream by length. Returns the number of deleted items. */
843int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
844 streamAddTrimArgs args = {
845 .trim_strategy = TRIM_STRATEGY_MAXLEN1,
846 .approx_trim = approx,
847 .limit = approx ? 100 * server.stream_node_max_entries : 0,
848 .maxlen = maxlen
849 };
850 return streamTrim(s, &args);
851}
852
853/* Trims a stream by minimum ID. Returns the number of deleted items. */
854int64_t streamTrimByID(stream *s, streamID minid, int approx) {
855 streamAddTrimArgs args = {
856 .trim_strategy = TRIM_STRATEGY_MINID2,
857 .approx_trim = approx,
858 .limit = approx ? 100 * server.stream_node_max_entries : 0,
859 .minid = minid
860 };
861 return streamTrim(s, &args);
862}
863
864/* Parse the arguements of XADD/XTRIM.
865 *
866 * See streamAddTrimArgs for more details about the arguments handled.
867 *
868 * This function returns the position of the ID argument (relevant only to XADD).
869 * On error -1 is returned and a reply is sent. */
870static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) {
871 /* Initialize arguments to defaults */
872 memset(args, 0, sizeof(*args));
873
874 /* Parse options. */
875 int i = 2; /* This is the first argument position where we could
876 find an option, or the ID. */
877 int limit_given = 0;
878 for (; i < c->argc; i++) {
879 int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
880 char *opt = c->argv[i]->ptr;
881 if (xadd && opt[0] == '*' && opt[1] == '\0') {
882 /* This is just a fast path for the common case of auto-ID
883 * creation. */
884 break;
885 } else if (!strcasecmp(opt,"maxlen") && moreargs) {
886 if (args->trim_strategy != TRIM_STRATEGY_NONE0) {
887 addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
888 return -1;
889 }
890 args->approx_trim = 0;
891 char *next = c->argv[i+1]->ptr;
892 /* Check for the form MAXLEN ~ <count>. */
893 if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
894 args->approx_trim = 1;
895 i++;
896 } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
897 i++;
898 }
899 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL((void*)0))
900 != C_OK0) return -1;
901
902 if (args->maxlen < 0) {
903 addReplyError(c,"The MAXLEN argument must be >= 0.");
904 return -1;
905 }
906 i++;
907 args->trim_strategy = TRIM_STRATEGY_MAXLEN1;
908 args->trim_strategy_arg_idx = i;
909 } else if (!strcasecmp(opt,"minid") && moreargs) {
910 if (args->trim_strategy != TRIM_STRATEGY_NONE0) {
911 addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
912 return -1;
913 }
914 args->approx_trim = 0;
915 char *next = c->argv[i+1]->ptr;
916 /* Check for the form MINID ~ <id>|<age>. */
917 if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
918 args->approx_trim = 1;
919 i++;
920 } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
921 i++;
922 }
923
924 if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK0)
925 return -1;
926
927 i++;
928 args->trim_strategy = TRIM_STRATEGY_MINID2;
929 args->trim_strategy_arg_idx = i;
930 } else if (!strcasecmp(opt,"limit") && moreargs) {
931 /* Note about LIMIT: If it was not provided by the caller we set
932 * it to 100*server.stream_node_max_entries, and that's to prevent the
933 * trimming from taking too long, on the expense of not deleting entries
934 * that should be trimmed.
935 * If user wanted exact trimming (i.e. no '~') we never limit the number
936 * of trimmed entries */
937 if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL((void*)0)) != C_OK0)
938 return -1;
939
940 if (args->limit < 0) {
941 addReplyError(c,"The LIMIT argument must be >= 0.");
942 return -1;
943 }
944 limit_given = 1;
945 i++;
946 } else if (xadd && !strcasecmp(opt,"nomkstream")) {
947 args->no_mkstream = 1;
948 } else if (xadd) {
949 /* If we are here is a syntax error or a valid ID. */
950 if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK0)
951 return -1;
952 args->id_given = 1;
953 break;
954 } else {
955 addReplyErrorObject(c,shared.syntaxerr);
956 return -1;
957 }
958 }
959
960 if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE0) {
961 addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy");
962 return -1;
963 }
964
965 if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE0) {
966 addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy");
967 return -1;
968 }
969
970 if (c == server.master || c->id == CLIENT_ID_AOF((18446744073709551615UL))) {
971 /* If command cam from master or from AOF we must not enforce maxnodes
972 * (The maxlen/minid argument was re-written to make sure there's no
973 * inconsistency). */
974 args->limit = 0;
975 } else {
976 /* We need to set the limit (only if we got '~') */
977 if (limit_given) {
978 if (!args->approx_trim) {
979 /* LIMIT was provided without ~ */
980 addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option");
981 return -1;
982 }
983 } else {
984 /* User didn't provide LIMIT, we must set it. */
985
986 if (args->approx_trim) {
987 /* In order to prevent from trimming to do too much work and cause
988 * latency spikes we limit the amount of work it can do */
989 args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
990 } else {
991 /* No LIMIT for exact trimming */
992 args->limit = 0;
993 }
994 }
995 }
996
997 return i;
998}
999
1000/* Initialize the stream iterator, so that we can call iterating functions
1001 * to get the next items. This requires a corresponding streamIteratorStop()
1002 * at the end. The 'rev' parameter controls the direction. If it's zero the
1003 * iteration is from the start to the end element (inclusive), otherwise
1004 * if rev is non-zero, the iteration is reversed.
1005 *
1006 * Once the iterator is initialized, we iterate like this:
1007 *
1008 * streamIterator myiterator;
1009 * streamIteratorStart(&myiterator,...);
1010 * int64_t numfields;
1011 * while(streamIteratorGetID(&myiterator,&ID,&numfields)) {
1012 * while(numfields--) {
1013 * unsigned char *key, *value;
1014 * size_t key_len, value_len;
1015 * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
1016 *
1017 * ... do what you want with key and value ...
1018 * }
1019 * }
1020 * streamIteratorStop(&myiterator); */
1021void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
1022 /* Initialize the iterator and translates the iteration start/stop
1023 * elements into a 128 big big-endian number. */
1024 if (start) {
1025 streamEncodeID(si->start_key,start);
1026 } else {
1027 si->start_key[0] = 0;
1028 si->start_key[1] = 0;
1029 }
1030
1031 if (end) {
1032 streamEncodeID(si->end_key,end);
1033 } else {
1034 si->end_key[0] = UINT64_MAX(18446744073709551615UL);
1035 si->end_key[1] = UINT64_MAX(18446744073709551615UL);
1036 }
1037
1038 /* Seek the correct node in the radix tree. */
1039 raxStart(&si->ri,s->rax);
1040 if (!rev) {
1041 if (start && (start->ms || start->seq)) {
1042 raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
1043 sizeof(si->start_key));
1044 if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL((void*)0),0);
1045 } else {
1046 raxSeek(&si->ri,"^",NULL((void*)0),0);
1047 }
1048 } else {
1049 if (end && (end->ms || end->seq)) {
1050 raxSeek(&si->ri,"<=",(unsigned char*)si->end_key,
1051 sizeof(si->end_key));
1052 if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL((void*)0),0);
1053 } else {
1054 raxSeek(&si->ri,"$",NULL((void*)0),0);
1055 }
1056 }
1057 si->stream = s;
1058 si->lp = NULL((void*)0); /* There is no current listpack right now. */
1059 si->lp_ele = NULL((void*)0); /* Current listpack cursor. */
1060 si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
1061}
1062
1063/* Return 1 and store the current item ID at 'id' if there are still
1064 * elements within the iteration range, otherwise return 0 in order to
1065 * signal the iteration terminated. */
1066int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
1067 while(1) { /* Will stop when element > stop_key or end of radix tree. */
1068 /* If the current listpack is set to NULL, this is the start of the
1069 * iteration or the previous listpack was completely iterated.
1070 * Go to the next node. */
1071 if (si->lp == NULL((void*)0) || si->lp_ele == NULL((void*)0)) {
1072 if (!si->rev && !raxNext(&si->ri)) return 0;
1073 else if (si->rev && !raxPrev(&si->ri)) return 0;
1074 serverAssert(si->ri.key_len == sizeof(streamID))((si->ri.key_len == sizeof(streamID))?(void)0 : (_serverAssert
("si->ri.key_len == sizeof(streamID)","t_stream.c",1074),__builtin_unreachable
()))
;
1075 /* Get the master ID. */
1076 streamDecodeID(si->ri.key,&si->master_id);
1077 /* Get the master fields count. */
1078 si->lp = si->ri.data;
1079 si->lp_ele = lpFirst(si->lp); /* Seek items count */
1080 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */
1081 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */
1082 si->master_fields_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1083 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */
1084 si->master_fields_start = si->lp_ele;
1085 /* We are now pointing to the first field of the master entry.
1086 * We need to seek either the first or the last entry depending
1087 * on the direction of the iteration. */
1088 if (!si->rev) {
1089 /* If we are iterating in normal order, skip the master fields
1090 * to seek the first actual entry. */
1091 for (uint64_t i = 0; i < si->master_fields_count; i++)
1092 si->lp_ele = lpNext(si->lp,si->lp_ele);
1093 } else {
1094 /* If we are iterating in reverse direction, just seek the
1095 * last part of the last entry in the listpack (that is, the
1096 * fields count). */
1097 si->lp_ele = lpLast(si->lp);
1098 }
1099 } else if (si->rev) {
1100 /* If we are iterating in the reverse order, and this is not
1101 * the first entry emitted for this listpack, then we already
1102 * emitted the current entry, and have to go back to the previous
1103 * one. */
1104 int lp_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1105 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
1106 /* Seek lp-count of prev entry. */
1107 si->lp_ele = lpPrev(si->lp,si->lp_ele);
1108 }
1109
1110 /* For every radix tree node, iterate the corresponding listpack,
1111 * returning elements when they are within range. */
1112 while(1) {
1113 if (!si->rev) {
1114 /* If we are going forward, skip the previous entry
1115 * lp-count field (or in case of the master entry, the zero
1116 * term field) */
1117 si->lp_ele = lpNext(si->lp,si->lp_ele);
1118 if (si->lp_ele == NULL((void*)0)) break;
1119 } else {
1120 /* If we are going backward, read the number of elements this
1121 * entry is composed of, and jump backward N times to seek
1122 * its start. */
1123 int64_t lp_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1124 if (lp_count == 0) { /* We reached the master entry. */
1125 si->lp = NULL((void*)0);
1126 si->lp_ele = NULL((void*)0);
1127 break;
1128 }
1129 while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
1130 }
1131
1132 /* Get the flags entry. */
1133 si->lp_flags = si->lp_ele;
1134 int flags = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1135 si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */
1136
1137 /* Get the ID: it is encoded as difference between the master
1138 * ID and this entry ID. */
1139 *id = si->master_id;
1140 id->ms += lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1141 si->lp_ele = lpNext(si->lp,si->lp_ele);
1142 id->seq += lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1143 si->lp_ele = lpNext(si->lp,si->lp_ele);
1144 unsigned char buf[sizeof(streamID)];
1145 streamEncodeID(buf,id);
1146
1147 /* The number of entries is here or not depending on the
1148 * flags. */
1149 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) {
1150 *numfields = si->master_fields_count;
1151 } else {
1152 *numfields = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0));
1153 si->lp_ele = lpNext(si->lp,si->lp_ele);
1154 }
1155 serverAssert(*numfields>=0)((*numfields>=0)?(void)0 : (_serverAssert("*numfields>=0"
,"t_stream.c",1155),__builtin_unreachable()))
;
1156
1157 /* If current >= start, and the entry is not marked as
1158 * deleted, emit it. */
1159 if (!si->rev) {
1160 if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
1161 !(flags & STREAM_ITEM_FLAG_DELETED(1<<0)))
1162 {
1163 if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
1164 return 0; /* We are already out of range. */
1165 si->entry_flags = flags;
1166 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))
1167 si->master_fields_ptr = si->master_fields_start;
1168 return 1; /* Valid item returned. */
1169 }
1170 } else {
1171 if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
1172 !(flags & STREAM_ITEM_FLAG_DELETED(1<<0)))
1173 {
1174 if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
1175 return 0; /* We are already out of range. */
1176 si->entry_flags = flags;
1177 if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))
1178 si->master_fields_ptr = si->master_fields_start;
1179 return 1; /* Valid item returned. */
1180 }
1181 }
1182
1183 /* If we do not emit, we have to discard if we are going
1184 * forward, or seek the previous entry if we are going
1185 * backward. */
1186 if (!si->rev) {
1187 int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) ?
1188 *numfields : *numfields*2;
1189 for (int64_t i = 0; i < to_discard; i++)
1190 si->lp_ele = lpNext(si->lp,si->lp_ele);
1191 } else {
1192 int64_t prev_times = 4; /* flag + id ms + id seq + one more to
1193 go back to the previous entry "count"
1194 field. */
1195 /* If the entry was not flagged SAMEFIELD we also read the
1196 * number of fields, so go back one more. */
1197 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) prev_times++;
1198 while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele);
1199 }
1200 }
1201
1202 /* End of listpack reached. Try the next/prev radix tree node. */
1203 }
1204}
1205
1206/* Get the field and value of the current item we are iterating. This should
1207 * be called immediately after streamIteratorGetID(), and for each field
1208 * according to the number of fields returned by streamIteratorGetID().
1209 * The function populates the field and value pointers and the corresponding
1210 * lengths by reference, that are valid until the next iterator call, assuming
1211 * no one touches the stream meanwhile. */
1212void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) {
1213 if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) {
1214 *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf);
1215 si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr);
1216 } else {
1217 *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf);
1218 si->lp_ele = lpNext(si->lp,si->lp_ele);
1219 }
1220 *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf);
1221 si->lp_ele = lpNext(si->lp,si->lp_ele);
1222}
1223
1224/* Remove the current entry from the stream: can be called after the
1225 * GetID() API or after any GetField() call, however we need to iterate
1226 * a valid entry while calling this function. Moreover the function
1227 * requires the entry ID we are currently iterating, that was previously
1228 * returned by GetID().
1229 *
1230 * Note that after calling this function, next calls to GetField() can't
1231 * be performed: the entry is now deleted. Instead the iterator will
1232 * automatically re-seek to the next entry, so the caller should continue
1233 * with GetID(). */
1234void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
1235 unsigned char *lp = si->lp;
1236 int64_t aux;
1237
1238 /* We do not really delete the entry here. Instead we mark it as
1239 * deleted flagging it, and also incrementing the count of the
1240 * deleted entries in the listpack header.
1241 *
1242 * We start flagging: */
1243 int flags = lpGetInteger(si->lp_flags)lpGetIntegerIfValid(si->lp_flags, ((void*)0));
1244 flags |= STREAM_ITEM_FLAG_DELETED(1<<0);
1245 lp = lpReplaceInteger(lp,&si->lp_flags,flags);
1246
1247 /* Change the valid/deleted entries count in the master entry. */
1248 unsigned char *p = lpFirst(lp);
1249 aux = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
1250
1251 if (aux == 1) {
1252 /* If this is the last element in the listpack, we can remove the whole
1253 * node. */
1254 lpFree(lp);
1255 raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL((void*)0));
1256 } else {
1257 /* In the base case we alter the counters of valid/deleted entries. */
1258 lp = lpReplaceInteger(lp,&p,aux-1);
1259 p = lpNext(lp,p); /* Seek deleted field. */
1260 aux = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0));
1261 lp = lpReplaceInteger(lp,&p,aux+1);
1262
1263 /* Update the listpack with the new pointer. */
1264 if (si->lp != lp)
1265 raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL((void*)0));
1266 }
1267
1268 /* Update the number of entries counter. */
1269 si->stream->length--;
1270
1271 /* Re-seek the iterator to fix the now messed up state. */
1272 streamID start, end;
1273 if (si->rev) {
1274 streamDecodeID(si->start_key,&start);
1275 end = *current;
1276 } else {
1277 start = *current;
1278 streamDecodeID(si->end_key,&end);
1279 }
1280 streamIteratorStop(si);
1281 streamIteratorStart(si,si->stream,&start,&end,si->rev);
1282
1283 /* TODO: perform a garbage collection here if the ration between
1284 * deleted and valid goes over a certain limit. */
1285}
1286
1287/* Stop the stream iterator. The only cleanup we need is to free the rax
1288 * iterator, since the stream iterator itself is supposed to be stack
1289 * allocated. */
1290void streamIteratorStop(streamIterator *si) {
1291 raxStop(&si->ri);
1292}
1293
1294/* Delete the specified item ID from the stream, returning 1 if the item
1295 * was deleted 0 otherwise (if it does not exist). */
1296int streamDeleteItem(stream *s, streamID *id) {
1297 int deleted = 0;
1298 streamIterator si;
1299 streamIteratorStart(&si,s,id,id,0);
1300 streamID myid;
1301 int64_t numfields;
1302 if (streamIteratorGetID(&si,&myid,&numfields)) {
1303 streamIteratorRemoveEntry(&si,&myid);
1304 deleted = 1;
1305 }
1306 streamIteratorStop(&si);
1307 return deleted;
1308}
1309
1310/* Get the last valid (non-tombstone) streamID of 's'. */
1311void streamLastValidID(stream *s, streamID *maxid)
1312{
1313 streamIterator si;
1314 streamIteratorStart(&si,s,NULL((void*)0),NULL((void*)0),1);
1315 int64_t numfields;
1316 streamIteratorGetID(&si,maxid,&numfields);
1317 streamIteratorStop(&si);
1318}
1319
1320/* Emit a reply in the client output buffer by formatting a Stream ID
1321 * in the standard <ms>-<seq> format, using the simple string protocol
1322 * of REPL. */
1323void addReplyStreamID(client *c, streamID *id) {
1324 sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
1325 addReplyBulkSds(c,replyid);
1326}
1327
1328void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
1329 sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
1330 setDeferredReplyBulkSds(c, dr, replyid);
1331}
1332
1333/* Similar to the above function, but just creates an object, usually useful
1334 * for replication purposes to create arguments. */
1335robj *createObjectFromStreamID(streamID *id) {
1336 return createObject(OBJ_STRING0, sdscatfmt(sdsempty(),"%U-%U",
1337 id->ms,id->seq));
1338}
1339
1340/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
1341 * are created in the pending list of the stream and consumers. We need
1342 * to propagate this changes in the form of XCLAIM commands. */
1343void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
1344 /* We need to generate an XCLAIM that will work in a idempotent fashion:
1345 *
1346 * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
1347 * RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
1348 *
1349 * Note that JUSTID is useful in order to avoid that XCLAIM will do
1350 * useless work in the slave side, trying to fetch the stream item. */
1351 robj *argv[14];
1352 argv[0] = shared.xclaim;
1353 argv[1] = key;
1354 argv[2] = groupname;
1355 argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
1356 argv[4] = shared.integers[0];
1357 argv[5] = id;
1358 argv[6] = shared.time;
1359 argv[7] = createStringObjectFromLongLong(nack->delivery_time);
1360 argv[8] = shared.retrycount;
1361 argv[9] = createStringObjectFromLongLong(nack->delivery_count);
1362 argv[10] = shared.force;
1363 argv[11] = shared.justid;
1364 argv[12] = shared.lastid;
1365 argv[13] = createObjectFromStreamID(&group->last_id);
1366
1367 /* We use progagate() because this code path is not always called from
1368 * the command execution context. Moreover this will just alter the
1369 * consumer group state, and we don't need MULTI/EXEC wrapping because
1370 * there is no message state cross-message atomicity required. */
1371 propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF1|PROPAGATE_REPL2);
1372 decrRefCount(argv[3]);
1373 decrRefCount(argv[7]);
1374 decrRefCount(argv[9]);
1375 decrRefCount(argv[13]);
1376}
1377
1378/* We need this when we want to propoagate the new last-id of a consumer group
1379 * that was consumed by XREADGROUP with the NOACK option: in that case we can't
1380 * propagate the last ID just using the XCLAIM LASTID option, so we emit
1381 *
1382 * XGROUP SETID <key> <groupname> <id>
1383 */
1384void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
1385 robj *argv[5];
1386 argv[0] = shared.xgroup;
1387 argv[1] = shared.setid;
1388 argv[2] = key;
1389 argv[3] = groupname;
1390 argv[4] = createObjectFromStreamID(&group->last_id);
1391
1392 /* We use progagate() because this code path is not always called from
1393 * the command execution context. Moreover this will just alter the
1394 * consumer group state, and we don't need MULTI/EXEC wrapping because
1395 * there is no message state cross-message atomicity required. */
1396 propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF1|PROPAGATE_REPL2);
1397 decrRefCount(argv[4]);
1398}
1399
1400/* We need this when we want to propagate creation of consumer that was created
1401 * by XREADGROUP with the NOACK option. In that case, the only way to create
1402 * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140)
1403 *
1404 * XGROUP CREATECONSUMER <key> <groupname> <consumername>
1405 */
1406void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) {
1407 robj *argv[5];
1408 argv[0] = shared.xgroup;
1409 argv[1] = shared.createconsumer;
1410 argv[2] = key;
1411 argv[3] = groupname;
1412 argv[4] = createObject(OBJ_STRING0,sdsdup(consumername));
1413
1414 /* We use progagate() because this code path is not always called from
1415 * the command execution context. Moreover this will just alter the
1416 * consumer group state, and we don't need MULTI/EXEC wrapping because
1417 * there is no message state cross-message atomicity required. */
1418 propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF1|PROPAGATE_REPL2);
1419 decrRefCount(argv[4]);
1420}
1421
1422/* Send the stream items in the specified range to the client 'c'. The range
1423 * the client will receive is between start and end inclusive, if 'count' is
1424 * non zero, no more than 'count' elements are sent.
1425 *
1426 * The 'end' pointer can be NULL to mean that we want all the elements from
1427 * 'start' till the end of the stream. If 'rev' is non zero, elements are
1428 * produced in reversed order from end to start.
1429 *
1430 * The function returns the number of entries emitted.
1431 *
1432 * If group and consumer are not NULL, the function performs additional work:
1433 * 1. It updates the last delivered ID in the group in case we are
1434 * sending IDs greater than the current last ID.
1435 * 2. If the requested IDs are already assigned to some other consumer, the
1436 * function will not return it to the client.
1437 * 3. An entry in the pending list will be created for every entry delivered
1438 * for the first time to this consumer.
1439 *
1440 * The behavior may be modified passing non-zero flags:
1441 *
1442 * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above
1443 * is not performed.
1444 * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
1445 * and return the number of entries emitted as usually.
1446 * This is used when the function is just used in order
1447 * to emit data and there is some higher level logic.
1448 *
1449 * The final argument 'spi' (stream propagation info pointer) is a structure
1450 * filled with information needed to propagate the command execution to AOF
1451 * and slaves, in the case a consumer group was passed: we need to generate
1452 * XCLAIM commands to create the pending list into AOF/slaves in that case.
1453 *
1454 * If 'spi' is set to NULL no propagation will happen even if the group was
1455 * given, but currently such a feature is never used by the code base that
1456 * will always pass 'spi' and propagate when a group is passed.
1457 *
1458 * Note that this function is recursive in certain cases. When it's called
1459 * with a non NULL group and consumer argument, it may call
1460 * streamReplyWithRangeFromConsumerPEL() in order to get entries from the
1461 * consumer pending entries list. However such a function will then call
1462 * streamReplyWithRange() in order to emit single entries (found in the
1463 * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES
1464 * flag.
1465 */
1466#define STREAM_RWR_NOACK(1<<0) (1<<0) /* Do not create entries in the PEL. */
1467#define STREAM_RWR_RAWENTRIES(1<<1) (1<<1) /* Do not emit protocol for array
1468 boundaries, just the entries. */
1469#define STREAM_RWR_HISTORY(1<<2) (1<<2) /* Only serve consumer local PEL. */
1470size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
1471 void *arraylen_ptr = NULL((void*)0);
1472 size_t arraylen = 0;
1473 streamIterator si;
1474 int64_t numfields;
1475 streamID id;
1476 int propagate_last_id = 0;
1477 int noack = flags & STREAM_RWR_NOACK(1<<0);
1478
1479 /* If the client is asking for some history, we serve it using a
1480 * different function, so that we return entries *solely* from its
1481 * own PEL. This ensures each consumer will always and only see
1482 * the history of messages delivered to it and not yet confirmed
1483 * as delivered. */
1484 if (group && (flags & STREAM_RWR_HISTORY(1<<2))) {
1485 return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
1486 consumer);
1487 }
1488
1489 if (!(flags & STREAM_RWR_RAWENTRIES(1<<1)))
1490 arraylen_ptr = addReplyDeferredLen(c);
1491 streamIteratorStart(&si,s,start,end,rev);
1492 while(streamIteratorGetID(&si,&id,&numfields)) {
1493 /* Update the group last_id if needed. */
1494 if (group && streamCompareID(&id,&group->last_id) > 0) {
1495 group->last_id = id;
1496 /* Group last ID should be propagated only if NOACK was
1497 * specified, otherwise the last id will be included
1498 * in the propagation of XCLAIM itself. */
1499 if (noack) propagate_last_id = 1;
1500 }
1501
1502 /* Emit a two elements array for each item. The first is
1503 * the ID, the second is an array of field-value pairs. */
1504 addReplyArrayLen(c,2);
1505 addReplyStreamID(c,&id);
1506
1507 addReplyArrayLen(c,numfields*2);
1508
1509 /* Emit the field-value pairs. */
1510 while(numfields--) {
1511 unsigned char *key, *value;
1512 int64_t key_len, value_len;
1513 streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
1514 addReplyBulkCBuffer(c,key,key_len);
1515 addReplyBulkCBuffer(c,value,value_len);
1516 }
1517
1518 /* If a group is passed, we need to create an entry in the
1519 * PEL (pending entries list) of this group *and* this consumer.
1520 *
1521 * Note that we cannot be sure about the fact the message is not
1522 * already owned by another consumer, because the admin is able
1523 * to change the consumer group last delivered ID using the
1524 * XGROUP SETID command. So if we find that there is already
1525 * a NACK for the entry, we need to associate it to the new
1526 * consumer. */
1527 if (group && !noack) {
1528 unsigned char buf[sizeof(streamID)];
1529 streamEncodeID(buf,&id);
1530
1531 /* Try to add a new NACK. Most of the time this will work and
1532 * will not require extra lookups. We'll fix the problem later
1533 * if we find that there is already a entry for this ID. */
1534 streamNACK *nack = streamCreateNACK(consumer);
1535 int group_inserted =
1536 raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL((void*)0));
1537 int consumer_inserted =
1538 raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0));
1539
1540 /* Now we can check if the entry was already busy, and
1541 * in that case reassign the entry to the new consumer,
1542 * or update it if the consumer is the same as before. */
1543 if (group_inserted == 0) {
1544 streamFreeNACK(nack);
1545 nack = raxFind(group->pel,buf,sizeof(buf));
1546 serverAssert(nack != raxNotFound)((nack != raxNotFound)?(void)0 : (_serverAssert("nack != raxNotFound"
,"t_stream.c",1546),__builtin_unreachable()))
;
1547 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0));
1548 /* Update the consumer and NACK metadata. */
1549 nack->consumer = consumer;
1550 nack->delivery_time = mstime();
1551 nack->delivery_count = 1;
1552 /* Add the entry in the new consumer local PEL. */
1553 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0));
1554 } else if (group_inserted == 1 && consumer_inserted == 0) {
1555 serverPanic("NACK half-created. Should not be possible.")_serverPanic("t_stream.c",1555,"NACK half-created. Should not be possible."
),__builtin_unreachable()
;
1556 }
1557
1558 /* Propagate as XCLAIM. */
1559 if (spi) {
1560 robj *idarg = createObjectFromStreamID(&id);
1561 streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
1562 decrRefCount(idarg);
1563 }
1564 }
1565
1566 arraylen++;
1567 if (count && count == arraylen) break;
1568 }
1569
1570 if (spi && propagate_last_id)
1571 streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
1572
1573 streamIteratorStop(&si);
1574 if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
1575 return arraylen;
1576}
1577
1578/* This is an helper function for streamReplyWithRange() when called with
1579 * group and consumer arguments, but with a range that is referring to already
1580 * delivered messages. In this case we just emit messages that are already
1581 * in the history of the consumer, fetching the IDs from its PEL.
1582 *
1583 * Note that this function does not have a 'rev' argument because it's not
1584 * possible to iterate in reverse using a group. Basically this function
1585 * is only called as a result of the XREADGROUP command.
1586 *
1587 * This function is more expensive because it needs to inspect the PEL and then
1588 * seek into the radix tree of the messages in order to emit the full message
1589 * to the client. However clients only reach this code path when they are
1590 * fetching the history of already retrieved messages, which is rare. */
1591size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
1592 raxIterator ri;
1593 unsigned char startkey[sizeof(streamID)];
1594 unsigned char endkey[sizeof(streamID)];
1595 streamEncodeID(startkey,start);
1596 if (end) streamEncodeID(endkey,end);
1597
1598 size_t arraylen = 0;
1599 void *arraylen_ptr = addReplyDeferredLen(c);
1600 raxStart(&ri,consumer->pel);
1601 raxSeek(&ri,">=",startkey,sizeof(startkey));
1602 while(raxNext(&ri) && (!count || arraylen < count)) {
1603 if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
1604 streamID thisid;
1605 streamDecodeID(ri.key,&thisid);
1606 if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL((void*)0),NULL((void*)0),
1607 STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)) == 0)
1608 {
1609 /* Note that we may have a not acknowledged entry in the PEL
1610 * about a message that's no longer here because was removed
1611 * by the user by other means. In that case we signal it emitting
1612 * the ID but then a NULL entry for the fields. */
1613 addReplyArrayLen(c,2);
1614 addReplyStreamID(c,&thisid);
1615 addReplyNullArray(c);
1616 } else {
1617 streamNACK *nack = ri.data;
1618 nack->delivery_time = mstime();
1619 nack->delivery_count++;
1620 }
1621 arraylen++;
1622 }
1623 raxStop(&ri);
1624 setDeferredArrayLen(c,arraylen_ptr,arraylen);
1625 return arraylen;
1626}
1627
1628/* -----------------------------------------------------------------------
1629 * Stream commands implementation
1630 * ----------------------------------------------------------------------- */
1631
1632/* Look the stream at 'key' and return the corresponding stream object.
1633 * The function creates a key setting it to an empty stream if needed. */
1634robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
1635 robj *o = lookupKeyWrite(c->db,key);
1636 if (checkType(c,o,OBJ_STREAM6)) return NULL((void*)0);
1637 if (o == NULL((void*)0)) {
1638 if (no_create) {
1639 addReplyNull(c);
1640 return NULL((void*)0);
1641 }
1642 o = createStreamObject();
1643 dbAdd(c->db,key,o);
1644 }
1645 return o;
1646}
1647
1648/* Parse a stream ID in the format given by clients to Redis, that is
1649 * <ms>-<seq>, and converts it into a streamID structure. If
1650 * the specified ID is invalid C_ERR is returned and an error is reported
1651 * to the client, otherwise C_OK is returned. The ID may be in incomplete
1652 * form, just stating the milliseconds time part of the stream. In such a case
1653 * the missing part is set according to the value of 'missing_seq' parameter.
1654 *
1655 * The IDs "-" and "+" specify respectively the minimum and maximum IDs
1656 * that can be represented. If 'strict' is set to 1, "-" and "+" will be
1657 * treated as an invalid ID.
1658 *
1659 * If 'c' is set to NULL, no reply is sent to the client. */
1660int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) {
1661 char buf[128];
1662 if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
1663 memcpy(buf,o->ptr,sdslen(o->ptr)+1);
1664
1665 if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
1666 goto invalid;
1667
1668 /* Handle the "-" and "+" special cases. */
1669 if (buf[0] == '-' && buf[1] == '\0') {
1670 id->ms = 0;
1671 id->seq = 0;
1672 return C_OK0;
1673 } else if (buf[0] == '+' && buf[1] == '\0') {
1674 id->ms = UINT64_MAX(18446744073709551615UL);
1675 id->seq = UINT64_MAX(18446744073709551615UL);
1676 return C_OK0;
1677 }
1678
1679 /* Parse <ms>-<seq> form. */
1680 char *dot = strchr(buf,'-');
1681 if (dot) *dot = '\0';
1682 unsigned long long ms, seq;
1683 if (string2ull(buf,&ms) == 0) goto invalid;
1684 if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
1685 if (!dot) seq = missing_seq;
1686 id->ms = ms;
1687 id->seq = seq;
1688 return C_OK0;
1689
1690invalid:
1691 if (c) addReplyError(c,"Invalid stream ID specified as stream "
1692 "command argument");
1693 return C_ERR-1;
1694}
1695
1696/* Wrapper for streamGenericParseIDOrReply() used by module API. */
1697int streamParseID(const robj *o, streamID *id) {
1698 return streamGenericParseIDOrReply(NULL((void*)0), o, id, 0, 0);
1699}
1700
1701/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1702 * 0, to be used when - and + are acceptable IDs. */
1703int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1704 return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
1705}
1706
1707/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
1708 * 1, to be used when we want to return an error if the special IDs + or -
1709 * are provided. */
1710int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
1711 return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
1712}
1713
1714/* Helper for parsing a stream ID that is a range query interval. When the
1715 * exclude argument is NULL, streamParseIDOrReply() is called and the interval
1716 * is treated as close (inclusive). Otherwise, the exclude argument is set if
1717 * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is
1718 * called in that case.
1719 */
1720int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) {
1721 char *p = o->ptr;
1722 size_t len = sdslen(p);
1723 int invalid = 0;
1724
1725 if (exclude != NULL((void*)0)) *exclude = (len > 1 && p[0] == '(');
1726 if (exclude != NULL((void*)0) && *exclude) {
1727 robj *t = createStringObject(p+1,len-1);
1728 invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR-1);
1729 decrRefCount(t);
1730 } else
1731 invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR-1);
1732 if (invalid)
1733 return C_ERR-1;
1734 return C_OK0;
1735}
1736
1737void streamRewriteApproxSpecifier(client *c, int idx) {
1738 rewriteClientCommandArgument(c,idx,shared.special_equals);
1739}
1740
1741/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream>
1742 * otherwise trimming is no longer deterministic on replicas / AOF. */
1743void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
1744 robj *arg;
1745 if (trim_strategy == TRIM_STRATEGY_MAXLEN1) {
1746 arg = createStringObjectFromLongLong(s->length);
1747 } else {
1748 streamID first_id;
1749 streamGetEdgeID(s, 1, &first_id);
1750 arg = createObjectFromStreamID(&first_id);
1751 }
1752
1753 rewriteClientCommandArgument(c,idx,arg);
1754 decrRefCount(arg);
1755}
1756
1757/* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
1758void xaddCommand(client *c) {
1759 /* Parse options. */
1760 streamAddTrimArgs parsed_args;
1761 int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
1762 if (idpos < 0)
1763 return; /* streamParseAddOrTrimArgsOrReply already replied. */
1764 int field_pos = idpos+1; /* The ID is always one argument before the first field */
1765
1766 /* Check arity. */
1767 if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
1768 addReplyError(c,"wrong number of arguments for XADD");
1769 return;
1770 }
1771
1772 /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
1773 * a new stream and have streamAppendItem fail, leaving an empty key in the
1774 * database. */
1775 if (parsed_args.id_given &&
1776 parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
1777 {
1778 addReplyError(c,"The ID specified in XADD must be greater than 0-0");
1779 return;
1780 }
1781
1782 /* Lookup the stream at key. */
1783 robj *o;
1784 stream *s;
1785 if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL((void*)0)) return;
1786 s = o->ptr;
1787
1788 /* Return ASAP if the stream has reached the last possible ID */
1789 if (s->last_id.ms == UINT64_MAX(18446744073709551615UL) && s->last_id.seq == UINT64_MAX(18446744073709551615UL)) {
1790 addReplyError(c,"The stream has exhausted the last possible ID, "
1791 "unable to add more items");
1792 return;
1793 }
1794
1795 /* Append using the low level function and return the ID. */
1796 streamID id;
1797 if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
1798 &id, parsed_args.id_given ? &parsed_args.id : NULL((void*)0))
1799 == C_ERR-1)
1800 {
1801 addReplyError(c,"The ID specified in XADD is equal or smaller than the "
1802 "target stream top item");
1803 return;
1804 }
1805 addReplyStreamID(c,&id);
1806
1807 signalModifiedKey(c,c->db,c->argv[1]);
1808 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xadd",c->argv[1],c->db->id);
1809 server.dirty++;
1810
1811 /* Trim if needed. */
1812 if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE0) {
1813 if (streamTrim(s, &parsed_args)) {
1814 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xtrim",c->argv[1],c->db->id);
1815 }
1816 if (parsed_args.approx_trim) {
1817 /* In case our trimming was limited (by LIMIT or by ~) we must
1818 * re-write the relevant trim argument to make sure there will be
1819 * no inconsistencies in AOF loading or in the replica.
1820 * It's enough to check only args->approx because there is no
1821 * way LIMIT is given without the ~ option. */
1822 streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
1823 streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
1824 }
1825 }
1826
1827 /* Let's rewrite the ID argument with the one actually generated for
1828 * AOF/replication propagation. */
1829 robj *idarg = createObjectFromStreamID(&id);
1830 rewriteClientCommandArgument(c,idpos,idarg);
1831 decrRefCount(idarg);
1832
1833 /* We need to signal to blocked clients that there is new data on this
1834 * stream. */
1835 signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM6);
1836}
1837
1838/* XRANGE/XREVRANGE actual implementation.
1839 * The 'start' and 'end' IDs are parsed as follows:
1840 * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX.
1841 * "-" and "+"" mean the minimal and maximal ID values, respectively.
1842 * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0
1843 * will match anything from 1-1 and 1-UINT64_MAX.
1844 */
1845void xrangeGenericCommand(client *c, int rev) {
1846 robj *o;
1847 stream *s;
1848 streamID startid, endid;
1849 long long count = -1;
1850 robj *startarg = rev ? c->argv[3] : c->argv[2];
1851 robj *endarg = rev ? c->argv[2] : c->argv[3];
1852 int startex = 0, endex = 0;
1853
1854 /* Parse start and end IDs. */
1855 if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK0)
1856 return;
1857 if (startex && streamIncrID(&startid) != C_OK0) {
1858 addReplyError(c,"invalid start ID for the interval");
1859 return;
1860 }
1861 if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX(18446744073709551615UL)) != C_OK0)
1862 return;
1863 if (endex && streamDecrID(&endid) != C_OK0) {
1864 addReplyError(c,"invalid end ID for the interval");
1865 return;
1866 }
1867
1868 /* Parse the COUNT option if any. */
1869 if (c->argc > 4) {
1870 for (int j = 4; j < c->argc; j++) {
1871 int additional = c->argc-j-1;
1872 if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) {
1873 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL((void*)0))
1874 != C_OK0) return;
1875 if (count < 0) count = 0;
1876 j++; /* Consume additional arg. */
1877 } else {
1878 addReplyErrorObject(c,shared.syntaxerr);
1879 return;
1880 }
1881 }
1882 }
1883
1884 /* Return the specified range to the user. */
1885 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL((void*)0) ||
1886 checkType(c,o,OBJ_STREAM6)) return;
1887
1888 s = o->ptr;
1889
1890 if (count == 0) {
1891 addReplyNullArray(c);
1892 } else {
1893 if (count == -1) count = 0;
1894 streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL((void*)0),NULL((void*)0),0,NULL((void*)0));
1895 }
1896}
1897
1898/* XRANGE key start end [COUNT <n>] */
1899void xrangeCommand(client *c) {
1900 xrangeGenericCommand(c,0);
1901}
1902
1903/* XREVRANGE key end start [COUNT <n>] */
1904void xrevrangeCommand(client *c) {
1905 xrangeGenericCommand(c,1);
1906}
1907
1908/* XLEN */
1909void xlenCommand(client *c) {
1910 robj *o;
1911 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0)
1912 || checkType(c,o,OBJ_STREAM6)) return;
1913 stream *s = o->ptr;
1914 addReplyLongLong(c,s->length);
1915}
1916
1917/* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N
1918 * ID_1 ID_2 ... ID_N
1919 *
1920 * This function also implements the XREAD-GROUP command, which is like XREAD
1921 * but accepting the [GROUP group-name consumer-name] additional option.
1922 * This is useful because while XREAD is a read command and can be called
1923 * on slaves, XREAD-GROUP is not. */
1924#define XREAD_BLOCKED_DEFAULT_COUNT1000 1000
1925void xreadCommand(client *c) {
1926 long long timeout = -1; /* -1 means, no BLOCK argument given. */
1927 long long count = 0;
1928 int streams_count = 0;
1929 int streams_arg = 0;
1930 int noack = 0; /* True if NOACK option was specified. */
1931 streamID static_ids[STREAMID_STATIC_VECTOR_LEN8];
1932 streamID *ids = static_ids;
1933 streamCG **groups = NULL((void*)0);
1934 int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
1935 robj *groupname = NULL((void*)0);
1936 robj *consumername = NULL((void*)0);
1937
1938 /* Parse arguments. */
1939 for (int i = 1; i < c->argc; i++) {
1940 int moreargs = c->argc-i-1;
1941 char *o = c->argv[i]->ptr;
1942 if (!strcasecmp(o,"BLOCK") && moreargs) {
1943 if (c->flags & CLIENT_LUA(1<<8)) {
1944 /*
1945 * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
1946 * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backword compatibility.
1947 * There is no sense to use BLOCK option within Lua. */
1948 addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
1949 return;
1950 }
1951 i++;
1952 if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
1953 UNIT_MILLISECONDS1) != C_OK0) return;
1954 } else if (!strcasecmp(o,"COUNT") && moreargs) {
1955 i++;
1956 if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL((void*)0)) != C_OK0)
1957 return;
1958 if (count < 0) count = 0;
1959 } else if (!strcasecmp(o,"STREAMS") && moreargs) {
1960 streams_arg = i+1;
1961 streams_count = (c->argc-streams_arg);
1962 if ((streams_count % 2) != 0) {
1963 addReplyError(c,"Unbalanced XREAD list of streams: "
1964 "for each stream key an ID or '$' must be "
1965 "specified.");
1966 return;
1967 }
1968 streams_count /= 2; /* We have two arguments for each stream. */
1969 break;
1970 } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
1971 if (!xreadgroup) {
1972 addReplyError(c,"The GROUP option is only supported by "
1973 "XREADGROUP. You called XREAD instead.");
1974 return;
1975 }
1976 groupname = c->argv[i+1];
1977 consumername = c->argv[i+2];
1978 i += 2;
1979 } else if (!strcasecmp(o,"NOACK")) {
1980 if (!xreadgroup) {
1981 addReplyError(c,"The NOACK option is only supported by "
1982 "XREADGROUP. You called XREAD instead.");
1983 return;
1984 }
1985 noack = 1;
1986 } else {
1987 addReplyErrorObject(c,shared.syntaxerr);
1988 return;
1989 }
1990 }
1991
1992 /* STREAMS option is mandatory. */
1993 if (streams_arg == 0) {
1994 addReplyErrorObject(c,shared.syntaxerr);
1995 return;
1996 }
1997
1998 /* If the user specified XREADGROUP then it must also
1999 * provide the GROUP option. */
2000 if (xreadgroup && groupname == NULL((void*)0)) {
2001 addReplyError(c,"Missing GROUP option for XREADGROUP");
2002 return;
2003 }
2004
2005 /* Parse the IDs and resolve the group name. */
2006 if (streams_count > STREAMID_STATIC_VECTOR_LEN8)
2007 ids = zmalloc(sizeof(streamID)*streams_count);
2008 if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
2009
2010 for (int i = streams_arg + streams_count; i < c->argc; i++) {
2011 /* Specifying "$" as last-known-id means that the client wants to be
2012 * served with just the messages that will arrive into the stream
2013 * starting from now. */
2014 int id_idx = i - streams_arg - streams_count;
2015 robj *key = c->argv[i-streams_count];
2016 robj *o = lookupKeyRead(c->db,key);
2017 if (checkType(c,o,OBJ_STREAM6)) goto cleanup;
2018 streamCG *group = NULL((void*)0);
2019
2020 /* If a group was specified, than we need to be sure that the
2021 * key and group actually exist. */
2022 if (groupname) {
2023 if (o == NULL((void*)0) ||
2024 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL((void*)0))
2025 {
2026 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
2027 "group '%s' in XREADGROUP with GROUP "
2028 "option",
2029 (char*)key->ptr,(char*)groupname->ptr);
2030 goto cleanup;
2031 }
2032 groups[id_idx] = group;
2033 }
2034
2035 if (strcmp(c->argv[i]->ptr,"$") == 0) {
2036 if (xreadgroup) {
2037 addReplyError(c,"The $ ID is meaningless in the context of "
2038 "XREADGROUP: you want to read the history of "
2039 "this consumer by specifying a proper ID, or "
2040 "use the > ID to get new messages. The $ ID would "
2041 "just return an empty result set.");
2042 goto cleanup;
2043 }
2044 if (o) {
2045 stream *s = o->ptr;
2046 ids[id_idx] = s->last_id;
2047 } else {
2048 ids[id_idx].ms = 0;
2049 ids[id_idx].seq = 0;
2050 }
2051 continue;
2052 } else if (strcmp(c->argv[i]->ptr,">") == 0) {
2053 if (!xreadgroup) {
2054 addReplyError(c,"The > ID can be specified only when calling "
2055 "XREADGROUP using the GROUP <group> "
2056 "<consumer> option.");
2057 goto cleanup;
2058 }
2059 /* We use just the maximum ID to signal this is a ">" ID, anyway
2060 * the code handling the blocking clients will have to update the
2061 * ID later in order to match the changing consumer group last ID. */
2062 ids[id_idx].ms = UINT64_MAX(18446744073709551615UL);
2063 ids[id_idx].seq = UINT64_MAX(18446744073709551615UL);
2064 continue;
2065 }
2066 if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK0)
2067 goto cleanup;
2068 }
2069
2070 /* Try to serve the client synchronously. */
2071 size_t arraylen = 0;
2072 void *arraylen_ptr = NULL((void*)0);
2073 for (int i = 0; i < streams_count; i++) {
2074 robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
2075 if (o == NULL((void*)0)) continue;
2076 stream *s = o->ptr;
2077 streamID *gt = ids+i; /* ID must be greater than this. */
2078 int serve_synchronously = 0;
2079 int serve_history = 0; /* True for XREADGROUP with ID != ">". */
2080
2081 /* Check if there are the conditions to serve the client
2082 * synchronously. */
2083 if (groups) {
2084 /* If the consumer is blocked on a group, we always serve it
2085 * synchronously (serving its local history) if the ID specified
2086 * was not the special ">" ID. */
2087 if (gt->ms != UINT64_MAX(18446744073709551615UL) ||
2088 gt->seq != UINT64_MAX(18446744073709551615UL))
2089 {
2090 serve_synchronously = 1;
2091 serve_history = 1;
2092 } else if (s->length) {
2093 /* We also want to serve a consumer in a consumer group
2094 * synchronously in case the group top item delivered is smaller
2095 * than what the stream has inside. */
2096 streamID maxid, *last = &groups[i]->last_id;
2097 streamLastValidID(s, &maxid);
2098 if (streamCompareID(&maxid, last) > 0) {
2099 serve_synchronously = 1;
2100 *gt = *last;
2101 }
2102 }
2103 } else if (s->length) {
2104 /* For consumers without a group, we serve synchronously if we can
2105 * actually provide at least one item from the stream. */
2106 streamID maxid;
2107 streamLastValidID(s, &maxid);
2108 if (streamCompareID(&maxid, gt) > 0) {
2109 serve_synchronously = 1;
2110 }
2111 }
2112
2113 if (serve_synchronously) {
2114 arraylen++;
2115 if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
2116 /* streamReplyWithRange() handles the 'start' ID as inclusive,
2117 * so start from the next ID, since we want only messages with
2118 * IDs greater than start. */
2119 streamID start = *gt;
2120 streamIncrID(&start);
2121
2122 /* Emit the two elements sub-array consisting of the name
2123 * of the stream and the data we extracted from it. */
2124 if (c->resp == 2) addReplyArrayLen(c,2);
2125 addReplyBulk(c,c->argv[streams_arg+i]);
2126 int created = 0;
2127 streamConsumer *consumer = NULL((void*)0);
2128 if (groups) consumer = streamLookupConsumer(groups[i],
2129 consumername->ptr,
2130 SLC_NONE0,
2131 &created);
2132 streamPropInfo spi = {c->argv[i+streams_arg],groupname};
2133 if (created && noack)
2134 streamPropagateConsumerCreation(c,spi.keyname,
2135 spi.groupname,
2136 consumer->name);
2137 int flags = 0;
2138 if (noack) flags |= STREAM_RWR_NOACK(1<<0);
2139 if (serve_history) flags |= STREAM_RWR_HISTORY(1<<2);
2140 streamReplyWithRange(c,s,&start,NULL((void*)0),count,0,
2141 groups ? groups[i] : NULL((void*)0),
2142 consumer, flags, &spi);
2143 if (groups) server.dirty++;
2144 }
2145 }
2146
2147 /* We replied synchronously! Set the top array len and return to caller. */
2148 if (arraylen) {
2149 if (c->resp == 2)
2150 setDeferredArrayLen(c,arraylen_ptr,arraylen);
2151 else
2152 setDeferredMapLen(c,arraylen_ptr,arraylen);
2153 goto cleanup;
2154 }
2155
2156 /* Block if needed. */
2157 if (timeout != -1) {
2158 /* If we are not allowed to block the client, the only thing
2159 * we can do is treating it as a timeout (even with timeout 0). */
2160 if (c->flags & CLIENT_DENY_BLOCKING(1ULL<<41)) {
2161 addReplyNullArray(c);
2162 goto cleanup;
2163 }
2164 blockForKeys(c, BLOCKED_STREAM4, c->argv+streams_arg, streams_count,
2165 timeout, NULL((void*)0), NULL((void*)0), ids);
2166 /* If no COUNT is given and we block, set a relatively small count:
2167 * in case the ID provided is too low, we do not want the server to
2168 * block just to serve this client a huge stream of messages. */
2169 c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT1000;
2170
2171 /* If this is a XREADGROUP + GROUP we need to remember for which
2172 * group and consumer name we are blocking, so later when one of the
2173 * keys receive more data, we can call streamReplyWithRange() passing
2174 * the right arguments. */
2175 if (groupname) {
2176 incrRefCount(groupname);
2177 incrRefCount(consumername);
2178 c->bpop.xread_group = groupname;
2179 c->bpop.xread_consumer = consumername;
2180 c->bpop.xread_group_noack = noack;
2181 } else {
2182 c->bpop.xread_group = NULL((void*)0);
2183 c->bpop.xread_consumer = NULL((void*)0);
2184 }
2185 goto cleanup;
2186 }
2187
2188 /* No BLOCK option, nor any stream we can serve. Reply as with a
2189 * timeout happened. */
2190 addReplyNullArray(c);
2191 /* Continue to cleanup... */
2192
2193cleanup: /* Cleanup. */
2194
2195 /* The command is propagated (in the READGROUP form) as a side effect
2196 * of calling lower level APIs. So stop any implicit propagation. */
2197 preventCommandPropagation(c);
2198 if (ids != static_ids) zfree(ids);
2199 zfree(groups);
2200}
2201
2202/* -----------------------------------------------------------------------
2203 * Low level implementation of consumer groups
2204 * ----------------------------------------------------------------------- */
2205
2206/* Create a NACK entry setting the delivery count to 1 and the delivery
2207 * time to the current time. The NACK consumer will be set to the one
2208 * specified as argument of the function. */
2209streamNACK *streamCreateNACK(streamConsumer *consumer) {
2210 streamNACK *nack = zmalloc(sizeof(*nack));
2211 nack->delivery_time = mstime();
2212 nack->delivery_count = 1;
2213 nack->consumer = consumer;
2214 return nack;
2215}
2216
2217/* Free a NACK entry. */
2218void streamFreeNACK(streamNACK *na) {
2219 zfree(na);
2220}
2221
2222/* Free a consumer and associated data structures. Note that this function
2223 * will not reassign the pending messages associated with this consumer
2224 * nor will delete them from the stream, so when this function is called
2225 * to delete a consumer, and not when the whole stream is destroyed, the caller
2226 * should do some work before. */
2227void streamFreeConsumer(streamConsumer *sc) {
2228 raxFree(sc->pel); /* No value free callback: the PEL entries are shared
2229 between the consumer and the main stream PEL. */
2230 sdsfree(sc->name);
2231 zfree(sc);
2232}
2233
2234/* Create a new consumer group in the context of the stream 's', having the
2235 * specified name and last server ID. If a consumer group with the same name
2236 * already existed NULL is returned, otherwise the pointer to the consumer
2237 * group is returned. */
2238streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
2239 if (s->cgroups == NULL((void*)0)) s->cgroups = raxNew();
2240 if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
2241 return NULL((void*)0);
2242
2243 streamCG *cg = zmalloc(sizeof(*cg));
2244 cg->pel = raxNew();
2245 cg->consumers = raxNew();
2246 cg->last_id = *id;
2247 raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL((void*)0));
2248 return cg;
2249}
2250
2251/* Free a consumer group and all its associated data. */
2252void streamFreeCG(streamCG *cg) {
2253 raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
2254 raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
2255 zfree(cg);
2256}
2257
2258/* Lookup the consumer group in the specified stream and returns its
2259 * pointer, otherwise if there is no such group, NULL is returned. */
2260streamCG *streamLookupCG(stream *s, sds groupname) {
2261 if (s->cgroups == NULL((void*)0)) return NULL((void*)0);
2262 streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname,
2263 sdslen(groupname));
2264 return (cg == raxNotFound) ? NULL((void*)0) : cg;
2265}
2266
2267/* Lookup the consumer with the specified name in the group 'cg': if the
2268 * consumer does not exist it is created unless SLC_NOCREAT flag was specified.
2269 * Its last seen time is updated unless SLC_NOREFRESH flag was specified. */
2270streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created) {
2271 if (created) *created = 0;
2272 int create = !(flags & SLC_NOCREAT(1<<0));
2273 int refresh = !(flags & SLC_NOREFRESH(1<<1));
2274 streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
2275 sdslen(name));
2276 if (consumer == raxNotFound) {
2277 if (!create) return NULL((void*)0);
2278 consumer = zmalloc(sizeof(*consumer));
2279 consumer->name = sdsdup(name);
2280 consumer->pel = raxNew();
2281 raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
2282 consumer,NULL((void*)0));
2283 consumer->seen_time = mstime();
2284 if (created) *created = 1;
2285 } else if (refresh)
2286 consumer->seen_time = mstime();
2287 return consumer;
2288}
2289
2290/* Delete the consumer specified in the consumer group 'cg'. The consumer
2291 * may have pending messages: they are removed from the PEL, and the number
2292 * of pending messages "lost" is returned. */
2293uint64_t streamDelConsumer(streamCG *cg, sds name) {
2294 streamConsumer *consumer =
2295 streamLookupConsumer(cg,name,SLC_NOCREAT(1<<0)|SLC_NOREFRESH(1<<1),NULL((void*)0));
2296 if (consumer == NULL((void*)0)) return 0;
2297
2298 uint64_t retval = raxSize(consumer->pel);
2299
2300 /* Iterate all the consumer pending messages, deleting every corresponding
2301 * entry from the global entry. */
2302 raxIterator ri;
2303 raxStart(&ri,consumer->pel);
2304 raxSeek(&ri,"^",NULL((void*)0),0);
2305 while(raxNext(&ri)) {
2306 streamNACK *nack = ri.data;
2307 raxRemove(cg->pel,ri.key,ri.key_len,NULL((void*)0));
2308 streamFreeNACK(nack);
2309 }
2310 raxStop(&ri);
2311
2312 /* Deallocate the consumer. */
2313 raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL((void*)0));
2314 streamFreeConsumer(consumer);
2315 return retval;
2316}
2317
2318/* -----------------------------------------------------------------------
2319 * Consumer groups commands
2320 * ----------------------------------------------------------------------- */
2321
2322/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
2323 * XGROUP SETID <key> <groupname> <id or $>
2324 * XGROUP DESTROY <key> <groupname>
2325 * XGROUP CREATECONSUMER <key> <groupname> <consumer>
2326 * XGROUP DELCONSUMER <key> <groupname> <consumername> */
2327void xgroupCommand(client *c) {
2328 stream *s = NULL((void*)0);
2329 sds grpname = NULL((void*)0);
2330 streamCG *cg = NULL((void*)0);
2331 char *opt = c->argv[1]->ptr; /* Subcommand name. */
2332 int mkstream = 0;
2333 robj *o;
2334
2335 /* CREATE has an MKSTREAM option that creates the stream if it
2336 * does not exist. */
2337 if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
2338 if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
2339 addReplySubcommandSyntaxError(c);
2340 return;
2341 }
2342 mkstream = 1;
2343 grpname = c->argv[3]->ptr;
2344 }
2345
2346 /* Everything but the "HELP" option requires a key and group name. */
2347 if (c->argc >= 4) {
2348 o = lookupKeyWrite(c->db,c->argv[2]);
2349 if (o) {
2350 if (checkType(c,o,OBJ_STREAM6)) return;
2351 s = o->ptr;
2352 }
2353 grpname = c->argv[3]->ptr;
2354 }
2355
2356 /* Check for missing key/group. */
2357 if (c->argc >= 4 && !mkstream) {
2358 /* At this point key must exist, or there is an error. */
2359 if (s == NULL((void*)0)) {
2360 addReplyError(c,
2361 "The XGROUP subcommand requires the key to exist. "
2362 "Note that for CREATE you may want to use the MKSTREAM "
2363 "option to create an empty stream automatically.");
2364 return;
2365 }
2366
2367 /* Certain subcommands require the group to exist. */
2368 if ((cg = streamLookupCG(s,grpname)) == NULL((void*)0) &&
2369 (!strcasecmp(opt,"SETID") ||
2370 !strcasecmp(opt,"CREATECONSUMER") ||
2371 !strcasecmp(opt,"DELCONSUMER")))
2372 {
2373 addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
2374 "for key name '%s'",
2375 (char*)grpname, (char*)c->argv[2]->ptr);
2376 return;
2377 }
2378 }
2379
2380 /* Dispatch the different subcommands. */
2381 if (c->argc == 2 && !strcasecmp(opt,"HELP")) {
2382 const char *help[] = {
2383"CREATE <key> <groupname> <id|$> [option]",
2384" Create a new consumer group. Options are:",
2385" * MKSTREAM",
2386" Create the empty stream if it does not exist.",
2387"CREATECONSUMER <key> <groupname> <consumer>",
2388" Create a new consumer in the specified group.",
2389"DELCONSUMER <key> <groupname> <consumer>",
2390" Remove the specified consumer.",
2391"DESTROY <key> <groupname>"
2392" Remove the specified group.",
2393"SETID <key> <groupname> <id|$>",
2394" Set the current group ID.",
2395NULL((void*)0)
2396 };
2397 addReplyHelp(c, help);
2398 } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
2399 streamID id;
2400 if (!strcmp(c->argv[4]->ptr,"$")) {
2401 if (s) {
2402 id = s->last_id;
2403 } else {
2404 id.ms = 0;
2405 id.seq = 0;
2406 }
2407 } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK0) {
2408 return;
2409 }
2410
2411 /* Handle the MKSTREAM option now that the command can no longer fail. */
2412 if (s == NULL((void*)0)) {
2413 serverAssert(mkstream)((mkstream)?(void)0 : (_serverAssert("mkstream","t_stream.c",
2413),__builtin_unreachable()))
;
2414 o = createStreamObject();
2415 dbAdd(c->db,c->argv[2],o);
2416 s = o->ptr;
2417 signalModifiedKey(c,c->db,c->argv[2]);
2418 }
2419
2420 streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
2421 if (cg) {
2422 addReply(c,shared.ok);
2423 server.dirty++;
2424 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-create",
2425 c->argv[2],c->db->id);
2426 } else {
2427 addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
2428 }
2429 } else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
2430 streamID id;
2431 if (!strcmp(c->argv[4]->ptr,"$")) {
2432 id = s->last_id;
2433 } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK0) {
2434 return;
2435 }
2436 cg->last_id = id;
2437 addReply(c,shared.ok);
2438 server.dirty++;
2439 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-setid",c->argv[2],c->db->id);
2440 } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) {
2441 if (cg) {
2442 raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL((void*)0));
2443 streamFreeCG(cg);
2444 addReply(c,shared.cone);
2445 server.dirty++;
2446 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-destroy",
2447 c->argv[2],c->db->id);
2448 /* We want to unblock any XREADGROUP consumers with -NOGROUP. */
2449 signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM6);
2450 } else {
2451 addReply(c,shared.czero);
2452 }
2453 } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
2454 int created = 0;
2455 streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NOREFRESH(1<<1),&created);
2456 if (created) {
2457 server.dirty++;
2458 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-createconsumer",
2459 c->argv[2],c->db->id);
2460 }
2461 addReplyLongLong(c,created);
2462 } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
2463 /* Delete the consumer and returns the number of pending messages
2464 * that were yet associated with such a consumer. */
2465 long long pending = streamDelConsumer(cg,c->argv[4]->ptr);
2466 addReplyLongLong(c,pending);
2467 server.dirty++;
2468 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-delconsumer",
2469 c->argv[2],c->db->id);
2470 } else {
2471 addReplySubcommandSyntaxError(c);
2472 }
2473}
2474
2475/* XSETID <stream> <id>
2476 *
2477 * Set the internal "last ID" of a stream. */
2478void xsetidCommand(client *c) {
2479 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
2480 if (o == NULL((void*)0) || checkType(c,o,OBJ_STREAM6)) return;
2481
2482 stream *s = o->ptr;
2483 streamID id;
2484 if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK0) return;
2485
2486 /* If the stream has at least one item, we want to check that the user
2487 * is setting a last ID that is equal or greater than the current top
2488 * item, otherwise the fundamental ID monotonicity assumption is violated. */
2489 if (s->length > 0) {
2490 streamID maxid;
2491 streamLastValidID(s,&maxid);
2492
2493 if (streamCompareID(&id,&maxid) < 0) {
2494 addReplyError(c,"The ID specified in XSETID is smaller than the "
2495 "target stream top item");
2496 return;
2497 }
2498 }
2499 s->last_id = id;
2500 addReply(c,shared.ok);
2501 server.dirty++;
2502 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xsetid",c->argv[1],c->db->id);
2503}
2504
2505/* XACK <key> <group> <id> <id> ... <id>
2506 *
2507 * Acknowledge a message as processed. In practical terms we just check the
2508 * pendine entries list (PEL) of the group, and delete the PEL entry both from
2509 * the group and the consumer (pending messages are referenced in both places).
2510 *
2511 * Return value of the command is the number of messages successfully
2512 * acknowledged, that is, the IDs we were actually able to resolve in the PEL.
2513 */
2514void xackCommand(client *c) {
2515 streamCG *group = NULL((void*)0);
2516 robj *o = lookupKeyRead(c->db,c->argv[1]);
2517 if (o) {
2518 if (checkType(c,o,OBJ_STREAM6)) return; /* Type error. */
2519 group = streamLookupCG(o->ptr,c->argv[2]->ptr);
2520 }
2521
2522 /* No key or group? Nothing to ack. */
2523 if (o == NULL((void*)0) || group == NULL((void*)0)) {
2524 addReply(c,shared.czero);
2525 return;
2526 }
2527
2528 /* Start parsing the IDs, so that we abort ASAP if there is a syntax
2529 * error: the return value of this command cannot be an error in case
2530 * the client successfully acknowledged some messages, so it should be
2531 * executed in a "all or nothing" fashion. */
2532 streamID static_ids[STREAMID_STATIC_VECTOR_LEN8];
2533 streamID *ids = static_ids;
2534 int id_count = c->argc-3;
2535 if (id_count > STREAMID_STATIC_VECTOR_LEN8)
2536 ids = zmalloc(sizeof(streamID)*id_count);
2537 for (int j = 3; j < c->argc; j++) {
2538 if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK0) goto cleanup;
2539 }
2540
2541 int acknowledged = 0;
2542 for (int j = 3; j < c->argc; j++) {
2543 unsigned char buf[sizeof(streamID)];
2544 streamEncodeID(buf,&ids[j-3]);
2545
2546 /* Lookup the ID in the group PEL: it will have a reference to the
2547 * NACK structure that will have a reference to the consumer, so that
2548 * we are able to remove the entry from both PELs. */
2549 streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
2550 if (nack != raxNotFound) {
2551 raxRemove(group->pel,buf,sizeof(buf),NULL((void*)0));
2552 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0));
2553 streamFreeNACK(nack);
2554 acknowledged++;
2555 server.dirty++;
2556 }
2557 }
2558 addReplyLongLong(c,acknowledged);
2559cleanup:
2560 if (ids != static_ids) zfree(ids);
2561}
2562
2563/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]]
2564 *
2565 * If start and stop are omitted, the command just outputs information about
2566 * the amount of pending messages for the key/group pair, together with
2567 * the minimum and maximum ID of pending messages.
2568 *
2569 * If start and stop are provided instead, the pending messages are returned
2570 * with information about the current owner, number of deliveries and last
2571 * delivery time and so forth. */
2572void xpendingCommand(client *c) {
2573 int justinfo = c->argc == 3; /* Without the range just outputs general
2574 informations about the PEL. */
2575 robj *key = c->argv[1];
2576 robj *groupname = c->argv[2];
2577 robj *consumername = NULL((void*)0);
2578 streamID startid, endid;
2579 long long count = 0;
2580 long long minidle = 0;
2581 int startex = 0, endex = 0;
2582
2583 /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
2584 if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
2585 addReplyErrorObject(c,shared.syntaxerr);
2586 return;
2587 }
2588
2589 /* Parse start/end/count arguments ASAP if needed, in order to report
2590 * syntax errors before any other error. */
2591 if (c->argc >= 6) {
2592 int startidx = 3; /* Without IDLE */
2593
2594 if (!strcasecmp(c->argv[3]->ptr, "IDLE")) {
2595 if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL((void*)0)) == C_ERR-1)
2596 return;
2597 if (c->argc < 8) {
2598 /* If IDLE was provided we must have at least 'start end count' */
2599 addReplyErrorObject(c,shared.syntaxerr);
2600 return;
2601 }
2602 /* Search for rest of arguments after 'IDLE <idle>' */
2603 startidx += 2;
2604 }
2605
2606 /* count argument. */
2607 if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL((void*)0)) == C_ERR-1)
2608 return;
2609 if (count < 0) count = 0;
2610
2611 /* start and end arguments. */
2612 if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK0)
2613 return;
2614 if (startex && streamIncrID(&startid) != C_OK0) {
2615 addReplyError(c,"invalid start ID for the interval");
2616 return;
2617 }
2618 if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX(18446744073709551615UL)) != C_OK0)
2619 return;
2620 if (endex && streamDecrID(&endid) != C_OK0) {
2621 addReplyError(c,"invalid end ID for the interval");
2622 return;
2623 }
2624
2625 if (startidx+3 < c->argc) {
2626 /* 'consumer' was provided */
2627 consumername = c->argv[startidx+3];
2628 }
2629 }
2630
2631 /* Lookup the key and the group inside the stream. */
2632 robj *o = lookupKeyRead(c->db,c->argv[1]);
2633 streamCG *group;
2634
2635 if (checkType(c,o,OBJ_STREAM6)) return;
2636 if (o == NULL((void*)0) ||
2637 (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL((void*)0))
2638 {
2639 addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
2640 "group '%s'",
2641 (char*)key->ptr,(char*)groupname->ptr);
2642 return;
2643 }
2644
2645 /* XPENDING <key> <group> variant. */
2646 if (justinfo) {
2647 addReplyArrayLen(c,4);
2648 /* Total number of messages in the PEL. */
2649 addReplyLongLong(c,raxSize(group->pel));
2650 /* First and last IDs. */
2651 if (raxSize(group->pel) == 0) {
2652 addReplyNull(c); /* Start. */
2653 addReplyNull(c); /* End. */
2654 addReplyNullArray(c); /* Clients. */
2655 } else {
2656 /* Start. */
2657 raxIterator ri;
2658 raxStart(&ri,group->pel);
2659 raxSeek(&ri,"^",NULL((void*)0),0);
2660 raxNext(&ri);
2661 streamDecodeID(ri.key,&startid);
2662 addReplyStreamID(c,&startid);
2663
2664 /* End. */
2665 raxSeek(&ri,"$",NULL((void*)0),0);
2666 raxNext(&ri);
2667 streamDecodeID(ri.key,&endid);
2668 addReplyStreamID(c,&endid);
2669 raxStop(&ri);
2670
2671 /* Consumers with pending messages. */
2672 raxStart(&ri,group->consumers);
2673 raxSeek(&ri,"^",NULL((void*)0),0);
2674 void *arraylen_ptr = addReplyDeferredLen(c);
2675 size_t arraylen = 0;
2676 while(raxNext(&ri)) {
2677 streamConsumer *consumer = ri.data;
2678 if (raxSize(consumer->pel) == 0) continue;
2679 addReplyArrayLen(c,2);
2680 addReplyBulkCBuffer(c,ri.key,ri.key_len);
2681 addReplyBulkLongLong(c,raxSize(consumer->pel));
2682 arraylen++;
2683 }
2684 setDeferredArrayLen(c,arraylen_ptr,arraylen);
2685 raxStop(&ri);
2686 }
2687 } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
2688 streamConsumer *consumer = NULL((void*)0);
2689 if (consumername) {
2690 consumer = streamLookupConsumer(group,
2691 consumername->ptr,
2692 SLC_NOCREAT(1<<0)|SLC_NOREFRESH(1<<1),
2693 NULL((void*)0));
2694
2695 /* If a consumer name was mentioned but it does not exist, we can
2696 * just return an empty array. */
2697 if (consumer == NULL((void*)0)) {
2698 addReplyArrayLen(c,0);
2699 return;
2700 }
2701 }
2702
2703 rax *pel = consumer ? consumer->pel : group->pel;
2704 unsigned char startkey[sizeof(streamID)];
2705 unsigned char endkey[sizeof(streamID)];
2706 raxIterator ri;
2707 mstime_t now = mstime();
2708
2709 streamEncodeID(startkey,&startid);
2710 streamEncodeID(endkey,&endid);
2711 raxStart(&ri,pel);
2712 raxSeek(&ri,">=",startkey,sizeof(startkey));
2713 void *arraylen_ptr = addReplyDeferredLen(c);
2714 size_t arraylen = 0;
2715
2716 while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
2717 streamNACK *nack = ri.data;
2718
2719 if (minidle) {
2720 mstime_t this_idle = now - nack->delivery_time;
2721 if (this_idle < minidle) continue;
2722 }
2723
2724 arraylen++;
2725 count--;
2726 addReplyArrayLen(c,4);
2727
2728 /* Entry ID. */
2729 streamID id;
2730 streamDecodeID(ri.key,&id);
2731 addReplyStreamID(c,&id);
2732
2733 /* Consumer name. */
2734 addReplyBulkCBuffer(c,nack->consumer->name,
2735 sdslen(nack->consumer->name));
2736
2737 /* Milliseconds elapsed since last delivery. */
2738 mstime_t elapsed = now - nack->delivery_time;
2739 if (elapsed < 0) elapsed = 0;
2740 addReplyLongLong(c,elapsed);
2741
2742 /* Number of deliveries. */
2743 addReplyLongLong(c,nack->delivery_count);
2744 }
2745 raxStop(&ri);
2746 setDeferredArrayLen(c,arraylen_ptr,arraylen);
2747 }
2748}
2749
2750/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
2751 * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
2752 * [FORCE] [JUSTID]
2753 *
2754 * Gets ownership of one or multiple messages in the Pending Entries List
2755 * of a given stream consumer group.
2756 *
2757 * If the message ID (among the specified ones) exists, and its idle
2758 * time greater or equal to <min-idle-time>, then the message new owner
2759 * becomes the specified <consumer>. If the minimum idle time specified
2760 * is zero, messages are claimed regardless of their idle time.
2761 *
2762 * All the messages that cannot be found inside the pending entries list
2763 * are ignored, but in case the FORCE option is used. In that case we
2764 * create the NACK (representing a not yet acknowledged message) entry in
2765 * the consumer group PEL.
2766 *
2767 * This command creates the consumer as side effect if it does not yet
2768 * exists. Moreover the command reset the idle time of the message to 0,
2769 * even if by using the IDLE or TIME options, the user can control the
2770 * new idle time.
2771 *
2772 * The options at the end can be used in order to specify more attributes
2773 * to set in the representation of the pending message:
2774 *
2775 * 1. IDLE <ms>:
2776 * Set the idle time (last time it was delivered) of the message.
2777 * If IDLE is not specified, an IDLE of 0 is assumed, that is,
2778 * the time count is reset because the message has now a new
2779 * owner trying to process it.
2780 *
2781 * 2. TIME <ms-unix-time>:
2782 * This is the same as IDLE but instead of a relative amount of
2783 * milliseconds, it sets the idle time to a specific unix time
2784 * (in milliseconds). This is useful in order to rewrite the AOF
2785 * file generating XCLAIM commands.
2786 *
2787 * 3. RETRYCOUNT <count>:
2788 * Set the retry counter to the specified value. This counter is
2789 * incremented every time a message is delivered again. Normally
2790 * XCLAIM does not alter this counter, which is just served to clients
2791 * when the XPENDING command is called: this way clients can detect
2792 * anomalies, like messages that are never processed for some reason
2793 * after a big number of delivery attempts.
2794 *
2795 * 4. FORCE:
2796 * Creates the pending message entry in the PEL even if certain
2797 * specified IDs are not already in the PEL assigned to a different
2798 * client. However the message must be exist in the stream, otherwise
2799 * the IDs of non existing messages are ignored.
2800 *
2801 * 5. JUSTID:
2802 * Return just an array of IDs of messages successfully claimed,
2803 * without returning the actual message.
2804 *
2805 * 6. LASTID <id>:
2806 * Update the consumer group last ID with the specified ID if the
2807 * current last ID is smaller than the provided one.
2808 * This is used for replication / AOF, so that when we read from a
2809 * consumer group, the XCLAIM that gets propagated to give ownership
2810 * to the consumer, is also used in order to update the group current
2811 * ID.
2812 *
2813 * The command returns an array of messages that the user
2814 * successfully claimed, so that the caller is able to understand
2815 * what messages it is now in charge of. */
2816void xclaimCommand(client *c) {
2817 streamCG *group = NULL((void*)0);
2818 robj *o = lookupKeyRead(c->db,c->argv[1]);
2819 long long minidle; /* Minimum idle time argument. */
2820 long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
2821 mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
2822 int force = 0;
2823 int justid = 0;
2824
2825 if (o) {
2826 if (checkType(c,o,OBJ_STREAM6)) return; /* Type error. */
2827 group = streamLookupCG(o->ptr,c->argv[2]->ptr);
2828 }
2829
2830 /* No key or group? Send an error given that the group creation
2831 * is mandatory. */
2832 if (o == NULL((void*)0) || group == NULL((void*)0)) {
2833 addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
2834 "consumer group '%s'", (char*)c->argv[1]->ptr,
2835 (char*)c->argv[2]->ptr);
2836 return;
2837 }
2838
2839 if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
2840 "Invalid min-idle-time argument for XCLAIM")
2841 != C_OK0) return;
2842 if (minidle < 0) minidle = 0;
2843
2844 /* Start parsing the IDs, so that we abort ASAP if there is a syntax
2845 * error: the return value of this command cannot be an error in case
2846 * the client successfully claimed some message, so it should be
2847 * executed in a "all or nothing" fashion. */
2848 int j;
2849 streamID static_ids[STREAMID_STATIC_VECTOR_LEN8];
2850 streamID *ids = static_ids;
2851 int id_count = c->argc-5;
2852 if (id_count > STREAMID_STATIC_VECTOR_LEN8)
2853 ids = zmalloc(sizeof(streamID)*id_count);
2854 for (j = 5; j < c->argc; j++) {
2855 if (streamParseStrictIDOrReply(NULL((void*)0),c->argv[j],&ids[j-5],0) != C_OK0) break;
2856 }
2857 int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
2858
2859 /* If we stopped because some IDs cannot be parsed, perhaps they
2860 * are trailing options. */
2861 mstime_t now = mstime();
2862 streamID last_id = {0,0};
2863 int propagate_last_id = 0;
2864 for (; j < c->argc; j++) {
2865 int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
2866 char *opt = c->argv[j]->ptr;
2867 if (!strcasecmp(opt,"FORCE")) {
2868 force = 1;
2869 } else if (!strcasecmp(opt,"JUSTID")) {
2870 justid = 1;
2871 } else if (!strcasecmp(opt,"IDLE") && moreargs) {
2872 j++;
2873 if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
2874 "Invalid IDLE option argument for XCLAIM")
2875 != C_OK0) goto cleanup;
2876 deliverytime = now - deliverytime;
2877 } else if (!strcasecmp(opt,"TIME") && moreargs) {
2878 j++;
2879 if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
2880 "Invalid TIME option argument for XCLAIM")
2881 != C_OK0) goto cleanup;
2882 } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) {
2883 j++;
2884 if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
2885 "Invalid RETRYCOUNT option argument for XCLAIM")
2886 != C_OK0) goto cleanup;
2887 } else if (!strcasecmp(opt,"LASTID") && moreargs) {
2888 j++;
2889 if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK0) goto cleanup;
2890 } else {
2891 addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
2892 goto cleanup;
2893 }
2894 }
2895
2896 if (streamCompareID(&last_id,&group->last_id) > 0) {
2897 group->last_id = last_id;
2898 propagate_last_id = 1;
2899 }
2900
2901 if (deliverytime != -1) {
2902 /* If a delivery time was passed, either with IDLE or TIME, we
2903 * do some sanity check on it, and set the deliverytime to now
2904 * (which is a sane choice usually) if the value is bogus.
2905 * To raise an error here is not wise because clients may compute
2906 * the idle time doing some math starting from their local time,
2907 * and this is not a good excuse to fail in case, for instance,
2908 * the computer time is a bit in the future from our POV. */
2909 if (deliverytime < 0 || deliverytime > now) deliverytime = now;
2910 } else {
2911 /* If no IDLE/TIME option was passed, we want the last delivery
2912 * time to be now, so that the idle time of the message will be
2913 * zero. */
2914 deliverytime = now;
2915 }
2916
2917 /* Do the actual claiming. */
2918 streamConsumer *consumer = NULL((void*)0);
2919 void *arraylenptr = addReplyDeferredLen(c);
2920 size_t arraylen = 0;
2921 for (int j = 5; j <= last_id_arg; j++) {
2922 streamID id = ids[j-5];
2923 unsigned char buf[sizeof(streamID)];
2924 streamEncodeID(buf,&id);
2925
2926 /* Lookup the ID in the group PEL. */
2927 streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
2928
2929 /* If FORCE is passed, let's check if at least the entry
2930 * exists in the Stream. In such case, we'll crate a new
2931 * entry in the PEL from scratch, so that XCLAIM can also
2932 * be used to create entries in the PEL. Useful for AOF
2933 * and replication of consumer groups. */
2934 if (force && nack == raxNotFound) {
2935 streamIterator myiterator;
2936 streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
2937 int64_t numfields;
2938 int found = 0;
2939 streamID item_id;
2940 if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
2941 streamIteratorStop(&myiterator);
2942
2943 /* Item must exist for us to create a NACK for it. */
2944 if (!found) continue;
2945
2946 /* Create the NACK. */
2947 nack = streamCreateNACK(NULL((void*)0));
2948 raxInsert(group->pel,buf,sizeof(buf),nack,NULL((void*)0));
2949 }
2950
2951 if (nack != raxNotFound) {
2952 /* We need to check if the minimum idle time requested
2953 * by the caller is satisfied by this entry.
2954 *
2955 * Note that the nack could be created by FORCE, in this
2956 * case there was no pre-existing entry and minidle should
2957 * be ignored, but in that case nack->consumer is NULL. */
2958 if (nack->consumer && minidle) {
2959 mstime_t this_idle = now - nack->delivery_time;
2960 if (this_idle < minidle) continue;
2961 }
2962 if (consumer == NULL((void*)0))
2963 consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE0,NULL((void*)0));
2964 if (nack->consumer != consumer) {
2965 /* Remove the entry from the old consumer.
2966 * Note that nack->consumer is NULL if we created the
2967 * NACK above because of the FORCE option. */
2968 if (nack->consumer)
2969 raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0));
2970 }
2971 nack->delivery_time = deliverytime;
2972 /* Set the delivery attempts counter if given, otherwise
2973 * autoincrement unless JUSTID option provided */
2974 if (retrycount >= 0) {
2975 nack->delivery_count = retrycount;
2976 } else if (!justid) {
2977 nack->delivery_count++;
2978 }
2979 if (nack->consumer != consumer) {
2980 /* Add the entry in the new consumer local PEL. */
2981 raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0));
2982 nack->consumer = consumer;
2983 }
2984 /* Send the reply for this entry. */
2985 if (justid) {
2986 addReplyStreamID(c,&id);
2987 } else {
2988 size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
2989 NULL((void*)0),NULL((void*)0),STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0));
2990 if (!emitted) addReplyNull(c);
2991 }
2992 arraylen++;
2993
2994 /* Propagate this change. */
2995 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
2996 propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
2997 server.dirty++;
2998 }
2999 }
3000 if (propagate_last_id) {
3001 streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
3002 server.dirty++;
3003 }
3004 setDeferredArrayLen(c,arraylenptr,arraylen);
3005 preventCommandPropagation(c);
3006cleanup:
3007 if (ids != static_ids) zfree(ids);
3008}
3009
3010/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
3011 *
3012 * Gets ownership of one or multiple messages in the Pending Entries List
3013 * of a given stream consumer group.
3014 *
3015 * For each PEL entry, if its idle time greater or equal to <min-idle-time>,
3016 * then the message new owner becomes the specified <consumer>.
3017 * If the minimum idle time specified is zero, messages are claimed
3018 * regardless of their idle time.
3019 *
3020 * This command creates the consumer as side effect if it does not yet
3021 * exists. Moreover the command reset the idle time of the message to 0.
3022 *
3023 * The command returns an array of messages that the user
3024 * successfully claimed, so that the caller is able to understand
3025 * what messages it is now in charge of. */
3026void xautoclaimCommand(client *c) {
3027 streamCG *group = NULL((void*)0);
3028 robj *o = lookupKeyRead(c->db,c->argv[1]);
3029 long long minidle; /* Minimum idle time argument, in milliseconds. */
3030 long count = 100; /* Maximum entries to claim. */
3031 streamID startid;
3032 int startex;
3033 int justid = 0;
3034
3035 /* Parse idle/start/end/count arguments ASAP if needed, in order to report
3036 * syntax errors before any other error. */
3037 if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK0)
3038 return;
3039 if (minidle < 0) minidle = 0;
3040
3041 if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK0)
3042 return;
3043 if (startex && streamIncrID(&startid) != C_OK0) {
3044 addReplyError(c,"invalid start ID for the interval");
3045 return;
3046 }
3047
3048 int j = 6; /* options start at argv[6] */
3049 while(j < c->argc) {
3050 int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
3051 char *opt = c->argv[j]->ptr;
3052 if (!strcasecmp(opt,"COUNT") && moreargs) {
3053 if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL((void*)0)) != C_OK0)
3054 return;
3055 if (count == 0) {
3056 addReplyError(c,"COUNT must be > 0");
3057 return;
3058 }
3059 j++;
3060 } else if (!strcasecmp(opt,"JUSTID")) {
3061 justid = 1;
3062 } else {
3063 addReplyErrorObject(c,shared.syntaxerr);
3064 return;
3065 }
3066 j++;
3067 }
3068
3069 if (o) {
3070 if (checkType(c,o,OBJ_STREAM6))
3071 return; /* Type error. */
3072 group = streamLookupCG(o->ptr,c->argv[2]->ptr);
3073 }
3074
3075 /* No key or group? Send an error given that the group creation
3076 * is mandatory. */
3077 if (o == NULL((void*)0) || group == NULL((void*)0)) {
3078 addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
3079 (char*)c->argv[1]->ptr,
3080 (char*)c->argv[2]->ptr);
3081 return;
3082 }
3083
3084 /* Do the actual claiming. */
3085 streamConsumer *consumer = NULL((void*)0);
3086 long long attempts = count*10;
3087
3088 addReplyArrayLen(c, 2);
3089 void *endidptr = addReplyDeferredLen(c);
3090 void *arraylenptr = addReplyDeferredLen(c);
3091
3092 unsigned char startkey[sizeof(streamID)];
3093 streamEncodeID(startkey,&startid);
3094 raxIterator ri;
3095 raxStart(&ri,group->pel);
3096 raxSeek(&ri,">=",startkey,sizeof(startkey));
3097 size_t arraylen = 0;
3098 mstime_t now = mstime();
3099 while (attempts-- && count && raxNext(&ri)) {
3100 streamNACK *nack = ri.data;
3101
3102 if (minidle) {
3103 mstime_t this_idle = now - nack->delivery_time;
3104 if (this_idle < minidle)
3105 continue;
3106 }
3107
3108 streamID id;
3109 streamDecodeID(ri.key, &id);
3110
3111 if (consumer == NULL((void*)0))
3112 consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE0,NULL((void*)0));
3113 if (nack->consumer != consumer) {
3114 /* Remove the entry from the old consumer.
3115 * Note that nack->consumer is NULL if we created the
3116 * NACK above because of the FORCE option. */
3117 if (nack->consumer)
3118 raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL((void*)0));
3119 }
3120
3121 /* Update the consumer and idle time. */
3122 nack->delivery_time = now;
3123 nack->delivery_count++;
3124
3125 if (nack->consumer != consumer) {
3126 /* Add the entry in the new consumer local PEL. */
3127 raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL((void*)0));
3128 nack->consumer = consumer;
3129 }
3130
3131 /* Send the reply for this entry. */
3132 if (justid) {
3133 addReplyStreamID(c,&id);
3134 } else {
3135 size_t emitted =
3136 streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL((void*)0),NULL((void*)0),
3137 STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0));
3138 if (!emitted)
3139 addReplyNull(c);
3140 }
3141 arraylen++;
3142 count--;
3143
3144 /* Propagate this change. */
3145 robj *idstr = createObjectFromStreamID(&id);
3146 streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
3147 decrRefCount(idstr);
3148 server.dirty++;
3149 }
3150
3151 streamID endid;
3152 if (raxEOF(&ri)) {
3153 endid.ms = endid.seq = 0;
3154 } else {
3155 streamDecodeID(ri.key, &endid);
3156 }
3157 raxStop(&ri);
3158
3159 setDeferredArrayLen(c,arraylenptr,arraylen);
3160 setDeferredReplyStreamID(c,endidptr,&endid);
3161
3162 preventCommandPropagation(c);
3163}
3164
3165/* XDEL <key> [<ID1> <ID2> ... <IDN>]
3166 *
3167 * Removes the specified entries from the stream. Returns the number
3168 * of items actually deleted, that may be different from the number
3169 * of IDs passed in case certain IDs do not exist. */
3170void xdelCommand(client *c) {
3171 robj *o;
3172
3173 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0)
3174 || checkType(c,o,OBJ_STREAM6)) return;
3175 stream *s = o->ptr;
3176
3177 /* We need to sanity check the IDs passed to start. Even if not
3178 * a big issue, it is not great that the command is only partially
3179 * executed because at some point an invalid ID is parsed. */
3180 streamID static_ids[STREAMID_STATIC_VECTOR_LEN8];
3181 streamID *ids = static_ids;
3182 int id_count = c->argc-2;
3183 if (id_count > STREAMID_STATIC_VECTOR_LEN8)
3184 ids = zmalloc(sizeof(streamID)*id_count);
3185 for (int j = 2; j < c->argc; j++) {
3186 if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK0) goto cleanup;
3187 }
3188
3189 /* Actually apply the command. */
3190 int deleted = 0;
3191 for (int j = 2; j < c->argc; j++) {
3192 deleted += streamDeleteItem(s,&ids[j-2]);
3193 }
3194
3195 /* Propagate the write if needed. */
3196 if (deleted) {
3197 signalModifiedKey(c,c->db,c->argv[1]);
3198 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xdel",c->argv[1],c->db->id);
3199 server.dirty += deleted;
3200 }
3201 addReplyLongLong(c,deleted);
3202cleanup:
3203 if (ids != static_ids) zfree(ids);
3204}
3205
3206/* General form: XTRIM <key> [... options ...]
3207 *
3208 * List of options:
3209 *
3210 * Trim strategies:
3211 *
3212 * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
3213 * the specified length. Use ~ before the
3214 * count in order to demand approximated trimming
3215 * (like XADD MAXLEN option).
3216 * MINID [~|=] <id> -- Trim so that the stream will not contain entries
3217 * with IDs smaller than 'id'. Use ~ before the
3218 * count in order to demand approximated trimming
3219 * (like XADD MINID option).
3220 *
3221 * Other options:
3222 *
3223 * LIMIT <entries> -- The maximum number of entries to trim.
3224 * 0 means unlimited. Unless specified, it is set
3225 * to a default of 100*server.stream_node_max_entries,
3226 * and that's in order to keep the trimming time sane.
3227 * Has meaning only if `~` was provided.
3228 */
3229void xtrimCommand(client *c) {
3230 robj *o;
3231
3232 /* Argument parsing. */
3233 streamAddTrimArgs parsed_args;
3234 if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1) < 0)
1
Taking false branch
3235 return; /* streamParseAddOrTrimArgsOrReply already replied. */
3236
3237 /* If the key does not exist, we are ok returning zero, that is, the
3238 * number of elements removed from the stream. */
3239 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0)
2
Assuming the condition is false
4
Taking false branch
3240 || checkType(c,o,OBJ_STREAM6)) return;
3
Assuming the condition is false
3241 stream *s = o->ptr;
3242
3243 /* Perform the trimming. */
3244 int64_t deleted = streamTrim(s, &parsed_args);
5
Calling 'streamTrim'
3245 if (deleted) {
3246 notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xtrim",c->argv[1],c->db->id);
3247 if (parsed_args.approx_trim) {
3248 /* In case our trimming was limited (by LIMIT or by ~) we must
3249 * re-write the relevant trim argument to make sure there will be
3250 * no inconsistencies in AOF loading or in the replica.
3251 * It's enough to check only args->approx because there is no
3252 * way LIMIT is given without the ~ option. */
3253 streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
3254 streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
3255 }
3256
3257 /* Propagate the write. */
3258 signalModifiedKey(c, c->db,c->argv[1]);
3259 server.dirty += deleted;
3260 }
3261 addReplyLongLong(c,deleted);
3262}
3263
3264/* Helper function for xinfoCommand.
3265 * Handles the variants of XINFO STREAM */
3266void xinfoReplyWithStreamInfo(client *c, stream *s) {
3267 int full = 1;
3268 long long count = 10; /* Default COUNT is 10 so we don't block the server */
3269 robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
3270 int optc = c->argc - 3;
3271
3272 /* Parse options. */
3273 if (optc == 0) {
3274 full = 0;
3275 } else {
3276 /* Valid options are [FULL] or [FULL COUNT <count>] */
3277 if (optc != 1 && optc != 3) {
3278 addReplySubcommandSyntaxError(c);
3279 return;
3280 }
3281
3282 /* First option must be "FULL" */
3283 if (strcasecmp(optv[0]->ptr,"full")) {
3284 addReplySubcommandSyntaxError(c);
3285 return;
3286 }
3287
3288 if (optc == 3) {
3289 /* First option must be "FULL" */
3290 if (strcasecmp(optv[1]->ptr,"count")) {
3291 addReplySubcommandSyntaxError(c);
3292 return;
3293 }
3294 if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL((void*)0)) == C_ERR-1)
3295 return;
3296 if (count < 0) count = 10;
3297 }
3298 }
3299
3300 addReplyMapLen(c,full ? 6 : 7);
3301 addReplyBulkCString(c,"length");
3302 addReplyLongLong(c,s->length);
3303 addReplyBulkCString(c,"radix-tree-keys");
3304 addReplyLongLong(c,raxSize(s->rax));
3305 addReplyBulkCString(c,"radix-tree-nodes");
3306 addReplyLongLong(c,s->rax->numnodes);
3307 addReplyBulkCString(c,"last-generated-id");
3308 addReplyStreamID(c,&s->last_id);
3309
3310 if (!full) {
3311 /* XINFO STREAM <key> */
3312
3313 addReplyBulkCString(c,"groups");
3314 addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
3315
3316 /* To emit the first/last entry we use streamReplyWithRange(). */
3317 int emitted;
3318 streamID start, end;
3319 start.ms = start.seq = 0;
3320 end.ms = end.seq = UINT64_MAX(18446744073709551615UL);
3321 addReplyBulkCString(c,"first-entry");
3322 emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL((void*)0),NULL((void*)0),
3323 STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0));
3324 if (!emitted) addReplyNull(c);
3325 addReplyBulkCString(c,"last-entry");
3326 emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL((void*)0),NULL((void*)0),
3327 STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0));
3328 if (!emitted) addReplyNull(c);
3329 } else {
3330 /* XINFO STREAM <key> FULL [COUNT <count>] */
3331
3332 /* Stream entries */
3333 addReplyBulkCString(c,"entries");
3334 streamReplyWithRange(c,s,NULL((void*)0),NULL((void*)0),count,0,NULL((void*)0),NULL((void*)0),0,NULL((void*)0));
3335
3336 /* Consumer groups */
3337 addReplyBulkCString(c,"groups");
3338 if (s->cgroups == NULL((void*)0)) {
3339 addReplyArrayLen(c,0);
3340 } else {
3341 addReplyArrayLen(c,raxSize(s->cgroups));
3342 raxIterator ri_cgroups;
3343 raxStart(&ri_cgroups,s->cgroups);
3344 raxSeek(&ri_cgroups,"^",NULL((void*)0),0);
3345 while(raxNext(&ri_cgroups)) {
3346 streamCG *cg = ri_cgroups.data;
3347 addReplyMapLen(c,5);
3348
3349 /* Name */
3350 addReplyBulkCString(c,"name");
3351 addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
3352
3353 /* Last delivered ID */
3354 addReplyBulkCString(c,"last-delivered-id");
3355 addReplyStreamID(c,&cg->last_id);
3356
3357 /* Group PEL count */
3358 addReplyBulkCString(c,"pel-count");
3359 addReplyLongLong(c,raxSize(cg->pel));
3360
3361 /* Group PEL */
3362 addReplyBulkCString(c,"pending");
3363 long long arraylen_cg_pel = 0;
3364 void *arrayptr_cg_pel = addReplyDeferredLen(c);
3365 raxIterator ri_cg_pel;
3366 raxStart(&ri_cg_pel,cg->pel);
3367 raxSeek(&ri_cg_pel,"^",NULL((void*)0),0);
3368 while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
3369 streamNACK *nack = ri_cg_pel.data;
3370 addReplyArrayLen(c,4);
3371
3372 /* Entry ID. */
3373 streamID id;
3374 streamDecodeID(ri_cg_pel.key,&id);
3375 addReplyStreamID(c,&id);
3376
3377 /* Consumer name. */
3378 serverAssert(nack->consumer)((nack->consumer)?(void)0 : (_serverAssert("nack->consumer"
,"t_stream.c",3378),__builtin_unreachable()))
; /* assertion for valgrind (avoid NPD) */
3379 addReplyBulkCBuffer(c,nack->consumer->name,
3380 sdslen(nack->consumer->name));
3381
3382 /* Last delivery. */
3383 addReplyLongLong(c,nack->delivery_time);
3384
3385 /* Number of deliveries. */
3386 addReplyLongLong(c,nack->delivery_count);
3387
3388 arraylen_cg_pel++;
3389 }
3390 setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
3391 raxStop(&ri_cg_pel);
3392
3393 /* Consumers */
3394 addReplyBulkCString(c,"consumers");
3395 addReplyArrayLen(c,raxSize(cg->consumers));
3396 raxIterator ri_consumers;
3397 raxStart(&ri_consumers,cg->consumers);
3398 raxSeek(&ri_consumers,"^",NULL((void*)0),0);
3399 while(raxNext(&ri_consumers)) {
3400 streamConsumer *consumer = ri_consumers.data;
3401 addReplyMapLen(c,4);
3402
3403 /* Consumer name */
3404 addReplyBulkCString(c,"name");
3405 addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
3406
3407 /* Seen-time */
3408 addReplyBulkCString(c,"seen-time");
3409 addReplyLongLong(c,consumer->seen_time);
3410
3411 /* Consumer PEL count */
3412 addReplyBulkCString(c,"pel-count");
3413 addReplyLongLong(c,raxSize(consumer->pel));
3414
3415 /* Consumer PEL */
3416 addReplyBulkCString(c,"pending");
3417 long long arraylen_cpel = 0;
3418 void *arrayptr_cpel = addReplyDeferredLen(c);
3419 raxIterator ri_cpel;
3420 raxStart(&ri_cpel,consumer->pel);
3421 raxSeek(&ri_cpel,"^",NULL((void*)0),0);
3422 while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
3423 streamNACK *nack = ri_cpel.data;
3424 addReplyArrayLen(c,3);
3425
3426 /* Entry ID. */
3427 streamID id;
3428 streamDecodeID(ri_cpel.key,&id);
3429 addReplyStreamID(c,&id);
3430
3431 /* Last delivery. */
3432 addReplyLongLong(c,nack->delivery_time);
3433
3434 /* Number of deliveries. */
3435 addReplyLongLong(c,nack->delivery_count);
3436
3437 arraylen_cpel++;
3438 }
3439 setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
3440 raxStop(&ri_cpel);
3441 }
3442 raxStop(&ri_consumers);
3443 }
3444 raxStop(&ri_cgroups);
3445 }
3446 }
3447}
3448
3449/* XINFO CONSUMERS <key> <group>
3450 * XINFO GROUPS <key>
3451 * XINFO STREAM <key> [FULL [COUNT <count>]]
3452 * XINFO HELP. */
3453void xinfoCommand(client *c) {
3454 stream *s = NULL((void*)0);
3455 char *opt;
3456 robj *key;
3457
3458 /* HELP is special. Handle it ASAP. */
3459 if (!strcasecmp(c->argv[1]->ptr,"HELP")) {
3460 const char *help[] = {
3461"CONSUMERS <key> <groupname>",
3462" Show consumers of <groupname>.",
3463"GROUPS <key>",
3464" Show the stream consumer groups.",
3465"STREAM <key> [FULL [COUNT <count>]",
3466" Show information about the stream.",
3467NULL((void*)0)
3468 };
3469 addReplyHelp(c, help);
3470 return;
3471 } else if (c->argc < 3) {
3472 addReplySubcommandSyntaxError(c);
3473 return;
3474 }
3475
3476 /* With the exception of HELP handled before any other sub commands, all
3477 * the ones are in the form of "<subcommand> <key>". */
3478 opt = c->argv[1]->ptr;
3479 key = c->argv[2];
3480
3481 /* Lookup the key now, this is common for all the subcommands but HELP. */
3482 robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr);
3483 if (o == NULL((void*)0) || checkType(c,o,OBJ_STREAM6)) return;
3484 s = o->ptr;
3485
3486 /* Dispatch the different subcommands. */
3487 if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) {
3488 /* XINFO CONSUMERS <key> <group>. */
3489 streamCG *cg = streamLookupCG(s,c->argv[3]->ptr);
3490 if (cg == NULL((void*)0)) {
3491 addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' "
3492 "for key name '%s'",
3493 (char*)c->argv[3]->ptr, (char*)key->ptr);
3494 return;
3495 }
3496
3497 addReplyArrayLen(c,raxSize(cg->consumers));
3498 raxIterator ri;
3499 raxStart(&ri,cg->consumers);
3500 raxSeek(&ri,"^",NULL((void*)0),0);
3501 mstime_t now = mstime();
3502 while(raxNext(&ri)) {
3503 streamConsumer *consumer = ri.data;
3504 mstime_t idle = now - consumer->seen_time;
3505 if (idle < 0) idle = 0;
3506
3507 addReplyMapLen(c,3);
3508 addReplyBulkCString(c,"name");
3509 addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
3510 addReplyBulkCString(c,"pending");
3511 addReplyLongLong(c,raxSize(consumer->pel));
3512 addReplyBulkCString(c,"idle");
3513 addReplyLongLong(c,idle);
3514 }
3515 raxStop(&ri);
3516 } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {
3517 /* XINFO GROUPS <key>. */
3518 if (s->cgroups == NULL((void*)0)) {
3519 addReplyArrayLen(c,0);
3520 return;
3521 }
3522
3523 addReplyArrayLen(c,raxSize(s->cgroups));
3524 raxIterator ri;
3525 raxStart(&ri,s->cgroups);
3526 raxSeek(&ri,"^",NULL((void*)0),0);
3527 while(raxNext(&ri)) {
3528 streamCG *cg = ri.data;
3529 addReplyMapLen(c,4);
3530 addReplyBulkCString(c,"name");
3531 addReplyBulkCBuffer(c,ri.key,ri.key_len);
3532 addReplyBulkCString(c,"consumers");
3533 addReplyLongLong(c,raxSize(cg->consumers));
3534 addReplyBulkCString(c,"pending");
3535 addReplyLongLong(c,raxSize(cg->pel));
3536 addReplyBulkCString(c,"last-delivered-id");
3537 addReplyStreamID(c,&cg->last_id);
3538 }
3539 raxStop(&ri);
3540 } else if (!strcasecmp(opt,"STREAM")) {
3541 /* XINFO STREAM <key> [FULL [COUNT <count>]]. */
3542 xinfoReplyWithStreamInfo(c,s);
3543 } else {
3544 addReplySubcommandSyntaxError(c);
3545 }
3546}
3547
3548/* Validate the integrity stream listpack entries stracture. Both in term of a
3549 * valid listpack, but also that the stracture of the entires matches a valid
3550 * stream. return 1 if valid 0 if not valid. */
3551int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) {
3552 int valid_record;
3553 unsigned char *p, *next;
3554
3555 /* Since we don't want to run validation of all records twice, we'll
3556 * run the listpack validation of just the header and do the rest here. */
3557 if (!lpValidateIntegrity(lp, size, 0))
3558 return 0;
3559
3560 /* In non-deep mode we just validated the listpack header (encoded size) */
3561 if (!deep) return 1;
3562
3563 next = p = lpFirst(lp);
3564 if (!lpValidateNext(lp, &next, size)) return 0;
3565 if (!p) return 0;
3566
3567 /* entry count */
3568 int64_t entry_count = lpGetIntegerIfValid(p, &valid_record);
3569 if (!valid_record) return 0;
3570 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3571
3572 /* deleted */
3573 int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record);
3574 if (!valid_record) return 0;
3575 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3576
3577 /* num-of-fields */
3578 int64_t master_fields = lpGetIntegerIfValid(p, &valid_record);
3579 if (!valid_record) return 0;
3580 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3581
3582 /* the field names */
3583 for (int64_t j = 0; j < master_fields; j++) {
3584 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3585 }
3586
3587 /* the zero master entry terminator. */
3588 int64_t zero = lpGetIntegerIfValid(p, &valid_record);
3589 if (!valid_record || zero != 0) return 0;
3590 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3591
3592 entry_count += deleted_count;
3593 while (entry_count--) {
3594 if (!p) return 0;
3595 int64_t fields = master_fields, extra_fields = 3;
3596 int64_t flags = lpGetIntegerIfValid(p, &valid_record);
3597 if (!valid_record) return 0;
3598 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3599
3600 /* entry id */
3601 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3602 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3603
3604 if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) {
3605 /* num-of-fields */
3606 fields = lpGetIntegerIfValid(p, &valid_record);
3607 if (!valid_record) return 0;
3608 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3609
3610 /* the field names */
3611 for (int64_t j = 0; j < fields; j++) {
3612 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3613 }
3614
3615 extra_fields += fields + 1;
3616 }
3617
3618 /* the values */
3619 for (int64_t j = 0; j < fields; j++) {
3620 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3621 }
3622
3623 /* lp-count */
3624 int64_t lp_count = lpGetIntegerIfValid(p, &valid_record);
3625 if (!valid_record) return 0;
3626 if (lp_count != fields + extra_fields) return 0;
3627 p = next; if (!lpValidateNext(lp, &next, size)) return 0;
3628 }
3629
3630 if (next)
3631 return 0;
3632
3633 return 1;
3634}