File: | src/t_stream.c |
Warning: | line 3601, column 9 Value stored to 'p' is never read |
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
1 | /* |
2 | * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | * All rights reserved. |
4 | * |
5 | * Redistribution and use in source and binary forms, with or without |
6 | * modification, are permitted provided that the following conditions are met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright notice, |
9 | * this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above copyright |
11 | * notice, this list of conditions and the following disclaimer in the |
12 | * documentation and/or other materials provided with the distribution. |
13 | * * Neither the name of Redis nor the names of its contributors may be used |
14 | * to endorse or promote products derived from this software without |
15 | * specific prior written permission. |
16 | * |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
27 | * POSSIBILITY OF SUCH DAMAGE. |
28 | */ |
29 | |
30 | #include "server.h" |
31 | #include "endianconv.h" |
32 | #include "stream.h" |
33 | |
34 | /* Every stream item inside the listpack, has a flags field that is used to |
35 | * mark the entry as deleted, or having the same field as the "master" |
36 | * entry at the start of the listpack> */ |
37 | #define STREAM_ITEM_FLAG_NONE0 0 /* No special flags. */ |
38 | #define STREAM_ITEM_FLAG_DELETED(1<<0) (1<<0) /* Entry is deleted. Skip it. */ |
39 | #define STREAM_ITEM_FLAG_SAMEFIELDS(1<<1) (1<<1) /* Same fields as master entry. */ |
40 | |
41 | /* For stream commands that require multiple IDs |
42 | * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', |
43 | * avoid malloc allocation.*/ |
44 | #define STREAMID_STATIC_VECTOR_LEN8 8 |
45 | |
46 | /* Max pre-allocation for listpack. This is done to avoid abuse of a user |
47 | * setting stream_node_max_bytes to a huge number. */ |
48 | #define STREAM_LISTPACK_MAX_PRE_ALLOCATE4096 4096 |
49 | |
50 | void streamFreeCG(streamCG *cg); |
51 | void streamFreeNACK(streamNACK *na); |
52 | size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); |
53 | int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); |
54 | int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); |
55 | |
56 | /* ----------------------------------------------------------------------- |
57 | * Low level stream encoding: a radix tree of listpacks. |
58 | * ----------------------------------------------------------------------- */ |
59 | |
60 | /* Create a new stream data structure. */ |
61 | stream *streamNew(void) { |
62 | stream *s = zmalloc(sizeof(*s)); |
63 | s->rax = raxNew(); |
64 | s->length = 0; |
65 | s->last_id.ms = 0; |
66 | s->last_id.seq = 0; |
67 | s->cgroups = NULL((void*)0); /* Created on demand to save memory when not used. */ |
68 | return s; |
69 | } |
70 | |
71 | /* Free a stream, including the listpacks stored inside the radix tree. */ |
72 | void freeStream(stream *s) { |
73 | raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); |
74 | if (s->cgroups) |
75 | raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); |
76 | zfree(s); |
77 | } |
78 | |
79 | /* Return the length of a stream. */ |
80 | unsigned long streamLength(const robj *subject) { |
81 | stream *s = subject->ptr; |
82 | return s->length; |
83 | } |
84 | |
85 | /* Set 'id' to be its successor stream ID. |
86 | * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a |
87 | * C_ERR is returned. */ |
88 | int streamIncrID(streamID *id) { |
89 | int ret = C_OK0; |
90 | if (id->seq == UINT64_MAX(18446744073709551615UL)) { |
91 | if (id->ms == UINT64_MAX(18446744073709551615UL)) { |
92 | /* Special case where 'id' is the last possible streamID... */ |
93 | id->ms = id->seq = 0; |
94 | ret = C_ERR-1; |
95 | } else { |
96 | id->ms++; |
97 | id->seq = 0; |
98 | } |
99 | } else { |
100 | id->seq++; |
101 | } |
102 | return ret; |
103 | } |
104 | |
105 | /* Set 'id' to be its predecessor stream ID. |
106 | * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is |
107 | * returned. */ |
108 | int streamDecrID(streamID *id) { |
109 | int ret = C_OK0; |
110 | if (id->seq == 0) { |
111 | if (id->ms == 0) { |
112 | /* Special case where 'id' is the first possible streamID... */ |
113 | id->ms = id->seq = UINT64_MAX(18446744073709551615UL); |
114 | ret = C_ERR-1; |
115 | } else { |
116 | id->ms--; |
117 | id->seq = UINT64_MAX(18446744073709551615UL); |
118 | } |
119 | } else { |
120 | id->seq--; |
121 | } |
122 | return ret; |
123 | } |
124 | |
125 | /* Generate the next stream item ID given the previous one. If the current |
126 | * milliseconds Unix time is greater than the previous one, just use this |
127 | * as time part and start with sequence part of zero. Otherwise we use the |
128 | * previous time (and never go backward) and increment the sequence. */ |
129 | void streamNextID(streamID *last_id, streamID *new_id) { |
130 | uint64_t ms = mstime(); |
131 | if (ms > last_id->ms) { |
132 | new_id->ms = ms; |
133 | new_id->seq = 0; |
134 | } else { |
135 | *new_id = *last_id; |
136 | streamIncrID(new_id); |
137 | } |
138 | } |
139 | |
140 | /* This is a helper function for the COPY command. |
141 | * Duplicate a Stream object, with the guarantee that the returned object |
142 | * has the same encoding as the original one. |
143 | * |
144 | * The resulting object always has refcount set to 1 */ |
145 | robj *streamDup(robj *o) { |
146 | robj *sobj; |
147 | |
148 | serverAssert(o->type == OBJ_STREAM)((o->type == 6)?(void)0 : (_serverAssert("o->type == OBJ_STREAM" ,"t_stream.c",148),__builtin_unreachable())); |
149 | |
150 | switch (o->encoding) { |
151 | case OBJ_ENCODING_STREAM10: |
152 | sobj = createStreamObject(); |
153 | break; |
154 | default: |
155 | serverPanic("Wrong encoding.")_serverPanic("t_stream.c",155,"Wrong encoding."),__builtin_unreachable (); |
156 | break; |
157 | } |
158 | |
159 | stream *s; |
160 | stream *new_s; |
161 | s = o->ptr; |
162 | new_s = sobj->ptr; |
163 | |
164 | raxIterator ri; |
165 | uint64_t rax_key[2]; |
166 | raxStart(&ri, s->rax); |
167 | raxSeek(&ri, "^", NULL((void*)0), 0); |
168 | size_t lp_bytes = 0; /* Total bytes in the listpack. */ |
169 | unsigned char *lp = NULL((void*)0); /* listpack pointer. */ |
170 | /* Get a reference to the listpack node. */ |
171 | while (raxNext(&ri)) { |
172 | lp = ri.data; |
173 | lp_bytes = lpBytes(lp); |
174 | unsigned char *new_lp = zmalloc(lp_bytes); |
175 | memcpy(new_lp, lp, lp_bytes); |
176 | memcpy(rax_key, ri.key, sizeof(rax_key)); |
177 | raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key), |
178 | new_lp, NULL((void*)0)); |
179 | } |
180 | new_s->length = s->length; |
181 | new_s->last_id = s->last_id; |
182 | raxStop(&ri); |
183 | |
184 | if (s->cgroups == NULL((void*)0)) return sobj; |
185 | |
186 | /* Consumer Groups */ |
187 | raxIterator ri_cgroups; |
188 | raxStart(&ri_cgroups, s->cgroups); |
189 | raxSeek(&ri_cgroups, "^", NULL((void*)0), 0); |
190 | while (raxNext(&ri_cgroups)) { |
191 | streamCG *cg = ri_cgroups.data; |
192 | streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, |
193 | ri_cgroups.key_len, &cg->last_id); |
194 | |
195 | serverAssert(new_cg != NULL)((new_cg != ((void*)0))?(void)0 : (_serverAssert("new_cg != NULL" ,"t_stream.c",195),__builtin_unreachable())); |
196 | |
197 | /* Consumer Group PEL */ |
198 | raxIterator ri_cg_pel; |
199 | raxStart(&ri_cg_pel,cg->pel); |
200 | raxSeek(&ri_cg_pel,"^",NULL((void*)0),0); |
201 | while(raxNext(&ri_cg_pel)){ |
202 | streamNACK *nack = ri_cg_pel.data; |
203 | streamNACK *new_nack = streamCreateNACK(NULL((void*)0)); |
204 | new_nack->delivery_time = nack->delivery_time; |
205 | new_nack->delivery_count = nack->delivery_count; |
206 | raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL((void*)0)); |
207 | } |
208 | raxStop(&ri_cg_pel); |
209 | |
210 | /* Consumers */ |
211 | raxIterator ri_consumers; |
212 | raxStart(&ri_consumers, cg->consumers); |
213 | raxSeek(&ri_consumers, "^", NULL((void*)0), 0); |
214 | while (raxNext(&ri_consumers)) { |
215 | streamConsumer *consumer = ri_consumers.data; |
216 | streamConsumer *new_consumer; |
217 | new_consumer = zmalloc(sizeof(*new_consumer)); |
218 | new_consumer->name = sdsdup(consumer->name); |
219 | new_consumer->pel = raxNew(); |
220 | raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, |
221 | sdslen(new_consumer->name), new_consumer, NULL((void*)0)); |
222 | new_consumer->seen_time = consumer->seen_time; |
223 | |
224 | /* Consumer PEL */ |
225 | raxIterator ri_cpel; |
226 | raxStart(&ri_cpel, consumer->pel); |
227 | raxSeek(&ri_cpel, "^", NULL((void*)0), 0); |
228 | while (raxNext(&ri_cpel)) { |
229 | streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID)); |
230 | |
231 | serverAssert(new_nack != raxNotFound)((new_nack != raxNotFound)?(void)0 : (_serverAssert("new_nack != raxNotFound" ,"t_stream.c",231),__builtin_unreachable())); |
232 | |
233 | new_nack->consumer = new_consumer; |
234 | raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL((void*)0)); |
235 | } |
236 | raxStop(&ri_cpel); |
237 | } |
238 | raxStop(&ri_consumers); |
239 | } |
240 | raxStop(&ri_cgroups); |
241 | return sobj; |
242 | } |
243 | |
244 | /* This is just a wrapper for lpAppend() to directly use a 64 bit integer |
245 | * instead of a string. */ |
246 | unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) { |
247 | char buf[LONG_STR_SIZE21]; |
248 | int slen = ll2string(buf,sizeof(buf),value); |
249 | return lpAppend(lp,(unsigned char*)buf,slen); |
250 | } |
251 | |
252 | /* This is just a wrapper for lpReplace() to directly use a 64 bit integer |
253 | * instead of a string to replace the current element. The function returns |
254 | * the new listpack as return value, and also updates the current cursor |
255 | * by updating '*pos'. */ |
256 | unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) { |
257 | char buf[LONG_STR_SIZE21]; |
258 | int slen = ll2string(buf,sizeof(buf),value); |
259 | return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE2, pos); |
260 | } |
261 | |
262 | /* This is a wrapper function for lpGet() to directly get an integer value |
263 | * from the listpack (that may store numbers as a string), converting |
264 | * the string if needed. |
265 | * The 'valid" argument is an optional output parameter to get an indication |
266 | * if the record was valid, when this parameter is NULL, the function will |
267 | * fail with an assertion. */ |
268 | static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) { |
269 | int64_t v; |
270 | unsigned char *e = lpGet(ele,&v,NULL((void*)0)); |
271 | if (e == NULL((void*)0)) { |
272 | if (valid) |
273 | *valid = 1; |
274 | return v; |
275 | } |
276 | /* The following code path should never be used for how listpacks work: |
277 | * they should always be able to store an int64_t value in integer |
278 | * encoded form. However the implementation may change. */ |
279 | long long ll; |
280 | int ret = string2ll((char*)e,v,&ll); |
281 | if (valid) |
282 | *valid = ret; |
283 | else |
284 | serverAssert(ret != 0)((ret != 0)?(void)0 : (_serverAssert("ret != 0","t_stream.c", 284),__builtin_unreachable())); |
285 | v = ll; |
286 | return v; |
287 | } |
288 | |
289 | #define lpGetInteger(ele)lpGetIntegerIfValid(ele, ((void*)0)) lpGetIntegerIfValid(ele, NULL((void*)0)) |
290 | |
291 | /* Get an edge streamID of a given listpack. |
292 | * 'master_id' is an input param, used to build the 'edge_id' output param */ |
293 | int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id) |
294 | { |
295 | if (lp == NULL((void*)0)) |
296 | return 0; |
297 | |
298 | unsigned char *lp_ele; |
299 | |
300 | /* We need to seek either the first or the last entry depending |
301 | * on the direction of the iteration. */ |
302 | if (first) { |
303 | /* Get the master fields count. */ |
304 | lp_ele = lpFirst(lp); /* Seek items count */ |
305 | lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */ |
306 | lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */ |
307 | int64_t master_fields_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); |
308 | lp_ele = lpNext(lp, lp_ele); /* Seek first field. */ |
309 | |
310 | /* If we are iterating in normal order, skip the master fields |
311 | * to seek the first actual entry. */ |
312 | for (int64_t i = 0; i < master_fields_count; i++) |
313 | lp_ele = lpNext(lp, lp_ele); |
314 | |
315 | /* If we are going forward, skip the previous entry's |
316 | * lp-count field (or in case of the master entry, the zero |
317 | * term field) */ |
318 | lp_ele = lpNext(lp, lp_ele); |
319 | if (lp_ele == NULL((void*)0)) |
320 | return 0; |
321 | } else { |
322 | /* If we are iterating in reverse direction, just seek the |
323 | * last part of the last entry in the listpack (that is, the |
324 | * fields count). */ |
325 | lp_ele = lpLast(lp); |
326 | |
327 | /* If we are going backward, read the number of elements this |
328 | * entry is composed of, and jump backward N times to seek |
329 | * its start. */ |
330 | int64_t lp_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); |
331 | if (lp_count == 0) /* We reached the master entry. */ |
332 | return 0; |
333 | |
334 | while (lp_count--) |
335 | lp_ele = lpPrev(lp, lp_ele); |
336 | } |
337 | |
338 | lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */ |
339 | |
340 | /* Get the ID: it is encoded as difference between the master |
341 | * ID and this entry ID. */ |
342 | streamID id = *master_id; |
343 | id.ms += lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); |
344 | lp_ele = lpNext(lp, lp_ele); |
345 | id.seq += lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); |
346 | *edge_id = id; |
347 | return 1; |
348 | } |
349 | |
350 | /* Debugging function to log the full content of a listpack. Useful |
351 | * for development and debugging. */ |
352 | void streamLogListpackContent(unsigned char *lp) { |
353 | unsigned char *p = lpFirst(lp); |
354 | while(p) { |
355 | unsigned char buf[LP_INTBUF_SIZE21]; |
356 | int64_t v; |
357 | unsigned char *ele = lpGet(p,&v,buf); |
358 | serverLog(LL_WARNING3,"- [%d] '%.*s'", (int)v, (int)v, ele); |
359 | p = lpNext(lp,p); |
360 | } |
361 | } |
362 | |
363 | /* Convert the specified stream entry ID as a 128 bit big endian number, so |
364 | * that the IDs can be sorted lexicographically. */ |
365 | void streamEncodeID(void *buf, streamID *id) { |
366 | uint64_t e[2]; |
367 | e[0] = htonu64(id->ms)intrev64(id->ms); |
368 | e[1] = htonu64(id->seq)intrev64(id->seq); |
369 | memcpy(buf,e,sizeof(e)); |
370 | } |
371 | |
372 | /* This is the reverse of streamEncodeID(): the decoded ID will be stored |
373 | * in the 'id' structure passed by reference. The buffer 'buf' must point |
374 | * to a 128 bit big-endian encoded ID. */ |
375 | void streamDecodeID(void *buf, streamID *id) { |
376 | uint64_t e[2]; |
377 | memcpy(e,buf,sizeof(e)); |
378 | id->ms = ntohu64(e[0])intrev64(e[0]); |
379 | id->seq = ntohu64(e[1])intrev64(e[1]); |
380 | } |
381 | |
382 | /* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ |
383 | int streamCompareID(streamID *a, streamID *b) { |
384 | if (a->ms > b->ms) return 1; |
385 | else if (a->ms < b->ms) return -1; |
386 | /* The ms part is the same. Check the sequence part. */ |
387 | else if (a->seq > b->seq) return 1; |
388 | else if (a->seq < b->seq) return -1; |
389 | /* Everything is the same: IDs are equal. */ |
390 | return 0; |
391 | } |
392 | |
393 | void streamGetEdgeID(stream *s, int first, streamID *edge_id) |
394 | { |
395 | raxIterator ri; |
396 | raxStart(&ri, s->rax); |
397 | int empty; |
398 | if (first) { |
399 | raxSeek(&ri, "^", NULL((void*)0), 0); |
400 | empty = !raxNext(&ri); |
401 | } else { |
402 | raxSeek(&ri, "$", NULL((void*)0), 0); |
403 | empty = !raxPrev(&ri); |
404 | } |
405 | |
406 | if (empty) { |
407 | /* Stream is empty, mark edge ID as lowest/highest possible. */ |
408 | edge_id->ms = first ? UINT64_MAX(18446744073709551615UL) : 0; |
409 | edge_id->seq = first ? UINT64_MAX(18446744073709551615UL) : 0; |
410 | raxStop(&ri); |
411 | return; |
412 | } |
413 | |
414 | unsigned char *lp = ri.data; |
415 | |
416 | /* Read the master ID from the radix tree key. */ |
417 | streamID master_id; |
418 | streamDecodeID(ri.key, &master_id); |
419 | |
420 | /* Construct edge ID. */ |
421 | lpGetEdgeStreamID(lp, first, &master_id, edge_id); |
422 | |
423 | raxStop(&ri); |
424 | } |
425 | |
426 | /* Adds a new item into the stream 's' having the specified number of |
427 | * field-value pairs as specified in 'numfields' and stored into 'argv'. |
428 | * Returns the new entry ID populating the 'added_id' structure. |
429 | * |
430 | * If 'use_id' is not NULL, the ID is not auto-generated by the function, |
431 | * but instead the passed ID is used to add the new entry. In this case |
432 | * adding the entry may fail as specified later in this comment. |
433 | * |
434 | * The function returns C_OK if the item was added, this is always true |
435 | * if the ID was generated by the function. However the function may return |
436 | * C_ERR if an ID was given via 'use_id', but adding it failed since the |
437 | * current top ID is greater or equal. */ |
438 | int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { |
439 | |
440 | /* Generate the new entry ID. */ |
441 | streamID id; |
442 | if (use_id) |
443 | id = *use_id; |
444 | else |
445 | streamNextID(&s->last_id,&id); |
446 | |
447 | /* Check that the new ID is greater than the last entry ID |
448 | * or return an error. Automatically generated IDs might |
449 | * overflow (and wrap-around) when incrementing the sequence |
450 | part. */ |
451 | if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR-1; |
452 | |
453 | /* Add the new entry. */ |
454 | raxIterator ri; |
455 | raxStart(&ri,s->rax); |
456 | raxSeek(&ri,"$",NULL((void*)0),0); |
457 | |
458 | size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ |
459 | unsigned char *lp = NULL((void*)0); /* Tail listpack pointer. */ |
460 | |
461 | /* Get a reference to the tail node listpack. */ |
462 | if (raxNext(&ri)) { |
463 | lp = ri.data; |
464 | lp_bytes = lpBytes(lp); |
465 | } |
466 | raxStop(&ri); |
467 | |
468 | /* We have to add the key into the radix tree in lexicographic order, |
469 | * to do so we consider the ID as a single 128 bit number written in |
470 | * big endian, so that the most significant bytes are the first ones. */ |
471 | uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ |
472 | streamID master_id; /* ID of the master entry in the listpack. */ |
473 | |
474 | /* Create a new listpack and radix tree node if needed. Note that when |
475 | * a new listpack is created, we populate it with a "master entry". This |
476 | * is just a set of fields that is taken as references in order to compress |
477 | * the stream entries that we'll add inside the listpack. |
478 | * |
479 | * Note that while we use the first added entry fields to create |
480 | * the master entry, the first added entry is NOT represented in the master |
481 | * entry, which is a stand alone object. But of course, the first entry |
482 | * will compress well because it's used as reference. |
483 | * |
484 | * The master entry is composed like in the following example: |
485 | * |
486 | * +-------+---------+------------+---------+--/--+---------+---------+-+ |
487 | * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| |
488 | * +-------+---------+------------+---------+--/--+---------+---------+-+ |
489 | * |
490 | * count and deleted just represent respectively the total number of |
491 | * entries inside the listpack that are valid, and marked as deleted |
492 | * (deleted flag in the entry flags set). So the total number of items |
493 | * actually inside the listpack (both deleted and not) is count+deleted. |
494 | * |
495 | * The real entries will be encoded with an ID that is just the |
496 | * millisecond and sequence difference compared to the key stored at |
497 | * the radix tree node containing the listpack (delta encoding), and |
498 | * if the fields of the entry are the same as the master entry fields, the |
499 | * entry flags will specify this fact and the entry fields and number |
500 | * of fields will be omitted (see later in the code of this function). |
501 | * |
502 | * The "0" entry at the end is the same as the 'lp-count' entry in the |
503 | * regular stream entries (see below), and marks the fact that there are |
504 | * no more entries, when we scan the stream from right to left. */ |
505 | |
506 | /* First of all, check if we can append to the current macro node or |
507 | * if we need to switch to the next one. 'lp' will be set to NULL if |
508 | * the current node is full. */ |
509 | if (lp != NULL((void*)0)) { |
510 | if (server.stream_node_max_bytes && |
511 | lp_bytes >= server.stream_node_max_bytes) |
512 | { |
513 | lp = NULL((void*)0); |
514 | } else if (server.stream_node_max_entries) { |
515 | unsigned char *lp_ele = lpFirst(lp); |
516 | /* Count both live entries and deleted ones. */ |
517 | int64_t count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)) + lpGetInteger(lpNext(lp,lp_ele))lpGetIntegerIfValid(lpNext(lp,lp_ele), ((void*)0)); |
518 | if (count >= server.stream_node_max_entries) { |
519 | /* Shrink extra pre-allocated memory */ |
520 | lp = lpShrinkToFit(lp); |
521 | if (ri.data != lp) |
522 | raxInsert(s->rax,ri.key,ri.key_len,lp,NULL((void*)0)); |
523 | lp = NULL((void*)0); |
524 | } |
525 | } |
526 | } |
527 | |
528 | int flags = STREAM_ITEM_FLAG_NONE0; |
529 | if (lp == NULL((void*)0)) { |
530 | master_id = id; |
531 | streamEncodeID(rax_key,&id); |
532 | /* Create the listpack having the master entry ID and fields. |
533 | * Pre-allocate some bytes when creating listpack to avoid realloc on |
534 | * every XADD. Since listpack.c uses malloc_size, it'll grow in steps, |
535 | * and won't realloc on every XADD. |
536 | * When listpack reaches max number of entries, we'll shrink the |
537 | * allocation to fit the data. */ |
538 | size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE4096; |
539 | if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) { |
540 | prealloc = server.stream_node_max_bytes; |
541 | } |
542 | lp = lpNew(prealloc); |
543 | lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ |
544 | lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ |
545 | lp = lpAppendInteger(lp,numfields); |
546 | for (int64_t i = 0; i < numfields; i++) { |
547 | sds field = argv[i*2]->ptr; |
548 | lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); |
549 | } |
550 | lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ |
551 | raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL((void*)0)); |
552 | /* The first entry we insert, has obviously the same fields of the |
553 | * master entry. */ |
554 | flags |= STREAM_ITEM_FLAG_SAMEFIELDS(1<<1); |
555 | } else { |
556 | serverAssert(ri.key_len == sizeof(rax_key))((ri.key_len == sizeof(rax_key))?(void)0 : (_serverAssert("ri.key_len == sizeof(rax_key)" ,"t_stream.c",556),__builtin_unreachable())); |
557 | memcpy(rax_key,ri.key,sizeof(rax_key)); |
558 | |
559 | /* Read the master ID from the radix tree key. */ |
560 | streamDecodeID(rax_key,&master_id); |
561 | unsigned char *lp_ele = lpFirst(lp); |
562 | |
563 | /* Update count and skip the deleted fields. */ |
564 | int64_t count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); |
565 | lp = lpReplaceInteger(lp,&lp_ele,count+1); |
566 | lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ |
567 | lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ |
568 | |
569 | /* Check if the entry we are adding, have the same fields |
570 | * as the master entry. */ |
571 | int64_t master_fields_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); |
572 | lp_ele = lpNext(lp,lp_ele); |
573 | if (numfields == master_fields_count) { |
574 | int64_t i; |
575 | for (i = 0; i < master_fields_count; i++) { |
576 | sds field = argv[i*2]->ptr; |
577 | int64_t e_len; |
578 | unsigned char buf[LP_INTBUF_SIZE21]; |
579 | unsigned char *e = lpGet(lp_ele,&e_len,buf); |
580 | /* Stop if there is a mismatch. */ |
581 | if (sdslen(field) != (size_t)e_len || |
582 | memcmp(e,field,e_len) != 0) break; |
583 | lp_ele = lpNext(lp,lp_ele); |
584 | } |
585 | /* All fields are the same! We can compress the field names |
586 | * setting a single bit in the flags. */ |
587 | if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS(1<<1); |
588 | } |
589 | } |
590 | |
591 | /* Populate the listpack with the new entry. We use the following |
592 | * encoding: |
593 | * |
594 | * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ |
595 | * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| |
596 | * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ |
597 | * |
598 | * However if the SAMEFIELD flag is set, we have just to populate |
599 | * the entry with the values, so it becomes: |
600 | * |
601 | * +-----+--------+-------+-/-+-------+--------+ |
602 | * |flags|entry-id|value-1|...|value-N|lp-count| |
603 | * +-----+--------+-------+-/-+-------+--------+ |
604 | * |
605 | * The entry-id field is actually two separated fields: the ms |
606 | * and seq difference compared to the master entry. |
607 | * |
608 | * The lp-count field is a number that states the number of listpack pieces |
609 | * that compose the entry, so that it's possible to travel the entry |
610 | * in reverse order: we can just start from the end of the listpack, read |
611 | * the entry, and jump back N times to seek the "flags" field to read |
612 | * the stream full entry. */ |
613 | lp = lpAppendInteger(lp,flags); |
614 | lp = lpAppendInteger(lp,id.ms - master_id.ms); |
615 | lp = lpAppendInteger(lp,id.seq - master_id.seq); |
616 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) |
617 | lp = lpAppendInteger(lp,numfields); |
618 | for (int64_t i = 0; i < numfields; i++) { |
619 | sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; |
620 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) |
621 | lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); |
622 | lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); |
623 | } |
624 | /* Compute and store the lp-count field. */ |
625 | int64_t lp_count = numfields; |
626 | lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */ |
627 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) { |
628 | /* If the item is not compressed, it also has the fields other than |
629 | * the values, and an additional num-fileds field. */ |
630 | lp_count += numfields+1; |
631 | } |
632 | lp = lpAppendInteger(lp,lp_count); |
633 | |
634 | /* Insert back into the tree in order to update the listpack pointer. */ |
635 | if (ri.data != lp) |
636 | raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL((void*)0)); |
637 | s->length++; |
638 | s->last_id = id; |
639 | if (added_id) *added_id = id; |
640 | return C_OK0; |
641 | } |
642 | |
643 | typedef struct { |
644 | /* XADD options */ |
645 | streamID id; /* User-provided ID, for XADD only. */ |
646 | int id_given; /* Was an ID different than "*" specified? for XADD only. */ |
647 | int no_mkstream; /* if set to 1 do not create new stream */ |
648 | |
649 | /* XADD + XTRIM common options */ |
650 | int trim_strategy; /* TRIM_STRATEGY_* */ |
651 | int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ |
652 | int approx_trim; /* If 1 only delete whole radix tree nodes, so |
653 | * the trim argument is not applied verbatim. */ |
654 | long long limit; /* Maximum amount of entries to trim. If 0, no limitation |
655 | * on the amount of trimming work is enforced. */ |
656 | /* TRIM_STRATEGY_MAXLEN options */ |
657 | long long maxlen; /* After trimming, leave stream at this length . */ |
658 | /* TRIM_STRATEGY_MINID options */ |
659 | streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ |
660 | } streamAddTrimArgs; |
661 | |
662 | #define TRIM_STRATEGY_NONE0 0 |
663 | #define TRIM_STRATEGY_MAXLEN1 1 |
664 | #define TRIM_STRATEGY_MINID2 2 |
665 | |
666 | /* Trim the stream 's' according to args->trim_strategy, and return the |
667 | * number of elements removed from the stream. The 'approx' option, if non-zero, |
668 | * specifies that the trimming must be performed in a approximated way in |
669 | * order to maximize performances. This means that the stream may contain |
670 | * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen' |
671 | * in case of MAXLEN), and elements are only removed if we can remove |
672 | * a *whole* node of the radix tree. The elements are removed from the head |
673 | * of the stream (older elements). |
674 | * |
675 | * The function may return zero if: |
676 | * |
677 | * 1) The minimal entry ID of the stream is already < 'id' (MINID); or |
678 | * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or |
679 | * 3) The 'approx' option is true and the head node did not have enough elements |
680 | * to be deleted. |
681 | * |
682 | * args->limit is the maximum number of entries to delete. The purpose is to |
683 | * prevent this function from taking to long. |
684 | * If 'limit' is 0 then we do not limit the number of deleted entries. |
685 | * Much like the 'approx', if 'limit' is smaller than the number of entries |
686 | * that should be trimmed, there is a chance we will still have entries with |
687 | * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). |
688 | */ |
689 | int64_t streamTrim(stream *s, streamAddTrimArgs *args) { |
690 | size_t maxlen = args->maxlen; |
691 | streamID *id = &args->minid; |
692 | int approx = args->approx_trim; |
693 | int64_t limit = args->limit; |
694 | int trim_strategy = args->trim_strategy; |
695 | |
696 | if (trim_strategy == TRIM_STRATEGY_NONE0) |
697 | return 0; |
698 | |
699 | raxIterator ri; |
700 | raxStart(&ri,s->rax); |
701 | raxSeek(&ri,"^",NULL((void*)0),0); |
702 | |
703 | int64_t deleted = 0; |
704 | while (raxNext(&ri)) { |
705 | /* Check if we exceeded the amount of work we could do */ |
706 | if (limit && deleted >= limit) |
707 | break; |
708 | |
709 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1 && s->length <= maxlen) |
710 | break; |
711 | |
712 | unsigned char *lp = ri.data, *p = lpFirst(lp); |
713 | int64_t entries = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
714 | |
715 | /* Check if we can remove the whole node. */ |
716 | int remove_node; |
717 | streamID master_id = {0}; /* For MINID */ |
718 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1) { |
719 | remove_node = s->length - entries >= maxlen; |
720 | } else { |
721 | /* Read the master ID from the radix tree key. */ |
722 | streamDecodeID(ri.key, &master_id); |
723 | |
724 | /* Read last ID. */ |
725 | streamID last_id; |
726 | lpGetEdgeStreamID(lp, 0, &master_id, &last_id); |
727 | |
728 | /* We can remove the entire node id its last ID < 'id' */ |
729 | remove_node = streamCompareID(&last_id, id) < 0; |
730 | } |
731 | |
732 | if (remove_node) { |
733 | lpFree(lp); |
734 | raxRemove(s->rax,ri.key,ri.key_len,NULL((void*)0)); |
735 | raxSeek(&ri,">=",ri.key,ri.key_len); |
736 | s->length -= entries; |
737 | deleted += entries; |
738 | continue; |
739 | } |
740 | |
741 | /* If we cannot remove a whole element, and approx is true, |
742 | * stop here. */ |
743 | if (approx) break; |
744 | |
745 | /* Now we have to trim entries from within 'lp' */ |
746 | int64_t deleted_from_lp = 0; |
747 | |
748 | p = lpNext(lp, p); /* Skip deleted field. */ |
749 | p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */ |
750 | |
751 | /* Skip all the master fields. */ |
752 | int64_t master_fields_count = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
753 | p = lpNext(lp,p); /* Skip the first field. */ |
754 | for (int64_t j = 0; j < master_fields_count; j++) |
755 | p = lpNext(lp,p); /* Skip all master fields. */ |
756 | p = lpNext(lp,p); /* Skip the zero master entry terminator. */ |
757 | |
758 | /* 'p' is now pointing to the first entry inside the listpack. |
759 | * We have to run entry after entry, marking entries as deleted |
760 | * if they are already not deleted. */ |
761 | while (p) { |
762 | /* We keep a copy of p (which point to flags part) in order to |
763 | * update it after (and if) we actually remove the entry */ |
764 | unsigned char *pcopy = p; |
765 | |
766 | int flags = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
767 | p = lpNext(lp, p); /* Skip flags. */ |
768 | int to_skip; |
769 | |
770 | int ms_delta = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
771 | p = lpNext(lp, p); /* Skip ID ms delta */ |
772 | int seq_delta = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
773 | p = lpNext(lp, p); /* Skip ID seq delta */ |
774 | |
775 | streamID currid = {0}; /* For MINID */ |
776 | if (trim_strategy == TRIM_STRATEGY_MINID2) { |
777 | currid.ms = master_id.ms + ms_delta; |
778 | currid.seq = master_id.seq + seq_delta; |
779 | } |
780 | |
781 | int stop; |
782 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1) { |
783 | stop = s->length <= maxlen; |
784 | } else { |
785 | /* Following IDs will definitely be greater because the rax |
786 | * tree is sorted, no point of continuing. */ |
787 | stop = streamCompareID(&currid, id) >= 0; |
788 | } |
789 | if (stop) |
790 | break; |
791 | |
792 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) { |
793 | to_skip = master_fields_count; |
794 | } else { |
795 | to_skip = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); /* Get num-fields. */ |
796 | p = lpNext(lp,p); /* Skip num-fields. */ |
797 | to_skip *= 2; /* Fields and values. */ |
798 | } |
799 | |
800 | while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ |
801 | p = lpNext(lp,p); /* Skip the final lp-count field. */ |
802 | |
803 | /* Mark the entry as deleted. */ |
804 | if (!(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) { |
805 | intptr_t delta = p - lp; |
806 | flags |= STREAM_ITEM_FLAG_DELETED(1<<0); |
807 | lp = lpReplaceInteger(lp, &pcopy, flags); |
808 | deleted_from_lp++; |
809 | s->length--; |
810 | p = lp + delta; |
811 | } |
812 | } |
813 | deleted += deleted_from_lp; |
814 | |
815 | /* Now we the entries/deleted counters. */ |
816 | p = lpFirst(lp); |
817 | lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); |
818 | p = lpNext(lp,p); /* Skip deleted field. */ |
819 | int64_t marked_deleted = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
820 | lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp); |
821 | p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */ |
822 | |
823 | /* Here we should perform garbage collection in case at this point |
824 | * there are too many entries deleted inside the listpack. */ |
825 | entries -= deleted_from_lp; |
826 | marked_deleted += deleted_from_lp; |
827 | if (entries + marked_deleted > 10 && marked_deleted > entries/2) { |
828 | /* TODO: perform a garbage collection. */ |
829 | } |
830 | |
831 | /* Update the listpack with the new pointer. */ |
832 | raxInsert(s->rax,ri.key,ri.key_len,lp,NULL((void*)0)); |
833 | |
834 | break; /* If we are here, there was enough to delete in the current |
835 | node, so no need to go to the next node. */ |
836 | } |
837 | |
838 | raxStop(&ri); |
839 | return deleted; |
840 | } |
841 | |
842 | /* Trims a stream by length. Returns the number of deleted items. */ |
843 | int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { |
844 | streamAddTrimArgs args = { |
845 | .trim_strategy = TRIM_STRATEGY_MAXLEN1, |
846 | .approx_trim = approx, |
847 | .limit = approx ? 100 * server.stream_node_max_entries : 0, |
848 | .maxlen = maxlen |
849 | }; |
850 | return streamTrim(s, &args); |
851 | } |
852 | |
853 | /* Trims a stream by minimum ID. Returns the number of deleted items. */ |
854 | int64_t streamTrimByID(stream *s, streamID minid, int approx) { |
855 | streamAddTrimArgs args = { |
856 | .trim_strategy = TRIM_STRATEGY_MINID2, |
857 | .approx_trim = approx, |
858 | .limit = approx ? 100 * server.stream_node_max_entries : 0, |
859 | .minid = minid |
860 | }; |
861 | return streamTrim(s, &args); |
862 | } |
863 | |
864 | /* Parse the arguements of XADD/XTRIM. |
865 | * |
866 | * See streamAddTrimArgs for more details about the arguments handled. |
867 | * |
868 | * This function returns the position of the ID argument (relevant only to XADD). |
869 | * On error -1 is returned and a reply is sent. */ |
870 | static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) { |
871 | /* Initialize arguments to defaults */ |
872 | memset(args, 0, sizeof(*args)); |
873 | |
874 | /* Parse options. */ |
875 | int i = 2; /* This is the first argument position where we could |
876 | find an option, or the ID. */ |
877 | int limit_given = 0; |
878 | for (; i < c->argc; i++) { |
879 | int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ |
880 | char *opt = c->argv[i]->ptr; |
881 | if (xadd && opt[0] == '*' && opt[1] == '\0') { |
882 | /* This is just a fast path for the common case of auto-ID |
883 | * creation. */ |
884 | break; |
885 | } else if (!strcasecmp(opt,"maxlen") && moreargs) { |
886 | if (args->trim_strategy != TRIM_STRATEGY_NONE0) { |
887 | addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); |
888 | return -1; |
889 | } |
890 | args->approx_trim = 0; |
891 | char *next = c->argv[i+1]->ptr; |
892 | /* Check for the form MAXLEN ~ <count>. */ |
893 | if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { |
894 | args->approx_trim = 1; |
895 | i++; |
896 | } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { |
897 | i++; |
898 | } |
899 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL((void*)0)) |
900 | != C_OK0) return -1; |
901 | |
902 | if (args->maxlen < 0) { |
903 | addReplyError(c,"The MAXLEN argument must be >= 0."); |
904 | return -1; |
905 | } |
906 | i++; |
907 | args->trim_strategy = TRIM_STRATEGY_MAXLEN1; |
908 | args->trim_strategy_arg_idx = i; |
909 | } else if (!strcasecmp(opt,"minid") && moreargs) { |
910 | if (args->trim_strategy != TRIM_STRATEGY_NONE0) { |
911 | addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); |
912 | return -1; |
913 | } |
914 | args->approx_trim = 0; |
915 | char *next = c->argv[i+1]->ptr; |
916 | /* Check for the form MINID ~ <id>|<age>. */ |
917 | if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { |
918 | args->approx_trim = 1; |
919 | i++; |
920 | } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { |
921 | i++; |
922 | } |
923 | |
924 | if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK0) |
925 | return -1; |
926 | |
927 | i++; |
928 | args->trim_strategy = TRIM_STRATEGY_MINID2; |
929 | args->trim_strategy_arg_idx = i; |
930 | } else if (!strcasecmp(opt,"limit") && moreargs) { |
931 | /* Note about LIMIT: If it was not provided by the caller we set |
932 | * it to 100*server.stream_node_max_entries, and that's to prevent the |
933 | * trimming from taking too long, on the expense of not deleting entries |
934 | * that should be trimmed. |
935 | * If user wanted exact trimming (i.e. no '~') we never limit the number |
936 | * of trimmed entries */ |
937 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL((void*)0)) != C_OK0) |
938 | return -1; |
939 | |
940 | if (args->limit < 0) { |
941 | addReplyError(c,"The LIMIT argument must be >= 0."); |
942 | return -1; |
943 | } |
944 | limit_given = 1; |
945 | i++; |
946 | } else if (xadd && !strcasecmp(opt,"nomkstream")) { |
947 | args->no_mkstream = 1; |
948 | } else if (xadd) { |
949 | /* If we are here is a syntax error or a valid ID. */ |
950 | if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK0) |
951 | return -1; |
952 | args->id_given = 1; |
953 | break; |
954 | } else { |
955 | addReplyErrorObject(c,shared.syntaxerr); |
956 | return -1; |
957 | } |
958 | } |
959 | |
960 | if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE0) { |
961 | addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy"); |
962 | return -1; |
963 | } |
964 | |
965 | if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE0) { |
966 | addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy"); |
967 | return -1; |
968 | } |
969 | |
970 | if (c == server.master || c->id == CLIENT_ID_AOF((18446744073709551615UL))) { |
971 | /* If command cam from master or from AOF we must not enforce maxnodes |
972 | * (The maxlen/minid argument was re-written to make sure there's no |
973 | * inconsistency). */ |
974 | args->limit = 0; |
975 | } else { |
976 | /* We need to set the limit (only if we got '~') */ |
977 | if (limit_given) { |
978 | if (!args->approx_trim) { |
979 | /* LIMIT was provided without ~ */ |
980 | addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option"); |
981 | return -1; |
982 | } |
983 | } else { |
984 | /* User didn't provide LIMIT, we must set it. */ |
985 | |
986 | if (args->approx_trim) { |
987 | /* In order to prevent from trimming to do too much work and cause |
988 | * latency spikes we limit the amount of work it can do */ |
989 | args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ |
990 | } else { |
991 | /* No LIMIT for exact trimming */ |
992 | args->limit = 0; |
993 | } |
994 | } |
995 | } |
996 | |
997 | return i; |
998 | } |
999 | |
1000 | /* Initialize the stream iterator, so that we can call iterating functions |
1001 | * to get the next items. This requires a corresponding streamIteratorStop() |
1002 | * at the end. The 'rev' parameter controls the direction. If it's zero the |
1003 | * iteration is from the start to the end element (inclusive), otherwise |
1004 | * if rev is non-zero, the iteration is reversed. |
1005 | * |
1006 | * Once the iterator is initialized, we iterate like this: |
1007 | * |
1008 | * streamIterator myiterator; |
1009 | * streamIteratorStart(&myiterator,...); |
1010 | * int64_t numfields; |
1011 | * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { |
1012 | * while(numfields--) { |
1013 | * unsigned char *key, *value; |
1014 | * size_t key_len, value_len; |
1015 | * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); |
1016 | * |
1017 | * ... do what you want with key and value ... |
1018 | * } |
1019 | * } |
1020 | * streamIteratorStop(&myiterator); */ |
1021 | void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { |
1022 | /* Initialize the iterator and translates the iteration start/stop |
1023 | * elements into a 128 big big-endian number. */ |
1024 | if (start) { |
1025 | streamEncodeID(si->start_key,start); |
1026 | } else { |
1027 | si->start_key[0] = 0; |
1028 | si->start_key[1] = 0; |
1029 | } |
1030 | |
1031 | if (end) { |
1032 | streamEncodeID(si->end_key,end); |
1033 | } else { |
1034 | si->end_key[0] = UINT64_MAX(18446744073709551615UL); |
1035 | si->end_key[1] = UINT64_MAX(18446744073709551615UL); |
1036 | } |
1037 | |
1038 | /* Seek the correct node in the radix tree. */ |
1039 | raxStart(&si->ri,s->rax); |
1040 | if (!rev) { |
1041 | if (start && (start->ms || start->seq)) { |
1042 | raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, |
1043 | sizeof(si->start_key)); |
1044 | if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL((void*)0),0); |
1045 | } else { |
1046 | raxSeek(&si->ri,"^",NULL((void*)0),0); |
1047 | } |
1048 | } else { |
1049 | if (end && (end->ms || end->seq)) { |
1050 | raxSeek(&si->ri,"<=",(unsigned char*)si->end_key, |
1051 | sizeof(si->end_key)); |
1052 | if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL((void*)0),0); |
1053 | } else { |
1054 | raxSeek(&si->ri,"$",NULL((void*)0),0); |
1055 | } |
1056 | } |
1057 | si->stream = s; |
1058 | si->lp = NULL((void*)0); /* There is no current listpack right now. */ |
1059 | si->lp_ele = NULL((void*)0); /* Current listpack cursor. */ |
1060 | si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ |
1061 | } |
1062 | |
1063 | /* Return 1 and store the current item ID at 'id' if there are still |
1064 | * elements within the iteration range, otherwise return 0 in order to |
1065 | * signal the iteration terminated. */ |
1066 | int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { |
1067 | while(1) { /* Will stop when element > stop_key or end of radix tree. */ |
1068 | /* If the current listpack is set to NULL, this is the start of the |
1069 | * iteration or the previous listpack was completely iterated. |
1070 | * Go to the next node. */ |
1071 | if (si->lp == NULL((void*)0) || si->lp_ele == NULL((void*)0)) { |
1072 | if (!si->rev && !raxNext(&si->ri)) return 0; |
1073 | else if (si->rev && !raxPrev(&si->ri)) return 0; |
1074 | serverAssert(si->ri.key_len == sizeof(streamID))((si->ri.key_len == sizeof(streamID))?(void)0 : (_serverAssert ("si->ri.key_len == sizeof(streamID)","t_stream.c",1074),__builtin_unreachable ())); |
1075 | /* Get the master ID. */ |
1076 | streamDecodeID(si->ri.key,&si->master_id); |
1077 | /* Get the master fields count. */ |
1078 | si->lp = si->ri.data; |
1079 | si->lp_ele = lpFirst(si->lp); /* Seek items count */ |
1080 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ |
1081 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ |
1082 | si->master_fields_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1083 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ |
1084 | si->master_fields_start = si->lp_ele; |
1085 | /* We are now pointing to the first field of the master entry. |
1086 | * We need to seek either the first or the last entry depending |
1087 | * on the direction of the iteration. */ |
1088 | if (!si->rev) { |
1089 | /* If we are iterating in normal order, skip the master fields |
1090 | * to seek the first actual entry. */ |
1091 | for (uint64_t i = 0; i < si->master_fields_count; i++) |
1092 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1093 | } else { |
1094 | /* If we are iterating in reverse direction, just seek the |
1095 | * last part of the last entry in the listpack (that is, the |
1096 | * fields count). */ |
1097 | si->lp_ele = lpLast(si->lp); |
1098 | } |
1099 | } else if (si->rev) { |
1100 | /* If we are iterating in the reverse order, and this is not |
1101 | * the first entry emitted for this listpack, then we already |
1102 | * emitted the current entry, and have to go back to the previous |
1103 | * one. */ |
1104 | int lp_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1105 | while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1106 | /* Seek lp-count of prev entry. */ |
1107 | si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1108 | } |
1109 | |
1110 | /* For every radix tree node, iterate the corresponding listpack, |
1111 | * returning elements when they are within range. */ |
1112 | while(1) { |
1113 | if (!si->rev) { |
1114 | /* If we are going forward, skip the previous entry |
1115 | * lp-count field (or in case of the master entry, the zero |
1116 | * term field) */ |
1117 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1118 | if (si->lp_ele == NULL((void*)0)) break; |
1119 | } else { |
1120 | /* If we are going backward, read the number of elements this |
1121 | * entry is composed of, and jump backward N times to seek |
1122 | * its start. */ |
1123 | int64_t lp_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1124 | if (lp_count == 0) { /* We reached the master entry. */ |
1125 | si->lp = NULL((void*)0); |
1126 | si->lp_ele = NULL((void*)0); |
1127 | break; |
1128 | } |
1129 | while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1130 | } |
1131 | |
1132 | /* Get the flags entry. */ |
1133 | si->lp_flags = si->lp_ele; |
1134 | int flags = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1135 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ |
1136 | |
1137 | /* Get the ID: it is encoded as difference between the master |
1138 | * ID and this entry ID. */ |
1139 | *id = si->master_id; |
1140 | id->ms += lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1141 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1142 | id->seq += lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1143 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1144 | unsigned char buf[sizeof(streamID)]; |
1145 | streamEncodeID(buf,id); |
1146 | |
1147 | /* The number of entries is here or not depending on the |
1148 | * flags. */ |
1149 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) { |
1150 | *numfields = si->master_fields_count; |
1151 | } else { |
1152 | *numfields = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); |
1153 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1154 | } |
1155 | serverAssert(*numfields>=0)((*numfields>=0)?(void)0 : (_serverAssert("*numfields>=0" ,"t_stream.c",1155),__builtin_unreachable())); |
1156 | |
1157 | /* If current >= start, and the entry is not marked as |
1158 | * deleted, emit it. */ |
1159 | if (!si->rev) { |
1160 | if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && |
1161 | !(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) |
1162 | { |
1163 | if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) |
1164 | return 0; /* We are already out of range. */ |
1165 | si->entry_flags = flags; |
1166 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) |
1167 | si->master_fields_ptr = si->master_fields_start; |
1168 | return 1; /* Valid item returned. */ |
1169 | } |
1170 | } else { |
1171 | if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && |
1172 | !(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) |
1173 | { |
1174 | if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) |
1175 | return 0; /* We are already out of range. */ |
1176 | si->entry_flags = flags; |
1177 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) |
1178 | si->master_fields_ptr = si->master_fields_start; |
1179 | return 1; /* Valid item returned. */ |
1180 | } |
1181 | } |
1182 | |
1183 | /* If we do not emit, we have to discard if we are going |
1184 | * forward, or seek the previous entry if we are going |
1185 | * backward. */ |
1186 | if (!si->rev) { |
1187 | int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) ? |
1188 | *numfields : *numfields*2; |
1189 | for (int64_t i = 0; i < to_discard; i++) |
1190 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1191 | } else { |
1192 | int64_t prev_times = 4; /* flag + id ms + id seq + one more to |
1193 | go back to the previous entry "count" |
1194 | field. */ |
1195 | /* If the entry was not flagged SAMEFIELD we also read the |
1196 | * number of fields, so go back one more. */ |
1197 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) prev_times++; |
1198 | while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele); |
1199 | } |
1200 | } |
1201 | |
1202 | /* End of listpack reached. Try the next/prev radix tree node. */ |
1203 | } |
1204 | } |
1205 | |
1206 | /* Get the field and value of the current item we are iterating. This should |
1207 | * be called immediately after streamIteratorGetID(), and for each field |
1208 | * according to the number of fields returned by streamIteratorGetID(). |
1209 | * The function populates the field and value pointers and the corresponding |
1210 | * lengths by reference, that are valid until the next iterator call, assuming |
1211 | * no one touches the stream meanwhile. */ |
1212 | void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { |
1213 | if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) { |
1214 | *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); |
1215 | si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); |
1216 | } else { |
1217 | *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); |
1218 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1219 | } |
1220 | *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); |
1221 | si->lp_ele = lpNext(si->lp,si->lp_ele); |
1222 | } |
1223 | |
1224 | /* Remove the current entry from the stream: can be called after the |
1225 | * GetID() API or after any GetField() call, however we need to iterate |
1226 | * a valid entry while calling this function. Moreover the function |
1227 | * requires the entry ID we are currently iterating, that was previously |
1228 | * returned by GetID(). |
1229 | * |
1230 | * Note that after calling this function, next calls to GetField() can't |
1231 | * be performed: the entry is now deleted. Instead the iterator will |
1232 | * automatically re-seek to the next entry, so the caller should continue |
1233 | * with GetID(). */ |
1234 | void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { |
1235 | unsigned char *lp = si->lp; |
1236 | int64_t aux; |
1237 | |
1238 | /* We do not really delete the entry here. Instead we mark it as |
1239 | * deleted flagging it, and also incrementing the count of the |
1240 | * deleted entries in the listpack header. |
1241 | * |
1242 | * We start flagging: */ |
1243 | int flags = lpGetInteger(si->lp_flags)lpGetIntegerIfValid(si->lp_flags, ((void*)0)); |
1244 | flags |= STREAM_ITEM_FLAG_DELETED(1<<0); |
1245 | lp = lpReplaceInteger(lp,&si->lp_flags,flags); |
1246 | |
1247 | /* Change the valid/deleted entries count in the master entry. */ |
1248 | unsigned char *p = lpFirst(lp); |
1249 | aux = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
1250 | |
1251 | if (aux == 1) { |
1252 | /* If this is the last element in the listpack, we can remove the whole |
1253 | * node. */ |
1254 | lpFree(lp); |
1255 | raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL((void*)0)); |
1256 | } else { |
1257 | /* In the base case we alter the counters of valid/deleted entries. */ |
1258 | lp = lpReplaceInteger(lp,&p,aux-1); |
1259 | p = lpNext(lp,p); /* Seek deleted field. */ |
1260 | aux = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); |
1261 | lp = lpReplaceInteger(lp,&p,aux+1); |
1262 | |
1263 | /* Update the listpack with the new pointer. */ |
1264 | if (si->lp != lp) |
1265 | raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL((void*)0)); |
1266 | } |
1267 | |
1268 | /* Update the number of entries counter. */ |
1269 | si->stream->length--; |
1270 | |
1271 | /* Re-seek the iterator to fix the now messed up state. */ |
1272 | streamID start, end; |
1273 | if (si->rev) { |
1274 | streamDecodeID(si->start_key,&start); |
1275 | end = *current; |
1276 | } else { |
1277 | start = *current; |
1278 | streamDecodeID(si->end_key,&end); |
1279 | } |
1280 | streamIteratorStop(si); |
1281 | streamIteratorStart(si,si->stream,&start,&end,si->rev); |
1282 | |
1283 | /* TODO: perform a garbage collection here if the ration between |
1284 | * deleted and valid goes over a certain limit. */ |
1285 | } |
1286 | |
1287 | /* Stop the stream iterator. The only cleanup we need is to free the rax |
1288 | * iterator, since the stream iterator itself is supposed to be stack |
1289 | * allocated. */ |
1290 | void streamIteratorStop(streamIterator *si) { |
1291 | raxStop(&si->ri); |
1292 | } |
1293 | |
1294 | /* Delete the specified item ID from the stream, returning 1 if the item |
1295 | * was deleted 0 otherwise (if it does not exist). */ |
1296 | int streamDeleteItem(stream *s, streamID *id) { |
1297 | int deleted = 0; |
1298 | streamIterator si; |
1299 | streamIteratorStart(&si,s,id,id,0); |
1300 | streamID myid; |
1301 | int64_t numfields; |
1302 | if (streamIteratorGetID(&si,&myid,&numfields)) { |
1303 | streamIteratorRemoveEntry(&si,&myid); |
1304 | deleted = 1; |
1305 | } |
1306 | streamIteratorStop(&si); |
1307 | return deleted; |
1308 | } |
1309 | |
1310 | /* Get the last valid (non-tombstone) streamID of 's'. */ |
1311 | void streamLastValidID(stream *s, streamID *maxid) |
1312 | { |
1313 | streamIterator si; |
1314 | streamIteratorStart(&si,s,NULL((void*)0),NULL((void*)0),1); |
1315 | int64_t numfields; |
1316 | streamIteratorGetID(&si,maxid,&numfields); |
1317 | streamIteratorStop(&si); |
1318 | } |
1319 | |
1320 | /* Emit a reply in the client output buffer by formatting a Stream ID |
1321 | * in the standard <ms>-<seq> format, using the simple string protocol |
1322 | * of REPL. */ |
1323 | void addReplyStreamID(client *c, streamID *id) { |
1324 | sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); |
1325 | addReplyBulkSds(c,replyid); |
1326 | } |
1327 | |
1328 | void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { |
1329 | sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); |
1330 | setDeferredReplyBulkSds(c, dr, replyid); |
1331 | } |
1332 | |
1333 | /* Similar to the above function, but just creates an object, usually useful |
1334 | * for replication purposes to create arguments. */ |
1335 | robj *createObjectFromStreamID(streamID *id) { |
1336 | return createObject(OBJ_STRING0, sdscatfmt(sdsempty(),"%U-%U", |
1337 | id->ms,id->seq)); |
1338 | } |
1339 | |
1340 | /* As a result of an explicit XCLAIM or XREADGROUP command, new entries |
1341 | * are created in the pending list of the stream and consumers. We need |
1342 | * to propagate this changes in the form of XCLAIM commands. */ |
1343 | void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { |
1344 | /* We need to generate an XCLAIM that will work in a idempotent fashion: |
1345 | * |
1346 | * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> |
1347 | * RETRYCOUNT <count> FORCE JUSTID LASTID <id>. |
1348 | * |
1349 | * Note that JUSTID is useful in order to avoid that XCLAIM will do |
1350 | * useless work in the slave side, trying to fetch the stream item. */ |
1351 | robj *argv[14]; |
1352 | argv[0] = shared.xclaim; |
1353 | argv[1] = key; |
1354 | argv[2] = groupname; |
1355 | argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); |
1356 | argv[4] = shared.integers[0]; |
1357 | argv[5] = id; |
1358 | argv[6] = shared.time; |
1359 | argv[7] = createStringObjectFromLongLong(nack->delivery_time); |
1360 | argv[8] = shared.retrycount; |
1361 | argv[9] = createStringObjectFromLongLong(nack->delivery_count); |
1362 | argv[10] = shared.force; |
1363 | argv[11] = shared.justid; |
1364 | argv[12] = shared.lastid; |
1365 | argv[13] = createObjectFromStreamID(&group->last_id); |
1366 | |
1367 | /* We use progagate() because this code path is not always called from |
1368 | * the command execution context. Moreover this will just alter the |
1369 | * consumer group state, and we don't need MULTI/EXEC wrapping because |
1370 | * there is no message state cross-message atomicity required. */ |
1371 | propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF1|PROPAGATE_REPL2); |
1372 | decrRefCount(argv[3]); |
1373 | decrRefCount(argv[7]); |
1374 | decrRefCount(argv[9]); |
1375 | decrRefCount(argv[13]); |
1376 | } |
1377 | |
1378 | /* We need this when we want to propoagate the new last-id of a consumer group |
1379 | * that was consumed by XREADGROUP with the NOACK option: in that case we can't |
1380 | * propagate the last ID just using the XCLAIM LASTID option, so we emit |
1381 | * |
1382 | * XGROUP SETID <key> <groupname> <id> |
1383 | */ |
1384 | void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { |
1385 | robj *argv[5]; |
1386 | argv[0] = shared.xgroup; |
1387 | argv[1] = shared.setid; |
1388 | argv[2] = key; |
1389 | argv[3] = groupname; |
1390 | argv[4] = createObjectFromStreamID(&group->last_id); |
1391 | |
1392 | /* We use progagate() because this code path is not always called from |
1393 | * the command execution context. Moreover this will just alter the |
1394 | * consumer group state, and we don't need MULTI/EXEC wrapping because |
1395 | * there is no message state cross-message atomicity required. */ |
1396 | propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF1|PROPAGATE_REPL2); |
1397 | decrRefCount(argv[4]); |
1398 | } |
1399 | |
1400 | /* We need this when we want to propagate creation of consumer that was created |
1401 | * by XREADGROUP with the NOACK option. In that case, the only way to create |
1402 | * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) |
1403 | * |
1404 | * XGROUP CREATECONSUMER <key> <groupname> <consumername> |
1405 | */ |
1406 | void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { |
1407 | robj *argv[5]; |
1408 | argv[0] = shared.xgroup; |
1409 | argv[1] = shared.createconsumer; |
1410 | argv[2] = key; |
1411 | argv[3] = groupname; |
1412 | argv[4] = createObject(OBJ_STRING0,sdsdup(consumername)); |
1413 | |
1414 | /* We use progagate() because this code path is not always called from |
1415 | * the command execution context. Moreover this will just alter the |
1416 | * consumer group state, and we don't need MULTI/EXEC wrapping because |
1417 | * there is no message state cross-message atomicity required. */ |
1418 | propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF1|PROPAGATE_REPL2); |
1419 | decrRefCount(argv[4]); |
1420 | } |
1421 | |
1422 | /* Send the stream items in the specified range to the client 'c'. The range |
1423 | * the client will receive is between start and end inclusive, if 'count' is |
1424 | * non zero, no more than 'count' elements are sent. |
1425 | * |
1426 | * The 'end' pointer can be NULL to mean that we want all the elements from |
1427 | * 'start' till the end of the stream. If 'rev' is non zero, elements are |
1428 | * produced in reversed order from end to start. |
1429 | * |
1430 | * The function returns the number of entries emitted. |
1431 | * |
1432 | * If group and consumer are not NULL, the function performs additional work: |
1433 | * 1. It updates the last delivered ID in the group in case we are |
1434 | * sending IDs greater than the current last ID. |
1435 | * 2. If the requested IDs are already assigned to some other consumer, the |
1436 | * function will not return it to the client. |
1437 | * 3. An entry in the pending list will be created for every entry delivered |
1438 | * for the first time to this consumer. |
1439 | * |
1440 | * The behavior may be modified passing non-zero flags: |
1441 | * |
1442 | * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above |
1443 | * is not performed. |
1444 | * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, |
1445 | * and return the number of entries emitted as usually. |
1446 | * This is used when the function is just used in order |
1447 | * to emit data and there is some higher level logic. |
1448 | * |
1449 | * The final argument 'spi' (stream propagation info pointer) is a structure |
1450 | * filled with information needed to propagate the command execution to AOF |
1451 | * and slaves, in the case a consumer group was passed: we need to generate |
1452 | * XCLAIM commands to create the pending list into AOF/slaves in that case. |
1453 | * |
1454 | * If 'spi' is set to NULL no propagation will happen even if the group was |
1455 | * given, but currently such a feature is never used by the code base that |
1456 | * will always pass 'spi' and propagate when a group is passed. |
1457 | * |
1458 | * Note that this function is recursive in certain cases. When it's called |
1459 | * with a non NULL group and consumer argument, it may call |
1460 | * streamReplyWithRangeFromConsumerPEL() in order to get entries from the |
1461 | * consumer pending entries list. However such a function will then call |
1462 | * streamReplyWithRange() in order to emit single entries (found in the |
1463 | * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES |
1464 | * flag. |
1465 | */ |
1466 | #define STREAM_RWR_NOACK(1<<0) (1<<0) /* Do not create entries in the PEL. */ |
1467 | #define STREAM_RWR_RAWENTRIES(1<<1) (1<<1) /* Do not emit protocol for array |
1468 | boundaries, just the entries. */ |
1469 | #define STREAM_RWR_HISTORY(1<<2) (1<<2) /* Only serve consumer local PEL. */ |
1470 | size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { |
1471 | void *arraylen_ptr = NULL((void*)0); |
1472 | size_t arraylen = 0; |
1473 | streamIterator si; |
1474 | int64_t numfields; |
1475 | streamID id; |
1476 | int propagate_last_id = 0; |
1477 | int noack = flags & STREAM_RWR_NOACK(1<<0); |
1478 | |
1479 | /* If the client is asking for some history, we serve it using a |
1480 | * different function, so that we return entries *solely* from its |
1481 | * own PEL. This ensures each consumer will always and only see |
1482 | * the history of messages delivered to it and not yet confirmed |
1483 | * as delivered. */ |
1484 | if (group && (flags & STREAM_RWR_HISTORY(1<<2))) { |
1485 | return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, |
1486 | consumer); |
1487 | } |
1488 | |
1489 | if (!(flags & STREAM_RWR_RAWENTRIES(1<<1))) |
1490 | arraylen_ptr = addReplyDeferredLen(c); |
1491 | streamIteratorStart(&si,s,start,end,rev); |
1492 | while(streamIteratorGetID(&si,&id,&numfields)) { |
1493 | /* Update the group last_id if needed. */ |
1494 | if (group && streamCompareID(&id,&group->last_id) > 0) { |
1495 | group->last_id = id; |
1496 | /* Group last ID should be propagated only if NOACK was |
1497 | * specified, otherwise the last id will be included |
1498 | * in the propagation of XCLAIM itself. */ |
1499 | if (noack) propagate_last_id = 1; |
1500 | } |
1501 | |
1502 | /* Emit a two elements array for each item. The first is |
1503 | * the ID, the second is an array of field-value pairs. */ |
1504 | addReplyArrayLen(c,2); |
1505 | addReplyStreamID(c,&id); |
1506 | |
1507 | addReplyArrayLen(c,numfields*2); |
1508 | |
1509 | /* Emit the field-value pairs. */ |
1510 | while(numfields--) { |
1511 | unsigned char *key, *value; |
1512 | int64_t key_len, value_len; |
1513 | streamIteratorGetField(&si,&key,&value,&key_len,&value_len); |
1514 | addReplyBulkCBuffer(c,key,key_len); |
1515 | addReplyBulkCBuffer(c,value,value_len); |
1516 | } |
1517 | |
1518 | /* If a group is passed, we need to create an entry in the |
1519 | * PEL (pending entries list) of this group *and* this consumer. |
1520 | * |
1521 | * Note that we cannot be sure about the fact the message is not |
1522 | * already owned by another consumer, because the admin is able |
1523 | * to change the consumer group last delivered ID using the |
1524 | * XGROUP SETID command. So if we find that there is already |
1525 | * a NACK for the entry, we need to associate it to the new |
1526 | * consumer. */ |
1527 | if (group && !noack) { |
1528 | unsigned char buf[sizeof(streamID)]; |
1529 | streamEncodeID(buf,&id); |
1530 | |
1531 | /* Try to add a new NACK. Most of the time this will work and |
1532 | * will not require extra lookups. We'll fix the problem later |
1533 | * if we find that there is already a entry for this ID. */ |
1534 | streamNACK *nack = streamCreateNACK(consumer); |
1535 | int group_inserted = |
1536 | raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL((void*)0)); |
1537 | int consumer_inserted = |
1538 | raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0)); |
1539 | |
1540 | /* Now we can check if the entry was already busy, and |
1541 | * in that case reassign the entry to the new consumer, |
1542 | * or update it if the consumer is the same as before. */ |
1543 | if (group_inserted == 0) { |
1544 | streamFreeNACK(nack); |
1545 | nack = raxFind(group->pel,buf,sizeof(buf)); |
1546 | serverAssert(nack != raxNotFound)((nack != raxNotFound)?(void)0 : (_serverAssert("nack != raxNotFound" ,"t_stream.c",1546),__builtin_unreachable())); |
1547 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0)); |
1548 | /* Update the consumer and NACK metadata. */ |
1549 | nack->consumer = consumer; |
1550 | nack->delivery_time = mstime(); |
1551 | nack->delivery_count = 1; |
1552 | /* Add the entry in the new consumer local PEL. */ |
1553 | raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0)); |
1554 | } else if (group_inserted == 1 && consumer_inserted == 0) { |
1555 | serverPanic("NACK half-created. Should not be possible.")_serverPanic("t_stream.c",1555,"NACK half-created. Should not be possible." ),__builtin_unreachable(); |
1556 | } |
1557 | |
1558 | /* Propagate as XCLAIM. */ |
1559 | if (spi) { |
1560 | robj *idarg = createObjectFromStreamID(&id); |
1561 | streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); |
1562 | decrRefCount(idarg); |
1563 | } |
1564 | } |
1565 | |
1566 | arraylen++; |
1567 | if (count && count == arraylen) break; |
1568 | } |
1569 | |
1570 | if (spi && propagate_last_id) |
1571 | streamPropagateGroupID(c,spi->keyname,group,spi->groupname); |
1572 | |
1573 | streamIteratorStop(&si); |
1574 | if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); |
1575 | return arraylen; |
1576 | } |
1577 | |
1578 | /* This is an helper function for streamReplyWithRange() when called with |
1579 | * group and consumer arguments, but with a range that is referring to already |
1580 | * delivered messages. In this case we just emit messages that are already |
1581 | * in the history of the consumer, fetching the IDs from its PEL. |
1582 | * |
1583 | * Note that this function does not have a 'rev' argument because it's not |
1584 | * possible to iterate in reverse using a group. Basically this function |
1585 | * is only called as a result of the XREADGROUP command. |
1586 | * |
1587 | * This function is more expensive because it needs to inspect the PEL and then |
1588 | * seek into the radix tree of the messages in order to emit the full message |
1589 | * to the client. However clients only reach this code path when they are |
1590 | * fetching the history of already retrieved messages, which is rare. */ |
1591 | size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { |
1592 | raxIterator ri; |
1593 | unsigned char startkey[sizeof(streamID)]; |
1594 | unsigned char endkey[sizeof(streamID)]; |
1595 | streamEncodeID(startkey,start); |
1596 | if (end) streamEncodeID(endkey,end); |
1597 | |
1598 | size_t arraylen = 0; |
1599 | void *arraylen_ptr = addReplyDeferredLen(c); |
1600 | raxStart(&ri,consumer->pel); |
1601 | raxSeek(&ri,">=",startkey,sizeof(startkey)); |
1602 | while(raxNext(&ri) && (!count || arraylen < count)) { |
1603 | if (end && memcmp(ri.key,end,ri.key_len) > 0) break; |
1604 | streamID thisid; |
1605 | streamDecodeID(ri.key,&thisid); |
1606 | if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL((void*)0),NULL((void*)0), |
1607 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)) == 0) |
1608 | { |
1609 | /* Note that we may have a not acknowledged entry in the PEL |
1610 | * about a message that's no longer here because was removed |
1611 | * by the user by other means. In that case we signal it emitting |
1612 | * the ID but then a NULL entry for the fields. */ |
1613 | addReplyArrayLen(c,2); |
1614 | addReplyStreamID(c,&thisid); |
1615 | addReplyNullArray(c); |
1616 | } else { |
1617 | streamNACK *nack = ri.data; |
1618 | nack->delivery_time = mstime(); |
1619 | nack->delivery_count++; |
1620 | } |
1621 | arraylen++; |
1622 | } |
1623 | raxStop(&ri); |
1624 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
1625 | return arraylen; |
1626 | } |
1627 | |
1628 | /* ----------------------------------------------------------------------- |
1629 | * Stream commands implementation |
1630 | * ----------------------------------------------------------------------- */ |
1631 | |
1632 | /* Look the stream at 'key' and return the corresponding stream object. |
1633 | * The function creates a key setting it to an empty stream if needed. */ |
1634 | robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { |
1635 | robj *o = lookupKeyWrite(c->db,key); |
1636 | if (checkType(c,o,OBJ_STREAM6)) return NULL((void*)0); |
1637 | if (o == NULL((void*)0)) { |
1638 | if (no_create) { |
1639 | addReplyNull(c); |
1640 | return NULL((void*)0); |
1641 | } |
1642 | o = createStreamObject(); |
1643 | dbAdd(c->db,key,o); |
1644 | } |
1645 | return o; |
1646 | } |
1647 | |
1648 | /* Parse a stream ID in the format given by clients to Redis, that is |
1649 | * <ms>-<seq>, and converts it into a streamID structure. If |
1650 | * the specified ID is invalid C_ERR is returned and an error is reported |
1651 | * to the client, otherwise C_OK is returned. The ID may be in incomplete |
1652 | * form, just stating the milliseconds time part of the stream. In such a case |
1653 | * the missing part is set according to the value of 'missing_seq' parameter. |
1654 | * |
1655 | * The IDs "-" and "+" specify respectively the minimum and maximum IDs |
1656 | * that can be represented. If 'strict' is set to 1, "-" and "+" will be |
1657 | * treated as an invalid ID. |
1658 | * |
1659 | * If 'c' is set to NULL, no reply is sent to the client. */ |
1660 | int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) { |
1661 | char buf[128]; |
1662 | if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; |
1663 | memcpy(buf,o->ptr,sdslen(o->ptr)+1); |
1664 | |
1665 | if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') |
1666 | goto invalid; |
1667 | |
1668 | /* Handle the "-" and "+" special cases. */ |
1669 | if (buf[0] == '-' && buf[1] == '\0') { |
1670 | id->ms = 0; |
1671 | id->seq = 0; |
1672 | return C_OK0; |
1673 | } else if (buf[0] == '+' && buf[1] == '\0') { |
1674 | id->ms = UINT64_MAX(18446744073709551615UL); |
1675 | id->seq = UINT64_MAX(18446744073709551615UL); |
1676 | return C_OK0; |
1677 | } |
1678 | |
1679 | /* Parse <ms>-<seq> form. */ |
1680 | char *dot = strchr(buf,'-'); |
1681 | if (dot) *dot = '\0'; |
1682 | unsigned long long ms, seq; |
1683 | if (string2ull(buf,&ms) == 0) goto invalid; |
1684 | if (dot && string2ull(dot+1,&seq) == 0) goto invalid; |
1685 | if (!dot) seq = missing_seq; |
1686 | id->ms = ms; |
1687 | id->seq = seq; |
1688 | return C_OK0; |
1689 | |
1690 | invalid: |
1691 | if (c) addReplyError(c,"Invalid stream ID specified as stream " |
1692 | "command argument"); |
1693 | return C_ERR-1; |
1694 | } |
1695 | |
1696 | /* Wrapper for streamGenericParseIDOrReply() used by module API. */ |
1697 | int streamParseID(const robj *o, streamID *id) { |
1698 | return streamGenericParseIDOrReply(NULL((void*)0), o, id, 0, 0); |
1699 | } |
1700 | |
1701 | /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to |
1702 | * 0, to be used when - and + are acceptable IDs. */ |
1703 | int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { |
1704 | return streamGenericParseIDOrReply(c,o,id,missing_seq,0); |
1705 | } |
1706 | |
1707 | /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to |
1708 | * 1, to be used when we want to return an error if the special IDs + or - |
1709 | * are provided. */ |
1710 | int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { |
1711 | return streamGenericParseIDOrReply(c,o,id,missing_seq,1); |
1712 | } |
1713 | |
1714 | /* Helper for parsing a stream ID that is a range query interval. When the |
1715 | * exclude argument is NULL, streamParseIDOrReply() is called and the interval |
1716 | * is treated as close (inclusive). Otherwise, the exclude argument is set if |
1717 | * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is |
1718 | * called in that case. |
1719 | */ |
1720 | int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) { |
1721 | char *p = o->ptr; |
1722 | size_t len = sdslen(p); |
1723 | int invalid = 0; |
1724 | |
1725 | if (exclude != NULL((void*)0)) *exclude = (len > 1 && p[0] == '('); |
1726 | if (exclude != NULL((void*)0) && *exclude) { |
1727 | robj *t = createStringObject(p+1,len-1); |
1728 | invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR-1); |
1729 | decrRefCount(t); |
1730 | } else |
1731 | invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR-1); |
1732 | if (invalid) |
1733 | return C_ERR-1; |
1734 | return C_OK0; |
1735 | } |
1736 | |
1737 | void streamRewriteApproxSpecifier(client *c, int idx) { |
1738 | rewriteClientCommandArgument(c,idx,shared.special_equals); |
1739 | } |
1740 | |
1741 | /* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream> |
1742 | * otherwise trimming is no longer deterministic on replicas / AOF. */ |
1743 | void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { |
1744 | robj *arg; |
1745 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1) { |
1746 | arg = createStringObjectFromLongLong(s->length); |
1747 | } else { |
1748 | streamID first_id; |
1749 | streamGetEdgeID(s, 1, &first_id); |
1750 | arg = createObjectFromStreamID(&first_id); |
1751 | } |
1752 | |
1753 | rewriteClientCommandArgument(c,idx,arg); |
1754 | decrRefCount(arg); |
1755 | } |
1756 | |
1757 | /* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */ |
1758 | void xaddCommand(client *c) { |
1759 | /* Parse options. */ |
1760 | streamAddTrimArgs parsed_args; |
1761 | int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1); |
1762 | if (idpos < 0) |
1763 | return; /* streamParseAddOrTrimArgsOrReply already replied. */ |
1764 | int field_pos = idpos+1; /* The ID is always one argument before the first field */ |
1765 | |
1766 | /* Check arity. */ |
1767 | if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { |
1768 | addReplyError(c,"wrong number of arguments for XADD"); |
1769 | return; |
1770 | } |
1771 | |
1772 | /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating |
1773 | * a new stream and have streamAppendItem fail, leaving an empty key in the |
1774 | * database. */ |
1775 | if (parsed_args.id_given && |
1776 | parsed_args.id.ms == 0 && parsed_args.id.seq == 0) |
1777 | { |
1778 | addReplyError(c,"The ID specified in XADD must be greater than 0-0"); |
1779 | return; |
1780 | } |
1781 | |
1782 | /* Lookup the stream at key. */ |
1783 | robj *o; |
1784 | stream *s; |
1785 | if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL((void*)0)) return; |
1786 | s = o->ptr; |
1787 | |
1788 | /* Return ASAP if the stream has reached the last possible ID */ |
1789 | if (s->last_id.ms == UINT64_MAX(18446744073709551615UL) && s->last_id.seq == UINT64_MAX(18446744073709551615UL)) { |
1790 | addReplyError(c,"The stream has exhausted the last possible ID, " |
1791 | "unable to add more items"); |
1792 | return; |
1793 | } |
1794 | |
1795 | /* Append using the low level function and return the ID. */ |
1796 | streamID id; |
1797 | if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, |
1798 | &id, parsed_args.id_given ? &parsed_args.id : NULL((void*)0)) |
1799 | == C_ERR-1) |
1800 | { |
1801 | addReplyError(c,"The ID specified in XADD is equal or smaller than the " |
1802 | "target stream top item"); |
1803 | return; |
1804 | } |
1805 | addReplyStreamID(c,&id); |
1806 | |
1807 | signalModifiedKey(c,c->db,c->argv[1]); |
1808 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xadd",c->argv[1],c->db->id); |
1809 | server.dirty++; |
1810 | |
1811 | /* Trim if needed. */ |
1812 | if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE0) { |
1813 | if (streamTrim(s, &parsed_args)) { |
1814 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xtrim",c->argv[1],c->db->id); |
1815 | } |
1816 | if (parsed_args.approx_trim) { |
1817 | /* In case our trimming was limited (by LIMIT or by ~) we must |
1818 | * re-write the relevant trim argument to make sure there will be |
1819 | * no inconsistencies in AOF loading or in the replica. |
1820 | * It's enough to check only args->approx because there is no |
1821 | * way LIMIT is given without the ~ option. */ |
1822 | streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); |
1823 | streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); |
1824 | } |
1825 | } |
1826 | |
1827 | /* Let's rewrite the ID argument with the one actually generated for |
1828 | * AOF/replication propagation. */ |
1829 | robj *idarg = createObjectFromStreamID(&id); |
1830 | rewriteClientCommandArgument(c,idpos,idarg); |
1831 | decrRefCount(idarg); |
1832 | |
1833 | /* We need to signal to blocked clients that there is new data on this |
1834 | * stream. */ |
1835 | signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM6); |
1836 | } |
1837 | |
1838 | /* XRANGE/XREVRANGE actual implementation. |
1839 | * The 'start' and 'end' IDs are parsed as follows: |
1840 | * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX. |
1841 | * "-" and "+"" mean the minimal and maximal ID values, respectively. |
1842 | * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0 |
1843 | * will match anything from 1-1 and 1-UINT64_MAX. |
1844 | */ |
1845 | void xrangeGenericCommand(client *c, int rev) { |
1846 | robj *o; |
1847 | stream *s; |
1848 | streamID startid, endid; |
1849 | long long count = -1; |
1850 | robj *startarg = rev ? c->argv[3] : c->argv[2]; |
1851 | robj *endarg = rev ? c->argv[2] : c->argv[3]; |
1852 | int startex = 0, endex = 0; |
1853 | |
1854 | /* Parse start and end IDs. */ |
1855 | if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK0) |
1856 | return; |
1857 | if (startex && streamIncrID(&startid) != C_OK0) { |
1858 | addReplyError(c,"invalid start ID for the interval"); |
1859 | return; |
1860 | } |
1861 | if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX(18446744073709551615UL)) != C_OK0) |
1862 | return; |
1863 | if (endex && streamDecrID(&endid) != C_OK0) { |
1864 | addReplyError(c,"invalid end ID for the interval"); |
1865 | return; |
1866 | } |
1867 | |
1868 | /* Parse the COUNT option if any. */ |
1869 | if (c->argc > 4) { |
1870 | for (int j = 4; j < c->argc; j++) { |
1871 | int additional = c->argc-j-1; |
1872 | if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) { |
1873 | if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL((void*)0)) |
1874 | != C_OK0) return; |
1875 | if (count < 0) count = 0; |
1876 | j++; /* Consume additional arg. */ |
1877 | } else { |
1878 | addReplyErrorObject(c,shared.syntaxerr); |
1879 | return; |
1880 | } |
1881 | } |
1882 | } |
1883 | |
1884 | /* Return the specified range to the user. */ |
1885 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL((void*)0) || |
1886 | checkType(c,o,OBJ_STREAM6)) return; |
1887 | |
1888 | s = o->ptr; |
1889 | |
1890 | if (count == 0) { |
1891 | addReplyNullArray(c); |
1892 | } else { |
1893 | if (count == -1) count = 0; |
1894 | streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL((void*)0),NULL((void*)0),0,NULL((void*)0)); |
1895 | } |
1896 | } |
1897 | |
1898 | /* XRANGE key start end [COUNT <n>] */ |
1899 | void xrangeCommand(client *c) { |
1900 | xrangeGenericCommand(c,0); |
1901 | } |
1902 | |
1903 | /* XREVRANGE key end start [COUNT <n>] */ |
1904 | void xrevrangeCommand(client *c) { |
1905 | xrangeGenericCommand(c,1); |
1906 | } |
1907 | |
1908 | /* XLEN */ |
1909 | void xlenCommand(client *c) { |
1910 | robj *o; |
1911 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0) |
1912 | || checkType(c,o,OBJ_STREAM6)) return; |
1913 | stream *s = o->ptr; |
1914 | addReplyLongLong(c,s->length); |
1915 | } |
1916 | |
1917 | /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N |
1918 | * ID_1 ID_2 ... ID_N |
1919 | * |
1920 | * This function also implements the XREAD-GROUP command, which is like XREAD |
1921 | * but accepting the [GROUP group-name consumer-name] additional option. |
1922 | * This is useful because while XREAD is a read command and can be called |
1923 | * on slaves, XREAD-GROUP is not. */ |
1924 | #define XREAD_BLOCKED_DEFAULT_COUNT1000 1000 |
1925 | void xreadCommand(client *c) { |
1926 | long long timeout = -1; /* -1 means, no BLOCK argument given. */ |
1927 | long long count = 0; |
1928 | int streams_count = 0; |
1929 | int streams_arg = 0; |
1930 | int noack = 0; /* True if NOACK option was specified. */ |
1931 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; |
1932 | streamID *ids = static_ids; |
1933 | streamCG **groups = NULL((void*)0); |
1934 | int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ |
1935 | robj *groupname = NULL((void*)0); |
1936 | robj *consumername = NULL((void*)0); |
1937 | |
1938 | /* Parse arguments. */ |
1939 | for (int i = 1; i < c->argc; i++) { |
1940 | int moreargs = c->argc-i-1; |
1941 | char *o = c->argv[i]->ptr; |
1942 | if (!strcasecmp(o,"BLOCK") && moreargs) { |
1943 | if (c->flags & CLIENT_LUA(1<<8)) { |
1944 | /* |
1945 | * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client |
1946 | * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backword compatibility. |
1947 | * There is no sense to use BLOCK option within Lua. */ |
1948 | addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr); |
1949 | return; |
1950 | } |
1951 | i++; |
1952 | if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, |
1953 | UNIT_MILLISECONDS1) != C_OK0) return; |
1954 | } else if (!strcasecmp(o,"COUNT") && moreargs) { |
1955 | i++; |
1956 | if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL((void*)0)) != C_OK0) |
1957 | return; |
1958 | if (count < 0) count = 0; |
1959 | } else if (!strcasecmp(o,"STREAMS") && moreargs) { |
1960 | streams_arg = i+1; |
1961 | streams_count = (c->argc-streams_arg); |
1962 | if ((streams_count % 2) != 0) { |
1963 | addReplyError(c,"Unbalanced XREAD list of streams: " |
1964 | "for each stream key an ID or '$' must be " |
1965 | "specified."); |
1966 | return; |
1967 | } |
1968 | streams_count /= 2; /* We have two arguments for each stream. */ |
1969 | break; |
1970 | } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) { |
1971 | if (!xreadgroup) { |
1972 | addReplyError(c,"The GROUP option is only supported by " |
1973 | "XREADGROUP. You called XREAD instead."); |
1974 | return; |
1975 | } |
1976 | groupname = c->argv[i+1]; |
1977 | consumername = c->argv[i+2]; |
1978 | i += 2; |
1979 | } else if (!strcasecmp(o,"NOACK")) { |
1980 | if (!xreadgroup) { |
1981 | addReplyError(c,"The NOACK option is only supported by " |
1982 | "XREADGROUP. You called XREAD instead."); |
1983 | return; |
1984 | } |
1985 | noack = 1; |
1986 | } else { |
1987 | addReplyErrorObject(c,shared.syntaxerr); |
1988 | return; |
1989 | } |
1990 | } |
1991 | |
1992 | /* STREAMS option is mandatory. */ |
1993 | if (streams_arg == 0) { |
1994 | addReplyErrorObject(c,shared.syntaxerr); |
1995 | return; |
1996 | } |
1997 | |
1998 | /* If the user specified XREADGROUP then it must also |
1999 | * provide the GROUP option. */ |
2000 | if (xreadgroup && groupname == NULL((void*)0)) { |
2001 | addReplyError(c,"Missing GROUP option for XREADGROUP"); |
2002 | return; |
2003 | } |
2004 | |
2005 | /* Parse the IDs and resolve the group name. */ |
2006 | if (streams_count > STREAMID_STATIC_VECTOR_LEN8) |
2007 | ids = zmalloc(sizeof(streamID)*streams_count); |
2008 | if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); |
2009 | |
2010 | for (int i = streams_arg + streams_count; i < c->argc; i++) { |
2011 | /* Specifying "$" as last-known-id means that the client wants to be |
2012 | * served with just the messages that will arrive into the stream |
2013 | * starting from now. */ |
2014 | int id_idx = i - streams_arg - streams_count; |
2015 | robj *key = c->argv[i-streams_count]; |
2016 | robj *o = lookupKeyRead(c->db,key); |
2017 | if (checkType(c,o,OBJ_STREAM6)) goto cleanup; |
2018 | streamCG *group = NULL((void*)0); |
2019 | |
2020 | /* If a group was specified, than we need to be sure that the |
2021 | * key and group actually exist. */ |
2022 | if (groupname) { |
2023 | if (o == NULL((void*)0) || |
2024 | (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL((void*)0)) |
2025 | { |
2026 | addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " |
2027 | "group '%s' in XREADGROUP with GROUP " |
2028 | "option", |
2029 | (char*)key->ptr,(char*)groupname->ptr); |
2030 | goto cleanup; |
2031 | } |
2032 | groups[id_idx] = group; |
2033 | } |
2034 | |
2035 | if (strcmp(c->argv[i]->ptr,"$") == 0) { |
2036 | if (xreadgroup) { |
2037 | addReplyError(c,"The $ ID is meaningless in the context of " |
2038 | "XREADGROUP: you want to read the history of " |
2039 | "this consumer by specifying a proper ID, or " |
2040 | "use the > ID to get new messages. The $ ID would " |
2041 | "just return an empty result set."); |
2042 | goto cleanup; |
2043 | } |
2044 | if (o) { |
2045 | stream *s = o->ptr; |
2046 | ids[id_idx] = s->last_id; |
2047 | } else { |
2048 | ids[id_idx].ms = 0; |
2049 | ids[id_idx].seq = 0; |
2050 | } |
2051 | continue; |
2052 | } else if (strcmp(c->argv[i]->ptr,">") == 0) { |
2053 | if (!xreadgroup) { |
2054 | addReplyError(c,"The > ID can be specified only when calling " |
2055 | "XREADGROUP using the GROUP <group> " |
2056 | "<consumer> option."); |
2057 | goto cleanup; |
2058 | } |
2059 | /* We use just the maximum ID to signal this is a ">" ID, anyway |
2060 | * the code handling the blocking clients will have to update the |
2061 | * ID later in order to match the changing consumer group last ID. */ |
2062 | ids[id_idx].ms = UINT64_MAX(18446744073709551615UL); |
2063 | ids[id_idx].seq = UINT64_MAX(18446744073709551615UL); |
2064 | continue; |
2065 | } |
2066 | if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK0) |
2067 | goto cleanup; |
2068 | } |
2069 | |
2070 | /* Try to serve the client synchronously. */ |
2071 | size_t arraylen = 0; |
2072 | void *arraylen_ptr = NULL((void*)0); |
2073 | for (int i = 0; i < streams_count; i++) { |
2074 | robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); |
2075 | if (o == NULL((void*)0)) continue; |
2076 | stream *s = o->ptr; |
2077 | streamID *gt = ids+i; /* ID must be greater than this. */ |
2078 | int serve_synchronously = 0; |
2079 | int serve_history = 0; /* True for XREADGROUP with ID != ">". */ |
2080 | |
2081 | /* Check if there are the conditions to serve the client |
2082 | * synchronously. */ |
2083 | if (groups) { |
2084 | /* If the consumer is blocked on a group, we always serve it |
2085 | * synchronously (serving its local history) if the ID specified |
2086 | * was not the special ">" ID. */ |
2087 | if (gt->ms != UINT64_MAX(18446744073709551615UL) || |
2088 | gt->seq != UINT64_MAX(18446744073709551615UL)) |
2089 | { |
2090 | serve_synchronously = 1; |
2091 | serve_history = 1; |
2092 | } else if (s->length) { |
2093 | /* We also want to serve a consumer in a consumer group |
2094 | * synchronously in case the group top item delivered is smaller |
2095 | * than what the stream has inside. */ |
2096 | streamID maxid, *last = &groups[i]->last_id; |
2097 | streamLastValidID(s, &maxid); |
2098 | if (streamCompareID(&maxid, last) > 0) { |
2099 | serve_synchronously = 1; |
2100 | *gt = *last; |
2101 | } |
2102 | } |
2103 | } else if (s->length) { |
2104 | /* For consumers without a group, we serve synchronously if we can |
2105 | * actually provide at least one item from the stream. */ |
2106 | streamID maxid; |
2107 | streamLastValidID(s, &maxid); |
2108 | if (streamCompareID(&maxid, gt) > 0) { |
2109 | serve_synchronously = 1; |
2110 | } |
2111 | } |
2112 | |
2113 | if (serve_synchronously) { |
2114 | arraylen++; |
2115 | if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); |
2116 | /* streamReplyWithRange() handles the 'start' ID as inclusive, |
2117 | * so start from the next ID, since we want only messages with |
2118 | * IDs greater than start. */ |
2119 | streamID start = *gt; |
2120 | streamIncrID(&start); |
2121 | |
2122 | /* Emit the two elements sub-array consisting of the name |
2123 | * of the stream and the data we extracted from it. */ |
2124 | if (c->resp == 2) addReplyArrayLen(c,2); |
2125 | addReplyBulk(c,c->argv[streams_arg+i]); |
2126 | int created = 0; |
2127 | streamConsumer *consumer = NULL((void*)0); |
2128 | if (groups) consumer = streamLookupConsumer(groups[i], |
2129 | consumername->ptr, |
2130 | SLC_NONE0, |
2131 | &created); |
2132 | streamPropInfo spi = {c->argv[i+streams_arg],groupname}; |
2133 | if (created && noack) |
2134 | streamPropagateConsumerCreation(c,spi.keyname, |
2135 | spi.groupname, |
2136 | consumer->name); |
2137 | int flags = 0; |
2138 | if (noack) flags |= STREAM_RWR_NOACK(1<<0); |
2139 | if (serve_history) flags |= STREAM_RWR_HISTORY(1<<2); |
2140 | streamReplyWithRange(c,s,&start,NULL((void*)0),count,0, |
2141 | groups ? groups[i] : NULL((void*)0), |
2142 | consumer, flags, &spi); |
2143 | if (groups) server.dirty++; |
2144 | } |
2145 | } |
2146 | |
2147 | /* We replied synchronously! Set the top array len and return to caller. */ |
2148 | if (arraylen) { |
2149 | if (c->resp == 2) |
2150 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
2151 | else |
2152 | setDeferredMapLen(c,arraylen_ptr,arraylen); |
2153 | goto cleanup; |
2154 | } |
2155 | |
2156 | /* Block if needed. */ |
2157 | if (timeout != -1) { |
2158 | /* If we are not allowed to block the client, the only thing |
2159 | * we can do is treating it as a timeout (even with timeout 0). */ |
2160 | if (c->flags & CLIENT_DENY_BLOCKING(1ULL<<41)) { |
2161 | addReplyNullArray(c); |
2162 | goto cleanup; |
2163 | } |
2164 | blockForKeys(c, BLOCKED_STREAM4, c->argv+streams_arg, streams_count, |
2165 | timeout, NULL((void*)0), NULL((void*)0), ids); |
2166 | /* If no COUNT is given and we block, set a relatively small count: |
2167 | * in case the ID provided is too low, we do not want the server to |
2168 | * block just to serve this client a huge stream of messages. */ |
2169 | c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT1000; |
2170 | |
2171 | /* If this is a XREADGROUP + GROUP we need to remember for which |
2172 | * group and consumer name we are blocking, so later when one of the |
2173 | * keys receive more data, we can call streamReplyWithRange() passing |
2174 | * the right arguments. */ |
2175 | if (groupname) { |
2176 | incrRefCount(groupname); |
2177 | incrRefCount(consumername); |
2178 | c->bpop.xread_group = groupname; |
2179 | c->bpop.xread_consumer = consumername; |
2180 | c->bpop.xread_group_noack = noack; |
2181 | } else { |
2182 | c->bpop.xread_group = NULL((void*)0); |
2183 | c->bpop.xread_consumer = NULL((void*)0); |
2184 | } |
2185 | goto cleanup; |
2186 | } |
2187 | |
2188 | /* No BLOCK option, nor any stream we can serve. Reply as with a |
2189 | * timeout happened. */ |
2190 | addReplyNullArray(c); |
2191 | /* Continue to cleanup... */ |
2192 | |
2193 | cleanup: /* Cleanup. */ |
2194 | |
2195 | /* The command is propagated (in the READGROUP form) as a side effect |
2196 | * of calling lower level APIs. So stop any implicit propagation. */ |
2197 | preventCommandPropagation(c); |
2198 | if (ids != static_ids) zfree(ids); |
2199 | zfree(groups); |
2200 | } |
2201 | |
2202 | /* ----------------------------------------------------------------------- |
2203 | * Low level implementation of consumer groups |
2204 | * ----------------------------------------------------------------------- */ |
2205 | |
2206 | /* Create a NACK entry setting the delivery count to 1 and the delivery |
2207 | * time to the current time. The NACK consumer will be set to the one |
2208 | * specified as argument of the function. */ |
2209 | streamNACK *streamCreateNACK(streamConsumer *consumer) { |
2210 | streamNACK *nack = zmalloc(sizeof(*nack)); |
2211 | nack->delivery_time = mstime(); |
2212 | nack->delivery_count = 1; |
2213 | nack->consumer = consumer; |
2214 | return nack; |
2215 | } |
2216 | |
2217 | /* Free a NACK entry. */ |
2218 | void streamFreeNACK(streamNACK *na) { |
2219 | zfree(na); |
2220 | } |
2221 | |
2222 | /* Free a consumer and associated data structures. Note that this function |
2223 | * will not reassign the pending messages associated with this consumer |
2224 | * nor will delete them from the stream, so when this function is called |
2225 | * to delete a consumer, and not when the whole stream is destroyed, the caller |
2226 | * should do some work before. */ |
2227 | void streamFreeConsumer(streamConsumer *sc) { |
2228 | raxFree(sc->pel); /* No value free callback: the PEL entries are shared |
2229 | between the consumer and the main stream PEL. */ |
2230 | sdsfree(sc->name); |
2231 | zfree(sc); |
2232 | } |
2233 | |
2234 | /* Create a new consumer group in the context of the stream 's', having the |
2235 | * specified name and last server ID. If a consumer group with the same name |
2236 | * already existed NULL is returned, otherwise the pointer to the consumer |
2237 | * group is returned. */ |
2238 | streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { |
2239 | if (s->cgroups == NULL((void*)0)) s->cgroups = raxNew(); |
2240 | if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) |
2241 | return NULL((void*)0); |
2242 | |
2243 | streamCG *cg = zmalloc(sizeof(*cg)); |
2244 | cg->pel = raxNew(); |
2245 | cg->consumers = raxNew(); |
2246 | cg->last_id = *id; |
2247 | raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL((void*)0)); |
2248 | return cg; |
2249 | } |
2250 | |
2251 | /* Free a consumer group and all its associated data. */ |
2252 | void streamFreeCG(streamCG *cg) { |
2253 | raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK); |
2254 | raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer); |
2255 | zfree(cg); |
2256 | } |
2257 | |
2258 | /* Lookup the consumer group in the specified stream and returns its |
2259 | * pointer, otherwise if there is no such group, NULL is returned. */ |
2260 | streamCG *streamLookupCG(stream *s, sds groupname) { |
2261 | if (s->cgroups == NULL((void*)0)) return NULL((void*)0); |
2262 | streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, |
2263 | sdslen(groupname)); |
2264 | return (cg == raxNotFound) ? NULL((void*)0) : cg; |
2265 | } |
2266 | |
2267 | /* Lookup the consumer with the specified name in the group 'cg': if the |
2268 | * consumer does not exist it is created unless SLC_NOCREAT flag was specified. |
2269 | * Its last seen time is updated unless SLC_NOREFRESH flag was specified. */ |
2270 | streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created) { |
2271 | if (created) *created = 0; |
2272 | int create = !(flags & SLC_NOCREAT(1<<0)); |
2273 | int refresh = !(flags & SLC_NOREFRESH(1<<1)); |
2274 | streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, |
2275 | sdslen(name)); |
2276 | if (consumer == raxNotFound) { |
2277 | if (!create) return NULL((void*)0); |
2278 | consumer = zmalloc(sizeof(*consumer)); |
2279 | consumer->name = sdsdup(name); |
2280 | consumer->pel = raxNew(); |
2281 | raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), |
2282 | consumer,NULL((void*)0)); |
2283 | consumer->seen_time = mstime(); |
2284 | if (created) *created = 1; |
2285 | } else if (refresh) |
2286 | consumer->seen_time = mstime(); |
2287 | return consumer; |
2288 | } |
2289 | |
2290 | /* Delete the consumer specified in the consumer group 'cg'. The consumer |
2291 | * may have pending messages: they are removed from the PEL, and the number |
2292 | * of pending messages "lost" is returned. */ |
2293 | uint64_t streamDelConsumer(streamCG *cg, sds name) { |
2294 | streamConsumer *consumer = |
2295 | streamLookupConsumer(cg,name,SLC_NOCREAT(1<<0)|SLC_NOREFRESH(1<<1),NULL((void*)0)); |
2296 | if (consumer == NULL((void*)0)) return 0; |
2297 | |
2298 | uint64_t retval = raxSize(consumer->pel); |
2299 | |
2300 | /* Iterate all the consumer pending messages, deleting every corresponding |
2301 | * entry from the global entry. */ |
2302 | raxIterator ri; |
2303 | raxStart(&ri,consumer->pel); |
2304 | raxSeek(&ri,"^",NULL((void*)0),0); |
2305 | while(raxNext(&ri)) { |
2306 | streamNACK *nack = ri.data; |
2307 | raxRemove(cg->pel,ri.key,ri.key_len,NULL((void*)0)); |
2308 | streamFreeNACK(nack); |
2309 | } |
2310 | raxStop(&ri); |
2311 | |
2312 | /* Deallocate the consumer. */ |
2313 | raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL((void*)0)); |
2314 | streamFreeConsumer(consumer); |
2315 | return retval; |
2316 | } |
2317 | |
2318 | /* ----------------------------------------------------------------------- |
2319 | * Consumer groups commands |
2320 | * ----------------------------------------------------------------------- */ |
2321 | |
2322 | /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] |
2323 | * XGROUP SETID <key> <groupname> <id or $> |
2324 | * XGROUP DESTROY <key> <groupname> |
2325 | * XGROUP CREATECONSUMER <key> <groupname> <consumer> |
2326 | * XGROUP DELCONSUMER <key> <groupname> <consumername> */ |
2327 | void xgroupCommand(client *c) { |
2328 | stream *s = NULL((void*)0); |
2329 | sds grpname = NULL((void*)0); |
2330 | streamCG *cg = NULL((void*)0); |
2331 | char *opt = c->argv[1]->ptr; /* Subcommand name. */ |
2332 | int mkstream = 0; |
2333 | robj *o; |
2334 | |
2335 | /* CREATE has an MKSTREAM option that creates the stream if it |
2336 | * does not exist. */ |
2337 | if (c->argc == 6 && !strcasecmp(opt,"CREATE")) { |
2338 | if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) { |
2339 | addReplySubcommandSyntaxError(c); |
2340 | return; |
2341 | } |
2342 | mkstream = 1; |
2343 | grpname = c->argv[3]->ptr; |
2344 | } |
2345 | |
2346 | /* Everything but the "HELP" option requires a key and group name. */ |
2347 | if (c->argc >= 4) { |
2348 | o = lookupKeyWrite(c->db,c->argv[2]); |
2349 | if (o) { |
2350 | if (checkType(c,o,OBJ_STREAM6)) return; |
2351 | s = o->ptr; |
2352 | } |
2353 | grpname = c->argv[3]->ptr; |
2354 | } |
2355 | |
2356 | /* Check for missing key/group. */ |
2357 | if (c->argc >= 4 && !mkstream) { |
2358 | /* At this point key must exist, or there is an error. */ |
2359 | if (s == NULL((void*)0)) { |
2360 | addReplyError(c, |
2361 | "The XGROUP subcommand requires the key to exist. " |
2362 | "Note that for CREATE you may want to use the MKSTREAM " |
2363 | "option to create an empty stream automatically."); |
2364 | return; |
2365 | } |
2366 | |
2367 | /* Certain subcommands require the group to exist. */ |
2368 | if ((cg = streamLookupCG(s,grpname)) == NULL((void*)0) && |
2369 | (!strcasecmp(opt,"SETID") || |
2370 | !strcasecmp(opt,"CREATECONSUMER") || |
2371 | !strcasecmp(opt,"DELCONSUMER"))) |
2372 | { |
2373 | addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " |
2374 | "for key name '%s'", |
2375 | (char*)grpname, (char*)c->argv[2]->ptr); |
2376 | return; |
2377 | } |
2378 | } |
2379 | |
2380 | /* Dispatch the different subcommands. */ |
2381 | if (c->argc == 2 && !strcasecmp(opt,"HELP")) { |
2382 | const char *help[] = { |
2383 | "CREATE <key> <groupname> <id|$> [option]", |
2384 | " Create a new consumer group. Options are:", |
2385 | " * MKSTREAM", |
2386 | " Create the empty stream if it does not exist.", |
2387 | "CREATECONSUMER <key> <groupname> <consumer>", |
2388 | " Create a new consumer in the specified group.", |
2389 | "DELCONSUMER <key> <groupname> <consumer>", |
2390 | " Remove the specified consumer.", |
2391 | "DESTROY <key> <groupname>" |
2392 | " Remove the specified group.", |
2393 | "SETID <key> <groupname> <id|$>", |
2394 | " Set the current group ID.", |
2395 | NULL((void*)0) |
2396 | }; |
2397 | addReplyHelp(c, help); |
2398 | } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) { |
2399 | streamID id; |
2400 | if (!strcmp(c->argv[4]->ptr,"$")) { |
2401 | if (s) { |
2402 | id = s->last_id; |
2403 | } else { |
2404 | id.ms = 0; |
2405 | id.seq = 0; |
2406 | } |
2407 | } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK0) { |
2408 | return; |
2409 | } |
2410 | |
2411 | /* Handle the MKSTREAM option now that the command can no longer fail. */ |
2412 | if (s == NULL((void*)0)) { |
2413 | serverAssert(mkstream)((mkstream)?(void)0 : (_serverAssert("mkstream","t_stream.c", 2413),__builtin_unreachable())); |
2414 | o = createStreamObject(); |
2415 | dbAdd(c->db,c->argv[2],o); |
2416 | s = o->ptr; |
2417 | signalModifiedKey(c,c->db,c->argv[2]); |
2418 | } |
2419 | |
2420 | streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); |
2421 | if (cg) { |
2422 | addReply(c,shared.ok); |
2423 | server.dirty++; |
2424 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-create", |
2425 | c->argv[2],c->db->id); |
2426 | } else { |
2427 | addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); |
2428 | } |
2429 | } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { |
2430 | streamID id; |
2431 | if (!strcmp(c->argv[4]->ptr,"$")) { |
2432 | id = s->last_id; |
2433 | } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK0) { |
2434 | return; |
2435 | } |
2436 | cg->last_id = id; |
2437 | addReply(c,shared.ok); |
2438 | server.dirty++; |
2439 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-setid",c->argv[2],c->db->id); |
2440 | } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { |
2441 | if (cg) { |
2442 | raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL((void*)0)); |
2443 | streamFreeCG(cg); |
2444 | addReply(c,shared.cone); |
2445 | server.dirty++; |
2446 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-destroy", |
2447 | c->argv[2],c->db->id); |
2448 | /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ |
2449 | signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM6); |
2450 | } else { |
2451 | addReply(c,shared.czero); |
2452 | } |
2453 | } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) { |
2454 | int created = 0; |
2455 | streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NOREFRESH(1<<1),&created); |
2456 | if (created) { |
2457 | server.dirty++; |
2458 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-createconsumer", |
2459 | c->argv[2],c->db->id); |
2460 | } |
2461 | addReplyLongLong(c,created); |
2462 | } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { |
2463 | /* Delete the consumer and returns the number of pending messages |
2464 | * that were yet associated with such a consumer. */ |
2465 | long long pending = streamDelConsumer(cg,c->argv[4]->ptr); |
2466 | addReplyLongLong(c,pending); |
2467 | server.dirty++; |
2468 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-delconsumer", |
2469 | c->argv[2],c->db->id); |
2470 | } else { |
2471 | addReplySubcommandSyntaxError(c); |
2472 | } |
2473 | } |
2474 | |
2475 | /* XSETID <stream> <id> |
2476 | * |
2477 | * Set the internal "last ID" of a stream. */ |
2478 | void xsetidCommand(client *c) { |
2479 | robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); |
2480 | if (o == NULL((void*)0) || checkType(c,o,OBJ_STREAM6)) return; |
2481 | |
2482 | stream *s = o->ptr; |
2483 | streamID id; |
2484 | if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK0) return; |
2485 | |
2486 | /* If the stream has at least one item, we want to check that the user |
2487 | * is setting a last ID that is equal or greater than the current top |
2488 | * item, otherwise the fundamental ID monotonicity assumption is violated. */ |
2489 | if (s->length > 0) { |
2490 | streamID maxid; |
2491 | streamLastValidID(s,&maxid); |
2492 | |
2493 | if (streamCompareID(&id,&maxid) < 0) { |
2494 | addReplyError(c,"The ID specified in XSETID is smaller than the " |
2495 | "target stream top item"); |
2496 | return; |
2497 | } |
2498 | } |
2499 | s->last_id = id; |
2500 | addReply(c,shared.ok); |
2501 | server.dirty++; |
2502 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xsetid",c->argv[1],c->db->id); |
2503 | } |
2504 | |
2505 | /* XACK <key> <group> <id> <id> ... <id> |
2506 | * |
2507 | * Acknowledge a message as processed. In practical terms we just check the |
2508 | * pendine entries list (PEL) of the group, and delete the PEL entry both from |
2509 | * the group and the consumer (pending messages are referenced in both places). |
2510 | * |
2511 | * Return value of the command is the number of messages successfully |
2512 | * acknowledged, that is, the IDs we were actually able to resolve in the PEL. |
2513 | */ |
2514 | void xackCommand(client *c) { |
2515 | streamCG *group = NULL((void*)0); |
2516 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
2517 | if (o) { |
2518 | if (checkType(c,o,OBJ_STREAM6)) return; /* Type error. */ |
2519 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); |
2520 | } |
2521 | |
2522 | /* No key or group? Nothing to ack. */ |
2523 | if (o == NULL((void*)0) || group == NULL((void*)0)) { |
2524 | addReply(c,shared.czero); |
2525 | return; |
2526 | } |
2527 | |
2528 | /* Start parsing the IDs, so that we abort ASAP if there is a syntax |
2529 | * error: the return value of this command cannot be an error in case |
2530 | * the client successfully acknowledged some messages, so it should be |
2531 | * executed in a "all or nothing" fashion. */ |
2532 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; |
2533 | streamID *ids = static_ids; |
2534 | int id_count = c->argc-3; |
2535 | if (id_count > STREAMID_STATIC_VECTOR_LEN8) |
2536 | ids = zmalloc(sizeof(streamID)*id_count); |
2537 | for (int j = 3; j < c->argc; j++) { |
2538 | if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK0) goto cleanup; |
2539 | } |
2540 | |
2541 | int acknowledged = 0; |
2542 | for (int j = 3; j < c->argc; j++) { |
2543 | unsigned char buf[sizeof(streamID)]; |
2544 | streamEncodeID(buf,&ids[j-3]); |
2545 | |
2546 | /* Lookup the ID in the group PEL: it will have a reference to the |
2547 | * NACK structure that will have a reference to the consumer, so that |
2548 | * we are able to remove the entry from both PELs. */ |
2549 | streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); |
2550 | if (nack != raxNotFound) { |
2551 | raxRemove(group->pel,buf,sizeof(buf),NULL((void*)0)); |
2552 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0)); |
2553 | streamFreeNACK(nack); |
2554 | acknowledged++; |
2555 | server.dirty++; |
2556 | } |
2557 | } |
2558 | addReplyLongLong(c,acknowledged); |
2559 | cleanup: |
2560 | if (ids != static_ids) zfree(ids); |
2561 | } |
2562 | |
2563 | /* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]] |
2564 | * |
2565 | * If start and stop are omitted, the command just outputs information about |
2566 | * the amount of pending messages for the key/group pair, together with |
2567 | * the minimum and maximum ID of pending messages. |
2568 | * |
2569 | * If start and stop are provided instead, the pending messages are returned |
2570 | * with information about the current owner, number of deliveries and last |
2571 | * delivery time and so forth. */ |
2572 | void xpendingCommand(client *c) { |
2573 | int justinfo = c->argc == 3; /* Without the range just outputs general |
2574 | informations about the PEL. */ |
2575 | robj *key = c->argv[1]; |
2576 | robj *groupname = c->argv[2]; |
2577 | robj *consumername = NULL((void*)0); |
2578 | streamID startid, endid; |
2579 | long long count = 0; |
2580 | long long minidle = 0; |
2581 | int startex = 0, endex = 0; |
2582 | |
2583 | /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ |
2584 | if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { |
2585 | addReplyErrorObject(c,shared.syntaxerr); |
2586 | return; |
2587 | } |
2588 | |
2589 | /* Parse start/end/count arguments ASAP if needed, in order to report |
2590 | * syntax errors before any other error. */ |
2591 | if (c->argc >= 6) { |
2592 | int startidx = 3; /* Without IDLE */ |
2593 | |
2594 | if (!strcasecmp(c->argv[3]->ptr, "IDLE")) { |
2595 | if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL((void*)0)) == C_ERR-1) |
2596 | return; |
2597 | if (c->argc < 8) { |
2598 | /* If IDLE was provided we must have at least 'start end count' */ |
2599 | addReplyErrorObject(c,shared.syntaxerr); |
2600 | return; |
2601 | } |
2602 | /* Search for rest of arguments after 'IDLE <idle>' */ |
2603 | startidx += 2; |
2604 | } |
2605 | |
2606 | /* count argument. */ |
2607 | if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL((void*)0)) == C_ERR-1) |
2608 | return; |
2609 | if (count < 0) count = 0; |
2610 | |
2611 | /* start and end arguments. */ |
2612 | if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK0) |
2613 | return; |
2614 | if (startex && streamIncrID(&startid) != C_OK0) { |
2615 | addReplyError(c,"invalid start ID for the interval"); |
2616 | return; |
2617 | } |
2618 | if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX(18446744073709551615UL)) != C_OK0) |
2619 | return; |
2620 | if (endex && streamDecrID(&endid) != C_OK0) { |
2621 | addReplyError(c,"invalid end ID for the interval"); |
2622 | return; |
2623 | } |
2624 | |
2625 | if (startidx+3 < c->argc) { |
2626 | /* 'consumer' was provided */ |
2627 | consumername = c->argv[startidx+3]; |
2628 | } |
2629 | } |
2630 | |
2631 | /* Lookup the key and the group inside the stream. */ |
2632 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
2633 | streamCG *group; |
2634 | |
2635 | if (checkType(c,o,OBJ_STREAM6)) return; |
2636 | if (o == NULL((void*)0) || |
2637 | (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL((void*)0)) |
2638 | { |
2639 | addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " |
2640 | "group '%s'", |
2641 | (char*)key->ptr,(char*)groupname->ptr); |
2642 | return; |
2643 | } |
2644 | |
2645 | /* XPENDING <key> <group> variant. */ |
2646 | if (justinfo) { |
2647 | addReplyArrayLen(c,4); |
2648 | /* Total number of messages in the PEL. */ |
2649 | addReplyLongLong(c,raxSize(group->pel)); |
2650 | /* First and last IDs. */ |
2651 | if (raxSize(group->pel) == 0) { |
2652 | addReplyNull(c); /* Start. */ |
2653 | addReplyNull(c); /* End. */ |
2654 | addReplyNullArray(c); /* Clients. */ |
2655 | } else { |
2656 | /* Start. */ |
2657 | raxIterator ri; |
2658 | raxStart(&ri,group->pel); |
2659 | raxSeek(&ri,"^",NULL((void*)0),0); |
2660 | raxNext(&ri); |
2661 | streamDecodeID(ri.key,&startid); |
2662 | addReplyStreamID(c,&startid); |
2663 | |
2664 | /* End. */ |
2665 | raxSeek(&ri,"$",NULL((void*)0),0); |
2666 | raxNext(&ri); |
2667 | streamDecodeID(ri.key,&endid); |
2668 | addReplyStreamID(c,&endid); |
2669 | raxStop(&ri); |
2670 | |
2671 | /* Consumers with pending messages. */ |
2672 | raxStart(&ri,group->consumers); |
2673 | raxSeek(&ri,"^",NULL((void*)0),0); |
2674 | void *arraylen_ptr = addReplyDeferredLen(c); |
2675 | size_t arraylen = 0; |
2676 | while(raxNext(&ri)) { |
2677 | streamConsumer *consumer = ri.data; |
2678 | if (raxSize(consumer->pel) == 0) continue; |
2679 | addReplyArrayLen(c,2); |
2680 | addReplyBulkCBuffer(c,ri.key,ri.key_len); |
2681 | addReplyBulkLongLong(c,raxSize(consumer->pel)); |
2682 | arraylen++; |
2683 | } |
2684 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
2685 | raxStop(&ri); |
2686 | } |
2687 | } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */ |
2688 | streamConsumer *consumer = NULL((void*)0); |
2689 | if (consumername) { |
2690 | consumer = streamLookupConsumer(group, |
2691 | consumername->ptr, |
2692 | SLC_NOCREAT(1<<0)|SLC_NOREFRESH(1<<1), |
2693 | NULL((void*)0)); |
2694 | |
2695 | /* If a consumer name was mentioned but it does not exist, we can |
2696 | * just return an empty array. */ |
2697 | if (consumer == NULL((void*)0)) { |
2698 | addReplyArrayLen(c,0); |
2699 | return; |
2700 | } |
2701 | } |
2702 | |
2703 | rax *pel = consumer ? consumer->pel : group->pel; |
2704 | unsigned char startkey[sizeof(streamID)]; |
2705 | unsigned char endkey[sizeof(streamID)]; |
2706 | raxIterator ri; |
2707 | mstime_t now = mstime(); |
2708 | |
2709 | streamEncodeID(startkey,&startid); |
2710 | streamEncodeID(endkey,&endid); |
2711 | raxStart(&ri,pel); |
2712 | raxSeek(&ri,">=",startkey,sizeof(startkey)); |
2713 | void *arraylen_ptr = addReplyDeferredLen(c); |
2714 | size_t arraylen = 0; |
2715 | |
2716 | while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { |
2717 | streamNACK *nack = ri.data; |
2718 | |
2719 | if (minidle) { |
2720 | mstime_t this_idle = now - nack->delivery_time; |
2721 | if (this_idle < minidle) continue; |
2722 | } |
2723 | |
2724 | arraylen++; |
2725 | count--; |
2726 | addReplyArrayLen(c,4); |
2727 | |
2728 | /* Entry ID. */ |
2729 | streamID id; |
2730 | streamDecodeID(ri.key,&id); |
2731 | addReplyStreamID(c,&id); |
2732 | |
2733 | /* Consumer name. */ |
2734 | addReplyBulkCBuffer(c,nack->consumer->name, |
2735 | sdslen(nack->consumer->name)); |
2736 | |
2737 | /* Milliseconds elapsed since last delivery. */ |
2738 | mstime_t elapsed = now - nack->delivery_time; |
2739 | if (elapsed < 0) elapsed = 0; |
2740 | addReplyLongLong(c,elapsed); |
2741 | |
2742 | /* Number of deliveries. */ |
2743 | addReplyLongLong(c,nack->delivery_count); |
2744 | } |
2745 | raxStop(&ri); |
2746 | setDeferredArrayLen(c,arraylen_ptr,arraylen); |
2747 | } |
2748 | } |
2749 | |
2750 | /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> |
2751 | * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] |
2752 | * [FORCE] [JUSTID] |
2753 | * |
2754 | * Gets ownership of one or multiple messages in the Pending Entries List |
2755 | * of a given stream consumer group. |
2756 | * |
2757 | * If the message ID (among the specified ones) exists, and its idle |
2758 | * time greater or equal to <min-idle-time>, then the message new owner |
2759 | * becomes the specified <consumer>. If the minimum idle time specified |
2760 | * is zero, messages are claimed regardless of their idle time. |
2761 | * |
2762 | * All the messages that cannot be found inside the pending entries list |
2763 | * are ignored, but in case the FORCE option is used. In that case we |
2764 | * create the NACK (representing a not yet acknowledged message) entry in |
2765 | * the consumer group PEL. |
2766 | * |
2767 | * This command creates the consumer as side effect if it does not yet |
2768 | * exists. Moreover the command reset the idle time of the message to 0, |
2769 | * even if by using the IDLE or TIME options, the user can control the |
2770 | * new idle time. |
2771 | * |
2772 | * The options at the end can be used in order to specify more attributes |
2773 | * to set in the representation of the pending message: |
2774 | * |
2775 | * 1. IDLE <ms>: |
2776 | * Set the idle time (last time it was delivered) of the message. |
2777 | * If IDLE is not specified, an IDLE of 0 is assumed, that is, |
2778 | * the time count is reset because the message has now a new |
2779 | * owner trying to process it. |
2780 | * |
2781 | * 2. TIME <ms-unix-time>: |
2782 | * This is the same as IDLE but instead of a relative amount of |
2783 | * milliseconds, it sets the idle time to a specific unix time |
2784 | * (in milliseconds). This is useful in order to rewrite the AOF |
2785 | * file generating XCLAIM commands. |
2786 | * |
2787 | * 3. RETRYCOUNT <count>: |
2788 | * Set the retry counter to the specified value. This counter is |
2789 | * incremented every time a message is delivered again. Normally |
2790 | * XCLAIM does not alter this counter, which is just served to clients |
2791 | * when the XPENDING command is called: this way clients can detect |
2792 | * anomalies, like messages that are never processed for some reason |
2793 | * after a big number of delivery attempts. |
2794 | * |
2795 | * 4. FORCE: |
2796 | * Creates the pending message entry in the PEL even if certain |
2797 | * specified IDs are not already in the PEL assigned to a different |
2798 | * client. However the message must be exist in the stream, otherwise |
2799 | * the IDs of non existing messages are ignored. |
2800 | * |
2801 | * 5. JUSTID: |
2802 | * Return just an array of IDs of messages successfully claimed, |
2803 | * without returning the actual message. |
2804 | * |
2805 | * 6. LASTID <id>: |
2806 | * Update the consumer group last ID with the specified ID if the |
2807 | * current last ID is smaller than the provided one. |
2808 | * This is used for replication / AOF, so that when we read from a |
2809 | * consumer group, the XCLAIM that gets propagated to give ownership |
2810 | * to the consumer, is also used in order to update the group current |
2811 | * ID. |
2812 | * |
2813 | * The command returns an array of messages that the user |
2814 | * successfully claimed, so that the caller is able to understand |
2815 | * what messages it is now in charge of. */ |
2816 | void xclaimCommand(client *c) { |
2817 | streamCG *group = NULL((void*)0); |
2818 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
2819 | long long minidle; /* Minimum idle time argument. */ |
2820 | long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ |
2821 | mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ |
2822 | int force = 0; |
2823 | int justid = 0; |
2824 | |
2825 | if (o) { |
2826 | if (checkType(c,o,OBJ_STREAM6)) return; /* Type error. */ |
2827 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); |
2828 | } |
2829 | |
2830 | /* No key or group? Send an error given that the group creation |
2831 | * is mandatory. */ |
2832 | if (o == NULL((void*)0) || group == NULL((void*)0)) { |
2833 | addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " |
2834 | "consumer group '%s'", (char*)c->argv[1]->ptr, |
2835 | (char*)c->argv[2]->ptr); |
2836 | return; |
2837 | } |
2838 | |
2839 | if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, |
2840 | "Invalid min-idle-time argument for XCLAIM") |
2841 | != C_OK0) return; |
2842 | if (minidle < 0) minidle = 0; |
2843 | |
2844 | /* Start parsing the IDs, so that we abort ASAP if there is a syntax |
2845 | * error: the return value of this command cannot be an error in case |
2846 | * the client successfully claimed some message, so it should be |
2847 | * executed in a "all or nothing" fashion. */ |
2848 | int j; |
2849 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; |
2850 | streamID *ids = static_ids; |
2851 | int id_count = c->argc-5; |
2852 | if (id_count > STREAMID_STATIC_VECTOR_LEN8) |
2853 | ids = zmalloc(sizeof(streamID)*id_count); |
2854 | for (j = 5; j < c->argc; j++) { |
2855 | if (streamParseStrictIDOrReply(NULL((void*)0),c->argv[j],&ids[j-5],0) != C_OK0) break; |
2856 | } |
2857 | int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ |
2858 | |
2859 | /* If we stopped because some IDs cannot be parsed, perhaps they |
2860 | * are trailing options. */ |
2861 | mstime_t now = mstime(); |
2862 | streamID last_id = {0,0}; |
2863 | int propagate_last_id = 0; |
2864 | for (; j < c->argc; j++) { |
2865 | int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ |
2866 | char *opt = c->argv[j]->ptr; |
2867 | if (!strcasecmp(opt,"FORCE")) { |
2868 | force = 1; |
2869 | } else if (!strcasecmp(opt,"JUSTID")) { |
2870 | justid = 1; |
2871 | } else if (!strcasecmp(opt,"IDLE") && moreargs) { |
2872 | j++; |
2873 | if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, |
2874 | "Invalid IDLE option argument for XCLAIM") |
2875 | != C_OK0) goto cleanup; |
2876 | deliverytime = now - deliverytime; |
2877 | } else if (!strcasecmp(opt,"TIME") && moreargs) { |
2878 | j++; |
2879 | if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, |
2880 | "Invalid TIME option argument for XCLAIM") |
2881 | != C_OK0) goto cleanup; |
2882 | } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { |
2883 | j++; |
2884 | if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, |
2885 | "Invalid RETRYCOUNT option argument for XCLAIM") |
2886 | != C_OK0) goto cleanup; |
2887 | } else if (!strcasecmp(opt,"LASTID") && moreargs) { |
2888 | j++; |
2889 | if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK0) goto cleanup; |
2890 | } else { |
2891 | addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); |
2892 | goto cleanup; |
2893 | } |
2894 | } |
2895 | |
2896 | if (streamCompareID(&last_id,&group->last_id) > 0) { |
2897 | group->last_id = last_id; |
2898 | propagate_last_id = 1; |
2899 | } |
2900 | |
2901 | if (deliverytime != -1) { |
2902 | /* If a delivery time was passed, either with IDLE or TIME, we |
2903 | * do some sanity check on it, and set the deliverytime to now |
2904 | * (which is a sane choice usually) if the value is bogus. |
2905 | * To raise an error here is not wise because clients may compute |
2906 | * the idle time doing some math starting from their local time, |
2907 | * and this is not a good excuse to fail in case, for instance, |
2908 | * the computer time is a bit in the future from our POV. */ |
2909 | if (deliverytime < 0 || deliverytime > now) deliverytime = now; |
2910 | } else { |
2911 | /* If no IDLE/TIME option was passed, we want the last delivery |
2912 | * time to be now, so that the idle time of the message will be |
2913 | * zero. */ |
2914 | deliverytime = now; |
2915 | } |
2916 | |
2917 | /* Do the actual claiming. */ |
2918 | streamConsumer *consumer = NULL((void*)0); |
2919 | void *arraylenptr = addReplyDeferredLen(c); |
2920 | size_t arraylen = 0; |
2921 | for (int j = 5; j <= last_id_arg; j++) { |
2922 | streamID id = ids[j-5]; |
2923 | unsigned char buf[sizeof(streamID)]; |
2924 | streamEncodeID(buf,&id); |
2925 | |
2926 | /* Lookup the ID in the group PEL. */ |
2927 | streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); |
2928 | |
2929 | /* If FORCE is passed, let's check if at least the entry |
2930 | * exists in the Stream. In such case, we'll crate a new |
2931 | * entry in the PEL from scratch, so that XCLAIM can also |
2932 | * be used to create entries in the PEL. Useful for AOF |
2933 | * and replication of consumer groups. */ |
2934 | if (force && nack == raxNotFound) { |
2935 | streamIterator myiterator; |
2936 | streamIteratorStart(&myiterator,o->ptr,&id,&id,0); |
2937 | int64_t numfields; |
2938 | int found = 0; |
2939 | streamID item_id; |
2940 | if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1; |
2941 | streamIteratorStop(&myiterator); |
2942 | |
2943 | /* Item must exist for us to create a NACK for it. */ |
2944 | if (!found) continue; |
2945 | |
2946 | /* Create the NACK. */ |
2947 | nack = streamCreateNACK(NULL((void*)0)); |
2948 | raxInsert(group->pel,buf,sizeof(buf),nack,NULL((void*)0)); |
2949 | } |
2950 | |
2951 | if (nack != raxNotFound) { |
2952 | /* We need to check if the minimum idle time requested |
2953 | * by the caller is satisfied by this entry. |
2954 | * |
2955 | * Note that the nack could be created by FORCE, in this |
2956 | * case there was no pre-existing entry and minidle should |
2957 | * be ignored, but in that case nack->consumer is NULL. */ |
2958 | if (nack->consumer && minidle) { |
2959 | mstime_t this_idle = now - nack->delivery_time; |
2960 | if (this_idle < minidle) continue; |
2961 | } |
2962 | if (consumer == NULL((void*)0)) |
2963 | consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE0,NULL((void*)0)); |
2964 | if (nack->consumer != consumer) { |
2965 | /* Remove the entry from the old consumer. |
2966 | * Note that nack->consumer is NULL if we created the |
2967 | * NACK above because of the FORCE option. */ |
2968 | if (nack->consumer) |
2969 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0)); |
2970 | } |
2971 | nack->delivery_time = deliverytime; |
2972 | /* Set the delivery attempts counter if given, otherwise |
2973 | * autoincrement unless JUSTID option provided */ |
2974 | if (retrycount >= 0) { |
2975 | nack->delivery_count = retrycount; |
2976 | } else if (!justid) { |
2977 | nack->delivery_count++; |
2978 | } |
2979 | if (nack->consumer != consumer) { |
2980 | /* Add the entry in the new consumer local PEL. */ |
2981 | raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0)); |
2982 | nack->consumer = consumer; |
2983 | } |
2984 | /* Send the reply for this entry. */ |
2985 | if (justid) { |
2986 | addReplyStreamID(c,&id); |
2987 | } else { |
2988 | size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, |
2989 | NULL((void*)0),NULL((void*)0),STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); |
2990 | if (!emitted) addReplyNull(c); |
2991 | } |
2992 | arraylen++; |
2993 | |
2994 | /* Propagate this change. */ |
2995 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); |
2996 | propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ |
2997 | server.dirty++; |
2998 | } |
2999 | } |
3000 | if (propagate_last_id) { |
3001 | streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); |
3002 | server.dirty++; |
3003 | } |
3004 | setDeferredArrayLen(c,arraylenptr,arraylen); |
3005 | preventCommandPropagation(c); |
3006 | cleanup: |
3007 | if (ids != static_ids) zfree(ids); |
3008 | } |
3009 | |
3010 | /* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] |
3011 | * |
3012 | * Gets ownership of one or multiple messages in the Pending Entries List |
3013 | * of a given stream consumer group. |
3014 | * |
3015 | * For each PEL entry, if its idle time greater or equal to <min-idle-time>, |
3016 | * then the message new owner becomes the specified <consumer>. |
3017 | * If the minimum idle time specified is zero, messages are claimed |
3018 | * regardless of their idle time. |
3019 | * |
3020 | * This command creates the consumer as side effect if it does not yet |
3021 | * exists. Moreover the command reset the idle time of the message to 0. |
3022 | * |
3023 | * The command returns an array of messages that the user |
3024 | * successfully claimed, so that the caller is able to understand |
3025 | * what messages it is now in charge of. */ |
3026 | void xautoclaimCommand(client *c) { |
3027 | streamCG *group = NULL((void*)0); |
3028 | robj *o = lookupKeyRead(c->db,c->argv[1]); |
3029 | long long minidle; /* Minimum idle time argument, in milliseconds. */ |
3030 | long count = 100; /* Maximum entries to claim. */ |
3031 | streamID startid; |
3032 | int startex; |
3033 | int justid = 0; |
3034 | |
3035 | /* Parse idle/start/end/count arguments ASAP if needed, in order to report |
3036 | * syntax errors before any other error. */ |
3037 | if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK0) |
3038 | return; |
3039 | if (minidle < 0) minidle = 0; |
3040 | |
3041 | if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK0) |
3042 | return; |
3043 | if (startex && streamIncrID(&startid) != C_OK0) { |
3044 | addReplyError(c,"invalid start ID for the interval"); |
3045 | return; |
3046 | } |
3047 | |
3048 | int j = 6; /* options start at argv[6] */ |
3049 | while(j < c->argc) { |
3050 | int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ |
3051 | char *opt = c->argv[j]->ptr; |
3052 | if (!strcasecmp(opt,"COUNT") && moreargs) { |
3053 | if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL((void*)0)) != C_OK0) |
3054 | return; |
3055 | if (count == 0) { |
3056 | addReplyError(c,"COUNT must be > 0"); |
3057 | return; |
3058 | } |
3059 | j++; |
3060 | } else if (!strcasecmp(opt,"JUSTID")) { |
3061 | justid = 1; |
3062 | } else { |
3063 | addReplyErrorObject(c,shared.syntaxerr); |
3064 | return; |
3065 | } |
3066 | j++; |
3067 | } |
3068 | |
3069 | if (o) { |
3070 | if (checkType(c,o,OBJ_STREAM6)) |
3071 | return; /* Type error. */ |
3072 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); |
3073 | } |
3074 | |
3075 | /* No key or group? Send an error given that the group creation |
3076 | * is mandatory. */ |
3077 | if (o == NULL((void*)0) || group == NULL((void*)0)) { |
3078 | addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'", |
3079 | (char*)c->argv[1]->ptr, |
3080 | (char*)c->argv[2]->ptr); |
3081 | return; |
3082 | } |
3083 | |
3084 | /* Do the actual claiming. */ |
3085 | streamConsumer *consumer = NULL((void*)0); |
3086 | long long attempts = count*10; |
3087 | |
3088 | addReplyArrayLen(c, 2); |
3089 | void *endidptr = addReplyDeferredLen(c); |
3090 | void *arraylenptr = addReplyDeferredLen(c); |
3091 | |
3092 | unsigned char startkey[sizeof(streamID)]; |
3093 | streamEncodeID(startkey,&startid); |
3094 | raxIterator ri; |
3095 | raxStart(&ri,group->pel); |
3096 | raxSeek(&ri,">=",startkey,sizeof(startkey)); |
3097 | size_t arraylen = 0; |
3098 | mstime_t now = mstime(); |
3099 | while (attempts-- && count && raxNext(&ri)) { |
3100 | streamNACK *nack = ri.data; |
3101 | |
3102 | if (minidle) { |
3103 | mstime_t this_idle = now - nack->delivery_time; |
3104 | if (this_idle < minidle) |
3105 | continue; |
3106 | } |
3107 | |
3108 | streamID id; |
3109 | streamDecodeID(ri.key, &id); |
3110 | |
3111 | if (consumer == NULL((void*)0)) |
3112 | consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE0,NULL((void*)0)); |
3113 | if (nack->consumer != consumer) { |
3114 | /* Remove the entry from the old consumer. |
3115 | * Note that nack->consumer is NULL if we created the |
3116 | * NACK above because of the FORCE option. */ |
3117 | if (nack->consumer) |
3118 | raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL((void*)0)); |
3119 | } |
3120 | |
3121 | /* Update the consumer and idle time. */ |
3122 | nack->delivery_time = now; |
3123 | nack->delivery_count++; |
3124 | |
3125 | if (nack->consumer != consumer) { |
3126 | /* Add the entry in the new consumer local PEL. */ |
3127 | raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL((void*)0)); |
3128 | nack->consumer = consumer; |
3129 | } |
3130 | |
3131 | /* Send the reply for this entry. */ |
3132 | if (justid) { |
3133 | addReplyStreamID(c,&id); |
3134 | } else { |
3135 | size_t emitted = |
3136 | streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL((void*)0),NULL((void*)0), |
3137 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); |
3138 | if (!emitted) |
3139 | addReplyNull(c); |
3140 | } |
3141 | arraylen++; |
3142 | count--; |
3143 | |
3144 | /* Propagate this change. */ |
3145 | robj *idstr = createObjectFromStreamID(&id); |
3146 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); |
3147 | decrRefCount(idstr); |
3148 | server.dirty++; |
3149 | } |
3150 | |
3151 | streamID endid; |
3152 | if (raxEOF(&ri)) { |
3153 | endid.ms = endid.seq = 0; |
3154 | } else { |
3155 | streamDecodeID(ri.key, &endid); |
3156 | } |
3157 | raxStop(&ri); |
3158 | |
3159 | setDeferredArrayLen(c,arraylenptr,arraylen); |
3160 | setDeferredReplyStreamID(c,endidptr,&endid); |
3161 | |
3162 | preventCommandPropagation(c); |
3163 | } |
3164 | |
3165 | /* XDEL <key> [<ID1> <ID2> ... <IDN>] |
3166 | * |
3167 | * Removes the specified entries from the stream. Returns the number |
3168 | * of items actually deleted, that may be different from the number |
3169 | * of IDs passed in case certain IDs do not exist. */ |
3170 | void xdelCommand(client *c) { |
3171 | robj *o; |
3172 | |
3173 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0) |
3174 | || checkType(c,o,OBJ_STREAM6)) return; |
3175 | stream *s = o->ptr; |
3176 | |
3177 | /* We need to sanity check the IDs passed to start. Even if not |
3178 | * a big issue, it is not great that the command is only partially |
3179 | * executed because at some point an invalid ID is parsed. */ |
3180 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; |
3181 | streamID *ids = static_ids; |
3182 | int id_count = c->argc-2; |
3183 | if (id_count > STREAMID_STATIC_VECTOR_LEN8) |
3184 | ids = zmalloc(sizeof(streamID)*id_count); |
3185 | for (int j = 2; j < c->argc; j++) { |
3186 | if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK0) goto cleanup; |
3187 | } |
3188 | |
3189 | /* Actually apply the command. */ |
3190 | int deleted = 0; |
3191 | for (int j = 2; j < c->argc; j++) { |
3192 | deleted += streamDeleteItem(s,&ids[j-2]); |
3193 | } |
3194 | |
3195 | /* Propagate the write if needed. */ |
3196 | if (deleted) { |
3197 | signalModifiedKey(c,c->db,c->argv[1]); |
3198 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xdel",c->argv[1],c->db->id); |
3199 | server.dirty += deleted; |
3200 | } |
3201 | addReplyLongLong(c,deleted); |
3202 | cleanup: |
3203 | if (ids != static_ids) zfree(ids); |
3204 | } |
3205 | |
3206 | /* General form: XTRIM <key> [... options ...] |
3207 | * |
3208 | * List of options: |
3209 | * |
3210 | * Trim strategies: |
3211 | * |
3212 | * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at |
3213 | * the specified length. Use ~ before the |
3214 | * count in order to demand approximated trimming |
3215 | * (like XADD MAXLEN option). |
3216 | * MINID [~|=] <id> -- Trim so that the stream will not contain entries |
3217 | * with IDs smaller than 'id'. Use ~ before the |
3218 | * count in order to demand approximated trimming |
3219 | * (like XADD MINID option). |
3220 | * |
3221 | * Other options: |
3222 | * |
3223 | * LIMIT <entries> -- The maximum number of entries to trim. |
3224 | * 0 means unlimited. Unless specified, it is set |
3225 | * to a default of 100*server.stream_node_max_entries, |
3226 | * and that's in order to keep the trimming time sane. |
3227 | * Has meaning only if `~` was provided. |
3228 | */ |
3229 | void xtrimCommand(client *c) { |
3230 | robj *o; |
3231 | |
3232 | /* Argument parsing. */ |
3233 | streamAddTrimArgs parsed_args; |
3234 | if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1) < 0) |
3235 | return; /* streamParseAddOrTrimArgsOrReply already replied. */ |
3236 | |
3237 | /* If the key does not exist, we are ok returning zero, that is, the |
3238 | * number of elements removed from the stream. */ |
3239 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0) |
3240 | || checkType(c,o,OBJ_STREAM6)) return; |
3241 | stream *s = o->ptr; |
3242 | |
3243 | /* Perform the trimming. */ |
3244 | int64_t deleted = streamTrim(s, &parsed_args); |
3245 | if (deleted) { |
3246 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xtrim",c->argv[1],c->db->id); |
3247 | if (parsed_args.approx_trim) { |
3248 | /* In case our trimming was limited (by LIMIT or by ~) we must |
3249 | * re-write the relevant trim argument to make sure there will be |
3250 | * no inconsistencies in AOF loading or in the replica. |
3251 | * It's enough to check only args->approx because there is no |
3252 | * way LIMIT is given without the ~ option. */ |
3253 | streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); |
3254 | streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); |
3255 | } |
3256 | |
3257 | /* Propagate the write. */ |
3258 | signalModifiedKey(c, c->db,c->argv[1]); |
3259 | server.dirty += deleted; |
3260 | } |
3261 | addReplyLongLong(c,deleted); |
3262 | } |
3263 | |
3264 | /* Helper function for xinfoCommand. |
3265 | * Handles the variants of XINFO STREAM */ |
3266 | void xinfoReplyWithStreamInfo(client *c, stream *s) { |
3267 | int full = 1; |
3268 | long long count = 10; /* Default COUNT is 10 so we don't block the server */ |
3269 | robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */ |
3270 | int optc = c->argc - 3; |
3271 | |
3272 | /* Parse options. */ |
3273 | if (optc == 0) { |
3274 | full = 0; |
3275 | } else { |
3276 | /* Valid options are [FULL] or [FULL COUNT <count>] */ |
3277 | if (optc != 1 && optc != 3) { |
3278 | addReplySubcommandSyntaxError(c); |
3279 | return; |
3280 | } |
3281 | |
3282 | /* First option must be "FULL" */ |
3283 | if (strcasecmp(optv[0]->ptr,"full")) { |
3284 | addReplySubcommandSyntaxError(c); |
3285 | return; |
3286 | } |
3287 | |
3288 | if (optc == 3) { |
3289 | /* First option must be "FULL" */ |
3290 | if (strcasecmp(optv[1]->ptr,"count")) { |
3291 | addReplySubcommandSyntaxError(c); |
3292 | return; |
3293 | } |
3294 | if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL((void*)0)) == C_ERR-1) |
3295 | return; |
3296 | if (count < 0) count = 10; |
3297 | } |
3298 | } |
3299 | |
3300 | addReplyMapLen(c,full ? 6 : 7); |
3301 | addReplyBulkCString(c,"length"); |
3302 | addReplyLongLong(c,s->length); |
3303 | addReplyBulkCString(c,"radix-tree-keys"); |
3304 | addReplyLongLong(c,raxSize(s->rax)); |
3305 | addReplyBulkCString(c,"radix-tree-nodes"); |
3306 | addReplyLongLong(c,s->rax->numnodes); |
3307 | addReplyBulkCString(c,"last-generated-id"); |
3308 | addReplyStreamID(c,&s->last_id); |
3309 | |
3310 | if (!full) { |
3311 | /* XINFO STREAM <key> */ |
3312 | |
3313 | addReplyBulkCString(c,"groups"); |
3314 | addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); |
3315 | |
3316 | /* To emit the first/last entry we use streamReplyWithRange(). */ |
3317 | int emitted; |
3318 | streamID start, end; |
3319 | start.ms = start.seq = 0; |
3320 | end.ms = end.seq = UINT64_MAX(18446744073709551615UL); |
3321 | addReplyBulkCString(c,"first-entry"); |
3322 | emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL((void*)0),NULL((void*)0), |
3323 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); |
3324 | if (!emitted) addReplyNull(c); |
3325 | addReplyBulkCString(c,"last-entry"); |
3326 | emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL((void*)0),NULL((void*)0), |
3327 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); |
3328 | if (!emitted) addReplyNull(c); |
3329 | } else { |
3330 | /* XINFO STREAM <key> FULL [COUNT <count>] */ |
3331 | |
3332 | /* Stream entries */ |
3333 | addReplyBulkCString(c,"entries"); |
3334 | streamReplyWithRange(c,s,NULL((void*)0),NULL((void*)0),count,0,NULL((void*)0),NULL((void*)0),0,NULL((void*)0)); |
3335 | |
3336 | /* Consumer groups */ |
3337 | addReplyBulkCString(c,"groups"); |
3338 | if (s->cgroups == NULL((void*)0)) { |
3339 | addReplyArrayLen(c,0); |
3340 | } else { |
3341 | addReplyArrayLen(c,raxSize(s->cgroups)); |
3342 | raxIterator ri_cgroups; |
3343 | raxStart(&ri_cgroups,s->cgroups); |
3344 | raxSeek(&ri_cgroups,"^",NULL((void*)0),0); |
3345 | while(raxNext(&ri_cgroups)) { |
3346 | streamCG *cg = ri_cgroups.data; |
3347 | addReplyMapLen(c,5); |
3348 | |
3349 | /* Name */ |
3350 | addReplyBulkCString(c,"name"); |
3351 | addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); |
3352 | |
3353 | /* Last delivered ID */ |
3354 | addReplyBulkCString(c,"last-delivered-id"); |
3355 | addReplyStreamID(c,&cg->last_id); |
3356 | |
3357 | /* Group PEL count */ |
3358 | addReplyBulkCString(c,"pel-count"); |
3359 | addReplyLongLong(c,raxSize(cg->pel)); |
3360 | |
3361 | /* Group PEL */ |
3362 | addReplyBulkCString(c,"pending"); |
3363 | long long arraylen_cg_pel = 0; |
3364 | void *arrayptr_cg_pel = addReplyDeferredLen(c); |
3365 | raxIterator ri_cg_pel; |
3366 | raxStart(&ri_cg_pel,cg->pel); |
3367 | raxSeek(&ri_cg_pel,"^",NULL((void*)0),0); |
3368 | while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { |
3369 | streamNACK *nack = ri_cg_pel.data; |
3370 | addReplyArrayLen(c,4); |
3371 | |
3372 | /* Entry ID. */ |
3373 | streamID id; |
3374 | streamDecodeID(ri_cg_pel.key,&id); |
3375 | addReplyStreamID(c,&id); |
3376 | |
3377 | /* Consumer name. */ |
3378 | serverAssert(nack->consumer)((nack->consumer)?(void)0 : (_serverAssert("nack->consumer" ,"t_stream.c",3378),__builtin_unreachable())); /* assertion for valgrind (avoid NPD) */ |
3379 | addReplyBulkCBuffer(c,nack->consumer->name, |
3380 | sdslen(nack->consumer->name)); |
3381 | |
3382 | /* Last delivery. */ |
3383 | addReplyLongLong(c,nack->delivery_time); |
3384 | |
3385 | /* Number of deliveries. */ |
3386 | addReplyLongLong(c,nack->delivery_count); |
3387 | |
3388 | arraylen_cg_pel++; |
3389 | } |
3390 | setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); |
3391 | raxStop(&ri_cg_pel); |
3392 | |
3393 | /* Consumers */ |
3394 | addReplyBulkCString(c,"consumers"); |
3395 | addReplyArrayLen(c,raxSize(cg->consumers)); |
3396 | raxIterator ri_consumers; |
3397 | raxStart(&ri_consumers,cg->consumers); |
3398 | raxSeek(&ri_consumers,"^",NULL((void*)0),0); |
3399 | while(raxNext(&ri_consumers)) { |
3400 | streamConsumer *consumer = ri_consumers.data; |
3401 | addReplyMapLen(c,4); |
3402 | |
3403 | /* Consumer name */ |
3404 | addReplyBulkCString(c,"name"); |
3405 | addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); |
3406 | |
3407 | /* Seen-time */ |
3408 | addReplyBulkCString(c,"seen-time"); |
3409 | addReplyLongLong(c,consumer->seen_time); |
3410 | |
3411 | /* Consumer PEL count */ |
3412 | addReplyBulkCString(c,"pel-count"); |
3413 | addReplyLongLong(c,raxSize(consumer->pel)); |
3414 | |
3415 | /* Consumer PEL */ |
3416 | addReplyBulkCString(c,"pending"); |
3417 | long long arraylen_cpel = 0; |
3418 | void *arrayptr_cpel = addReplyDeferredLen(c); |
3419 | raxIterator ri_cpel; |
3420 | raxStart(&ri_cpel,consumer->pel); |
3421 | raxSeek(&ri_cpel,"^",NULL((void*)0),0); |
3422 | while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { |
3423 | streamNACK *nack = ri_cpel.data; |
3424 | addReplyArrayLen(c,3); |
3425 | |
3426 | /* Entry ID. */ |
3427 | streamID id; |
3428 | streamDecodeID(ri_cpel.key,&id); |
3429 | addReplyStreamID(c,&id); |
3430 | |
3431 | /* Last delivery. */ |
3432 | addReplyLongLong(c,nack->delivery_time); |
3433 | |
3434 | /* Number of deliveries. */ |
3435 | addReplyLongLong(c,nack->delivery_count); |
3436 | |
3437 | arraylen_cpel++; |
3438 | } |
3439 | setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); |
3440 | raxStop(&ri_cpel); |
3441 | } |
3442 | raxStop(&ri_consumers); |
3443 | } |
3444 | raxStop(&ri_cgroups); |
3445 | } |
3446 | } |
3447 | } |
3448 | |
3449 | /* XINFO CONSUMERS <key> <group> |
3450 | * XINFO GROUPS <key> |
3451 | * XINFO STREAM <key> [FULL [COUNT <count>]] |
3452 | * XINFO HELP. */ |
3453 | void xinfoCommand(client *c) { |
3454 | stream *s = NULL((void*)0); |
3455 | char *opt; |
3456 | robj *key; |
3457 | |
3458 | /* HELP is special. Handle it ASAP. */ |
3459 | if (!strcasecmp(c->argv[1]->ptr,"HELP")) { |
3460 | const char *help[] = { |
3461 | "CONSUMERS <key> <groupname>", |
3462 | " Show consumers of <groupname>.", |
3463 | "GROUPS <key>", |
3464 | " Show the stream consumer groups.", |
3465 | "STREAM <key> [FULL [COUNT <count>]", |
3466 | " Show information about the stream.", |
3467 | NULL((void*)0) |
3468 | }; |
3469 | addReplyHelp(c, help); |
3470 | return; |
3471 | } else if (c->argc < 3) { |
3472 | addReplySubcommandSyntaxError(c); |
3473 | return; |
3474 | } |
3475 | |
3476 | /* With the exception of HELP handled before any other sub commands, all |
3477 | * the ones are in the form of "<subcommand> <key>". */ |
3478 | opt = c->argv[1]->ptr; |
3479 | key = c->argv[2]; |
3480 | |
3481 | /* Lookup the key now, this is common for all the subcommands but HELP. */ |
3482 | robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr); |
3483 | if (o == NULL((void*)0) || checkType(c,o,OBJ_STREAM6)) return; |
3484 | s = o->ptr; |
3485 | |
3486 | /* Dispatch the different subcommands. */ |
3487 | if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { |
3488 | /* XINFO CONSUMERS <key> <group>. */ |
3489 | streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); |
3490 | if (cg == NULL((void*)0)) { |
3491 | addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " |
3492 | "for key name '%s'", |
3493 | (char*)c->argv[3]->ptr, (char*)key->ptr); |
3494 | return; |
3495 | } |
3496 | |
3497 | addReplyArrayLen(c,raxSize(cg->consumers)); |
3498 | raxIterator ri; |
3499 | raxStart(&ri,cg->consumers); |
3500 | raxSeek(&ri,"^",NULL((void*)0),0); |
3501 | mstime_t now = mstime(); |
3502 | while(raxNext(&ri)) { |
3503 | streamConsumer *consumer = ri.data; |
3504 | mstime_t idle = now - consumer->seen_time; |
3505 | if (idle < 0) idle = 0; |
3506 | |
3507 | addReplyMapLen(c,3); |
3508 | addReplyBulkCString(c,"name"); |
3509 | addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); |
3510 | addReplyBulkCString(c,"pending"); |
3511 | addReplyLongLong(c,raxSize(consumer->pel)); |
3512 | addReplyBulkCString(c,"idle"); |
3513 | addReplyLongLong(c,idle); |
3514 | } |
3515 | raxStop(&ri); |
3516 | } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { |
3517 | /* XINFO GROUPS <key>. */ |
3518 | if (s->cgroups == NULL((void*)0)) { |
3519 | addReplyArrayLen(c,0); |
3520 | return; |
3521 | } |
3522 | |
3523 | addReplyArrayLen(c,raxSize(s->cgroups)); |
3524 | raxIterator ri; |
3525 | raxStart(&ri,s->cgroups); |
3526 | raxSeek(&ri,"^",NULL((void*)0),0); |
3527 | while(raxNext(&ri)) { |
3528 | streamCG *cg = ri.data; |
3529 | addReplyMapLen(c,4); |
3530 | addReplyBulkCString(c,"name"); |
3531 | addReplyBulkCBuffer(c,ri.key,ri.key_len); |
3532 | addReplyBulkCString(c,"consumers"); |
3533 | addReplyLongLong(c,raxSize(cg->consumers)); |
3534 | addReplyBulkCString(c,"pending"); |
3535 | addReplyLongLong(c,raxSize(cg->pel)); |
3536 | addReplyBulkCString(c,"last-delivered-id"); |
3537 | addReplyStreamID(c,&cg->last_id); |
3538 | } |
3539 | raxStop(&ri); |
3540 | } else if (!strcasecmp(opt,"STREAM")) { |
3541 | /* XINFO STREAM <key> [FULL [COUNT <count>]]. */ |
3542 | xinfoReplyWithStreamInfo(c,s); |
3543 | } else { |
3544 | addReplySubcommandSyntaxError(c); |
3545 | } |
3546 | } |
3547 | |
3548 | /* Validate the integrity stream listpack entries stracture. Both in term of a |
3549 | * valid listpack, but also that the stracture of the entires matches a valid |
3550 | * stream. return 1 if valid 0 if not valid. */ |
3551 | int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { |
3552 | int valid_record; |
3553 | unsigned char *p, *next; |
3554 | |
3555 | /* Since we don't want to run validation of all records twice, we'll |
3556 | * run the listpack validation of just the header and do the rest here. */ |
3557 | if (!lpValidateIntegrity(lp, size, 0)) |
3558 | return 0; |
3559 | |
3560 | /* In non-deep mode we just validated the listpack header (encoded size) */ |
3561 | if (!deep) return 1; |
3562 | |
3563 | next = p = lpFirst(lp); |
3564 | if (!lpValidateNext(lp, &next, size)) return 0; |
3565 | if (!p) return 0; |
3566 | |
3567 | /* entry count */ |
3568 | int64_t entry_count = lpGetIntegerIfValid(p, &valid_record); |
3569 | if (!valid_record) return 0; |
3570 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3571 | |
3572 | /* deleted */ |
3573 | int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record); |
3574 | if (!valid_record) return 0; |
3575 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3576 | |
3577 | /* num-of-fields */ |
3578 | int64_t master_fields = lpGetIntegerIfValid(p, &valid_record); |
3579 | if (!valid_record) return 0; |
3580 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3581 | |
3582 | /* the field names */ |
3583 | for (int64_t j = 0; j < master_fields; j++) { |
3584 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3585 | } |
3586 | |
3587 | /* the zero master entry terminator. */ |
3588 | int64_t zero = lpGetIntegerIfValid(p, &valid_record); |
3589 | if (!valid_record || zero != 0) return 0; |
3590 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3591 | |
3592 | entry_count += deleted_count; |
3593 | while (entry_count--) { |
3594 | if (!p) return 0; |
3595 | int64_t fields = master_fields, extra_fields = 3; |
3596 | int64_t flags = lpGetIntegerIfValid(p, &valid_record); |
3597 | if (!valid_record) return 0; |
3598 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3599 | |
3600 | /* entry id */ |
3601 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
Value stored to 'p' is never read | |
3602 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3603 | |
3604 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) { |
3605 | /* num-of-fields */ |
3606 | fields = lpGetIntegerIfValid(p, &valid_record); |
3607 | if (!valid_record) return 0; |
3608 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3609 | |
3610 | /* the field names */ |
3611 | for (int64_t j = 0; j < fields; j++) { |
3612 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3613 | } |
3614 | |
3615 | extra_fields += fields + 1; |
3616 | } |
3617 | |
3618 | /* the values */ |
3619 | for (int64_t j = 0; j < fields; j++) { |
3620 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3621 | } |
3622 | |
3623 | /* lp-count */ |
3624 | int64_t lp_count = lpGetIntegerIfValid(p, &valid_record); |
3625 | if (!valid_record) return 0; |
3626 | if (lp_count != fields + extra_fields) return 0; |
3627 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; |
3628 | } |
3629 | |
3630 | if (next) |
3631 | return 0; |
3632 | |
3633 | return 1; |
3634 | } |