File: | src/t_stream.c |
Warning: | line 2274, column 40 Access to field 'consumers' results in a dereference of a null pointer (loaded from variable 'cg') |
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
1 | /* | |||
2 | * Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com> | |||
3 | * All rights reserved. | |||
4 | * | |||
5 | * Redistribution and use in source and binary forms, with or without | |||
6 | * modification, are permitted provided that the following conditions are met: | |||
7 | * | |||
8 | * * Redistributions of source code must retain the above copyright notice, | |||
9 | * this list of conditions and the following disclaimer. | |||
10 | * * Redistributions in binary form must reproduce the above copyright | |||
11 | * notice, this list of conditions and the following disclaimer in the | |||
12 | * documentation and/or other materials provided with the distribution. | |||
13 | * * Neither the name of Redis nor the names of its contributors may be used | |||
14 | * to endorse or promote products derived from this software without | |||
15 | * specific prior written permission. | |||
16 | * | |||
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |||
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |||
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |||
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |||
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |||
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |||
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |||
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |||
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |||
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |||
27 | * POSSIBILITY OF SUCH DAMAGE. | |||
28 | */ | |||
29 | ||||
30 | #include "server.h" | |||
31 | #include "endianconv.h" | |||
32 | #include "stream.h" | |||
33 | ||||
34 | /* Every stream item inside the listpack, has a flags field that is used to | |||
35 | * mark the entry as deleted, or having the same field as the "master" | |||
36 | * entry at the start of the listpack> */ | |||
37 | #define STREAM_ITEM_FLAG_NONE0 0 /* No special flags. */ | |||
38 | #define STREAM_ITEM_FLAG_DELETED(1<<0) (1<<0) /* Entry is deleted. Skip it. */ | |||
39 | #define STREAM_ITEM_FLAG_SAMEFIELDS(1<<1) (1<<1) /* Same fields as master entry. */ | |||
40 | ||||
41 | /* For stream commands that require multiple IDs | |||
42 | * when the number of IDs is less than 'STREAMID_STATIC_VECTOR_LEN', | |||
43 | * avoid malloc allocation.*/ | |||
44 | #define STREAMID_STATIC_VECTOR_LEN8 8 | |||
45 | ||||
46 | /* Max pre-allocation for listpack. This is done to avoid abuse of a user | |||
47 | * setting stream_node_max_bytes to a huge number. */ | |||
48 | #define STREAM_LISTPACK_MAX_PRE_ALLOCATE4096 4096 | |||
49 | ||||
50 | void streamFreeCG(streamCG *cg); | |||
51 | void streamFreeNACK(streamNACK *na); | |||
52 | size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); | |||
53 | int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); | |||
54 | int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); | |||
55 | ||||
56 | /* ----------------------------------------------------------------------- | |||
57 | * Low level stream encoding: a radix tree of listpacks. | |||
58 | * ----------------------------------------------------------------------- */ | |||
59 | ||||
60 | /* Create a new stream data structure. */ | |||
61 | stream *streamNew(void) { | |||
62 | stream *s = zmalloc(sizeof(*s)); | |||
63 | s->rax = raxNew(); | |||
64 | s->length = 0; | |||
65 | s->last_id.ms = 0; | |||
66 | s->last_id.seq = 0; | |||
67 | s->cgroups = NULL((void*)0); /* Created on demand to save memory when not used. */ | |||
68 | return s; | |||
69 | } | |||
70 | ||||
71 | /* Free a stream, including the listpacks stored inside the radix tree. */ | |||
72 | void freeStream(stream *s) { | |||
73 | raxFreeWithCallback(s->rax,(void(*)(void*))lpFree); | |||
74 | if (s->cgroups) | |||
75 | raxFreeWithCallback(s->cgroups,(void(*)(void*))streamFreeCG); | |||
76 | zfree(s); | |||
77 | } | |||
78 | ||||
79 | /* Return the length of a stream. */ | |||
80 | unsigned long streamLength(const robj *subject) { | |||
81 | stream *s = subject->ptr; | |||
82 | return s->length; | |||
83 | } | |||
84 | ||||
85 | /* Set 'id' to be its successor stream ID. | |||
86 | * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a | |||
87 | * C_ERR is returned. */ | |||
88 | int streamIncrID(streamID *id) { | |||
89 | int ret = C_OK0; | |||
90 | if (id->seq == UINT64_MAX(18446744073709551615UL)) { | |||
91 | if (id->ms == UINT64_MAX(18446744073709551615UL)) { | |||
92 | /* Special case where 'id' is the last possible streamID... */ | |||
93 | id->ms = id->seq = 0; | |||
94 | ret = C_ERR-1; | |||
95 | } else { | |||
96 | id->ms++; | |||
97 | id->seq = 0; | |||
98 | } | |||
99 | } else { | |||
100 | id->seq++; | |||
101 | } | |||
102 | return ret; | |||
103 | } | |||
104 | ||||
105 | /* Set 'id' to be its predecessor stream ID. | |||
106 | * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is | |||
107 | * returned. */ | |||
108 | int streamDecrID(streamID *id) { | |||
109 | int ret = C_OK0; | |||
110 | if (id->seq == 0) { | |||
111 | if (id->ms == 0) { | |||
112 | /* Special case where 'id' is the first possible streamID... */ | |||
113 | id->ms = id->seq = UINT64_MAX(18446744073709551615UL); | |||
114 | ret = C_ERR-1; | |||
115 | } else { | |||
116 | id->ms--; | |||
117 | id->seq = UINT64_MAX(18446744073709551615UL); | |||
118 | } | |||
119 | } else { | |||
120 | id->seq--; | |||
121 | } | |||
122 | return ret; | |||
123 | } | |||
124 | ||||
125 | /* Generate the next stream item ID given the previous one. If the current | |||
126 | * milliseconds Unix time is greater than the previous one, just use this | |||
127 | * as time part and start with sequence part of zero. Otherwise we use the | |||
128 | * previous time (and never go backward) and increment the sequence. */ | |||
129 | void streamNextID(streamID *last_id, streamID *new_id) { | |||
130 | uint64_t ms = mstime(); | |||
131 | if (ms > last_id->ms) { | |||
132 | new_id->ms = ms; | |||
133 | new_id->seq = 0; | |||
134 | } else { | |||
135 | *new_id = *last_id; | |||
136 | streamIncrID(new_id); | |||
137 | } | |||
138 | } | |||
139 | ||||
140 | /* This is a helper function for the COPY command. | |||
141 | * Duplicate a Stream object, with the guarantee that the returned object | |||
142 | * has the same encoding as the original one. | |||
143 | * | |||
144 | * The resulting object always has refcount set to 1 */ | |||
145 | robj *streamDup(robj *o) { | |||
146 | robj *sobj; | |||
147 | ||||
148 | serverAssert(o->type == OBJ_STREAM)((o->type == 6)?(void)0 : (_serverAssert("o->type == OBJ_STREAM" ,"t_stream.c",148),__builtin_unreachable())); | |||
149 | ||||
150 | switch (o->encoding) { | |||
151 | case OBJ_ENCODING_STREAM10: | |||
152 | sobj = createStreamObject(); | |||
153 | break; | |||
154 | default: | |||
155 | serverPanic("Wrong encoding.")_serverPanic("t_stream.c",155,"Wrong encoding."),__builtin_unreachable (); | |||
156 | break; | |||
157 | } | |||
158 | ||||
159 | stream *s; | |||
160 | stream *new_s; | |||
161 | s = o->ptr; | |||
162 | new_s = sobj->ptr; | |||
163 | ||||
164 | raxIterator ri; | |||
165 | uint64_t rax_key[2]; | |||
166 | raxStart(&ri, s->rax); | |||
167 | raxSeek(&ri, "^", NULL((void*)0), 0); | |||
168 | size_t lp_bytes = 0; /* Total bytes in the listpack. */ | |||
169 | unsigned char *lp = NULL((void*)0); /* listpack pointer. */ | |||
170 | /* Get a reference to the listpack node. */ | |||
171 | while (raxNext(&ri)) { | |||
172 | lp = ri.data; | |||
173 | lp_bytes = lpBytes(lp); | |||
174 | unsigned char *new_lp = zmalloc(lp_bytes); | |||
175 | memcpy(new_lp, lp, lp_bytes); | |||
176 | memcpy(rax_key, ri.key, sizeof(rax_key)); | |||
177 | raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key), | |||
178 | new_lp, NULL((void*)0)); | |||
179 | } | |||
180 | new_s->length = s->length; | |||
181 | new_s->last_id = s->last_id; | |||
182 | raxStop(&ri); | |||
183 | ||||
184 | if (s->cgroups == NULL((void*)0)) return sobj; | |||
185 | ||||
186 | /* Consumer Groups */ | |||
187 | raxIterator ri_cgroups; | |||
188 | raxStart(&ri_cgroups, s->cgroups); | |||
189 | raxSeek(&ri_cgroups, "^", NULL((void*)0), 0); | |||
190 | while (raxNext(&ri_cgroups)) { | |||
191 | streamCG *cg = ri_cgroups.data; | |||
192 | streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key, | |||
193 | ri_cgroups.key_len, &cg->last_id); | |||
194 | ||||
195 | serverAssert(new_cg != NULL)((new_cg != ((void*)0))?(void)0 : (_serverAssert("new_cg != NULL" ,"t_stream.c",195),__builtin_unreachable())); | |||
196 | ||||
197 | /* Consumer Group PEL */ | |||
198 | raxIterator ri_cg_pel; | |||
199 | raxStart(&ri_cg_pel,cg->pel); | |||
200 | raxSeek(&ri_cg_pel,"^",NULL((void*)0),0); | |||
201 | while(raxNext(&ri_cg_pel)){ | |||
202 | streamNACK *nack = ri_cg_pel.data; | |||
203 | streamNACK *new_nack = streamCreateNACK(NULL((void*)0)); | |||
204 | new_nack->delivery_time = nack->delivery_time; | |||
205 | new_nack->delivery_count = nack->delivery_count; | |||
206 | raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL((void*)0)); | |||
207 | } | |||
208 | raxStop(&ri_cg_pel); | |||
209 | ||||
210 | /* Consumers */ | |||
211 | raxIterator ri_consumers; | |||
212 | raxStart(&ri_consumers, cg->consumers); | |||
213 | raxSeek(&ri_consumers, "^", NULL((void*)0), 0); | |||
214 | while (raxNext(&ri_consumers)) { | |||
215 | streamConsumer *consumer = ri_consumers.data; | |||
216 | streamConsumer *new_consumer; | |||
217 | new_consumer = zmalloc(sizeof(*new_consumer)); | |||
218 | new_consumer->name = sdsdup(consumer->name); | |||
219 | new_consumer->pel = raxNew(); | |||
220 | raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, | |||
221 | sdslen(new_consumer->name), new_consumer, NULL((void*)0)); | |||
222 | new_consumer->seen_time = consumer->seen_time; | |||
223 | ||||
224 | /* Consumer PEL */ | |||
225 | raxIterator ri_cpel; | |||
226 | raxStart(&ri_cpel, consumer->pel); | |||
227 | raxSeek(&ri_cpel, "^", NULL((void*)0), 0); | |||
228 | while (raxNext(&ri_cpel)) { | |||
229 | streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID)); | |||
230 | ||||
231 | serverAssert(new_nack != raxNotFound)((new_nack != raxNotFound)?(void)0 : (_serverAssert("new_nack != raxNotFound" ,"t_stream.c",231),__builtin_unreachable())); | |||
232 | ||||
233 | new_nack->consumer = new_consumer; | |||
234 | raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL((void*)0)); | |||
235 | } | |||
236 | raxStop(&ri_cpel); | |||
237 | } | |||
238 | raxStop(&ri_consumers); | |||
239 | } | |||
240 | raxStop(&ri_cgroups); | |||
241 | return sobj; | |||
242 | } | |||
243 | ||||
244 | /* This is just a wrapper for lpAppend() to directly use a 64 bit integer | |||
245 | * instead of a string. */ | |||
246 | unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) { | |||
247 | char buf[LONG_STR_SIZE21]; | |||
248 | int slen = ll2string(buf,sizeof(buf),value); | |||
249 | return lpAppend(lp,(unsigned char*)buf,slen); | |||
250 | } | |||
251 | ||||
252 | /* This is just a wrapper for lpReplace() to directly use a 64 bit integer | |||
253 | * instead of a string to replace the current element. The function returns | |||
254 | * the new listpack as return value, and also updates the current cursor | |||
255 | * by updating '*pos'. */ | |||
256 | unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **pos, int64_t value) { | |||
257 | char buf[LONG_STR_SIZE21]; | |||
258 | int slen = ll2string(buf,sizeof(buf),value); | |||
259 | return lpInsert(lp, (unsigned char*)buf, slen, *pos, LP_REPLACE2, pos); | |||
260 | } | |||
261 | ||||
262 | /* This is a wrapper function for lpGet() to directly get an integer value | |||
263 | * from the listpack (that may store numbers as a string), converting | |||
264 | * the string if needed. | |||
265 | * The 'valid" argument is an optional output parameter to get an indication | |||
266 | * if the record was valid, when this parameter is NULL, the function will | |||
267 | * fail with an assertion. */ | |||
268 | static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) { | |||
269 | int64_t v; | |||
270 | unsigned char *e = lpGet(ele,&v,NULL((void*)0)); | |||
271 | if (e == NULL((void*)0)) { | |||
272 | if (valid) | |||
273 | *valid = 1; | |||
274 | return v; | |||
275 | } | |||
276 | /* The following code path should never be used for how listpacks work: | |||
277 | * they should always be able to store an int64_t value in integer | |||
278 | * encoded form. However the implementation may change. */ | |||
279 | long long ll; | |||
280 | int ret = string2ll((char*)e,v,&ll); | |||
281 | if (valid) | |||
282 | *valid = ret; | |||
283 | else | |||
284 | serverAssert(ret != 0)((ret != 0)?(void)0 : (_serverAssert("ret != 0","t_stream.c", 284),__builtin_unreachable())); | |||
285 | v = ll; | |||
286 | return v; | |||
287 | } | |||
288 | ||||
289 | #define lpGetInteger(ele)lpGetIntegerIfValid(ele, ((void*)0)) lpGetIntegerIfValid(ele, NULL((void*)0)) | |||
290 | ||||
291 | /* Get an edge streamID of a given listpack. | |||
292 | * 'master_id' is an input param, used to build the 'edge_id' output param */ | |||
293 | int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id) | |||
294 | { | |||
295 | if (lp == NULL((void*)0)) | |||
296 | return 0; | |||
297 | ||||
298 | unsigned char *lp_ele; | |||
299 | ||||
300 | /* We need to seek either the first or the last entry depending | |||
301 | * on the direction of the iteration. */ | |||
302 | if (first) { | |||
303 | /* Get the master fields count. */ | |||
304 | lp_ele = lpFirst(lp); /* Seek items count */ | |||
305 | lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */ | |||
306 | lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */ | |||
307 | int64_t master_fields_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); | |||
308 | lp_ele = lpNext(lp, lp_ele); /* Seek first field. */ | |||
309 | ||||
310 | /* If we are iterating in normal order, skip the master fields | |||
311 | * to seek the first actual entry. */ | |||
312 | for (int64_t i = 0; i < master_fields_count; i++) | |||
313 | lp_ele = lpNext(lp, lp_ele); | |||
314 | ||||
315 | /* If we are going forward, skip the previous entry's | |||
316 | * lp-count field (or in case of the master entry, the zero | |||
317 | * term field) */ | |||
318 | lp_ele = lpNext(lp, lp_ele); | |||
319 | if (lp_ele == NULL((void*)0)) | |||
320 | return 0; | |||
321 | } else { | |||
322 | /* If we are iterating in reverse direction, just seek the | |||
323 | * last part of the last entry in the listpack (that is, the | |||
324 | * fields count). */ | |||
325 | lp_ele = lpLast(lp); | |||
326 | ||||
327 | /* If we are going backward, read the number of elements this | |||
328 | * entry is composed of, and jump backward N times to seek | |||
329 | * its start. */ | |||
330 | int64_t lp_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); | |||
331 | if (lp_count == 0) /* We reached the master entry. */ | |||
332 | return 0; | |||
333 | ||||
334 | while (lp_count--) | |||
335 | lp_ele = lpPrev(lp, lp_ele); | |||
336 | } | |||
337 | ||||
338 | lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */ | |||
339 | ||||
340 | /* Get the ID: it is encoded as difference between the master | |||
341 | * ID and this entry ID. */ | |||
342 | streamID id = *master_id; | |||
343 | id.ms += lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); | |||
344 | lp_ele = lpNext(lp, lp_ele); | |||
345 | id.seq += lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); | |||
346 | *edge_id = id; | |||
347 | return 1; | |||
348 | } | |||
349 | ||||
350 | /* Debugging function to log the full content of a listpack. Useful | |||
351 | * for development and debugging. */ | |||
352 | void streamLogListpackContent(unsigned char *lp) { | |||
353 | unsigned char *p = lpFirst(lp); | |||
354 | while(p) { | |||
355 | unsigned char buf[LP_INTBUF_SIZE21]; | |||
356 | int64_t v; | |||
357 | unsigned char *ele = lpGet(p,&v,buf); | |||
358 | serverLog(LL_WARNING3,"- [%d] '%.*s'", (int)v, (int)v, ele); | |||
359 | p = lpNext(lp,p); | |||
360 | } | |||
361 | } | |||
362 | ||||
363 | /* Convert the specified stream entry ID as a 128 bit big endian number, so | |||
364 | * that the IDs can be sorted lexicographically. */ | |||
365 | void streamEncodeID(void *buf, streamID *id) { | |||
366 | uint64_t e[2]; | |||
367 | e[0] = htonu64(id->ms)intrev64(id->ms); | |||
368 | e[1] = htonu64(id->seq)intrev64(id->seq); | |||
369 | memcpy(buf,e,sizeof(e)); | |||
370 | } | |||
371 | ||||
372 | /* This is the reverse of streamEncodeID(): the decoded ID will be stored | |||
373 | * in the 'id' structure passed by reference. The buffer 'buf' must point | |||
374 | * to a 128 bit big-endian encoded ID. */ | |||
375 | void streamDecodeID(void *buf, streamID *id) { | |||
376 | uint64_t e[2]; | |||
377 | memcpy(e,buf,sizeof(e)); | |||
378 | id->ms = ntohu64(e[0])intrev64(e[0]); | |||
379 | id->seq = ntohu64(e[1])intrev64(e[1]); | |||
380 | } | |||
381 | ||||
382 | /* Compare two stream IDs. Return -1 if a < b, 0 if a == b, 1 if a > b. */ | |||
383 | int streamCompareID(streamID *a, streamID *b) { | |||
384 | if (a->ms > b->ms) return 1; | |||
385 | else if (a->ms < b->ms) return -1; | |||
386 | /* The ms part is the same. Check the sequence part. */ | |||
387 | else if (a->seq > b->seq) return 1; | |||
388 | else if (a->seq < b->seq) return -1; | |||
389 | /* Everything is the same: IDs are equal. */ | |||
390 | return 0; | |||
391 | } | |||
392 | ||||
393 | void streamGetEdgeID(stream *s, int first, streamID *edge_id) | |||
394 | { | |||
395 | raxIterator ri; | |||
396 | raxStart(&ri, s->rax); | |||
397 | int empty; | |||
398 | if (first) { | |||
399 | raxSeek(&ri, "^", NULL((void*)0), 0); | |||
400 | empty = !raxNext(&ri); | |||
401 | } else { | |||
402 | raxSeek(&ri, "$", NULL((void*)0), 0); | |||
403 | empty = !raxPrev(&ri); | |||
404 | } | |||
405 | ||||
406 | if (empty) { | |||
407 | /* Stream is empty, mark edge ID as lowest/highest possible. */ | |||
408 | edge_id->ms = first ? UINT64_MAX(18446744073709551615UL) : 0; | |||
409 | edge_id->seq = first ? UINT64_MAX(18446744073709551615UL) : 0; | |||
410 | raxStop(&ri); | |||
411 | return; | |||
412 | } | |||
413 | ||||
414 | unsigned char *lp = ri.data; | |||
415 | ||||
416 | /* Read the master ID from the radix tree key. */ | |||
417 | streamID master_id; | |||
418 | streamDecodeID(ri.key, &master_id); | |||
419 | ||||
420 | /* Construct edge ID. */ | |||
421 | lpGetEdgeStreamID(lp, first, &master_id, edge_id); | |||
422 | ||||
423 | raxStop(&ri); | |||
424 | } | |||
425 | ||||
426 | /* Adds a new item into the stream 's' having the specified number of | |||
427 | * field-value pairs as specified in 'numfields' and stored into 'argv'. | |||
428 | * Returns the new entry ID populating the 'added_id' structure. | |||
429 | * | |||
430 | * If 'use_id' is not NULL, the ID is not auto-generated by the function, | |||
431 | * but instead the passed ID is used to add the new entry. In this case | |||
432 | * adding the entry may fail as specified later in this comment. | |||
433 | * | |||
434 | * The function returns C_OK if the item was added, this is always true | |||
435 | * if the ID was generated by the function. However the function may return | |||
436 | * C_ERR if an ID was given via 'use_id', but adding it failed since the | |||
437 | * current top ID is greater or equal. */ | |||
438 | int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) { | |||
439 | ||||
440 | /* Generate the new entry ID. */ | |||
441 | streamID id; | |||
442 | if (use_id) | |||
443 | id = *use_id; | |||
444 | else | |||
445 | streamNextID(&s->last_id,&id); | |||
446 | ||||
447 | /* Check that the new ID is greater than the last entry ID | |||
448 | * or return an error. Automatically generated IDs might | |||
449 | * overflow (and wrap-around) when incrementing the sequence | |||
450 | part. */ | |||
451 | if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR-1; | |||
452 | ||||
453 | /* Add the new entry. */ | |||
454 | raxIterator ri; | |||
455 | raxStart(&ri,s->rax); | |||
456 | raxSeek(&ri,"$",NULL((void*)0),0); | |||
457 | ||||
458 | size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ | |||
459 | unsigned char *lp = NULL((void*)0); /* Tail listpack pointer. */ | |||
460 | ||||
461 | /* Get a reference to the tail node listpack. */ | |||
462 | if (raxNext(&ri)) { | |||
463 | lp = ri.data; | |||
464 | lp_bytes = lpBytes(lp); | |||
465 | } | |||
466 | raxStop(&ri); | |||
467 | ||||
468 | /* We have to add the key into the radix tree in lexicographic order, | |||
469 | * to do so we consider the ID as a single 128 bit number written in | |||
470 | * big endian, so that the most significant bytes are the first ones. */ | |||
471 | uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ | |||
472 | streamID master_id; /* ID of the master entry in the listpack. */ | |||
473 | ||||
474 | /* Create a new listpack and radix tree node if needed. Note that when | |||
475 | * a new listpack is created, we populate it with a "master entry". This | |||
476 | * is just a set of fields that is taken as references in order to compress | |||
477 | * the stream entries that we'll add inside the listpack. | |||
478 | * | |||
479 | * Note that while we use the first added entry fields to create | |||
480 | * the master entry, the first added entry is NOT represented in the master | |||
481 | * entry, which is a stand alone object. But of course, the first entry | |||
482 | * will compress well because it's used as reference. | |||
483 | * | |||
484 | * The master entry is composed like in the following example: | |||
485 | * | |||
486 | * +-------+---------+------------+---------+--/--+---------+---------+-+ | |||
487 | * | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0| | |||
488 | * +-------+---------+------------+---------+--/--+---------+---------+-+ | |||
489 | * | |||
490 | * count and deleted just represent respectively the total number of | |||
491 | * entries inside the listpack that are valid, and marked as deleted | |||
492 | * (deleted flag in the entry flags set). So the total number of items | |||
493 | * actually inside the listpack (both deleted and not) is count+deleted. | |||
494 | * | |||
495 | * The real entries will be encoded with an ID that is just the | |||
496 | * millisecond and sequence difference compared to the key stored at | |||
497 | * the radix tree node containing the listpack (delta encoding), and | |||
498 | * if the fields of the entry are the same as the master entry fields, the | |||
499 | * entry flags will specify this fact and the entry fields and number | |||
500 | * of fields will be omitted (see later in the code of this function). | |||
501 | * | |||
502 | * The "0" entry at the end is the same as the 'lp-count' entry in the | |||
503 | * regular stream entries (see below), and marks the fact that there are | |||
504 | * no more entries, when we scan the stream from right to left. */ | |||
505 | ||||
506 | /* First of all, check if we can append to the current macro node or | |||
507 | * if we need to switch to the next one. 'lp' will be set to NULL if | |||
508 | * the current node is full. */ | |||
509 | if (lp != NULL((void*)0)) { | |||
510 | if (server.stream_node_max_bytes && | |||
511 | lp_bytes >= server.stream_node_max_bytes) | |||
512 | { | |||
513 | lp = NULL((void*)0); | |||
514 | } else if (server.stream_node_max_entries) { | |||
515 | unsigned char *lp_ele = lpFirst(lp); | |||
516 | /* Count both live entries and deleted ones. */ | |||
517 | int64_t count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)) + lpGetInteger(lpNext(lp,lp_ele))lpGetIntegerIfValid(lpNext(lp,lp_ele), ((void*)0)); | |||
518 | if (count >= server.stream_node_max_entries) { | |||
519 | /* Shrink extra pre-allocated memory */ | |||
520 | lp = lpShrinkToFit(lp); | |||
521 | if (ri.data != lp) | |||
522 | raxInsert(s->rax,ri.key,ri.key_len,lp,NULL((void*)0)); | |||
523 | lp = NULL((void*)0); | |||
524 | } | |||
525 | } | |||
526 | } | |||
527 | ||||
528 | int flags = STREAM_ITEM_FLAG_NONE0; | |||
529 | if (lp == NULL((void*)0)) { | |||
530 | master_id = id; | |||
531 | streamEncodeID(rax_key,&id); | |||
532 | /* Create the listpack having the master entry ID and fields. | |||
533 | * Pre-allocate some bytes when creating listpack to avoid realloc on | |||
534 | * every XADD. Since listpack.c uses malloc_size, it'll grow in steps, | |||
535 | * and won't realloc on every XADD. | |||
536 | * When listpack reaches max number of entries, we'll shrink the | |||
537 | * allocation to fit the data. */ | |||
538 | size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE4096; | |||
539 | if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) { | |||
540 | prealloc = server.stream_node_max_bytes; | |||
541 | } | |||
542 | lp = lpNew(prealloc); | |||
543 | lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ | |||
544 | lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ | |||
545 | lp = lpAppendInteger(lp,numfields); | |||
546 | for (int64_t i = 0; i < numfields; i++) { | |||
547 | sds field = argv[i*2]->ptr; | |||
548 | lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); | |||
549 | } | |||
550 | lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */ | |||
551 | raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL((void*)0)); | |||
552 | /* The first entry we insert, has obviously the same fields of the | |||
553 | * master entry. */ | |||
554 | flags |= STREAM_ITEM_FLAG_SAMEFIELDS(1<<1); | |||
555 | } else { | |||
556 | serverAssert(ri.key_len == sizeof(rax_key))((ri.key_len == sizeof(rax_key))?(void)0 : (_serverAssert("ri.key_len == sizeof(rax_key)" ,"t_stream.c",556),__builtin_unreachable())); | |||
557 | memcpy(rax_key,ri.key,sizeof(rax_key)); | |||
558 | ||||
559 | /* Read the master ID from the radix tree key. */ | |||
560 | streamDecodeID(rax_key,&master_id); | |||
561 | unsigned char *lp_ele = lpFirst(lp); | |||
562 | ||||
563 | /* Update count and skip the deleted fields. */ | |||
564 | int64_t count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); | |||
565 | lp = lpReplaceInteger(lp,&lp_ele,count+1); | |||
566 | lp_ele = lpNext(lp,lp_ele); /* seek deleted. */ | |||
567 | lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */ | |||
568 | ||||
569 | /* Check if the entry we are adding, have the same fields | |||
570 | * as the master entry. */ | |||
571 | int64_t master_fields_count = lpGetInteger(lp_ele)lpGetIntegerIfValid(lp_ele, ((void*)0)); | |||
572 | lp_ele = lpNext(lp,lp_ele); | |||
573 | if (numfields == master_fields_count) { | |||
574 | int64_t i; | |||
575 | for (i = 0; i < master_fields_count; i++) { | |||
576 | sds field = argv[i*2]->ptr; | |||
577 | int64_t e_len; | |||
578 | unsigned char buf[LP_INTBUF_SIZE21]; | |||
579 | unsigned char *e = lpGet(lp_ele,&e_len,buf); | |||
580 | /* Stop if there is a mismatch. */ | |||
581 | if (sdslen(field) != (size_t)e_len || | |||
582 | memcmp(e,field,e_len) != 0) break; | |||
583 | lp_ele = lpNext(lp,lp_ele); | |||
584 | } | |||
585 | /* All fields are the same! We can compress the field names | |||
586 | * setting a single bit in the flags. */ | |||
587 | if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS(1<<1); | |||
588 | } | |||
589 | } | |||
590 | ||||
591 | /* Populate the listpack with the new entry. We use the following | |||
592 | * encoding: | |||
593 | * | |||
594 | * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ | |||
595 | * |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count| | |||
596 | * +-----+--------+----------+-------+-------+-/-+-------+-------+--------+ | |||
597 | * | |||
598 | * However if the SAMEFIELD flag is set, we have just to populate | |||
599 | * the entry with the values, so it becomes: | |||
600 | * | |||
601 | * +-----+--------+-------+-/-+-------+--------+ | |||
602 | * |flags|entry-id|value-1|...|value-N|lp-count| | |||
603 | * +-----+--------+-------+-/-+-------+--------+ | |||
604 | * | |||
605 | * The entry-id field is actually two separated fields: the ms | |||
606 | * and seq difference compared to the master entry. | |||
607 | * | |||
608 | * The lp-count field is a number that states the number of listpack pieces | |||
609 | * that compose the entry, so that it's possible to travel the entry | |||
610 | * in reverse order: we can just start from the end of the listpack, read | |||
611 | * the entry, and jump back N times to seek the "flags" field to read | |||
612 | * the stream full entry. */ | |||
613 | lp = lpAppendInteger(lp,flags); | |||
614 | lp = lpAppendInteger(lp,id.ms - master_id.ms); | |||
615 | lp = lpAppendInteger(lp,id.seq - master_id.seq); | |||
616 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) | |||
617 | lp = lpAppendInteger(lp,numfields); | |||
618 | for (int64_t i = 0; i < numfields; i++) { | |||
619 | sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; | |||
620 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) | |||
621 | lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); | |||
622 | lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); | |||
623 | } | |||
624 | /* Compute and store the lp-count field. */ | |||
625 | int64_t lp_count = numfields; | |||
626 | lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */ | |||
627 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) { | |||
628 | /* If the item is not compressed, it also has the fields other than | |||
629 | * the values, and an additional num-fileds field. */ | |||
630 | lp_count += numfields+1; | |||
631 | } | |||
632 | lp = lpAppendInteger(lp,lp_count); | |||
633 | ||||
634 | /* Insert back into the tree in order to update the listpack pointer. */ | |||
635 | if (ri.data != lp) | |||
636 | raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL((void*)0)); | |||
637 | s->length++; | |||
638 | s->last_id = id; | |||
639 | if (added_id) *added_id = id; | |||
640 | return C_OK0; | |||
641 | } | |||
642 | ||||
643 | typedef struct { | |||
644 | /* XADD options */ | |||
645 | streamID id; /* User-provided ID, for XADD only. */ | |||
646 | int id_given; /* Was an ID different than "*" specified? for XADD only. */ | |||
647 | int no_mkstream; /* if set to 1 do not create new stream */ | |||
648 | ||||
649 | /* XADD + XTRIM common options */ | |||
650 | int trim_strategy; /* TRIM_STRATEGY_* */ | |||
651 | int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ | |||
652 | int approx_trim; /* If 1 only delete whole radix tree nodes, so | |||
653 | * the trim argument is not applied verbatim. */ | |||
654 | long long limit; /* Maximum amount of entries to trim. If 0, no limitation | |||
655 | * on the amount of trimming work is enforced. */ | |||
656 | /* TRIM_STRATEGY_MAXLEN options */ | |||
657 | long long maxlen; /* After trimming, leave stream at this length . */ | |||
658 | /* TRIM_STRATEGY_MINID options */ | |||
659 | streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ | |||
660 | } streamAddTrimArgs; | |||
661 | ||||
662 | #define TRIM_STRATEGY_NONE0 0 | |||
663 | #define TRIM_STRATEGY_MAXLEN1 1 | |||
664 | #define TRIM_STRATEGY_MINID2 2 | |||
665 | ||||
666 | /* Trim the stream 's' according to args->trim_strategy, and return the | |||
667 | * number of elements removed from the stream. The 'approx' option, if non-zero, | |||
668 | * specifies that the trimming must be performed in a approximated way in | |||
669 | * order to maximize performances. This means that the stream may contain | |||
670 | * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen' | |||
671 | * in case of MAXLEN), and elements are only removed if we can remove | |||
672 | * a *whole* node of the radix tree. The elements are removed from the head | |||
673 | * of the stream (older elements). | |||
674 | * | |||
675 | * The function may return zero if: | |||
676 | * | |||
677 | * 1) The minimal entry ID of the stream is already < 'id' (MINID); or | |||
678 | * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or | |||
679 | * 3) The 'approx' option is true and the head node did not have enough elements | |||
680 | * to be deleted. | |||
681 | * | |||
682 | * args->limit is the maximum number of entries to delete. The purpose is to | |||
683 | * prevent this function from taking to long. | |||
684 | * If 'limit' is 0 then we do not limit the number of deleted entries. | |||
685 | * Much like the 'approx', if 'limit' is smaller than the number of entries | |||
686 | * that should be trimmed, there is a chance we will still have entries with | |||
687 | * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). | |||
688 | */ | |||
689 | int64_t streamTrim(stream *s, streamAddTrimArgs *args) { | |||
690 | size_t maxlen = args->maxlen; | |||
691 | streamID *id = &args->minid; | |||
692 | int approx = args->approx_trim; | |||
693 | int64_t limit = args->limit; | |||
694 | int trim_strategy = args->trim_strategy; | |||
695 | ||||
696 | if (trim_strategy == TRIM_STRATEGY_NONE0) | |||
697 | return 0; | |||
698 | ||||
699 | raxIterator ri; | |||
700 | raxStart(&ri,s->rax); | |||
701 | raxSeek(&ri,"^",NULL((void*)0),0); | |||
702 | ||||
703 | int64_t deleted = 0; | |||
704 | while (raxNext(&ri)) { | |||
705 | /* Check if we exceeded the amount of work we could do */ | |||
706 | if (limit && deleted >= limit) | |||
707 | break; | |||
708 | ||||
709 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1 && s->length <= maxlen) | |||
710 | break; | |||
711 | ||||
712 | unsigned char *lp = ri.data, *p = lpFirst(lp); | |||
713 | int64_t entries = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
714 | ||||
715 | /* Check if we can remove the whole node. */ | |||
716 | int remove_node; | |||
717 | streamID master_id = {0}; /* For MINID */ | |||
718 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1) { | |||
719 | remove_node = s->length - entries >= maxlen; | |||
720 | } else { | |||
721 | /* Read the master ID from the radix tree key. */ | |||
722 | streamDecodeID(ri.key, &master_id); | |||
723 | ||||
724 | /* Read last ID. */ | |||
725 | streamID last_id; | |||
726 | lpGetEdgeStreamID(lp, 0, &master_id, &last_id); | |||
727 | ||||
728 | /* We can remove the entire node id its last ID < 'id' */ | |||
729 | remove_node = streamCompareID(&last_id, id) < 0; | |||
730 | } | |||
731 | ||||
732 | if (remove_node) { | |||
733 | lpFree(lp); | |||
734 | raxRemove(s->rax,ri.key,ri.key_len,NULL((void*)0)); | |||
735 | raxSeek(&ri,">=",ri.key,ri.key_len); | |||
736 | s->length -= entries; | |||
737 | deleted += entries; | |||
738 | continue; | |||
739 | } | |||
740 | ||||
741 | /* If we cannot remove a whole element, and approx is true, | |||
742 | * stop here. */ | |||
743 | if (approx) break; | |||
744 | ||||
745 | /* Now we have to trim entries from within 'lp' */ | |||
746 | int64_t deleted_from_lp = 0; | |||
747 | ||||
748 | p = lpNext(lp, p); /* Skip deleted field. */ | |||
749 | p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */ | |||
750 | ||||
751 | /* Skip all the master fields. */ | |||
752 | int64_t master_fields_count = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
753 | p = lpNext(lp,p); /* Skip the first field. */ | |||
754 | for (int64_t j = 0; j < master_fields_count; j++) | |||
755 | p = lpNext(lp,p); /* Skip all master fields. */ | |||
756 | p = lpNext(lp,p); /* Skip the zero master entry terminator. */ | |||
757 | ||||
758 | /* 'p' is now pointing to the first entry inside the listpack. | |||
759 | * We have to run entry after entry, marking entries as deleted | |||
760 | * if they are already not deleted. */ | |||
761 | while (p) { | |||
762 | /* We keep a copy of p (which point to flags part) in order to | |||
763 | * update it after (and if) we actually remove the entry */ | |||
764 | unsigned char *pcopy = p; | |||
765 | ||||
766 | int flags = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
767 | p = lpNext(lp, p); /* Skip flags. */ | |||
768 | int to_skip; | |||
769 | ||||
770 | int ms_delta = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
771 | p = lpNext(lp, p); /* Skip ID ms delta */ | |||
772 | int seq_delta = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
773 | p = lpNext(lp, p); /* Skip ID seq delta */ | |||
774 | ||||
775 | streamID currid = {0}; /* For MINID */ | |||
776 | if (trim_strategy == TRIM_STRATEGY_MINID2) { | |||
777 | currid.ms = master_id.ms + ms_delta; | |||
778 | currid.seq = master_id.seq + seq_delta; | |||
779 | } | |||
780 | ||||
781 | int stop; | |||
782 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1) { | |||
783 | stop = s->length <= maxlen; | |||
784 | } else { | |||
785 | /* Following IDs will definitely be greater because the rax | |||
786 | * tree is sorted, no point of continuing. */ | |||
787 | stop = streamCompareID(&currid, id) >= 0; | |||
788 | } | |||
789 | if (stop) | |||
790 | break; | |||
791 | ||||
792 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) { | |||
793 | to_skip = master_fields_count; | |||
794 | } else { | |||
795 | to_skip = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); /* Get num-fields. */ | |||
796 | p = lpNext(lp,p); /* Skip num-fields. */ | |||
797 | to_skip *= 2; /* Fields and values. */ | |||
798 | } | |||
799 | ||||
800 | while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ | |||
801 | p = lpNext(lp,p); /* Skip the final lp-count field. */ | |||
802 | ||||
803 | /* Mark the entry as deleted. */ | |||
804 | if (!(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) { | |||
805 | intptr_t delta = p - lp; | |||
806 | flags |= STREAM_ITEM_FLAG_DELETED(1<<0); | |||
807 | lp = lpReplaceInteger(lp, &pcopy, flags); | |||
808 | deleted_from_lp++; | |||
809 | s->length--; | |||
810 | p = lp + delta; | |||
811 | } | |||
812 | } | |||
813 | deleted += deleted_from_lp; | |||
814 | ||||
815 | /* Now we the entries/deleted counters. */ | |||
816 | p = lpFirst(lp); | |||
817 | lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); | |||
818 | p = lpNext(lp,p); /* Skip deleted field. */ | |||
819 | int64_t marked_deleted = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
820 | lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp); | |||
821 | p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */ | |||
822 | ||||
823 | /* Here we should perform garbage collection in case at this point | |||
824 | * there are too many entries deleted inside the listpack. */ | |||
825 | entries -= deleted_from_lp; | |||
826 | marked_deleted += deleted_from_lp; | |||
827 | if (entries + marked_deleted > 10 && marked_deleted > entries/2) { | |||
828 | /* TODO: perform a garbage collection. */ | |||
829 | } | |||
830 | ||||
831 | /* Update the listpack with the new pointer. */ | |||
832 | raxInsert(s->rax,ri.key,ri.key_len,lp,NULL((void*)0)); | |||
833 | ||||
834 | break; /* If we are here, there was enough to delete in the current | |||
835 | node, so no need to go to the next node. */ | |||
836 | } | |||
837 | ||||
838 | raxStop(&ri); | |||
839 | return deleted; | |||
840 | } | |||
841 | ||||
842 | /* Trims a stream by length. Returns the number of deleted items. */ | |||
843 | int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { | |||
844 | streamAddTrimArgs args = { | |||
845 | .trim_strategy = TRIM_STRATEGY_MAXLEN1, | |||
846 | .approx_trim = approx, | |||
847 | .limit = approx ? 100 * server.stream_node_max_entries : 0, | |||
848 | .maxlen = maxlen | |||
849 | }; | |||
850 | return streamTrim(s, &args); | |||
851 | } | |||
852 | ||||
853 | /* Trims a stream by minimum ID. Returns the number of deleted items. */ | |||
854 | int64_t streamTrimByID(stream *s, streamID minid, int approx) { | |||
855 | streamAddTrimArgs args = { | |||
856 | .trim_strategy = TRIM_STRATEGY_MINID2, | |||
857 | .approx_trim = approx, | |||
858 | .limit = approx ? 100 * server.stream_node_max_entries : 0, | |||
859 | .minid = minid | |||
860 | }; | |||
861 | return streamTrim(s, &args); | |||
862 | } | |||
863 | ||||
864 | /* Parse the arguements of XADD/XTRIM. | |||
865 | * | |||
866 | * See streamAddTrimArgs for more details about the arguments handled. | |||
867 | * | |||
868 | * This function returns the position of the ID argument (relevant only to XADD). | |||
869 | * On error -1 is returned and a reply is sent. */ | |||
870 | static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) { | |||
871 | /* Initialize arguments to defaults */ | |||
872 | memset(args, 0, sizeof(*args)); | |||
873 | ||||
874 | /* Parse options. */ | |||
875 | int i = 2; /* This is the first argument position where we could | |||
876 | find an option, or the ID. */ | |||
877 | int limit_given = 0; | |||
878 | for (; i < c->argc; i++) { | |||
879 | int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ | |||
880 | char *opt = c->argv[i]->ptr; | |||
881 | if (xadd && opt[0] == '*' && opt[1] == '\0') { | |||
882 | /* This is just a fast path for the common case of auto-ID | |||
883 | * creation. */ | |||
884 | break; | |||
885 | } else if (!strcasecmp(opt,"maxlen") && moreargs) { | |||
886 | if (args->trim_strategy != TRIM_STRATEGY_NONE0) { | |||
887 | addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); | |||
888 | return -1; | |||
889 | } | |||
890 | args->approx_trim = 0; | |||
891 | char *next = c->argv[i+1]->ptr; | |||
892 | /* Check for the form MAXLEN ~ <count>. */ | |||
893 | if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { | |||
894 | args->approx_trim = 1; | |||
895 | i++; | |||
896 | } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { | |||
897 | i++; | |||
898 | } | |||
899 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL((void*)0)) | |||
900 | != C_OK0) return -1; | |||
901 | ||||
902 | if (args->maxlen < 0) { | |||
903 | addReplyError(c,"The MAXLEN argument must be >= 0."); | |||
904 | return -1; | |||
905 | } | |||
906 | i++; | |||
907 | args->trim_strategy = TRIM_STRATEGY_MAXLEN1; | |||
908 | args->trim_strategy_arg_idx = i; | |||
909 | } else if (!strcasecmp(opt,"minid") && moreargs) { | |||
910 | if (args->trim_strategy != TRIM_STRATEGY_NONE0) { | |||
911 | addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); | |||
912 | return -1; | |||
913 | } | |||
914 | args->approx_trim = 0; | |||
915 | char *next = c->argv[i+1]->ptr; | |||
916 | /* Check for the form MINID ~ <id>|<age>. */ | |||
917 | if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { | |||
918 | args->approx_trim = 1; | |||
919 | i++; | |||
920 | } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { | |||
921 | i++; | |||
922 | } | |||
923 | ||||
924 | if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK0) | |||
925 | return -1; | |||
926 | ||||
927 | i++; | |||
928 | args->trim_strategy = TRIM_STRATEGY_MINID2; | |||
929 | args->trim_strategy_arg_idx = i; | |||
930 | } else if (!strcasecmp(opt,"limit") && moreargs) { | |||
931 | /* Note about LIMIT: If it was not provided by the caller we set | |||
932 | * it to 100*server.stream_node_max_entries, and that's to prevent the | |||
933 | * trimming from taking too long, on the expense of not deleting entries | |||
934 | * that should be trimmed. | |||
935 | * If user wanted exact trimming (i.e. no '~') we never limit the number | |||
936 | * of trimmed entries */ | |||
937 | if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL((void*)0)) != C_OK0) | |||
938 | return -1; | |||
939 | ||||
940 | if (args->limit < 0) { | |||
941 | addReplyError(c,"The LIMIT argument must be >= 0."); | |||
942 | return -1; | |||
943 | } | |||
944 | limit_given = 1; | |||
945 | i++; | |||
946 | } else if (xadd && !strcasecmp(opt,"nomkstream")) { | |||
947 | args->no_mkstream = 1; | |||
948 | } else if (xadd) { | |||
949 | /* If we are here is a syntax error or a valid ID. */ | |||
950 | if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK0) | |||
951 | return -1; | |||
952 | args->id_given = 1; | |||
953 | break; | |||
954 | } else { | |||
955 | addReplyErrorObject(c,shared.syntaxerr); | |||
956 | return -1; | |||
957 | } | |||
958 | } | |||
959 | ||||
960 | if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE0) { | |||
961 | addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy"); | |||
962 | return -1; | |||
963 | } | |||
964 | ||||
965 | if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE0) { | |||
966 | addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy"); | |||
967 | return -1; | |||
968 | } | |||
969 | ||||
970 | if (c == server.master || c->id == CLIENT_ID_AOF((18446744073709551615UL))) { | |||
971 | /* If command cam from master or from AOF we must not enforce maxnodes | |||
972 | * (The maxlen/minid argument was re-written to make sure there's no | |||
973 | * inconsistency). */ | |||
974 | args->limit = 0; | |||
975 | } else { | |||
976 | /* We need to set the limit (only if we got '~') */ | |||
977 | if (limit_given) { | |||
978 | if (!args->approx_trim) { | |||
979 | /* LIMIT was provided without ~ */ | |||
980 | addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option"); | |||
981 | return -1; | |||
982 | } | |||
983 | } else { | |||
984 | /* User didn't provide LIMIT, we must set it. */ | |||
985 | ||||
986 | if (args->approx_trim) { | |||
987 | /* In order to prevent from trimming to do too much work and cause | |||
988 | * latency spikes we limit the amount of work it can do */ | |||
989 | args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ | |||
990 | } else { | |||
991 | /* No LIMIT for exact trimming */ | |||
992 | args->limit = 0; | |||
993 | } | |||
994 | } | |||
995 | } | |||
996 | ||||
997 | return i; | |||
998 | } | |||
999 | ||||
1000 | /* Initialize the stream iterator, so that we can call iterating functions | |||
1001 | * to get the next items. This requires a corresponding streamIteratorStop() | |||
1002 | * at the end. The 'rev' parameter controls the direction. If it's zero the | |||
1003 | * iteration is from the start to the end element (inclusive), otherwise | |||
1004 | * if rev is non-zero, the iteration is reversed. | |||
1005 | * | |||
1006 | * Once the iterator is initialized, we iterate like this: | |||
1007 | * | |||
1008 | * streamIterator myiterator; | |||
1009 | * streamIteratorStart(&myiterator,...); | |||
1010 | * int64_t numfields; | |||
1011 | * while(streamIteratorGetID(&myiterator,&ID,&numfields)) { | |||
1012 | * while(numfields--) { | |||
1013 | * unsigned char *key, *value; | |||
1014 | * size_t key_len, value_len; | |||
1015 | * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); | |||
1016 | * | |||
1017 | * ... do what you want with key and value ... | |||
1018 | * } | |||
1019 | * } | |||
1020 | * streamIteratorStop(&myiterator); */ | |||
1021 | void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { | |||
1022 | /* Initialize the iterator and translates the iteration start/stop | |||
1023 | * elements into a 128 big big-endian number. */ | |||
1024 | if (start) { | |||
1025 | streamEncodeID(si->start_key,start); | |||
1026 | } else { | |||
1027 | si->start_key[0] = 0; | |||
1028 | si->start_key[1] = 0; | |||
1029 | } | |||
1030 | ||||
1031 | if (end) { | |||
1032 | streamEncodeID(si->end_key,end); | |||
1033 | } else { | |||
1034 | si->end_key[0] = UINT64_MAX(18446744073709551615UL); | |||
1035 | si->end_key[1] = UINT64_MAX(18446744073709551615UL); | |||
1036 | } | |||
1037 | ||||
1038 | /* Seek the correct node in the radix tree. */ | |||
1039 | raxStart(&si->ri,s->rax); | |||
1040 | if (!rev) { | |||
1041 | if (start && (start->ms || start->seq)) { | |||
1042 | raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, | |||
1043 | sizeof(si->start_key)); | |||
1044 | if (raxEOF(&si->ri)) raxSeek(&si->ri,"^",NULL((void*)0),0); | |||
1045 | } else { | |||
1046 | raxSeek(&si->ri,"^",NULL((void*)0),0); | |||
1047 | } | |||
1048 | } else { | |||
1049 | if (end && (end->ms || end->seq)) { | |||
1050 | raxSeek(&si->ri,"<=",(unsigned char*)si->end_key, | |||
1051 | sizeof(si->end_key)); | |||
1052 | if (raxEOF(&si->ri)) raxSeek(&si->ri,"$",NULL((void*)0),0); | |||
1053 | } else { | |||
1054 | raxSeek(&si->ri,"$",NULL((void*)0),0); | |||
1055 | } | |||
1056 | } | |||
1057 | si->stream = s; | |||
1058 | si->lp = NULL((void*)0); /* There is no current listpack right now. */ | |||
1059 | si->lp_ele = NULL((void*)0); /* Current listpack cursor. */ | |||
1060 | si->rev = rev; /* Direction, if non-zero reversed, from end to start. */ | |||
1061 | } | |||
1062 | ||||
1063 | /* Return 1 and store the current item ID at 'id' if there are still | |||
1064 | * elements within the iteration range, otherwise return 0 in order to | |||
1065 | * signal the iteration terminated. */ | |||
1066 | int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { | |||
1067 | while(1) { /* Will stop when element > stop_key or end of radix tree. */ | |||
1068 | /* If the current listpack is set to NULL, this is the start of the | |||
1069 | * iteration or the previous listpack was completely iterated. | |||
1070 | * Go to the next node. */ | |||
1071 | if (si->lp == NULL((void*)0) || si->lp_ele == NULL((void*)0)) { | |||
1072 | if (!si->rev && !raxNext(&si->ri)) return 0; | |||
1073 | else if (si->rev && !raxPrev(&si->ri)) return 0; | |||
1074 | serverAssert(si->ri.key_len == sizeof(streamID))((si->ri.key_len == sizeof(streamID))?(void)0 : (_serverAssert ("si->ri.key_len == sizeof(streamID)","t_stream.c",1074),__builtin_unreachable ())); | |||
1075 | /* Get the master ID. */ | |||
1076 | streamDecodeID(si->ri.key,&si->master_id); | |||
1077 | /* Get the master fields count. */ | |||
1078 | si->lp = si->ri.data; | |||
1079 | si->lp_ele = lpFirst(si->lp); /* Seek items count */ | |||
1080 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek deleted count. */ | |||
1081 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek num fields. */ | |||
1082 | si->master_fields_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1083 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek first field. */ | |||
1084 | si->master_fields_start = si->lp_ele; | |||
1085 | /* We are now pointing to the first field of the master entry. | |||
1086 | * We need to seek either the first or the last entry depending | |||
1087 | * on the direction of the iteration. */ | |||
1088 | if (!si->rev) { | |||
1089 | /* If we are iterating in normal order, skip the master fields | |||
1090 | * to seek the first actual entry. */ | |||
1091 | for (uint64_t i = 0; i < si->master_fields_count; i++) | |||
1092 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1093 | } else { | |||
1094 | /* If we are iterating in reverse direction, just seek the | |||
1095 | * last part of the last entry in the listpack (that is, the | |||
1096 | * fields count). */ | |||
1097 | si->lp_ele = lpLast(si->lp); | |||
1098 | } | |||
1099 | } else if (si->rev) { | |||
1100 | /* If we are iterating in the reverse order, and this is not | |||
1101 | * the first entry emitted for this listpack, then we already | |||
1102 | * emitted the current entry, and have to go back to the previous | |||
1103 | * one. */ | |||
1104 | int lp_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1105 | while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); | |||
1106 | /* Seek lp-count of prev entry. */ | |||
1107 | si->lp_ele = lpPrev(si->lp,si->lp_ele); | |||
1108 | } | |||
1109 | ||||
1110 | /* For every radix tree node, iterate the corresponding listpack, | |||
1111 | * returning elements when they are within range. */ | |||
1112 | while(1) { | |||
1113 | if (!si->rev) { | |||
1114 | /* If we are going forward, skip the previous entry | |||
1115 | * lp-count field (or in case of the master entry, the zero | |||
1116 | * term field) */ | |||
1117 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1118 | if (si->lp_ele == NULL((void*)0)) break; | |||
1119 | } else { | |||
1120 | /* If we are going backward, read the number of elements this | |||
1121 | * entry is composed of, and jump backward N times to seek | |||
1122 | * its start. */ | |||
1123 | int64_t lp_count = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1124 | if (lp_count == 0) { /* We reached the master entry. */ | |||
1125 | si->lp = NULL((void*)0); | |||
1126 | si->lp_ele = NULL((void*)0); | |||
1127 | break; | |||
1128 | } | |||
1129 | while(lp_count--) si->lp_ele = lpPrev(si->lp,si->lp_ele); | |||
1130 | } | |||
1131 | ||||
1132 | /* Get the flags entry. */ | |||
1133 | si->lp_flags = si->lp_ele; | |||
1134 | int flags = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1135 | si->lp_ele = lpNext(si->lp,si->lp_ele); /* Seek ID. */ | |||
1136 | ||||
1137 | /* Get the ID: it is encoded as difference between the master | |||
1138 | * ID and this entry ID. */ | |||
1139 | *id = si->master_id; | |||
1140 | id->ms += lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1141 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1142 | id->seq += lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1143 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1144 | unsigned char buf[sizeof(streamID)]; | |||
1145 | streamEncodeID(buf,id); | |||
1146 | ||||
1147 | /* The number of entries is here or not depending on the | |||
1148 | * flags. */ | |||
1149 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) { | |||
1150 | *numfields = si->master_fields_count; | |||
1151 | } else { | |||
1152 | *numfields = lpGetInteger(si->lp_ele)lpGetIntegerIfValid(si->lp_ele, ((void*)0)); | |||
1153 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1154 | } | |||
1155 | serverAssert(*numfields>=0)((*numfields>=0)?(void)0 : (_serverAssert("*numfields>=0" ,"t_stream.c",1155),__builtin_unreachable())); | |||
1156 | ||||
1157 | /* If current >= start, and the entry is not marked as | |||
1158 | * deleted, emit it. */ | |||
1159 | if (!si->rev) { | |||
1160 | if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 && | |||
1161 | !(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) | |||
1162 | { | |||
1163 | if (memcmp(buf,si->end_key,sizeof(streamID)) > 0) | |||
1164 | return 0; /* We are already out of range. */ | |||
1165 | si->entry_flags = flags; | |||
1166 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) | |||
1167 | si->master_fields_ptr = si->master_fields_start; | |||
1168 | return 1; /* Valid item returned. */ | |||
1169 | } | |||
1170 | } else { | |||
1171 | if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 && | |||
1172 | !(flags & STREAM_ITEM_FLAG_DELETED(1<<0))) | |||
1173 | { | |||
1174 | if (memcmp(buf,si->start_key,sizeof(streamID)) < 0) | |||
1175 | return 0; /* We are already out of range. */ | |||
1176 | si->entry_flags = flags; | |||
1177 | if (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) | |||
1178 | si->master_fields_ptr = si->master_fields_start; | |||
1179 | return 1; /* Valid item returned. */ | |||
1180 | } | |||
1181 | } | |||
1182 | ||||
1183 | /* If we do not emit, we have to discard if we are going | |||
1184 | * forward, or seek the previous entry if we are going | |||
1185 | * backward. */ | |||
1186 | if (!si->rev) { | |||
1187 | int64_t to_discard = (flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) ? | |||
1188 | *numfields : *numfields*2; | |||
1189 | for (int64_t i = 0; i < to_discard; i++) | |||
1190 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1191 | } else { | |||
1192 | int64_t prev_times = 4; /* flag + id ms + id seq + one more to | |||
1193 | go back to the previous entry "count" | |||
1194 | field. */ | |||
1195 | /* If the entry was not flagged SAMEFIELD we also read the | |||
1196 | * number of fields, so go back one more. */ | |||
1197 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) prev_times++; | |||
1198 | while(prev_times--) si->lp_ele = lpPrev(si->lp,si->lp_ele); | |||
1199 | } | |||
1200 | } | |||
1201 | ||||
1202 | /* End of listpack reached. Try the next/prev radix tree node. */ | |||
1203 | } | |||
1204 | } | |||
1205 | ||||
1206 | /* Get the field and value of the current item we are iterating. This should | |||
1207 | * be called immediately after streamIteratorGetID(), and for each field | |||
1208 | * according to the number of fields returned by streamIteratorGetID(). | |||
1209 | * The function populates the field and value pointers and the corresponding | |||
1210 | * lengths by reference, that are valid until the next iterator call, assuming | |||
1211 | * no one touches the stream meanwhile. */ | |||
1212 | void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen) { | |||
1213 | if (si->entry_flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1)) { | |||
1214 | *fieldptr = lpGet(si->master_fields_ptr,fieldlen,si->field_buf); | |||
1215 | si->master_fields_ptr = lpNext(si->lp,si->master_fields_ptr); | |||
1216 | } else { | |||
1217 | *fieldptr = lpGet(si->lp_ele,fieldlen,si->field_buf); | |||
1218 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1219 | } | |||
1220 | *valueptr = lpGet(si->lp_ele,valuelen,si->value_buf); | |||
1221 | si->lp_ele = lpNext(si->lp,si->lp_ele); | |||
1222 | } | |||
1223 | ||||
1224 | /* Remove the current entry from the stream: can be called after the | |||
1225 | * GetID() API or after any GetField() call, however we need to iterate | |||
1226 | * a valid entry while calling this function. Moreover the function | |||
1227 | * requires the entry ID we are currently iterating, that was previously | |||
1228 | * returned by GetID(). | |||
1229 | * | |||
1230 | * Note that after calling this function, next calls to GetField() can't | |||
1231 | * be performed: the entry is now deleted. Instead the iterator will | |||
1232 | * automatically re-seek to the next entry, so the caller should continue | |||
1233 | * with GetID(). */ | |||
1234 | void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { | |||
1235 | unsigned char *lp = si->lp; | |||
1236 | int64_t aux; | |||
1237 | ||||
1238 | /* We do not really delete the entry here. Instead we mark it as | |||
1239 | * deleted flagging it, and also incrementing the count of the | |||
1240 | * deleted entries in the listpack header. | |||
1241 | * | |||
1242 | * We start flagging: */ | |||
1243 | int flags = lpGetInteger(si->lp_flags)lpGetIntegerIfValid(si->lp_flags, ((void*)0)); | |||
1244 | flags |= STREAM_ITEM_FLAG_DELETED(1<<0); | |||
1245 | lp = lpReplaceInteger(lp,&si->lp_flags,flags); | |||
1246 | ||||
1247 | /* Change the valid/deleted entries count in the master entry. */ | |||
1248 | unsigned char *p = lpFirst(lp); | |||
1249 | aux = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
1250 | ||||
1251 | if (aux == 1) { | |||
1252 | /* If this is the last element in the listpack, we can remove the whole | |||
1253 | * node. */ | |||
1254 | lpFree(lp); | |||
1255 | raxRemove(si->stream->rax,si->ri.key,si->ri.key_len,NULL((void*)0)); | |||
1256 | } else { | |||
1257 | /* In the base case we alter the counters of valid/deleted entries. */ | |||
1258 | lp = lpReplaceInteger(lp,&p,aux-1); | |||
1259 | p = lpNext(lp,p); /* Seek deleted field. */ | |||
1260 | aux = lpGetInteger(p)lpGetIntegerIfValid(p, ((void*)0)); | |||
1261 | lp = lpReplaceInteger(lp,&p,aux+1); | |||
1262 | ||||
1263 | /* Update the listpack with the new pointer. */ | |||
1264 | if (si->lp != lp) | |||
1265 | raxInsert(si->stream->rax,si->ri.key,si->ri.key_len,lp,NULL((void*)0)); | |||
1266 | } | |||
1267 | ||||
1268 | /* Update the number of entries counter. */ | |||
1269 | si->stream->length--; | |||
1270 | ||||
1271 | /* Re-seek the iterator to fix the now messed up state. */ | |||
1272 | streamID start, end; | |||
1273 | if (si->rev) { | |||
1274 | streamDecodeID(si->start_key,&start); | |||
1275 | end = *current; | |||
1276 | } else { | |||
1277 | start = *current; | |||
1278 | streamDecodeID(si->end_key,&end); | |||
1279 | } | |||
1280 | streamIteratorStop(si); | |||
1281 | streamIteratorStart(si,si->stream,&start,&end,si->rev); | |||
1282 | ||||
1283 | /* TODO: perform a garbage collection here if the ration between | |||
1284 | * deleted and valid goes over a certain limit. */ | |||
1285 | } | |||
1286 | ||||
1287 | /* Stop the stream iterator. The only cleanup we need is to free the rax | |||
1288 | * iterator, since the stream iterator itself is supposed to be stack | |||
1289 | * allocated. */ | |||
1290 | void streamIteratorStop(streamIterator *si) { | |||
1291 | raxStop(&si->ri); | |||
1292 | } | |||
1293 | ||||
1294 | /* Delete the specified item ID from the stream, returning 1 if the item | |||
1295 | * was deleted 0 otherwise (if it does not exist). */ | |||
1296 | int streamDeleteItem(stream *s, streamID *id) { | |||
1297 | int deleted = 0; | |||
1298 | streamIterator si; | |||
1299 | streamIteratorStart(&si,s,id,id,0); | |||
1300 | streamID myid; | |||
1301 | int64_t numfields; | |||
1302 | if (streamIteratorGetID(&si,&myid,&numfields)) { | |||
1303 | streamIteratorRemoveEntry(&si,&myid); | |||
1304 | deleted = 1; | |||
1305 | } | |||
1306 | streamIteratorStop(&si); | |||
1307 | return deleted; | |||
1308 | } | |||
1309 | ||||
1310 | /* Get the last valid (non-tombstone) streamID of 's'. */ | |||
1311 | void streamLastValidID(stream *s, streamID *maxid) | |||
1312 | { | |||
1313 | streamIterator si; | |||
1314 | streamIteratorStart(&si,s,NULL((void*)0),NULL((void*)0),1); | |||
1315 | int64_t numfields; | |||
1316 | streamIteratorGetID(&si,maxid,&numfields); | |||
1317 | streamIteratorStop(&si); | |||
1318 | } | |||
1319 | ||||
1320 | /* Emit a reply in the client output buffer by formatting a Stream ID | |||
1321 | * in the standard <ms>-<seq> format, using the simple string protocol | |||
1322 | * of REPL. */ | |||
1323 | void addReplyStreamID(client *c, streamID *id) { | |||
1324 | sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); | |||
1325 | addReplyBulkSds(c,replyid); | |||
1326 | } | |||
1327 | ||||
1328 | void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { | |||
1329 | sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); | |||
1330 | setDeferredReplyBulkSds(c, dr, replyid); | |||
1331 | } | |||
1332 | ||||
1333 | /* Similar to the above function, but just creates an object, usually useful | |||
1334 | * for replication purposes to create arguments. */ | |||
1335 | robj *createObjectFromStreamID(streamID *id) { | |||
1336 | return createObject(OBJ_STRING0, sdscatfmt(sdsempty(),"%U-%U", | |||
1337 | id->ms,id->seq)); | |||
1338 | } | |||
1339 | ||||
1340 | /* As a result of an explicit XCLAIM or XREADGROUP command, new entries | |||
1341 | * are created in the pending list of the stream and consumers. We need | |||
1342 | * to propagate this changes in the form of XCLAIM commands. */ | |||
1343 | void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) { | |||
1344 | /* We need to generate an XCLAIM that will work in a idempotent fashion: | |||
1345 | * | |||
1346 | * XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time> | |||
1347 | * RETRYCOUNT <count> FORCE JUSTID LASTID <id>. | |||
1348 | * | |||
1349 | * Note that JUSTID is useful in order to avoid that XCLAIM will do | |||
1350 | * useless work in the slave side, trying to fetch the stream item. */ | |||
1351 | robj *argv[14]; | |||
1352 | argv[0] = shared.xclaim; | |||
1353 | argv[1] = key; | |||
1354 | argv[2] = groupname; | |||
1355 | argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name)); | |||
1356 | argv[4] = shared.integers[0]; | |||
1357 | argv[5] = id; | |||
1358 | argv[6] = shared.time; | |||
1359 | argv[7] = createStringObjectFromLongLong(nack->delivery_time); | |||
1360 | argv[8] = shared.retrycount; | |||
1361 | argv[9] = createStringObjectFromLongLong(nack->delivery_count); | |||
1362 | argv[10] = shared.force; | |||
1363 | argv[11] = shared.justid; | |||
1364 | argv[12] = shared.lastid; | |||
1365 | argv[13] = createObjectFromStreamID(&group->last_id); | |||
1366 | ||||
1367 | /* We use progagate() because this code path is not always called from | |||
1368 | * the command execution context. Moreover this will just alter the | |||
1369 | * consumer group state, and we don't need MULTI/EXEC wrapping because | |||
1370 | * there is no message state cross-message atomicity required. */ | |||
1371 | propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF1|PROPAGATE_REPL2); | |||
1372 | decrRefCount(argv[3]); | |||
1373 | decrRefCount(argv[7]); | |||
1374 | decrRefCount(argv[9]); | |||
1375 | decrRefCount(argv[13]); | |||
1376 | } | |||
1377 | ||||
1378 | /* We need this when we want to propoagate the new last-id of a consumer group | |||
1379 | * that was consumed by XREADGROUP with the NOACK option: in that case we can't | |||
1380 | * propagate the last ID just using the XCLAIM LASTID option, so we emit | |||
1381 | * | |||
1382 | * XGROUP SETID <key> <groupname> <id> | |||
1383 | */ | |||
1384 | void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) { | |||
1385 | robj *argv[5]; | |||
1386 | argv[0] = shared.xgroup; | |||
1387 | argv[1] = shared.setid; | |||
1388 | argv[2] = key; | |||
1389 | argv[3] = groupname; | |||
1390 | argv[4] = createObjectFromStreamID(&group->last_id); | |||
1391 | ||||
1392 | /* We use progagate() because this code path is not always called from | |||
1393 | * the command execution context. Moreover this will just alter the | |||
1394 | * consumer group state, and we don't need MULTI/EXEC wrapping because | |||
1395 | * there is no message state cross-message atomicity required. */ | |||
1396 | propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF1|PROPAGATE_REPL2); | |||
1397 | decrRefCount(argv[4]); | |||
1398 | } | |||
1399 | ||||
1400 | /* We need this when we want to propagate creation of consumer that was created | |||
1401 | * by XREADGROUP with the NOACK option. In that case, the only way to create | |||
1402 | * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) | |||
1403 | * | |||
1404 | * XGROUP CREATECONSUMER <key> <groupname> <consumername> | |||
1405 | */ | |||
1406 | void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { | |||
1407 | robj *argv[5]; | |||
1408 | argv[0] = shared.xgroup; | |||
1409 | argv[1] = shared.createconsumer; | |||
1410 | argv[2] = key; | |||
1411 | argv[3] = groupname; | |||
1412 | argv[4] = createObject(OBJ_STRING0,sdsdup(consumername)); | |||
1413 | ||||
1414 | /* We use progagate() because this code path is not always called from | |||
1415 | * the command execution context. Moreover this will just alter the | |||
1416 | * consumer group state, and we don't need MULTI/EXEC wrapping because | |||
1417 | * there is no message state cross-message atomicity required. */ | |||
1418 | propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF1|PROPAGATE_REPL2); | |||
1419 | decrRefCount(argv[4]); | |||
1420 | } | |||
1421 | ||||
1422 | /* Send the stream items in the specified range to the client 'c'. The range | |||
1423 | * the client will receive is between start and end inclusive, if 'count' is | |||
1424 | * non zero, no more than 'count' elements are sent. | |||
1425 | * | |||
1426 | * The 'end' pointer can be NULL to mean that we want all the elements from | |||
1427 | * 'start' till the end of the stream. If 'rev' is non zero, elements are | |||
1428 | * produced in reversed order from end to start. | |||
1429 | * | |||
1430 | * The function returns the number of entries emitted. | |||
1431 | * | |||
1432 | * If group and consumer are not NULL, the function performs additional work: | |||
1433 | * 1. It updates the last delivered ID in the group in case we are | |||
1434 | * sending IDs greater than the current last ID. | |||
1435 | * 2. If the requested IDs are already assigned to some other consumer, the | |||
1436 | * function will not return it to the client. | |||
1437 | * 3. An entry in the pending list will be created for every entry delivered | |||
1438 | * for the first time to this consumer. | |||
1439 | * | |||
1440 | * The behavior may be modified passing non-zero flags: | |||
1441 | * | |||
1442 | * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above | |||
1443 | * is not performed. | |||
1444 | * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, | |||
1445 | * and return the number of entries emitted as usually. | |||
1446 | * This is used when the function is just used in order | |||
1447 | * to emit data and there is some higher level logic. | |||
1448 | * | |||
1449 | * The final argument 'spi' (stream propagation info pointer) is a structure | |||
1450 | * filled with information needed to propagate the command execution to AOF | |||
1451 | * and slaves, in the case a consumer group was passed: we need to generate | |||
1452 | * XCLAIM commands to create the pending list into AOF/slaves in that case. | |||
1453 | * | |||
1454 | * If 'spi' is set to NULL no propagation will happen even if the group was | |||
1455 | * given, but currently such a feature is never used by the code base that | |||
1456 | * will always pass 'spi' and propagate when a group is passed. | |||
1457 | * | |||
1458 | * Note that this function is recursive in certain cases. When it's called | |||
1459 | * with a non NULL group and consumer argument, it may call | |||
1460 | * streamReplyWithRangeFromConsumerPEL() in order to get entries from the | |||
1461 | * consumer pending entries list. However such a function will then call | |||
1462 | * streamReplyWithRange() in order to emit single entries (found in the | |||
1463 | * PEL by ID) to the client. This is the use case for the STREAM_RWR_RAWENTRIES | |||
1464 | * flag. | |||
1465 | */ | |||
1466 | #define STREAM_RWR_NOACK(1<<0) (1<<0) /* Do not create entries in the PEL. */ | |||
1467 | #define STREAM_RWR_RAWENTRIES(1<<1) (1<<1) /* Do not emit protocol for array | |||
1468 | boundaries, just the entries. */ | |||
1469 | #define STREAM_RWR_HISTORY(1<<2) (1<<2) /* Only serve consumer local PEL. */ | |||
1470 | size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { | |||
1471 | void *arraylen_ptr = NULL((void*)0); | |||
1472 | size_t arraylen = 0; | |||
1473 | streamIterator si; | |||
1474 | int64_t numfields; | |||
1475 | streamID id; | |||
1476 | int propagate_last_id = 0; | |||
1477 | int noack = flags & STREAM_RWR_NOACK(1<<0); | |||
1478 | ||||
1479 | /* If the client is asking for some history, we serve it using a | |||
1480 | * different function, so that we return entries *solely* from its | |||
1481 | * own PEL. This ensures each consumer will always and only see | |||
1482 | * the history of messages delivered to it and not yet confirmed | |||
1483 | * as delivered. */ | |||
1484 | if (group && (flags & STREAM_RWR_HISTORY(1<<2))) { | |||
1485 | return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, | |||
1486 | consumer); | |||
1487 | } | |||
1488 | ||||
1489 | if (!(flags & STREAM_RWR_RAWENTRIES(1<<1))) | |||
1490 | arraylen_ptr = addReplyDeferredLen(c); | |||
1491 | streamIteratorStart(&si,s,start,end,rev); | |||
1492 | while(streamIteratorGetID(&si,&id,&numfields)) { | |||
1493 | /* Update the group last_id if needed. */ | |||
1494 | if (group && streamCompareID(&id,&group->last_id) > 0) { | |||
1495 | group->last_id = id; | |||
1496 | /* Group last ID should be propagated only if NOACK was | |||
1497 | * specified, otherwise the last id will be included | |||
1498 | * in the propagation of XCLAIM itself. */ | |||
1499 | if (noack) propagate_last_id = 1; | |||
1500 | } | |||
1501 | ||||
1502 | /* Emit a two elements array for each item. The first is | |||
1503 | * the ID, the second is an array of field-value pairs. */ | |||
1504 | addReplyArrayLen(c,2); | |||
1505 | addReplyStreamID(c,&id); | |||
1506 | ||||
1507 | addReplyArrayLen(c,numfields*2); | |||
1508 | ||||
1509 | /* Emit the field-value pairs. */ | |||
1510 | while(numfields--) { | |||
1511 | unsigned char *key, *value; | |||
1512 | int64_t key_len, value_len; | |||
1513 | streamIteratorGetField(&si,&key,&value,&key_len,&value_len); | |||
1514 | addReplyBulkCBuffer(c,key,key_len); | |||
1515 | addReplyBulkCBuffer(c,value,value_len); | |||
1516 | } | |||
1517 | ||||
1518 | /* If a group is passed, we need to create an entry in the | |||
1519 | * PEL (pending entries list) of this group *and* this consumer. | |||
1520 | * | |||
1521 | * Note that we cannot be sure about the fact the message is not | |||
1522 | * already owned by another consumer, because the admin is able | |||
1523 | * to change the consumer group last delivered ID using the | |||
1524 | * XGROUP SETID command. So if we find that there is already | |||
1525 | * a NACK for the entry, we need to associate it to the new | |||
1526 | * consumer. */ | |||
1527 | if (group && !noack) { | |||
1528 | unsigned char buf[sizeof(streamID)]; | |||
1529 | streamEncodeID(buf,&id); | |||
1530 | ||||
1531 | /* Try to add a new NACK. Most of the time this will work and | |||
1532 | * will not require extra lookups. We'll fix the problem later | |||
1533 | * if we find that there is already a entry for this ID. */ | |||
1534 | streamNACK *nack = streamCreateNACK(consumer); | |||
1535 | int group_inserted = | |||
1536 | raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL((void*)0)); | |||
1537 | int consumer_inserted = | |||
1538 | raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0)); | |||
1539 | ||||
1540 | /* Now we can check if the entry was already busy, and | |||
1541 | * in that case reassign the entry to the new consumer, | |||
1542 | * or update it if the consumer is the same as before. */ | |||
1543 | if (group_inserted == 0) { | |||
1544 | streamFreeNACK(nack); | |||
1545 | nack = raxFind(group->pel,buf,sizeof(buf)); | |||
1546 | serverAssert(nack != raxNotFound)((nack != raxNotFound)?(void)0 : (_serverAssert("nack != raxNotFound" ,"t_stream.c",1546),__builtin_unreachable())); | |||
1547 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0)); | |||
1548 | /* Update the consumer and NACK metadata. */ | |||
1549 | nack->consumer = consumer; | |||
1550 | nack->delivery_time = mstime(); | |||
1551 | nack->delivery_count = 1; | |||
1552 | /* Add the entry in the new consumer local PEL. */ | |||
1553 | raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0)); | |||
1554 | } else if (group_inserted == 1 && consumer_inserted == 0) { | |||
1555 | serverPanic("NACK half-created. Should not be possible.")_serverPanic("t_stream.c",1555,"NACK half-created. Should not be possible." ),__builtin_unreachable(); | |||
1556 | } | |||
1557 | ||||
1558 | /* Propagate as XCLAIM. */ | |||
1559 | if (spi) { | |||
1560 | robj *idarg = createObjectFromStreamID(&id); | |||
1561 | streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); | |||
1562 | decrRefCount(idarg); | |||
1563 | } | |||
1564 | } | |||
1565 | ||||
1566 | arraylen++; | |||
1567 | if (count && count == arraylen) break; | |||
1568 | } | |||
1569 | ||||
1570 | if (spi && propagate_last_id) | |||
1571 | streamPropagateGroupID(c,spi->keyname,group,spi->groupname); | |||
1572 | ||||
1573 | streamIteratorStop(&si); | |||
1574 | if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); | |||
1575 | return arraylen; | |||
1576 | } | |||
1577 | ||||
1578 | /* This is an helper function for streamReplyWithRange() when called with | |||
1579 | * group and consumer arguments, but with a range that is referring to already | |||
1580 | * delivered messages. In this case we just emit messages that are already | |||
1581 | * in the history of the consumer, fetching the IDs from its PEL. | |||
1582 | * | |||
1583 | * Note that this function does not have a 'rev' argument because it's not | |||
1584 | * possible to iterate in reverse using a group. Basically this function | |||
1585 | * is only called as a result of the XREADGROUP command. | |||
1586 | * | |||
1587 | * This function is more expensive because it needs to inspect the PEL and then | |||
1588 | * seek into the radix tree of the messages in order to emit the full message | |||
1589 | * to the client. However clients only reach this code path when they are | |||
1590 | * fetching the history of already retrieved messages, which is rare. */ | |||
1591 | size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) { | |||
1592 | raxIterator ri; | |||
1593 | unsigned char startkey[sizeof(streamID)]; | |||
1594 | unsigned char endkey[sizeof(streamID)]; | |||
1595 | streamEncodeID(startkey,start); | |||
1596 | if (end) streamEncodeID(endkey,end); | |||
1597 | ||||
1598 | size_t arraylen = 0; | |||
1599 | void *arraylen_ptr = addReplyDeferredLen(c); | |||
1600 | raxStart(&ri,consumer->pel); | |||
1601 | raxSeek(&ri,">=",startkey,sizeof(startkey)); | |||
1602 | while(raxNext(&ri) && (!count || arraylen < count)) { | |||
1603 | if (end && memcmp(ri.key,end,ri.key_len) > 0) break; | |||
1604 | streamID thisid; | |||
1605 | streamDecodeID(ri.key,&thisid); | |||
1606 | if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL((void*)0),NULL((void*)0), | |||
1607 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)) == 0) | |||
1608 | { | |||
1609 | /* Note that we may have a not acknowledged entry in the PEL | |||
1610 | * about a message that's no longer here because was removed | |||
1611 | * by the user by other means. In that case we signal it emitting | |||
1612 | * the ID but then a NULL entry for the fields. */ | |||
1613 | addReplyArrayLen(c,2); | |||
1614 | addReplyStreamID(c,&thisid); | |||
1615 | addReplyNullArray(c); | |||
1616 | } else { | |||
1617 | streamNACK *nack = ri.data; | |||
1618 | nack->delivery_time = mstime(); | |||
1619 | nack->delivery_count++; | |||
1620 | } | |||
1621 | arraylen++; | |||
1622 | } | |||
1623 | raxStop(&ri); | |||
1624 | setDeferredArrayLen(c,arraylen_ptr,arraylen); | |||
1625 | return arraylen; | |||
1626 | } | |||
1627 | ||||
1628 | /* ----------------------------------------------------------------------- | |||
1629 | * Stream commands implementation | |||
1630 | * ----------------------------------------------------------------------- */ | |||
1631 | ||||
1632 | /* Look the stream at 'key' and return the corresponding stream object. | |||
1633 | * The function creates a key setting it to an empty stream if needed. */ | |||
1634 | robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { | |||
1635 | robj *o = lookupKeyWrite(c->db,key); | |||
1636 | if (checkType(c,o,OBJ_STREAM6)) return NULL((void*)0); | |||
1637 | if (o == NULL((void*)0)) { | |||
1638 | if (no_create) { | |||
1639 | addReplyNull(c); | |||
1640 | return NULL((void*)0); | |||
1641 | } | |||
1642 | o = createStreamObject(); | |||
1643 | dbAdd(c->db,key,o); | |||
1644 | } | |||
1645 | return o; | |||
1646 | } | |||
1647 | ||||
1648 | /* Parse a stream ID in the format given by clients to Redis, that is | |||
1649 | * <ms>-<seq>, and converts it into a streamID structure. If | |||
1650 | * the specified ID is invalid C_ERR is returned and an error is reported | |||
1651 | * to the client, otherwise C_OK is returned. The ID may be in incomplete | |||
1652 | * form, just stating the milliseconds time part of the stream. In such a case | |||
1653 | * the missing part is set according to the value of 'missing_seq' parameter. | |||
1654 | * | |||
1655 | * The IDs "-" and "+" specify respectively the minimum and maximum IDs | |||
1656 | * that can be represented. If 'strict' is set to 1, "-" and "+" will be | |||
1657 | * treated as an invalid ID. | |||
1658 | * | |||
1659 | * If 'c' is set to NULL, no reply is sent to the client. */ | |||
1660 | int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) { | |||
1661 | char buf[128]; | |||
1662 | if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; | |||
1663 | memcpy(buf,o->ptr,sdslen(o->ptr)+1); | |||
1664 | ||||
1665 | if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0') | |||
1666 | goto invalid; | |||
1667 | ||||
1668 | /* Handle the "-" and "+" special cases. */ | |||
1669 | if (buf[0] == '-' && buf[1] == '\0') { | |||
1670 | id->ms = 0; | |||
1671 | id->seq = 0; | |||
1672 | return C_OK0; | |||
1673 | } else if (buf[0] == '+' && buf[1] == '\0') { | |||
1674 | id->ms = UINT64_MAX(18446744073709551615UL); | |||
1675 | id->seq = UINT64_MAX(18446744073709551615UL); | |||
1676 | return C_OK0; | |||
1677 | } | |||
1678 | ||||
1679 | /* Parse <ms>-<seq> form. */ | |||
1680 | char *dot = strchr(buf,'-'); | |||
1681 | if (dot) *dot = '\0'; | |||
1682 | unsigned long long ms, seq; | |||
1683 | if (string2ull(buf,&ms) == 0) goto invalid; | |||
1684 | if (dot && string2ull(dot+1,&seq) == 0) goto invalid; | |||
1685 | if (!dot) seq = missing_seq; | |||
1686 | id->ms = ms; | |||
1687 | id->seq = seq; | |||
1688 | return C_OK0; | |||
1689 | ||||
1690 | invalid: | |||
1691 | if (c) addReplyError(c,"Invalid stream ID specified as stream " | |||
1692 | "command argument"); | |||
1693 | return C_ERR-1; | |||
1694 | } | |||
1695 | ||||
1696 | /* Wrapper for streamGenericParseIDOrReply() used by module API. */ | |||
1697 | int streamParseID(const robj *o, streamID *id) { | |||
1698 | return streamGenericParseIDOrReply(NULL((void*)0), o, id, 0, 0); | |||
1699 | } | |||
1700 | ||||
1701 | /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to | |||
1702 | * 0, to be used when - and + are acceptable IDs. */ | |||
1703 | int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { | |||
1704 | return streamGenericParseIDOrReply(c,o,id,missing_seq,0); | |||
1705 | } | |||
1706 | ||||
1707 | /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to | |||
1708 | * 1, to be used when we want to return an error if the special IDs + or - | |||
1709 | * are provided. */ | |||
1710 | int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { | |||
1711 | return streamGenericParseIDOrReply(c,o,id,missing_seq,1); | |||
1712 | } | |||
1713 | ||||
1714 | /* Helper for parsing a stream ID that is a range query interval. When the | |||
1715 | * exclude argument is NULL, streamParseIDOrReply() is called and the interval | |||
1716 | * is treated as close (inclusive). Otherwise, the exclude argument is set if | |||
1717 | * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is | |||
1718 | * called in that case. | |||
1719 | */ | |||
1720 | int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) { | |||
1721 | char *p = o->ptr; | |||
1722 | size_t len = sdslen(p); | |||
1723 | int invalid = 0; | |||
1724 | ||||
1725 | if (exclude != NULL((void*)0)) *exclude = (len > 1 && p[0] == '('); | |||
1726 | if (exclude != NULL((void*)0) && *exclude) { | |||
1727 | robj *t = createStringObject(p+1,len-1); | |||
1728 | invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR-1); | |||
1729 | decrRefCount(t); | |||
1730 | } else | |||
1731 | invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR-1); | |||
1732 | if (invalid) | |||
1733 | return C_ERR-1; | |||
1734 | return C_OK0; | |||
1735 | } | |||
1736 | ||||
1737 | void streamRewriteApproxSpecifier(client *c, int idx) { | |||
1738 | rewriteClientCommandArgument(c,idx,shared.special_equals); | |||
1739 | } | |||
1740 | ||||
1741 | /* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream> | |||
1742 | * otherwise trimming is no longer deterministic on replicas / AOF. */ | |||
1743 | void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { | |||
1744 | robj *arg; | |||
1745 | if (trim_strategy == TRIM_STRATEGY_MAXLEN1) { | |||
1746 | arg = createStringObjectFromLongLong(s->length); | |||
1747 | } else { | |||
1748 | streamID first_id; | |||
1749 | streamGetEdgeID(s, 1, &first_id); | |||
1750 | arg = createObjectFromStreamID(&first_id); | |||
1751 | } | |||
1752 | ||||
1753 | rewriteClientCommandArgument(c,idx,arg); | |||
1754 | decrRefCount(arg); | |||
1755 | } | |||
1756 | ||||
1757 | /* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */ | |||
1758 | void xaddCommand(client *c) { | |||
1759 | /* Parse options. */ | |||
1760 | streamAddTrimArgs parsed_args; | |||
1761 | int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1); | |||
1762 | if (idpos < 0) | |||
1763 | return; /* streamParseAddOrTrimArgsOrReply already replied. */ | |||
1764 | int field_pos = idpos+1; /* The ID is always one argument before the first field */ | |||
1765 | ||||
1766 | /* Check arity. */ | |||
1767 | if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { | |||
1768 | addReplyError(c,"wrong number of arguments for XADD"); | |||
1769 | return; | |||
1770 | } | |||
1771 | ||||
1772 | /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating | |||
1773 | * a new stream and have streamAppendItem fail, leaving an empty key in the | |||
1774 | * database. */ | |||
1775 | if (parsed_args.id_given && | |||
1776 | parsed_args.id.ms == 0 && parsed_args.id.seq == 0) | |||
1777 | { | |||
1778 | addReplyError(c,"The ID specified in XADD must be greater than 0-0"); | |||
1779 | return; | |||
1780 | } | |||
1781 | ||||
1782 | /* Lookup the stream at key. */ | |||
1783 | robj *o; | |||
1784 | stream *s; | |||
1785 | if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL((void*)0)) return; | |||
1786 | s = o->ptr; | |||
1787 | ||||
1788 | /* Return ASAP if the stream has reached the last possible ID */ | |||
1789 | if (s->last_id.ms == UINT64_MAX(18446744073709551615UL) && s->last_id.seq == UINT64_MAX(18446744073709551615UL)) { | |||
1790 | addReplyError(c,"The stream has exhausted the last possible ID, " | |||
1791 | "unable to add more items"); | |||
1792 | return; | |||
1793 | } | |||
1794 | ||||
1795 | /* Append using the low level function and return the ID. */ | |||
1796 | streamID id; | |||
1797 | if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, | |||
1798 | &id, parsed_args.id_given ? &parsed_args.id : NULL((void*)0)) | |||
1799 | == C_ERR-1) | |||
1800 | { | |||
1801 | addReplyError(c,"The ID specified in XADD is equal or smaller than the " | |||
1802 | "target stream top item"); | |||
1803 | return; | |||
1804 | } | |||
1805 | addReplyStreamID(c,&id); | |||
1806 | ||||
1807 | signalModifiedKey(c,c->db,c->argv[1]); | |||
1808 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xadd",c->argv[1],c->db->id); | |||
1809 | server.dirty++; | |||
1810 | ||||
1811 | /* Trim if needed. */ | |||
1812 | if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE0) { | |||
1813 | if (streamTrim(s, &parsed_args)) { | |||
1814 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xtrim",c->argv[1],c->db->id); | |||
1815 | } | |||
1816 | if (parsed_args.approx_trim) { | |||
1817 | /* In case our trimming was limited (by LIMIT or by ~) we must | |||
1818 | * re-write the relevant trim argument to make sure there will be | |||
1819 | * no inconsistencies in AOF loading or in the replica. | |||
1820 | * It's enough to check only args->approx because there is no | |||
1821 | * way LIMIT is given without the ~ option. */ | |||
1822 | streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); | |||
1823 | streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); | |||
1824 | } | |||
1825 | } | |||
1826 | ||||
1827 | /* Let's rewrite the ID argument with the one actually generated for | |||
1828 | * AOF/replication propagation. */ | |||
1829 | robj *idarg = createObjectFromStreamID(&id); | |||
1830 | rewriteClientCommandArgument(c,idpos,idarg); | |||
1831 | decrRefCount(idarg); | |||
1832 | ||||
1833 | /* We need to signal to blocked clients that there is new data on this | |||
1834 | * stream. */ | |||
1835 | signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM6); | |||
1836 | } | |||
1837 | ||||
1838 | /* XRANGE/XREVRANGE actual implementation. | |||
1839 | * The 'start' and 'end' IDs are parsed as follows: | |||
1840 | * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX. | |||
1841 | * "-" and "+"" mean the minimal and maximal ID values, respectively. | |||
1842 | * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0 | |||
1843 | * will match anything from 1-1 and 1-UINT64_MAX. | |||
1844 | */ | |||
1845 | void xrangeGenericCommand(client *c, int rev) { | |||
1846 | robj *o; | |||
1847 | stream *s; | |||
1848 | streamID startid, endid; | |||
1849 | long long count = -1; | |||
1850 | robj *startarg = rev ? c->argv[3] : c->argv[2]; | |||
1851 | robj *endarg = rev ? c->argv[2] : c->argv[3]; | |||
1852 | int startex = 0, endex = 0; | |||
1853 | ||||
1854 | /* Parse start and end IDs. */ | |||
1855 | if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK0) | |||
1856 | return; | |||
1857 | if (startex && streamIncrID(&startid) != C_OK0) { | |||
1858 | addReplyError(c,"invalid start ID for the interval"); | |||
1859 | return; | |||
1860 | } | |||
1861 | if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX(18446744073709551615UL)) != C_OK0) | |||
1862 | return; | |||
1863 | if (endex && streamDecrID(&endid) != C_OK0) { | |||
1864 | addReplyError(c,"invalid end ID for the interval"); | |||
1865 | return; | |||
1866 | } | |||
1867 | ||||
1868 | /* Parse the COUNT option if any. */ | |||
1869 | if (c->argc > 4) { | |||
1870 | for (int j = 4; j < c->argc; j++) { | |||
1871 | int additional = c->argc-j-1; | |||
1872 | if (strcasecmp(c->argv[j]->ptr,"COUNT") == 0 && additional >= 1) { | |||
1873 | if (getLongLongFromObjectOrReply(c,c->argv[j+1],&count,NULL((void*)0)) | |||
1874 | != C_OK0) return; | |||
1875 | if (count < 0) count = 0; | |||
1876 | j++; /* Consume additional arg. */ | |||
1877 | } else { | |||
1878 | addReplyErrorObject(c,shared.syntaxerr); | |||
1879 | return; | |||
1880 | } | |||
1881 | } | |||
1882 | } | |||
1883 | ||||
1884 | /* Return the specified range to the user. */ | |||
1885 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL((void*)0) || | |||
1886 | checkType(c,o,OBJ_STREAM6)) return; | |||
1887 | ||||
1888 | s = o->ptr; | |||
1889 | ||||
1890 | if (count == 0) { | |||
1891 | addReplyNullArray(c); | |||
1892 | } else { | |||
1893 | if (count == -1) count = 0; | |||
1894 | streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL((void*)0),NULL((void*)0),0,NULL((void*)0)); | |||
1895 | } | |||
1896 | } | |||
1897 | ||||
1898 | /* XRANGE key start end [COUNT <n>] */ | |||
1899 | void xrangeCommand(client *c) { | |||
1900 | xrangeGenericCommand(c,0); | |||
1901 | } | |||
1902 | ||||
1903 | /* XREVRANGE key end start [COUNT <n>] */ | |||
1904 | void xrevrangeCommand(client *c) { | |||
1905 | xrangeGenericCommand(c,1); | |||
1906 | } | |||
1907 | ||||
1908 | /* XLEN */ | |||
1909 | void xlenCommand(client *c) { | |||
1910 | robj *o; | |||
1911 | if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0) | |||
1912 | || checkType(c,o,OBJ_STREAM6)) return; | |||
1913 | stream *s = o->ptr; | |||
1914 | addReplyLongLong(c,s->length); | |||
1915 | } | |||
1916 | ||||
1917 | /* XREAD [BLOCK <milliseconds>] [COUNT <count>] STREAMS key_1 key_2 ... key_N | |||
1918 | * ID_1 ID_2 ... ID_N | |||
1919 | * | |||
1920 | * This function also implements the XREAD-GROUP command, which is like XREAD | |||
1921 | * but accepting the [GROUP group-name consumer-name] additional option. | |||
1922 | * This is useful because while XREAD is a read command and can be called | |||
1923 | * on slaves, XREAD-GROUP is not. */ | |||
1924 | #define XREAD_BLOCKED_DEFAULT_COUNT1000 1000 | |||
1925 | void xreadCommand(client *c) { | |||
1926 | long long timeout = -1; /* -1 means, no BLOCK argument given. */ | |||
1927 | long long count = 0; | |||
1928 | int streams_count = 0; | |||
1929 | int streams_arg = 0; | |||
1930 | int noack = 0; /* True if NOACK option was specified. */ | |||
1931 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; | |||
1932 | streamID *ids = static_ids; | |||
1933 | streamCG **groups = NULL((void*)0); | |||
1934 | int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */ | |||
1935 | robj *groupname = NULL((void*)0); | |||
1936 | robj *consumername = NULL((void*)0); | |||
1937 | ||||
1938 | /* Parse arguments. */ | |||
1939 | for (int i = 1; i < c->argc; i++) { | |||
1940 | int moreargs = c->argc-i-1; | |||
1941 | char *o = c->argv[i]->ptr; | |||
1942 | if (!strcasecmp(o,"BLOCK") && moreargs) { | |||
1943 | if (c->flags & CLIENT_LUA(1<<8)) { | |||
1944 | /* | |||
1945 | * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client | |||
1946 | * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backword compatibility. | |||
1947 | * There is no sense to use BLOCK option within Lua. */ | |||
1948 | addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr); | |||
1949 | return; | |||
1950 | } | |||
1951 | i++; | |||
1952 | if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout, | |||
1953 | UNIT_MILLISECONDS1) != C_OK0) return; | |||
1954 | } else if (!strcasecmp(o,"COUNT") && moreargs) { | |||
1955 | i++; | |||
1956 | if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL((void*)0)) != C_OK0) | |||
1957 | return; | |||
1958 | if (count < 0) count = 0; | |||
1959 | } else if (!strcasecmp(o,"STREAMS") && moreargs) { | |||
1960 | streams_arg = i+1; | |||
1961 | streams_count = (c->argc-streams_arg); | |||
1962 | if ((streams_count % 2) != 0) { | |||
1963 | addReplyError(c,"Unbalanced XREAD list of streams: " | |||
1964 | "for each stream key an ID or '$' must be " | |||
1965 | "specified."); | |||
1966 | return; | |||
1967 | } | |||
1968 | streams_count /= 2; /* We have two arguments for each stream. */ | |||
1969 | break; | |||
1970 | } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) { | |||
1971 | if (!xreadgroup) { | |||
1972 | addReplyError(c,"The GROUP option is only supported by " | |||
1973 | "XREADGROUP. You called XREAD instead."); | |||
1974 | return; | |||
1975 | } | |||
1976 | groupname = c->argv[i+1]; | |||
1977 | consumername = c->argv[i+2]; | |||
1978 | i += 2; | |||
1979 | } else if (!strcasecmp(o,"NOACK")) { | |||
1980 | if (!xreadgroup) { | |||
1981 | addReplyError(c,"The NOACK option is only supported by " | |||
1982 | "XREADGROUP. You called XREAD instead."); | |||
1983 | return; | |||
1984 | } | |||
1985 | noack = 1; | |||
1986 | } else { | |||
1987 | addReplyErrorObject(c,shared.syntaxerr); | |||
1988 | return; | |||
1989 | } | |||
1990 | } | |||
1991 | ||||
1992 | /* STREAMS option is mandatory. */ | |||
1993 | if (streams_arg == 0) { | |||
1994 | addReplyErrorObject(c,shared.syntaxerr); | |||
1995 | return; | |||
1996 | } | |||
1997 | ||||
1998 | /* If the user specified XREADGROUP then it must also | |||
1999 | * provide the GROUP option. */ | |||
2000 | if (xreadgroup && groupname == NULL((void*)0)) { | |||
2001 | addReplyError(c,"Missing GROUP option for XREADGROUP"); | |||
2002 | return; | |||
2003 | } | |||
2004 | ||||
2005 | /* Parse the IDs and resolve the group name. */ | |||
2006 | if (streams_count > STREAMID_STATIC_VECTOR_LEN8) | |||
2007 | ids = zmalloc(sizeof(streamID)*streams_count); | |||
2008 | if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); | |||
2009 | ||||
2010 | for (int i = streams_arg + streams_count; i < c->argc; i++) { | |||
2011 | /* Specifying "$" as last-known-id means that the client wants to be | |||
2012 | * served with just the messages that will arrive into the stream | |||
2013 | * starting from now. */ | |||
2014 | int id_idx = i - streams_arg - streams_count; | |||
2015 | robj *key = c->argv[i-streams_count]; | |||
2016 | robj *o = lookupKeyRead(c->db,key); | |||
2017 | if (checkType(c,o,OBJ_STREAM6)) goto cleanup; | |||
2018 | streamCG *group = NULL((void*)0); | |||
2019 | ||||
2020 | /* If a group was specified, than we need to be sure that the | |||
2021 | * key and group actually exist. */ | |||
2022 | if (groupname) { | |||
2023 | if (o == NULL((void*)0) || | |||
2024 | (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL((void*)0)) | |||
2025 | { | |||
2026 | addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " | |||
2027 | "group '%s' in XREADGROUP with GROUP " | |||
2028 | "option", | |||
2029 | (char*)key->ptr,(char*)groupname->ptr); | |||
2030 | goto cleanup; | |||
2031 | } | |||
2032 | groups[id_idx] = group; | |||
2033 | } | |||
2034 | ||||
2035 | if (strcmp(c->argv[i]->ptr,"$") == 0) { | |||
2036 | if (xreadgroup) { | |||
2037 | addReplyError(c,"The $ ID is meaningless in the context of " | |||
2038 | "XREADGROUP: you want to read the history of " | |||
2039 | "this consumer by specifying a proper ID, or " | |||
2040 | "use the > ID to get new messages. The $ ID would " | |||
2041 | "just return an empty result set."); | |||
2042 | goto cleanup; | |||
2043 | } | |||
2044 | if (o) { | |||
2045 | stream *s = o->ptr; | |||
2046 | ids[id_idx] = s->last_id; | |||
2047 | } else { | |||
2048 | ids[id_idx].ms = 0; | |||
2049 | ids[id_idx].seq = 0; | |||
2050 | } | |||
2051 | continue; | |||
2052 | } else if (strcmp(c->argv[i]->ptr,">") == 0) { | |||
2053 | if (!xreadgroup) { | |||
2054 | addReplyError(c,"The > ID can be specified only when calling " | |||
2055 | "XREADGROUP using the GROUP <group> " | |||
2056 | "<consumer> option."); | |||
2057 | goto cleanup; | |||
2058 | } | |||
2059 | /* We use just the maximum ID to signal this is a ">" ID, anyway | |||
2060 | * the code handling the blocking clients will have to update the | |||
2061 | * ID later in order to match the changing consumer group last ID. */ | |||
2062 | ids[id_idx].ms = UINT64_MAX(18446744073709551615UL); | |||
2063 | ids[id_idx].seq = UINT64_MAX(18446744073709551615UL); | |||
2064 | continue; | |||
2065 | } | |||
2066 | if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK0) | |||
2067 | goto cleanup; | |||
2068 | } | |||
2069 | ||||
2070 | /* Try to serve the client synchronously. */ | |||
2071 | size_t arraylen = 0; | |||
2072 | void *arraylen_ptr = NULL((void*)0); | |||
2073 | for (int i = 0; i < streams_count; i++) { | |||
2074 | robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); | |||
2075 | if (o == NULL((void*)0)) continue; | |||
2076 | stream *s = o->ptr; | |||
2077 | streamID *gt = ids+i; /* ID must be greater than this. */ | |||
2078 | int serve_synchronously = 0; | |||
2079 | int serve_history = 0; /* True for XREADGROUP with ID != ">". */ | |||
2080 | ||||
2081 | /* Check if there are the conditions to serve the client | |||
2082 | * synchronously. */ | |||
2083 | if (groups) { | |||
2084 | /* If the consumer is blocked on a group, we always serve it | |||
2085 | * synchronously (serving its local history) if the ID specified | |||
2086 | * was not the special ">" ID. */ | |||
2087 | if (gt->ms != UINT64_MAX(18446744073709551615UL) || | |||
2088 | gt->seq != UINT64_MAX(18446744073709551615UL)) | |||
2089 | { | |||
2090 | serve_synchronously = 1; | |||
2091 | serve_history = 1; | |||
2092 | } else if (s->length) { | |||
2093 | /* We also want to serve a consumer in a consumer group | |||
2094 | * synchronously in case the group top item delivered is smaller | |||
2095 | * than what the stream has inside. */ | |||
2096 | streamID maxid, *last = &groups[i]->last_id; | |||
2097 | streamLastValidID(s, &maxid); | |||
2098 | if (streamCompareID(&maxid, last) > 0) { | |||
2099 | serve_synchronously = 1; | |||
2100 | *gt = *last; | |||
2101 | } | |||
2102 | } | |||
2103 | } else if (s->length) { | |||
2104 | /* For consumers without a group, we serve synchronously if we can | |||
2105 | * actually provide at least one item from the stream. */ | |||
2106 | streamID maxid; | |||
2107 | streamLastValidID(s, &maxid); | |||
2108 | if (streamCompareID(&maxid, gt) > 0) { | |||
2109 | serve_synchronously = 1; | |||
2110 | } | |||
2111 | } | |||
2112 | ||||
2113 | if (serve_synchronously) { | |||
2114 | arraylen++; | |||
2115 | if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c); | |||
2116 | /* streamReplyWithRange() handles the 'start' ID as inclusive, | |||
2117 | * so start from the next ID, since we want only messages with | |||
2118 | * IDs greater than start. */ | |||
2119 | streamID start = *gt; | |||
2120 | streamIncrID(&start); | |||
2121 | ||||
2122 | /* Emit the two elements sub-array consisting of the name | |||
2123 | * of the stream and the data we extracted from it. */ | |||
2124 | if (c->resp == 2) addReplyArrayLen(c,2); | |||
2125 | addReplyBulk(c,c->argv[streams_arg+i]); | |||
2126 | int created = 0; | |||
2127 | streamConsumer *consumer = NULL((void*)0); | |||
2128 | if (groups) consumer = streamLookupConsumer(groups[i], | |||
2129 | consumername->ptr, | |||
2130 | SLC_NONE0, | |||
2131 | &created); | |||
2132 | streamPropInfo spi = {c->argv[i+streams_arg],groupname}; | |||
2133 | if (created && noack) | |||
2134 | streamPropagateConsumerCreation(c,spi.keyname, | |||
2135 | spi.groupname, | |||
2136 | consumer->name); | |||
2137 | int flags = 0; | |||
2138 | if (noack) flags |= STREAM_RWR_NOACK(1<<0); | |||
2139 | if (serve_history) flags |= STREAM_RWR_HISTORY(1<<2); | |||
2140 | streamReplyWithRange(c,s,&start,NULL((void*)0),count,0, | |||
2141 | groups ? groups[i] : NULL((void*)0), | |||
2142 | consumer, flags, &spi); | |||
2143 | if (groups) server.dirty++; | |||
2144 | } | |||
2145 | } | |||
2146 | ||||
2147 | /* We replied synchronously! Set the top array len and return to caller. */ | |||
2148 | if (arraylen) { | |||
2149 | if (c->resp == 2) | |||
2150 | setDeferredArrayLen(c,arraylen_ptr,arraylen); | |||
2151 | else | |||
2152 | setDeferredMapLen(c,arraylen_ptr,arraylen); | |||
2153 | goto cleanup; | |||
2154 | } | |||
2155 | ||||
2156 | /* Block if needed. */ | |||
2157 | if (timeout != -1) { | |||
2158 | /* If we are not allowed to block the client, the only thing | |||
2159 | * we can do is treating it as a timeout (even with timeout 0). */ | |||
2160 | if (c->flags & CLIENT_DENY_BLOCKING(1ULL<<41)) { | |||
2161 | addReplyNullArray(c); | |||
2162 | goto cleanup; | |||
2163 | } | |||
2164 | blockForKeys(c, BLOCKED_STREAM4, c->argv+streams_arg, streams_count, | |||
2165 | timeout, NULL((void*)0), NULL((void*)0), ids); | |||
2166 | /* If no COUNT is given and we block, set a relatively small count: | |||
2167 | * in case the ID provided is too low, we do not want the server to | |||
2168 | * block just to serve this client a huge stream of messages. */ | |||
2169 | c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT1000; | |||
2170 | ||||
2171 | /* If this is a XREADGROUP + GROUP we need to remember for which | |||
2172 | * group and consumer name we are blocking, so later when one of the | |||
2173 | * keys receive more data, we can call streamReplyWithRange() passing | |||
2174 | * the right arguments. */ | |||
2175 | if (groupname) { | |||
2176 | incrRefCount(groupname); | |||
2177 | incrRefCount(consumername); | |||
2178 | c->bpop.xread_group = groupname; | |||
2179 | c->bpop.xread_consumer = consumername; | |||
2180 | c->bpop.xread_group_noack = noack; | |||
2181 | } else { | |||
2182 | c->bpop.xread_group = NULL((void*)0); | |||
2183 | c->bpop.xread_consumer = NULL((void*)0); | |||
2184 | } | |||
2185 | goto cleanup; | |||
2186 | } | |||
2187 | ||||
2188 | /* No BLOCK option, nor any stream we can serve. Reply as with a | |||
2189 | * timeout happened. */ | |||
2190 | addReplyNullArray(c); | |||
2191 | /* Continue to cleanup... */ | |||
2192 | ||||
2193 | cleanup: /* Cleanup. */ | |||
2194 | ||||
2195 | /* The command is propagated (in the READGROUP form) as a side effect | |||
2196 | * of calling lower level APIs. So stop any implicit propagation. */ | |||
2197 | preventCommandPropagation(c); | |||
2198 | if (ids != static_ids) zfree(ids); | |||
2199 | zfree(groups); | |||
2200 | } | |||
2201 | ||||
2202 | /* ----------------------------------------------------------------------- | |||
2203 | * Low level implementation of consumer groups | |||
2204 | * ----------------------------------------------------------------------- */ | |||
2205 | ||||
2206 | /* Create a NACK entry setting the delivery count to 1 and the delivery | |||
2207 | * time to the current time. The NACK consumer will be set to the one | |||
2208 | * specified as argument of the function. */ | |||
2209 | streamNACK *streamCreateNACK(streamConsumer *consumer) { | |||
2210 | streamNACK *nack = zmalloc(sizeof(*nack)); | |||
2211 | nack->delivery_time = mstime(); | |||
2212 | nack->delivery_count = 1; | |||
2213 | nack->consumer = consumer; | |||
2214 | return nack; | |||
2215 | } | |||
2216 | ||||
2217 | /* Free a NACK entry. */ | |||
2218 | void streamFreeNACK(streamNACK *na) { | |||
2219 | zfree(na); | |||
2220 | } | |||
2221 | ||||
2222 | /* Free a consumer and associated data structures. Note that this function | |||
2223 | * will not reassign the pending messages associated with this consumer | |||
2224 | * nor will delete them from the stream, so when this function is called | |||
2225 | * to delete a consumer, and not when the whole stream is destroyed, the caller | |||
2226 | * should do some work before. */ | |||
2227 | void streamFreeConsumer(streamConsumer *sc) { | |||
2228 | raxFree(sc->pel); /* No value free callback: the PEL entries are shared | |||
2229 | between the consumer and the main stream PEL. */ | |||
2230 | sdsfree(sc->name); | |||
2231 | zfree(sc); | |||
2232 | } | |||
2233 | ||||
2234 | /* Create a new consumer group in the context of the stream 's', having the | |||
2235 | * specified name and last server ID. If a consumer group with the same name | |||
2236 | * already existed NULL is returned, otherwise the pointer to the consumer | |||
2237 | * group is returned. */ | |||
2238 | streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) { | |||
2239 | if (s->cgroups == NULL((void*)0)) s->cgroups = raxNew(); | |||
2240 | if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound) | |||
2241 | return NULL((void*)0); | |||
2242 | ||||
2243 | streamCG *cg = zmalloc(sizeof(*cg)); | |||
2244 | cg->pel = raxNew(); | |||
2245 | cg->consumers = raxNew(); | |||
2246 | cg->last_id = *id; | |||
2247 | raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL((void*)0)); | |||
2248 | return cg; | |||
2249 | } | |||
2250 | ||||
2251 | /* Free a consumer group and all its associated data. */ | |||
2252 | void streamFreeCG(streamCG *cg) { | |||
2253 | raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK); | |||
2254 | raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer); | |||
2255 | zfree(cg); | |||
2256 | } | |||
2257 | ||||
2258 | /* Lookup the consumer group in the specified stream and returns its | |||
2259 | * pointer, otherwise if there is no such group, NULL is returned. */ | |||
2260 | streamCG *streamLookupCG(stream *s, sds groupname) { | |||
2261 | if (s->cgroups == NULL((void*)0)) return NULL((void*)0); | |||
2262 | streamCG *cg = raxFind(s->cgroups,(unsigned char*)groupname, | |||
2263 | sdslen(groupname)); | |||
2264 | return (cg == raxNotFound) ? NULL((void*)0) : cg; | |||
2265 | } | |||
2266 | ||||
2267 | /* Lookup the consumer with the specified name in the group 'cg': if the | |||
2268 | * consumer does not exist it is created unless SLC_NOCREAT flag was specified. | |||
2269 | * Its last seen time is updated unless SLC_NOREFRESH flag was specified. */ | |||
2270 | streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created) { | |||
2271 | if (created
| |||
2272 | int create = !(flags & SLC_NOCREAT(1<<0)); | |||
2273 | int refresh = !(flags & SLC_NOREFRESH(1<<1)); | |||
2274 | streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, | |||
| ||||
2275 | sdslen(name)); | |||
2276 | if (consumer == raxNotFound) { | |||
2277 | if (!create) return NULL((void*)0); | |||
2278 | consumer = zmalloc(sizeof(*consumer)); | |||
2279 | consumer->name = sdsdup(name); | |||
2280 | consumer->pel = raxNew(); | |||
2281 | raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), | |||
2282 | consumer,NULL((void*)0)); | |||
2283 | consumer->seen_time = mstime(); | |||
2284 | if (created) *created = 1; | |||
2285 | } else if (refresh) | |||
2286 | consumer->seen_time = mstime(); | |||
2287 | return consumer; | |||
2288 | } | |||
2289 | ||||
2290 | /* Delete the consumer specified in the consumer group 'cg'. The consumer | |||
2291 | * may have pending messages: they are removed from the PEL, and the number | |||
2292 | * of pending messages "lost" is returned. */ | |||
2293 | uint64_t streamDelConsumer(streamCG *cg, sds name) { | |||
2294 | streamConsumer *consumer = | |||
2295 | streamLookupConsumer(cg,name,SLC_NOCREAT(1<<0)|SLC_NOREFRESH(1<<1),NULL((void*)0)); | |||
2296 | if (consumer == NULL((void*)0)) return 0; | |||
2297 | ||||
2298 | uint64_t retval = raxSize(consumer->pel); | |||
2299 | ||||
2300 | /* Iterate all the consumer pending messages, deleting every corresponding | |||
2301 | * entry from the global entry. */ | |||
2302 | raxIterator ri; | |||
2303 | raxStart(&ri,consumer->pel); | |||
2304 | raxSeek(&ri,"^",NULL((void*)0),0); | |||
2305 | while(raxNext(&ri)) { | |||
2306 | streamNACK *nack = ri.data; | |||
2307 | raxRemove(cg->pel,ri.key,ri.key_len,NULL((void*)0)); | |||
2308 | streamFreeNACK(nack); | |||
2309 | } | |||
2310 | raxStop(&ri); | |||
2311 | ||||
2312 | /* Deallocate the consumer. */ | |||
2313 | raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL((void*)0)); | |||
2314 | streamFreeConsumer(consumer); | |||
2315 | return retval; | |||
2316 | } | |||
2317 | ||||
2318 | /* ----------------------------------------------------------------------- | |||
2319 | * Consumer groups commands | |||
2320 | * ----------------------------------------------------------------------- */ | |||
2321 | ||||
2322 | /* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] | |||
2323 | * XGROUP SETID <key> <groupname> <id or $> | |||
2324 | * XGROUP DESTROY <key> <groupname> | |||
2325 | * XGROUP CREATECONSUMER <key> <groupname> <consumer> | |||
2326 | * XGROUP DELCONSUMER <key> <groupname> <consumername> */ | |||
2327 | void xgroupCommand(client *c) { | |||
2328 | stream *s = NULL((void*)0); | |||
2329 | sds grpname = NULL((void*)0); | |||
2330 | streamCG *cg = NULL((void*)0); | |||
| ||||
2331 | char *opt = c->argv[1]->ptr; /* Subcommand name. */ | |||
2332 | int mkstream = 0; | |||
2333 | robj *o; | |||
2334 | ||||
2335 | /* CREATE has an MKSTREAM option that creates the stream if it | |||
2336 | * does not exist. */ | |||
2337 | if (c->argc == 6 && !strcasecmp(opt,"CREATE")) { | |||
2338 | if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) { | |||
2339 | addReplySubcommandSyntaxError(c); | |||
2340 | return; | |||
2341 | } | |||
2342 | mkstream = 1; | |||
2343 | grpname = c->argv[3]->ptr; | |||
2344 | } | |||
2345 | ||||
2346 | /* Everything but the "HELP" option requires a key and group name. */ | |||
2347 | if (c->argc
| |||
2348 | o = lookupKeyWrite(c->db,c->argv[2]); | |||
2349 | if (o) { | |||
2350 | if (checkType(c,o,OBJ_STREAM6)) return; | |||
2351 | s = o->ptr; | |||
2352 | } | |||
2353 | grpname = c->argv[3]->ptr; | |||
2354 | } | |||
2355 | ||||
2356 | /* Check for missing key/group. */ | |||
2357 | if (c->argc >= 4 && !mkstream
| |||
2358 | /* At this point key must exist, or there is an error. */ | |||
2359 | if (s == NULL((void*)0)) { | |||
2360 | addReplyError(c, | |||
2361 | "The XGROUP subcommand requires the key to exist. " | |||
2362 | "Note that for CREATE you may want to use the MKSTREAM " | |||
2363 | "option to create an empty stream automatically."); | |||
2364 | return; | |||
2365 | } | |||
2366 | ||||
2367 | /* Certain subcommands require the group to exist. */ | |||
2368 | if ((cg = streamLookupCG(s,grpname)) == NULL((void*)0) && | |||
2369 | (!strcasecmp(opt,"SETID") || | |||
2370 | !strcasecmp(opt,"CREATECONSUMER") || | |||
2371 | !strcasecmp(opt,"DELCONSUMER"))) | |||
2372 | { | |||
2373 | addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " | |||
2374 | "for key name '%s'", | |||
2375 | (char*)grpname, (char*)c->argv[2]->ptr); | |||
2376 | return; | |||
2377 | } | |||
2378 | } | |||
2379 | ||||
2380 | /* Dispatch the different subcommands. */ | |||
2381 | if (c->argc
| |||
2382 | const char *help[] = { | |||
2383 | "CREATE <key> <groupname> <id|$> [option]", | |||
2384 | " Create a new consumer group. Options are:", | |||
2385 | " * MKSTREAM", | |||
2386 | " Create the empty stream if it does not exist.", | |||
2387 | "CREATECONSUMER <key> <groupname> <consumer>", | |||
2388 | " Create a new consumer in the specified group.", | |||
2389 | "DELCONSUMER <key> <groupname> <consumer>", | |||
2390 | " Remove the specified consumer.", | |||
2391 | "DESTROY <key> <groupname>" | |||
2392 | " Remove the specified group.", | |||
2393 | "SETID <key> <groupname> <id|$>", | |||
2394 | " Set the current group ID.", | |||
2395 | NULL((void*)0) | |||
2396 | }; | |||
2397 | addReplyHelp(c, help); | |||
2398 | } else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) { | |||
2399 | streamID id; | |||
2400 | if (!strcmp(c->argv[4]->ptr,"$")) { | |||
2401 | if (s) { | |||
2402 | id = s->last_id; | |||
2403 | } else { | |||
2404 | id.ms = 0; | |||
2405 | id.seq = 0; | |||
2406 | } | |||
2407 | } else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK0) { | |||
2408 | return; | |||
2409 | } | |||
2410 | ||||
2411 | /* Handle the MKSTREAM option now that the command can no longer fail. */ | |||
2412 | if (s == NULL((void*)0)) { | |||
2413 | serverAssert(mkstream)((mkstream)?(void)0 : (_serverAssert("mkstream","t_stream.c", 2413),__builtin_unreachable())); | |||
2414 | o = createStreamObject(); | |||
2415 | dbAdd(c->db,c->argv[2],o); | |||
2416 | s = o->ptr; | |||
2417 | signalModifiedKey(c,c->db,c->argv[2]); | |||
2418 | } | |||
2419 | ||||
2420 | streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id); | |||
2421 | if (cg) { | |||
2422 | addReply(c,shared.ok); | |||
2423 | server.dirty++; | |||
2424 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-create", | |||
2425 | c->argv[2],c->db->id); | |||
2426 | } else { | |||
2427 | addReplyError(c,"-BUSYGROUP Consumer Group name already exists"); | |||
2428 | } | |||
2429 | } else if (!strcasecmp(opt,"SETID") && c->argc == 5) { | |||
2430 | streamID id; | |||
2431 | if (!strcmp(c->argv[4]->ptr,"$")) { | |||
2432 | id = s->last_id; | |||
2433 | } else if (streamParseIDOrReply(c,c->argv[4],&id,0) != C_OK0) { | |||
2434 | return; | |||
2435 | } | |||
2436 | cg->last_id = id; | |||
2437 | addReply(c,shared.ok); | |||
2438 | server.dirty++; | |||
2439 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-setid",c->argv[2],c->db->id); | |||
2440 | } else if (!strcasecmp(opt,"DESTROY") && c->argc == 4) { | |||
2441 | if (cg) { | |||
2442 | raxRemove(s->cgroups,(unsigned char*)grpname,sdslen(grpname),NULL((void*)0)); | |||
2443 | streamFreeCG(cg); | |||
2444 | addReply(c,shared.cone); | |||
2445 | server.dirty++; | |||
2446 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-destroy", | |||
2447 | c->argv[2],c->db->id); | |||
2448 | /* We want to unblock any XREADGROUP consumers with -NOGROUP. */ | |||
2449 | signalKeyAsReady(c->db,c->argv[2],OBJ_STREAM6); | |||
2450 | } else { | |||
2451 | addReply(c,shared.czero); | |||
2452 | } | |||
2453 | } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) { | |||
2454 | int created = 0; | |||
2455 | streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NOREFRESH(1<<1),&created); | |||
2456 | if (created) { | |||
2457 | server.dirty++; | |||
2458 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-createconsumer", | |||
2459 | c->argv[2],c->db->id); | |||
2460 | } | |||
2461 | addReplyLongLong(c,created); | |||
2462 | } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { | |||
2463 | /* Delete the consumer and returns the number of pending messages | |||
2464 | * that were yet associated with such a consumer. */ | |||
2465 | long long pending = streamDelConsumer(cg,c->argv[4]->ptr); | |||
2466 | addReplyLongLong(c,pending); | |||
2467 | server.dirty++; | |||
2468 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xgroup-delconsumer", | |||
2469 | c->argv[2],c->db->id); | |||
2470 | } else { | |||
2471 | addReplySubcommandSyntaxError(c); | |||
2472 | } | |||
2473 | } | |||
2474 | ||||
2475 | /* XSETID <stream> <id> | |||
2476 | * | |||
2477 | * Set the internal "last ID" of a stream. */ | |||
2478 | void xsetidCommand(client *c) { | |||
2479 | robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); | |||
2480 | if (o == NULL((void*)0) || checkType(c,o,OBJ_STREAM6)) return; | |||
2481 | ||||
2482 | stream *s = o->ptr; | |||
2483 | streamID id; | |||
2484 | if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK0) return; | |||
2485 | ||||
2486 | /* If the stream has at least one item, we want to check that the user | |||
2487 | * is setting a last ID that is equal or greater than the current top | |||
2488 | * item, otherwise the fundamental ID monotonicity assumption is violated. */ | |||
2489 | if (s->length > 0) { | |||
2490 | streamID maxid; | |||
2491 | streamLastValidID(s,&maxid); | |||
2492 | ||||
2493 | if (streamCompareID(&id,&maxid) < 0) { | |||
2494 | addReplyError(c,"The ID specified in XSETID is smaller than the " | |||
2495 | "target stream top item"); | |||
2496 | return; | |||
2497 | } | |||
2498 | } | |||
2499 | s->last_id = id; | |||
2500 | addReply(c,shared.ok); | |||
2501 | server.dirty++; | |||
2502 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xsetid",c->argv[1],c->db->id); | |||
2503 | } | |||
2504 | ||||
2505 | /* XACK <key> <group> <id> <id> ... <id> | |||
2506 | * | |||
2507 | * Acknowledge a message as processed. In practical terms we just check the | |||
2508 | * pendine entries list (PEL) of the group, and delete the PEL entry both from | |||
2509 | * the group and the consumer (pending messages are referenced in both places). | |||
2510 | * | |||
2511 | * Return value of the command is the number of messages successfully | |||
2512 | * acknowledged, that is, the IDs we were actually able to resolve in the PEL. | |||
2513 | */ | |||
2514 | void xackCommand(client *c) { | |||
2515 | streamCG *group = NULL((void*)0); | |||
2516 | robj *o = lookupKeyRead(c->db,c->argv[1]); | |||
2517 | if (o) { | |||
2518 | if (checkType(c,o,OBJ_STREAM6)) return; /* Type error. */ | |||
2519 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); | |||
2520 | } | |||
2521 | ||||
2522 | /* No key or group? Nothing to ack. */ | |||
2523 | if (o == NULL((void*)0) || group == NULL((void*)0)) { | |||
2524 | addReply(c,shared.czero); | |||
2525 | return; | |||
2526 | } | |||
2527 | ||||
2528 | /* Start parsing the IDs, so that we abort ASAP if there is a syntax | |||
2529 | * error: the return value of this command cannot be an error in case | |||
2530 | * the client successfully acknowledged some messages, so it should be | |||
2531 | * executed in a "all or nothing" fashion. */ | |||
2532 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; | |||
2533 | streamID *ids = static_ids; | |||
2534 | int id_count = c->argc-3; | |||
2535 | if (id_count > STREAMID_STATIC_VECTOR_LEN8) | |||
2536 | ids = zmalloc(sizeof(streamID)*id_count); | |||
2537 | for (int j = 3; j < c->argc; j++) { | |||
2538 | if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK0) goto cleanup; | |||
2539 | } | |||
2540 | ||||
2541 | int acknowledged = 0; | |||
2542 | for (int j = 3; j < c->argc; j++) { | |||
2543 | unsigned char buf[sizeof(streamID)]; | |||
2544 | streamEncodeID(buf,&ids[j-3]); | |||
2545 | ||||
2546 | /* Lookup the ID in the group PEL: it will have a reference to the | |||
2547 | * NACK structure that will have a reference to the consumer, so that | |||
2548 | * we are able to remove the entry from both PELs. */ | |||
2549 | streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); | |||
2550 | if (nack != raxNotFound) { | |||
2551 | raxRemove(group->pel,buf,sizeof(buf),NULL((void*)0)); | |||
2552 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0)); | |||
2553 | streamFreeNACK(nack); | |||
2554 | acknowledged++; | |||
2555 | server.dirty++; | |||
2556 | } | |||
2557 | } | |||
2558 | addReplyLongLong(c,acknowledged); | |||
2559 | cleanup: | |||
2560 | if (ids != static_ids) zfree(ids); | |||
2561 | } | |||
2562 | ||||
2563 | /* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]] | |||
2564 | * | |||
2565 | * If start and stop are omitted, the command just outputs information about | |||
2566 | * the amount of pending messages for the key/group pair, together with | |||
2567 | * the minimum and maximum ID of pending messages. | |||
2568 | * | |||
2569 | * If start and stop are provided instead, the pending messages are returned | |||
2570 | * with information about the current owner, number of deliveries and last | |||
2571 | * delivery time and so forth. */ | |||
2572 | void xpendingCommand(client *c) { | |||
2573 | int justinfo = c->argc == 3; /* Without the range just outputs general | |||
2574 | informations about the PEL. */ | |||
2575 | robj *key = c->argv[1]; | |||
2576 | robj *groupname = c->argv[2]; | |||
2577 | robj *consumername = NULL((void*)0); | |||
2578 | streamID startid, endid; | |||
2579 | long long count = 0; | |||
2580 | long long minidle = 0; | |||
2581 | int startex = 0, endex = 0; | |||
2582 | ||||
2583 | /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ | |||
2584 | if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { | |||
2585 | addReplyErrorObject(c,shared.syntaxerr); | |||
2586 | return; | |||
2587 | } | |||
2588 | ||||
2589 | /* Parse start/end/count arguments ASAP if needed, in order to report | |||
2590 | * syntax errors before any other error. */ | |||
2591 | if (c->argc >= 6) { | |||
2592 | int startidx = 3; /* Without IDLE */ | |||
2593 | ||||
2594 | if (!strcasecmp(c->argv[3]->ptr, "IDLE")) { | |||
2595 | if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL((void*)0)) == C_ERR-1) | |||
2596 | return; | |||
2597 | if (c->argc < 8) { | |||
2598 | /* If IDLE was provided we must have at least 'start end count' */ | |||
2599 | addReplyErrorObject(c,shared.syntaxerr); | |||
2600 | return; | |||
2601 | } | |||
2602 | /* Search for rest of arguments after 'IDLE <idle>' */ | |||
2603 | startidx += 2; | |||
2604 | } | |||
2605 | ||||
2606 | /* count argument. */ | |||
2607 | if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL((void*)0)) == C_ERR-1) | |||
2608 | return; | |||
2609 | if (count < 0) count = 0; | |||
2610 | ||||
2611 | /* start and end arguments. */ | |||
2612 | if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK0) | |||
2613 | return; | |||
2614 | if (startex && streamIncrID(&startid) != C_OK0) { | |||
2615 | addReplyError(c,"invalid start ID for the interval"); | |||
2616 | return; | |||
2617 | } | |||
2618 | if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX(18446744073709551615UL)) != C_OK0) | |||
2619 | return; | |||
2620 | if (endex && streamDecrID(&endid) != C_OK0) { | |||
2621 | addReplyError(c,"invalid end ID for the interval"); | |||
2622 | return; | |||
2623 | } | |||
2624 | ||||
2625 | if (startidx+3 < c->argc) { | |||
2626 | /* 'consumer' was provided */ | |||
2627 | consumername = c->argv[startidx+3]; | |||
2628 | } | |||
2629 | } | |||
2630 | ||||
2631 | /* Lookup the key and the group inside the stream. */ | |||
2632 | robj *o = lookupKeyRead(c->db,c->argv[1]); | |||
2633 | streamCG *group; | |||
2634 | ||||
2635 | if (checkType(c,o,OBJ_STREAM6)) return; | |||
2636 | if (o == NULL((void*)0) || | |||
2637 | (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL((void*)0)) | |||
2638 | { | |||
2639 | addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer " | |||
2640 | "group '%s'", | |||
2641 | (char*)key->ptr,(char*)groupname->ptr); | |||
2642 | return; | |||
2643 | } | |||
2644 | ||||
2645 | /* XPENDING <key> <group> variant. */ | |||
2646 | if (justinfo) { | |||
2647 | addReplyArrayLen(c,4); | |||
2648 | /* Total number of messages in the PEL. */ | |||
2649 | addReplyLongLong(c,raxSize(group->pel)); | |||
2650 | /* First and last IDs. */ | |||
2651 | if (raxSize(group->pel) == 0) { | |||
2652 | addReplyNull(c); /* Start. */ | |||
2653 | addReplyNull(c); /* End. */ | |||
2654 | addReplyNullArray(c); /* Clients. */ | |||
2655 | } else { | |||
2656 | /* Start. */ | |||
2657 | raxIterator ri; | |||
2658 | raxStart(&ri,group->pel); | |||
2659 | raxSeek(&ri,"^",NULL((void*)0),0); | |||
2660 | raxNext(&ri); | |||
2661 | streamDecodeID(ri.key,&startid); | |||
2662 | addReplyStreamID(c,&startid); | |||
2663 | ||||
2664 | /* End. */ | |||
2665 | raxSeek(&ri,"$",NULL((void*)0),0); | |||
2666 | raxNext(&ri); | |||
2667 | streamDecodeID(ri.key,&endid); | |||
2668 | addReplyStreamID(c,&endid); | |||
2669 | raxStop(&ri); | |||
2670 | ||||
2671 | /* Consumers with pending messages. */ | |||
2672 | raxStart(&ri,group->consumers); | |||
2673 | raxSeek(&ri,"^",NULL((void*)0),0); | |||
2674 | void *arraylen_ptr = addReplyDeferredLen(c); | |||
2675 | size_t arraylen = 0; | |||
2676 | while(raxNext(&ri)) { | |||
2677 | streamConsumer *consumer = ri.data; | |||
2678 | if (raxSize(consumer->pel) == 0) continue; | |||
2679 | addReplyArrayLen(c,2); | |||
2680 | addReplyBulkCBuffer(c,ri.key,ri.key_len); | |||
2681 | addReplyBulkLongLong(c,raxSize(consumer->pel)); | |||
2682 | arraylen++; | |||
2683 | } | |||
2684 | setDeferredArrayLen(c,arraylen_ptr,arraylen); | |||
2685 | raxStop(&ri); | |||
2686 | } | |||
2687 | } else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */ | |||
2688 | streamConsumer *consumer = NULL((void*)0); | |||
2689 | if (consumername) { | |||
2690 | consumer = streamLookupConsumer(group, | |||
2691 | consumername->ptr, | |||
2692 | SLC_NOCREAT(1<<0)|SLC_NOREFRESH(1<<1), | |||
2693 | NULL((void*)0)); | |||
2694 | ||||
2695 | /* If a consumer name was mentioned but it does not exist, we can | |||
2696 | * just return an empty array. */ | |||
2697 | if (consumer == NULL((void*)0)) { | |||
2698 | addReplyArrayLen(c,0); | |||
2699 | return; | |||
2700 | } | |||
2701 | } | |||
2702 | ||||
2703 | rax *pel = consumer ? consumer->pel : group->pel; | |||
2704 | unsigned char startkey[sizeof(streamID)]; | |||
2705 | unsigned char endkey[sizeof(streamID)]; | |||
2706 | raxIterator ri; | |||
2707 | mstime_t now = mstime(); | |||
2708 | ||||
2709 | streamEncodeID(startkey,&startid); | |||
2710 | streamEncodeID(endkey,&endid); | |||
2711 | raxStart(&ri,pel); | |||
2712 | raxSeek(&ri,">=",startkey,sizeof(startkey)); | |||
2713 | void *arraylen_ptr = addReplyDeferredLen(c); | |||
2714 | size_t arraylen = 0; | |||
2715 | ||||
2716 | while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { | |||
2717 | streamNACK *nack = ri.data; | |||
2718 | ||||
2719 | if (minidle) { | |||
2720 | mstime_t this_idle = now - nack->delivery_time; | |||
2721 | if (this_idle < minidle) continue; | |||
2722 | } | |||
2723 | ||||
2724 | arraylen++; | |||
2725 | count--; | |||
2726 | addReplyArrayLen(c,4); | |||
2727 | ||||
2728 | /* Entry ID. */ | |||
2729 | streamID id; | |||
2730 | streamDecodeID(ri.key,&id); | |||
2731 | addReplyStreamID(c,&id); | |||
2732 | ||||
2733 | /* Consumer name. */ | |||
2734 | addReplyBulkCBuffer(c,nack->consumer->name, | |||
2735 | sdslen(nack->consumer->name)); | |||
2736 | ||||
2737 | /* Milliseconds elapsed since last delivery. */ | |||
2738 | mstime_t elapsed = now - nack->delivery_time; | |||
2739 | if (elapsed < 0) elapsed = 0; | |||
2740 | addReplyLongLong(c,elapsed); | |||
2741 | ||||
2742 | /* Number of deliveries. */ | |||
2743 | addReplyLongLong(c,nack->delivery_count); | |||
2744 | } | |||
2745 | raxStop(&ri); | |||
2746 | setDeferredArrayLen(c,arraylen_ptr,arraylen); | |||
2747 | } | |||
2748 | } | |||
2749 | ||||
2750 | /* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> | |||
2751 | * [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>] | |||
2752 | * [FORCE] [JUSTID] | |||
2753 | * | |||
2754 | * Gets ownership of one or multiple messages in the Pending Entries List | |||
2755 | * of a given stream consumer group. | |||
2756 | * | |||
2757 | * If the message ID (among the specified ones) exists, and its idle | |||
2758 | * time greater or equal to <min-idle-time>, then the message new owner | |||
2759 | * becomes the specified <consumer>. If the minimum idle time specified | |||
2760 | * is zero, messages are claimed regardless of their idle time. | |||
2761 | * | |||
2762 | * All the messages that cannot be found inside the pending entries list | |||
2763 | * are ignored, but in case the FORCE option is used. In that case we | |||
2764 | * create the NACK (representing a not yet acknowledged message) entry in | |||
2765 | * the consumer group PEL. | |||
2766 | * | |||
2767 | * This command creates the consumer as side effect if it does not yet | |||
2768 | * exists. Moreover the command reset the idle time of the message to 0, | |||
2769 | * even if by using the IDLE or TIME options, the user can control the | |||
2770 | * new idle time. | |||
2771 | * | |||
2772 | * The options at the end can be used in order to specify more attributes | |||
2773 | * to set in the representation of the pending message: | |||
2774 | * | |||
2775 | * 1. IDLE <ms>: | |||
2776 | * Set the idle time (last time it was delivered) of the message. | |||
2777 | * If IDLE is not specified, an IDLE of 0 is assumed, that is, | |||
2778 | * the time count is reset because the message has now a new | |||
2779 | * owner trying to process it. | |||
2780 | * | |||
2781 | * 2. TIME <ms-unix-time>: | |||
2782 | * This is the same as IDLE but instead of a relative amount of | |||
2783 | * milliseconds, it sets the idle time to a specific unix time | |||
2784 | * (in milliseconds). This is useful in order to rewrite the AOF | |||
2785 | * file generating XCLAIM commands. | |||
2786 | * | |||
2787 | * 3. RETRYCOUNT <count>: | |||
2788 | * Set the retry counter to the specified value. This counter is | |||
2789 | * incremented every time a message is delivered again. Normally | |||
2790 | * XCLAIM does not alter this counter, which is just served to clients | |||
2791 | * when the XPENDING command is called: this way clients can detect | |||
2792 | * anomalies, like messages that are never processed for some reason | |||
2793 | * after a big number of delivery attempts. | |||
2794 | * | |||
2795 | * 4. FORCE: | |||
2796 | * Creates the pending message entry in the PEL even if certain | |||
2797 | * specified IDs are not already in the PEL assigned to a different | |||
2798 | * client. However the message must be exist in the stream, otherwise | |||
2799 | * the IDs of non existing messages are ignored. | |||
2800 | * | |||
2801 | * 5. JUSTID: | |||
2802 | * Return just an array of IDs of messages successfully claimed, | |||
2803 | * without returning the actual message. | |||
2804 | * | |||
2805 | * 6. LASTID <id>: | |||
2806 | * Update the consumer group last ID with the specified ID if the | |||
2807 | * current last ID is smaller than the provided one. | |||
2808 | * This is used for replication / AOF, so that when we read from a | |||
2809 | * consumer group, the XCLAIM that gets propagated to give ownership | |||
2810 | * to the consumer, is also used in order to update the group current | |||
2811 | * ID. | |||
2812 | * | |||
2813 | * The command returns an array of messages that the user | |||
2814 | * successfully claimed, so that the caller is able to understand | |||
2815 | * what messages it is now in charge of. */ | |||
2816 | void xclaimCommand(client *c) { | |||
2817 | streamCG *group = NULL((void*)0); | |||
2818 | robj *o = lookupKeyRead(c->db,c->argv[1]); | |||
2819 | long long minidle; /* Minimum idle time argument. */ | |||
2820 | long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */ | |||
2821 | mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */ | |||
2822 | int force = 0; | |||
2823 | int justid = 0; | |||
2824 | ||||
2825 | if (o) { | |||
2826 | if (checkType(c,o,OBJ_STREAM6)) return; /* Type error. */ | |||
2827 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); | |||
2828 | } | |||
2829 | ||||
2830 | /* No key or group? Send an error given that the group creation | |||
2831 | * is mandatory. */ | |||
2832 | if (o == NULL((void*)0) || group == NULL((void*)0)) { | |||
2833 | addReplyErrorFormat(c,"-NOGROUP No such key '%s' or " | |||
2834 | "consumer group '%s'", (char*)c->argv[1]->ptr, | |||
2835 | (char*)c->argv[2]->ptr); | |||
2836 | return; | |||
2837 | } | |||
2838 | ||||
2839 | if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle, | |||
2840 | "Invalid min-idle-time argument for XCLAIM") | |||
2841 | != C_OK0) return; | |||
2842 | if (minidle < 0) minidle = 0; | |||
2843 | ||||
2844 | /* Start parsing the IDs, so that we abort ASAP if there is a syntax | |||
2845 | * error: the return value of this command cannot be an error in case | |||
2846 | * the client successfully claimed some message, so it should be | |||
2847 | * executed in a "all or nothing" fashion. */ | |||
2848 | int j; | |||
2849 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; | |||
2850 | streamID *ids = static_ids; | |||
2851 | int id_count = c->argc-5; | |||
2852 | if (id_count > STREAMID_STATIC_VECTOR_LEN8) | |||
2853 | ids = zmalloc(sizeof(streamID)*id_count); | |||
2854 | for (j = 5; j < c->argc; j++) { | |||
2855 | if (streamParseStrictIDOrReply(NULL((void*)0),c->argv[j],&ids[j-5],0) != C_OK0) break; | |||
2856 | } | |||
2857 | int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */ | |||
2858 | ||||
2859 | /* If we stopped because some IDs cannot be parsed, perhaps they | |||
2860 | * are trailing options. */ | |||
2861 | mstime_t now = mstime(); | |||
2862 | streamID last_id = {0,0}; | |||
2863 | int propagate_last_id = 0; | |||
2864 | for (; j < c->argc; j++) { | |||
2865 | int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ | |||
2866 | char *opt = c->argv[j]->ptr; | |||
2867 | if (!strcasecmp(opt,"FORCE")) { | |||
2868 | force = 1; | |||
2869 | } else if (!strcasecmp(opt,"JUSTID")) { | |||
2870 | justid = 1; | |||
2871 | } else if (!strcasecmp(opt,"IDLE") && moreargs) { | |||
2872 | j++; | |||
2873 | if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, | |||
2874 | "Invalid IDLE option argument for XCLAIM") | |||
2875 | != C_OK0) goto cleanup; | |||
2876 | deliverytime = now - deliverytime; | |||
2877 | } else if (!strcasecmp(opt,"TIME") && moreargs) { | |||
2878 | j++; | |||
2879 | if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime, | |||
2880 | "Invalid TIME option argument for XCLAIM") | |||
2881 | != C_OK0) goto cleanup; | |||
2882 | } else if (!strcasecmp(opt,"RETRYCOUNT") && moreargs) { | |||
2883 | j++; | |||
2884 | if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount, | |||
2885 | "Invalid RETRYCOUNT option argument for XCLAIM") | |||
2886 | != C_OK0) goto cleanup; | |||
2887 | } else if (!strcasecmp(opt,"LASTID") && moreargs) { | |||
2888 | j++; | |||
2889 | if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK0) goto cleanup; | |||
2890 | } else { | |||
2891 | addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt); | |||
2892 | goto cleanup; | |||
2893 | } | |||
2894 | } | |||
2895 | ||||
2896 | if (streamCompareID(&last_id,&group->last_id) > 0) { | |||
2897 | group->last_id = last_id; | |||
2898 | propagate_last_id = 1; | |||
2899 | } | |||
2900 | ||||
2901 | if (deliverytime != -1) { | |||
2902 | /* If a delivery time was passed, either with IDLE or TIME, we | |||
2903 | * do some sanity check on it, and set the deliverytime to now | |||
2904 | * (which is a sane choice usually) if the value is bogus. | |||
2905 | * To raise an error here is not wise because clients may compute | |||
2906 | * the idle time doing some math starting from their local time, | |||
2907 | * and this is not a good excuse to fail in case, for instance, | |||
2908 | * the computer time is a bit in the future from our POV. */ | |||
2909 | if (deliverytime < 0 || deliverytime > now) deliverytime = now; | |||
2910 | } else { | |||
2911 | /* If no IDLE/TIME option was passed, we want the last delivery | |||
2912 | * time to be now, so that the idle time of the message will be | |||
2913 | * zero. */ | |||
2914 | deliverytime = now; | |||
2915 | } | |||
2916 | ||||
2917 | /* Do the actual claiming. */ | |||
2918 | streamConsumer *consumer = NULL((void*)0); | |||
2919 | void *arraylenptr = addReplyDeferredLen(c); | |||
2920 | size_t arraylen = 0; | |||
2921 | for (int j = 5; j <= last_id_arg; j++) { | |||
2922 | streamID id = ids[j-5]; | |||
2923 | unsigned char buf[sizeof(streamID)]; | |||
2924 | streamEncodeID(buf,&id); | |||
2925 | ||||
2926 | /* Lookup the ID in the group PEL. */ | |||
2927 | streamNACK *nack = raxFind(group->pel,buf,sizeof(buf)); | |||
2928 | ||||
2929 | /* If FORCE is passed, let's check if at least the entry | |||
2930 | * exists in the Stream. In such case, we'll crate a new | |||
2931 | * entry in the PEL from scratch, so that XCLAIM can also | |||
2932 | * be used to create entries in the PEL. Useful for AOF | |||
2933 | * and replication of consumer groups. */ | |||
2934 | if (force && nack == raxNotFound) { | |||
2935 | streamIterator myiterator; | |||
2936 | streamIteratorStart(&myiterator,o->ptr,&id,&id,0); | |||
2937 | int64_t numfields; | |||
2938 | int found = 0; | |||
2939 | streamID item_id; | |||
2940 | if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1; | |||
2941 | streamIteratorStop(&myiterator); | |||
2942 | ||||
2943 | /* Item must exist for us to create a NACK for it. */ | |||
2944 | if (!found) continue; | |||
2945 | ||||
2946 | /* Create the NACK. */ | |||
2947 | nack = streamCreateNACK(NULL((void*)0)); | |||
2948 | raxInsert(group->pel,buf,sizeof(buf),nack,NULL((void*)0)); | |||
2949 | } | |||
2950 | ||||
2951 | if (nack != raxNotFound) { | |||
2952 | /* We need to check if the minimum idle time requested | |||
2953 | * by the caller is satisfied by this entry. | |||
2954 | * | |||
2955 | * Note that the nack could be created by FORCE, in this | |||
2956 | * case there was no pre-existing entry and minidle should | |||
2957 | * be ignored, but in that case nack->consumer is NULL. */ | |||
2958 | if (nack->consumer && minidle) { | |||
2959 | mstime_t this_idle = now - nack->delivery_time; | |||
2960 | if (this_idle < minidle) continue; | |||
2961 | } | |||
2962 | if (consumer == NULL((void*)0)) | |||
2963 | consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE0,NULL((void*)0)); | |||
2964 | if (nack->consumer != consumer) { | |||
2965 | /* Remove the entry from the old consumer. | |||
2966 | * Note that nack->consumer is NULL if we created the | |||
2967 | * NACK above because of the FORCE option. */ | |||
2968 | if (nack->consumer) | |||
2969 | raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL((void*)0)); | |||
2970 | } | |||
2971 | nack->delivery_time = deliverytime; | |||
2972 | /* Set the delivery attempts counter if given, otherwise | |||
2973 | * autoincrement unless JUSTID option provided */ | |||
2974 | if (retrycount >= 0) { | |||
2975 | nack->delivery_count = retrycount; | |||
2976 | } else if (!justid) { | |||
2977 | nack->delivery_count++; | |||
2978 | } | |||
2979 | if (nack->consumer != consumer) { | |||
2980 | /* Add the entry in the new consumer local PEL. */ | |||
2981 | raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL((void*)0)); | |||
2982 | nack->consumer = consumer; | |||
2983 | } | |||
2984 | /* Send the reply for this entry. */ | |||
2985 | if (justid) { | |||
2986 | addReplyStreamID(c,&id); | |||
2987 | } else { | |||
2988 | size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, | |||
2989 | NULL((void*)0),NULL((void*)0),STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); | |||
2990 | if (!emitted) addReplyNull(c); | |||
2991 | } | |||
2992 | arraylen++; | |||
2993 | ||||
2994 | /* Propagate this change. */ | |||
2995 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); | |||
2996 | propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ | |||
2997 | server.dirty++; | |||
2998 | } | |||
2999 | } | |||
3000 | if (propagate_last_id) { | |||
3001 | streamPropagateGroupID(c,c->argv[1],group,c->argv[2]); | |||
3002 | server.dirty++; | |||
3003 | } | |||
3004 | setDeferredArrayLen(c,arraylenptr,arraylen); | |||
3005 | preventCommandPropagation(c); | |||
3006 | cleanup: | |||
3007 | if (ids != static_ids) zfree(ids); | |||
3008 | } | |||
3009 | ||||
3010 | /* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID] | |||
3011 | * | |||
3012 | * Gets ownership of one or multiple messages in the Pending Entries List | |||
3013 | * of a given stream consumer group. | |||
3014 | * | |||
3015 | * For each PEL entry, if its idle time greater or equal to <min-idle-time>, | |||
3016 | * then the message new owner becomes the specified <consumer>. | |||
3017 | * If the minimum idle time specified is zero, messages are claimed | |||
3018 | * regardless of their idle time. | |||
3019 | * | |||
3020 | * This command creates the consumer as side effect if it does not yet | |||
3021 | * exists. Moreover the command reset the idle time of the message to 0. | |||
3022 | * | |||
3023 | * The command returns an array of messages that the user | |||
3024 | * successfully claimed, so that the caller is able to understand | |||
3025 | * what messages it is now in charge of. */ | |||
3026 | void xautoclaimCommand(client *c) { | |||
3027 | streamCG *group = NULL((void*)0); | |||
3028 | robj *o = lookupKeyRead(c->db,c->argv[1]); | |||
3029 | long long minidle; /* Minimum idle time argument, in milliseconds. */ | |||
3030 | long count = 100; /* Maximum entries to claim. */ | |||
3031 | streamID startid; | |||
3032 | int startex; | |||
3033 | int justid = 0; | |||
3034 | ||||
3035 | /* Parse idle/start/end/count arguments ASAP if needed, in order to report | |||
3036 | * syntax errors before any other error. */ | |||
3037 | if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK0) | |||
3038 | return; | |||
3039 | if (minidle < 0) minidle = 0; | |||
3040 | ||||
3041 | if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK0) | |||
3042 | return; | |||
3043 | if (startex && streamIncrID(&startid) != C_OK0) { | |||
3044 | addReplyError(c,"invalid start ID for the interval"); | |||
3045 | return; | |||
3046 | } | |||
3047 | ||||
3048 | int j = 6; /* options start at argv[6] */ | |||
3049 | while(j < c->argc) { | |||
3050 | int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ | |||
3051 | char *opt = c->argv[j]->ptr; | |||
3052 | if (!strcasecmp(opt,"COUNT") && moreargs) { | |||
3053 | if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL((void*)0)) != C_OK0) | |||
3054 | return; | |||
3055 | if (count == 0) { | |||
3056 | addReplyError(c,"COUNT must be > 0"); | |||
3057 | return; | |||
3058 | } | |||
3059 | j++; | |||
3060 | } else if (!strcasecmp(opt,"JUSTID")) { | |||
3061 | justid = 1; | |||
3062 | } else { | |||
3063 | addReplyErrorObject(c,shared.syntaxerr); | |||
3064 | return; | |||
3065 | } | |||
3066 | j++; | |||
3067 | } | |||
3068 | ||||
3069 | if (o) { | |||
3070 | if (checkType(c,o,OBJ_STREAM6)) | |||
3071 | return; /* Type error. */ | |||
3072 | group = streamLookupCG(o->ptr,c->argv[2]->ptr); | |||
3073 | } | |||
3074 | ||||
3075 | /* No key or group? Send an error given that the group creation | |||
3076 | * is mandatory. */ | |||
3077 | if (o == NULL((void*)0) || group == NULL((void*)0)) { | |||
3078 | addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'", | |||
3079 | (char*)c->argv[1]->ptr, | |||
3080 | (char*)c->argv[2]->ptr); | |||
3081 | return; | |||
3082 | } | |||
3083 | ||||
3084 | /* Do the actual claiming. */ | |||
3085 | streamConsumer *consumer = NULL((void*)0); | |||
3086 | long long attempts = count*10; | |||
3087 | ||||
3088 | addReplyArrayLen(c, 2); | |||
3089 | void *endidptr = addReplyDeferredLen(c); | |||
3090 | void *arraylenptr = addReplyDeferredLen(c); | |||
3091 | ||||
3092 | unsigned char startkey[sizeof(streamID)]; | |||
3093 | streamEncodeID(startkey,&startid); | |||
3094 | raxIterator ri; | |||
3095 | raxStart(&ri,group->pel); | |||
3096 | raxSeek(&ri,">=",startkey,sizeof(startkey)); | |||
3097 | size_t arraylen = 0; | |||
3098 | mstime_t now = mstime(); | |||
3099 | while (attempts-- && count && raxNext(&ri)) { | |||
3100 | streamNACK *nack = ri.data; | |||
3101 | ||||
3102 | if (minidle) { | |||
3103 | mstime_t this_idle = now - nack->delivery_time; | |||
3104 | if (this_idle < minidle) | |||
3105 | continue; | |||
3106 | } | |||
3107 | ||||
3108 | streamID id; | |||
3109 | streamDecodeID(ri.key, &id); | |||
3110 | ||||
3111 | if (consumer == NULL((void*)0)) | |||
3112 | consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE0,NULL((void*)0)); | |||
3113 | if (nack->consumer != consumer) { | |||
3114 | /* Remove the entry from the old consumer. | |||
3115 | * Note that nack->consumer is NULL if we created the | |||
3116 | * NACK above because of the FORCE option. */ | |||
3117 | if (nack->consumer) | |||
3118 | raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL((void*)0)); | |||
3119 | } | |||
3120 | ||||
3121 | /* Update the consumer and idle time. */ | |||
3122 | nack->delivery_time = now; | |||
3123 | nack->delivery_count++; | |||
3124 | ||||
3125 | if (nack->consumer != consumer) { | |||
3126 | /* Add the entry in the new consumer local PEL. */ | |||
3127 | raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL((void*)0)); | |||
3128 | nack->consumer = consumer; | |||
3129 | } | |||
3130 | ||||
3131 | /* Send the reply for this entry. */ | |||
3132 | if (justid) { | |||
3133 | addReplyStreamID(c,&id); | |||
3134 | } else { | |||
3135 | size_t emitted = | |||
3136 | streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL((void*)0),NULL((void*)0), | |||
3137 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); | |||
3138 | if (!emitted) | |||
3139 | addReplyNull(c); | |||
3140 | } | |||
3141 | arraylen++; | |||
3142 | count--; | |||
3143 | ||||
3144 | /* Propagate this change. */ | |||
3145 | robj *idstr = createObjectFromStreamID(&id); | |||
3146 | streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); | |||
3147 | decrRefCount(idstr); | |||
3148 | server.dirty++; | |||
3149 | } | |||
3150 | ||||
3151 | streamID endid; | |||
3152 | if (raxEOF(&ri)) { | |||
3153 | endid.ms = endid.seq = 0; | |||
3154 | } else { | |||
3155 | streamDecodeID(ri.key, &endid); | |||
3156 | } | |||
3157 | raxStop(&ri); | |||
3158 | ||||
3159 | setDeferredArrayLen(c,arraylenptr,arraylen); | |||
3160 | setDeferredReplyStreamID(c,endidptr,&endid); | |||
3161 | ||||
3162 | preventCommandPropagation(c); | |||
3163 | } | |||
3164 | ||||
3165 | /* XDEL <key> [<ID1> <ID2> ... <IDN>] | |||
3166 | * | |||
3167 | * Removes the specified entries from the stream. Returns the number | |||
3168 | * of items actually deleted, that may be different from the number | |||
3169 | * of IDs passed in case certain IDs do not exist. */ | |||
3170 | void xdelCommand(client *c) { | |||
3171 | robj *o; | |||
3172 | ||||
3173 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0) | |||
3174 | || checkType(c,o,OBJ_STREAM6)) return; | |||
3175 | stream *s = o->ptr; | |||
3176 | ||||
3177 | /* We need to sanity check the IDs passed to start. Even if not | |||
3178 | * a big issue, it is not great that the command is only partially | |||
3179 | * executed because at some point an invalid ID is parsed. */ | |||
3180 | streamID static_ids[STREAMID_STATIC_VECTOR_LEN8]; | |||
3181 | streamID *ids = static_ids; | |||
3182 | int id_count = c->argc-2; | |||
3183 | if (id_count > STREAMID_STATIC_VECTOR_LEN8) | |||
3184 | ids = zmalloc(sizeof(streamID)*id_count); | |||
3185 | for (int j = 2; j < c->argc; j++) { | |||
3186 | if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK0) goto cleanup; | |||
3187 | } | |||
3188 | ||||
3189 | /* Actually apply the command. */ | |||
3190 | int deleted = 0; | |||
3191 | for (int j = 2; j < c->argc; j++) { | |||
3192 | deleted += streamDeleteItem(s,&ids[j-2]); | |||
3193 | } | |||
3194 | ||||
3195 | /* Propagate the write if needed. */ | |||
3196 | if (deleted) { | |||
3197 | signalModifiedKey(c,c->db,c->argv[1]); | |||
3198 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xdel",c->argv[1],c->db->id); | |||
3199 | server.dirty += deleted; | |||
3200 | } | |||
3201 | addReplyLongLong(c,deleted); | |||
3202 | cleanup: | |||
3203 | if (ids != static_ids) zfree(ids); | |||
3204 | } | |||
3205 | ||||
3206 | /* General form: XTRIM <key> [... options ...] | |||
3207 | * | |||
3208 | * List of options: | |||
3209 | * | |||
3210 | * Trim strategies: | |||
3211 | * | |||
3212 | * MAXLEN [~|=] <count> -- Trim so that the stream will be capped at | |||
3213 | * the specified length. Use ~ before the | |||
3214 | * count in order to demand approximated trimming | |||
3215 | * (like XADD MAXLEN option). | |||
3216 | * MINID [~|=] <id> -- Trim so that the stream will not contain entries | |||
3217 | * with IDs smaller than 'id'. Use ~ before the | |||
3218 | * count in order to demand approximated trimming | |||
3219 | * (like XADD MINID option). | |||
3220 | * | |||
3221 | * Other options: | |||
3222 | * | |||
3223 | * LIMIT <entries> -- The maximum number of entries to trim. | |||
3224 | * 0 means unlimited. Unless specified, it is set | |||
3225 | * to a default of 100*server.stream_node_max_entries, | |||
3226 | * and that's in order to keep the trimming time sane. | |||
3227 | * Has meaning only if `~` was provided. | |||
3228 | */ | |||
3229 | void xtrimCommand(client *c) { | |||
3230 | robj *o; | |||
3231 | ||||
3232 | /* Argument parsing. */ | |||
3233 | streamAddTrimArgs parsed_args; | |||
3234 | if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1) < 0) | |||
3235 | return; /* streamParseAddOrTrimArgsOrReply already replied. */ | |||
3236 | ||||
3237 | /* If the key does not exist, we are ok returning zero, that is, the | |||
3238 | * number of elements removed from the stream. */ | |||
3239 | if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL((void*)0) | |||
3240 | || checkType(c,o,OBJ_STREAM6)) return; | |||
3241 | stream *s = o->ptr; | |||
3242 | ||||
3243 | /* Perform the trimming. */ | |||
3244 | int64_t deleted = streamTrim(s, &parsed_args); | |||
3245 | if (deleted) { | |||
3246 | notifyKeyspaceEvent(NOTIFY_STREAM(1<<10),"xtrim",c->argv[1],c->db->id); | |||
3247 | if (parsed_args.approx_trim) { | |||
3248 | /* In case our trimming was limited (by LIMIT or by ~) we must | |||
3249 | * re-write the relevant trim argument to make sure there will be | |||
3250 | * no inconsistencies in AOF loading or in the replica. | |||
3251 | * It's enough to check only args->approx because there is no | |||
3252 | * way LIMIT is given without the ~ option. */ | |||
3253 | streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); | |||
3254 | streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); | |||
3255 | } | |||
3256 | ||||
3257 | /* Propagate the write. */ | |||
3258 | signalModifiedKey(c, c->db,c->argv[1]); | |||
3259 | server.dirty += deleted; | |||
3260 | } | |||
3261 | addReplyLongLong(c,deleted); | |||
3262 | } | |||
3263 | ||||
3264 | /* Helper function for xinfoCommand. | |||
3265 | * Handles the variants of XINFO STREAM */ | |||
3266 | void xinfoReplyWithStreamInfo(client *c, stream *s) { | |||
3267 | int full = 1; | |||
3268 | long long count = 10; /* Default COUNT is 10 so we don't block the server */ | |||
3269 | robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */ | |||
3270 | int optc = c->argc - 3; | |||
3271 | ||||
3272 | /* Parse options. */ | |||
3273 | if (optc == 0) { | |||
3274 | full = 0; | |||
3275 | } else { | |||
3276 | /* Valid options are [FULL] or [FULL COUNT <count>] */ | |||
3277 | if (optc != 1 && optc != 3) { | |||
3278 | addReplySubcommandSyntaxError(c); | |||
3279 | return; | |||
3280 | } | |||
3281 | ||||
3282 | /* First option must be "FULL" */ | |||
3283 | if (strcasecmp(optv[0]->ptr,"full")) { | |||
3284 | addReplySubcommandSyntaxError(c); | |||
3285 | return; | |||
3286 | } | |||
3287 | ||||
3288 | if (optc == 3) { | |||
3289 | /* First option must be "FULL" */ | |||
3290 | if (strcasecmp(optv[1]->ptr,"count")) { | |||
3291 | addReplySubcommandSyntaxError(c); | |||
3292 | return; | |||
3293 | } | |||
3294 | if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL((void*)0)) == C_ERR-1) | |||
3295 | return; | |||
3296 | if (count < 0) count = 10; | |||
3297 | } | |||
3298 | } | |||
3299 | ||||
3300 | addReplyMapLen(c,full ? 6 : 7); | |||
3301 | addReplyBulkCString(c,"length"); | |||
3302 | addReplyLongLong(c,s->length); | |||
3303 | addReplyBulkCString(c,"radix-tree-keys"); | |||
3304 | addReplyLongLong(c,raxSize(s->rax)); | |||
3305 | addReplyBulkCString(c,"radix-tree-nodes"); | |||
3306 | addReplyLongLong(c,s->rax->numnodes); | |||
3307 | addReplyBulkCString(c,"last-generated-id"); | |||
3308 | addReplyStreamID(c,&s->last_id); | |||
3309 | ||||
3310 | if (!full) { | |||
3311 | /* XINFO STREAM <key> */ | |||
3312 | ||||
3313 | addReplyBulkCString(c,"groups"); | |||
3314 | addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); | |||
3315 | ||||
3316 | /* To emit the first/last entry we use streamReplyWithRange(). */ | |||
3317 | int emitted; | |||
3318 | streamID start, end; | |||
3319 | start.ms = start.seq = 0; | |||
3320 | end.ms = end.seq = UINT64_MAX(18446744073709551615UL); | |||
3321 | addReplyBulkCString(c,"first-entry"); | |||
3322 | emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL((void*)0),NULL((void*)0), | |||
3323 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); | |||
3324 | if (!emitted) addReplyNull(c); | |||
3325 | addReplyBulkCString(c,"last-entry"); | |||
3326 | emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL((void*)0),NULL((void*)0), | |||
3327 | STREAM_RWR_RAWENTRIES(1<<1),NULL((void*)0)); | |||
3328 | if (!emitted) addReplyNull(c); | |||
3329 | } else { | |||
3330 | /* XINFO STREAM <key> FULL [COUNT <count>] */ | |||
3331 | ||||
3332 | /* Stream entries */ | |||
3333 | addReplyBulkCString(c,"entries"); | |||
3334 | streamReplyWithRange(c,s,NULL((void*)0),NULL((void*)0),count,0,NULL((void*)0),NULL((void*)0),0,NULL((void*)0)); | |||
3335 | ||||
3336 | /* Consumer groups */ | |||
3337 | addReplyBulkCString(c,"groups"); | |||
3338 | if (s->cgroups == NULL((void*)0)) { | |||
3339 | addReplyArrayLen(c,0); | |||
3340 | } else { | |||
3341 | addReplyArrayLen(c,raxSize(s->cgroups)); | |||
3342 | raxIterator ri_cgroups; | |||
3343 | raxStart(&ri_cgroups,s->cgroups); | |||
3344 | raxSeek(&ri_cgroups,"^",NULL((void*)0),0); | |||
3345 | while(raxNext(&ri_cgroups)) { | |||
3346 | streamCG *cg = ri_cgroups.data; | |||
3347 | addReplyMapLen(c,5); | |||
3348 | ||||
3349 | /* Name */ | |||
3350 | addReplyBulkCString(c,"name"); | |||
3351 | addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); | |||
3352 | ||||
3353 | /* Last delivered ID */ | |||
3354 | addReplyBulkCString(c,"last-delivered-id"); | |||
3355 | addReplyStreamID(c,&cg->last_id); | |||
3356 | ||||
3357 | /* Group PEL count */ | |||
3358 | addReplyBulkCString(c,"pel-count"); | |||
3359 | addReplyLongLong(c,raxSize(cg->pel)); | |||
3360 | ||||
3361 | /* Group PEL */ | |||
3362 | addReplyBulkCString(c,"pending"); | |||
3363 | long long arraylen_cg_pel = 0; | |||
3364 | void *arrayptr_cg_pel = addReplyDeferredLen(c); | |||
3365 | raxIterator ri_cg_pel; | |||
3366 | raxStart(&ri_cg_pel,cg->pel); | |||
3367 | raxSeek(&ri_cg_pel,"^",NULL((void*)0),0); | |||
3368 | while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { | |||
3369 | streamNACK *nack = ri_cg_pel.data; | |||
3370 | addReplyArrayLen(c,4); | |||
3371 | ||||
3372 | /* Entry ID. */ | |||
3373 | streamID id; | |||
3374 | streamDecodeID(ri_cg_pel.key,&id); | |||
3375 | addReplyStreamID(c,&id); | |||
3376 | ||||
3377 | /* Consumer name. */ | |||
3378 | serverAssert(nack->consumer)((nack->consumer)?(void)0 : (_serverAssert("nack->consumer" ,"t_stream.c",3378),__builtin_unreachable())); /* assertion for valgrind (avoid NPD) */ | |||
3379 | addReplyBulkCBuffer(c,nack->consumer->name, | |||
3380 | sdslen(nack->consumer->name)); | |||
3381 | ||||
3382 | /* Last delivery. */ | |||
3383 | addReplyLongLong(c,nack->delivery_time); | |||
3384 | ||||
3385 | /* Number of deliveries. */ | |||
3386 | addReplyLongLong(c,nack->delivery_count); | |||
3387 | ||||
3388 | arraylen_cg_pel++; | |||
3389 | } | |||
3390 | setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); | |||
3391 | raxStop(&ri_cg_pel); | |||
3392 | ||||
3393 | /* Consumers */ | |||
3394 | addReplyBulkCString(c,"consumers"); | |||
3395 | addReplyArrayLen(c,raxSize(cg->consumers)); | |||
3396 | raxIterator ri_consumers; | |||
3397 | raxStart(&ri_consumers,cg->consumers); | |||
3398 | raxSeek(&ri_consumers,"^",NULL((void*)0),0); | |||
3399 | while(raxNext(&ri_consumers)) { | |||
3400 | streamConsumer *consumer = ri_consumers.data; | |||
3401 | addReplyMapLen(c,4); | |||
3402 | ||||
3403 | /* Consumer name */ | |||
3404 | addReplyBulkCString(c,"name"); | |||
3405 | addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); | |||
3406 | ||||
3407 | /* Seen-time */ | |||
3408 | addReplyBulkCString(c,"seen-time"); | |||
3409 | addReplyLongLong(c,consumer->seen_time); | |||
3410 | ||||
3411 | /* Consumer PEL count */ | |||
3412 | addReplyBulkCString(c,"pel-count"); | |||
3413 | addReplyLongLong(c,raxSize(consumer->pel)); | |||
3414 | ||||
3415 | /* Consumer PEL */ | |||
3416 | addReplyBulkCString(c,"pending"); | |||
3417 | long long arraylen_cpel = 0; | |||
3418 | void *arrayptr_cpel = addReplyDeferredLen(c); | |||
3419 | raxIterator ri_cpel; | |||
3420 | raxStart(&ri_cpel,consumer->pel); | |||
3421 | raxSeek(&ri_cpel,"^",NULL((void*)0),0); | |||
3422 | while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { | |||
3423 | streamNACK *nack = ri_cpel.data; | |||
3424 | addReplyArrayLen(c,3); | |||
3425 | ||||
3426 | /* Entry ID. */ | |||
3427 | streamID id; | |||
3428 | streamDecodeID(ri_cpel.key,&id); | |||
3429 | addReplyStreamID(c,&id); | |||
3430 | ||||
3431 | /* Last delivery. */ | |||
3432 | addReplyLongLong(c,nack->delivery_time); | |||
3433 | ||||
3434 | /* Number of deliveries. */ | |||
3435 | addReplyLongLong(c,nack->delivery_count); | |||
3436 | ||||
3437 | arraylen_cpel++; | |||
3438 | } | |||
3439 | setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); | |||
3440 | raxStop(&ri_cpel); | |||
3441 | } | |||
3442 | raxStop(&ri_consumers); | |||
3443 | } | |||
3444 | raxStop(&ri_cgroups); | |||
3445 | } | |||
3446 | } | |||
3447 | } | |||
3448 | ||||
3449 | /* XINFO CONSUMERS <key> <group> | |||
3450 | * XINFO GROUPS <key> | |||
3451 | * XINFO STREAM <key> [FULL [COUNT <count>]] | |||
3452 | * XINFO HELP. */ | |||
3453 | void xinfoCommand(client *c) { | |||
3454 | stream *s = NULL((void*)0); | |||
3455 | char *opt; | |||
3456 | robj *key; | |||
3457 | ||||
3458 | /* HELP is special. Handle it ASAP. */ | |||
3459 | if (!strcasecmp(c->argv[1]->ptr,"HELP")) { | |||
3460 | const char *help[] = { | |||
3461 | "CONSUMERS <key> <groupname>", | |||
3462 | " Show consumers of <groupname>.", | |||
3463 | "GROUPS <key>", | |||
3464 | " Show the stream consumer groups.", | |||
3465 | "STREAM <key> [FULL [COUNT <count>]", | |||
3466 | " Show information about the stream.", | |||
3467 | NULL((void*)0) | |||
3468 | }; | |||
3469 | addReplyHelp(c, help); | |||
3470 | return; | |||
3471 | } else if (c->argc < 3) { | |||
3472 | addReplySubcommandSyntaxError(c); | |||
3473 | return; | |||
3474 | } | |||
3475 | ||||
3476 | /* With the exception of HELP handled before any other sub commands, all | |||
3477 | * the ones are in the form of "<subcommand> <key>". */ | |||
3478 | opt = c->argv[1]->ptr; | |||
3479 | key = c->argv[2]; | |||
3480 | ||||
3481 | /* Lookup the key now, this is common for all the subcommands but HELP. */ | |||
3482 | robj *o = lookupKeyReadOrReply(c,key,shared.nokeyerr); | |||
3483 | if (o == NULL((void*)0) || checkType(c,o,OBJ_STREAM6)) return; | |||
3484 | s = o->ptr; | |||
3485 | ||||
3486 | /* Dispatch the different subcommands. */ | |||
3487 | if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { | |||
3488 | /* XINFO CONSUMERS <key> <group>. */ | |||
3489 | streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); | |||
3490 | if (cg == NULL((void*)0)) { | |||
3491 | addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " | |||
3492 | "for key name '%s'", | |||
3493 | (char*)c->argv[3]->ptr, (char*)key->ptr); | |||
3494 | return; | |||
3495 | } | |||
3496 | ||||
3497 | addReplyArrayLen(c,raxSize(cg->consumers)); | |||
3498 | raxIterator ri; | |||
3499 | raxStart(&ri,cg->consumers); | |||
3500 | raxSeek(&ri,"^",NULL((void*)0),0); | |||
3501 | mstime_t now = mstime(); | |||
3502 | while(raxNext(&ri)) { | |||
3503 | streamConsumer *consumer = ri.data; | |||
3504 | mstime_t idle = now - consumer->seen_time; | |||
3505 | if (idle < 0) idle = 0; | |||
3506 | ||||
3507 | addReplyMapLen(c,3); | |||
3508 | addReplyBulkCString(c,"name"); | |||
3509 | addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); | |||
3510 | addReplyBulkCString(c,"pending"); | |||
3511 | addReplyLongLong(c,raxSize(consumer->pel)); | |||
3512 | addReplyBulkCString(c,"idle"); | |||
3513 | addReplyLongLong(c,idle); | |||
3514 | } | |||
3515 | raxStop(&ri); | |||
3516 | } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { | |||
3517 | /* XINFO GROUPS <key>. */ | |||
3518 | if (s->cgroups == NULL((void*)0)) { | |||
3519 | addReplyArrayLen(c,0); | |||
3520 | return; | |||
3521 | } | |||
3522 | ||||
3523 | addReplyArrayLen(c,raxSize(s->cgroups)); | |||
3524 | raxIterator ri; | |||
3525 | raxStart(&ri,s->cgroups); | |||
3526 | raxSeek(&ri,"^",NULL((void*)0),0); | |||
3527 | while(raxNext(&ri)) { | |||
3528 | streamCG *cg = ri.data; | |||
3529 | addReplyMapLen(c,4); | |||
3530 | addReplyBulkCString(c,"name"); | |||
3531 | addReplyBulkCBuffer(c,ri.key,ri.key_len); | |||
3532 | addReplyBulkCString(c,"consumers"); | |||
3533 | addReplyLongLong(c,raxSize(cg->consumers)); | |||
3534 | addReplyBulkCString(c,"pending"); | |||
3535 | addReplyLongLong(c,raxSize(cg->pel)); | |||
3536 | addReplyBulkCString(c,"last-delivered-id"); | |||
3537 | addReplyStreamID(c,&cg->last_id); | |||
3538 | } | |||
3539 | raxStop(&ri); | |||
3540 | } else if (!strcasecmp(opt,"STREAM")) { | |||
3541 | /* XINFO STREAM <key> [FULL [COUNT <count>]]. */ | |||
3542 | xinfoReplyWithStreamInfo(c,s); | |||
3543 | } else { | |||
3544 | addReplySubcommandSyntaxError(c); | |||
3545 | } | |||
3546 | } | |||
3547 | ||||
3548 | /* Validate the integrity stream listpack entries stracture. Both in term of a | |||
3549 | * valid listpack, but also that the stracture of the entires matches a valid | |||
3550 | * stream. return 1 if valid 0 if not valid. */ | |||
3551 | int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { | |||
3552 | int valid_record; | |||
3553 | unsigned char *p, *next; | |||
3554 | ||||
3555 | /* Since we don't want to run validation of all records twice, we'll | |||
3556 | * run the listpack validation of just the header and do the rest here. */ | |||
3557 | if (!lpValidateIntegrity(lp, size, 0)) | |||
3558 | return 0; | |||
3559 | ||||
3560 | /* In non-deep mode we just validated the listpack header (encoded size) */ | |||
3561 | if (!deep) return 1; | |||
3562 | ||||
3563 | next = p = lpFirst(lp); | |||
3564 | if (!lpValidateNext(lp, &next, size)) return 0; | |||
3565 | if (!p) return 0; | |||
3566 | ||||
3567 | /* entry count */ | |||
3568 | int64_t entry_count = lpGetIntegerIfValid(p, &valid_record); | |||
3569 | if (!valid_record) return 0; | |||
3570 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3571 | ||||
3572 | /* deleted */ | |||
3573 | int64_t deleted_count = lpGetIntegerIfValid(p, &valid_record); | |||
3574 | if (!valid_record) return 0; | |||
3575 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3576 | ||||
3577 | /* num-of-fields */ | |||
3578 | int64_t master_fields = lpGetIntegerIfValid(p, &valid_record); | |||
3579 | if (!valid_record) return 0; | |||
3580 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3581 | ||||
3582 | /* the field names */ | |||
3583 | for (int64_t j = 0; j < master_fields; j++) { | |||
3584 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3585 | } | |||
3586 | ||||
3587 | /* the zero master entry terminator. */ | |||
3588 | int64_t zero = lpGetIntegerIfValid(p, &valid_record); | |||
3589 | if (!valid_record || zero != 0) return 0; | |||
3590 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3591 | ||||
3592 | entry_count += deleted_count; | |||
3593 | while (entry_count--) { | |||
3594 | if (!p) return 0; | |||
3595 | int64_t fields = master_fields, extra_fields = 3; | |||
3596 | int64_t flags = lpGetIntegerIfValid(p, &valid_record); | |||
3597 | if (!valid_record) return 0; | |||
3598 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3599 | ||||
3600 | /* entry id */ | |||
3601 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3602 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3603 | ||||
3604 | if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS(1<<1))) { | |||
3605 | /* num-of-fields */ | |||
3606 | fields = lpGetIntegerIfValid(p, &valid_record); | |||
3607 | if (!valid_record) return 0; | |||
3608 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3609 | ||||
3610 | /* the field names */ | |||
3611 | for (int64_t j = 0; j < fields; j++) { | |||
3612 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3613 | } | |||
3614 | ||||
3615 | extra_fields += fields + 1; | |||
3616 | } | |||
3617 | ||||
3618 | /* the values */ | |||
3619 | for (int64_t j = 0; j < fields; j++) { | |||
3620 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3621 | } | |||
3622 | ||||
3623 | /* lp-count */ | |||
3624 | int64_t lp_count = lpGetIntegerIfValid(p, &valid_record); | |||
3625 | if (!valid_record) return 0; | |||
3626 | if (lp_count != fields + extra_fields) return 0; | |||
3627 | p = next; if (!lpValidateNext(lp, &next, size)) return 0; | |||
3628 | } | |||
3629 | ||||
3630 | if (next) | |||
3631 | return 0; | |||
3632 | ||||
3633 | return 1; | |||
3634 | } |