Bug Summary

File:src/t_zset.c
Warning:line 1135, column 13
Although the value stored to 'sptr' is used in the enclosing expression, the value is never actually read from 'sptr'

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -triple x86_64-pc-linux-gnu -analyze -disable-free -disable-llvm-verifier -discard-value-names -main-file-name t_zset.c -analyzer-store=region -analyzer-opt-analyze-nested-blocks -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model static -mthread-model posix -mframe-pointer=none -fmath-errno -fno-rounding-math -masm-verbose -mconstructor-aliases -munwind-tables -target-cpu x86-64 -dwarf-column-info -fno-split-dwarf-inlining -debugger-tuning=gdb -resource-dir /usr/lib/llvm-10/lib/clang/10.0.0 -D REDIS_STATIC= -I ../deps/hiredis -I ../deps/linenoise -I ../deps/lua/src -I ../deps/hdr_histogram -D USE_JEMALLOC -I ../deps/jemalloc/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/llvm-10/lib/clang/10.0.0/include -internal-externc-isystem /usr/include/x86_64-linux-gnu -internal-externc-isystem /include -internal-externc-isystem /usr/include -O2 -Wno-c11-extensions -Wno-missing-field-initializers -std=c11 -fdebug-compilation-dir /home/netto/Desktop/redis-6.2.1/src -ferror-limit 19 -fmessage-length 0 -fgnuc-version=4.2.1 -fobjc-runtime=gcc -fdiagnostics-show-option -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -o /tmp/scan-build-2021-03-14-133648-8817-1 -x c t_zset.c
1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31/*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35/* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to a hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view").
42 *
43 * Note that the SDS string representing the element is the same in both
44 * the hash table and skiplist in order to save memory. What we do in order
45 * to manage the shared SDS string more easily is to free the SDS string
46 * only in zslFreeNode(). The dictionary has no value free method set.
47 * So we should always remove an element from the dictionary, and later from
48 * the skiplist.
49 *
50 * This skiplist implementation is almost a C translation of the original
51 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
52 * Alternative to Balanced Trees", modified in three ways:
53 * a) this implementation allows for repeated scores.
54 * b) the comparison is not just by key (our 'score') but by satellite data.
55 * c) there is a back pointer, so it's a doubly linked list with the back
56 * pointers being only at "level 1". This allows to traverse the list
57 * from tail to head, useful for ZREVRANGE. */
58
59#include "server.h"
60#include <math.h>
61
62/*-----------------------------------------------------------------------------
63 * Skiplist implementation of the low level API
64 *----------------------------------------------------------------------------*/
65
66int zslLexValueGteMin(sds value, zlexrangespec *spec);
67int zslLexValueLteMax(sds value, zlexrangespec *spec);
68
69/* Create a skiplist node with the specified number of levels.
70 * The SDS string 'ele' is referenced by the node after the call. */
71zskiplistNode *zslCreateNode(int level, double score, sds ele) {
72 zskiplistNode *zn =
73 zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
74 zn->score = score;
75 zn->ele = ele;
76 return zn;
77}
78
79/* Create a new skiplist. */
80zskiplist *zslCreate(void) {
81 int j;
82 zskiplist *zsl;
83
84 zsl = zmalloc(sizeof(*zsl));
85 zsl->level = 1;
86 zsl->length = 0;
87 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL32,0,NULL((void*)0));
88 for (j = 0; j < ZSKIPLIST_MAXLEVEL32; j++) {
89 zsl->header->level[j].forward = NULL((void*)0);
90 zsl->header->level[j].span = 0;
91 }
92 zsl->header->backward = NULL((void*)0);
93 zsl->tail = NULL((void*)0);
94 return zsl;
95}
96
97/* Free the specified skiplist node. The referenced SDS string representation
98 * of the element is freed too, unless node->ele is set to NULL before calling
99 * this function. */
100void zslFreeNode(zskiplistNode *node) {
101 sdsfree(node->ele);
102 zfree(node);
103}
104
105/* Free a whole skiplist. */
106void zslFree(zskiplist *zsl) {
107 zskiplistNode *node = zsl->header->level[0].forward, *next;
108
109 zfree(zsl->header);
110 while(node) {
111 next = node->level[0].forward;
112 zslFreeNode(node);
113 node = next;
114 }
115 zfree(zsl);
116}
117
118/* Returns a random level for the new skiplist node we are going to create.
119 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
120 * (both inclusive), with a powerlaw-alike distribution where higher
121 * levels are less likely to be returned. */
122int zslRandomLevel(void) {
123 int level = 1;
124 while ((random()&0xFFFF) < (ZSKIPLIST_P0.25 * 0xFFFF))
125 level += 1;
126 return (level<ZSKIPLIST_MAXLEVEL32) ? level : ZSKIPLIST_MAXLEVEL32;
127}
128
129/* Insert a new node in the skiplist. Assumes the element does not already
130 * exist (up to the caller to enforce that). The skiplist takes ownership
131 * of the passed SDS string 'ele'. */
132zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
133 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
134 unsigned int rank[ZSKIPLIST_MAXLEVEL32];
135 int i, level;
136
137 serverAssert(!isnan(score))((!__builtin_isnan (score))?(void)0 : (_serverAssert("!isnan(score)"
,"t_zset.c",137),__builtin_unreachable()))
;
138 x = zsl->header;
139 for (i = zsl->level-1; i >= 0; i--) {
140 /* store rank that is crossed to reach the insert position */
141 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
142 while (x->level[i].forward &&
143 (x->level[i].forward->score < score ||
144 (x->level[i].forward->score == score &&
145 sdscmp(x->level[i].forward->ele,ele) < 0)))
146 {
147 rank[i] += x->level[i].span;
148 x = x->level[i].forward;
149 }
150 update[i] = x;
151 }
152 /* we assume the element is not already inside, since we allow duplicated
153 * scores, reinserting the same element should never happen since the
154 * caller of zslInsert() should test in the hash table if the element is
155 * already inside or not. */
156 level = zslRandomLevel();
157 if (level > zsl->level) {
158 for (i = zsl->level; i < level; i++) {
159 rank[i] = 0;
160 update[i] = zsl->header;
161 update[i]->level[i].span = zsl->length;
162 }
163 zsl->level = level;
164 }
165 x = zslCreateNode(level,score,ele);
166 for (i = 0; i < level; i++) {
167 x->level[i].forward = update[i]->level[i].forward;
168 update[i]->level[i].forward = x;
169
170 /* update span covered by update[i] as x is inserted here */
171 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
172 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
173 }
174
175 /* increment span for untouched levels */
176 for (i = level; i < zsl->level; i++) {
177 update[i]->level[i].span++;
178 }
179
180 x->backward = (update[0] == zsl->header) ? NULL((void*)0) : update[0];
181 if (x->level[0].forward)
182 x->level[0].forward->backward = x;
183 else
184 zsl->tail = x;
185 zsl->length++;
186 return x;
187}
188
189/* Internal function used by zslDelete, zslDeleteRangeByScore and
190 * zslDeleteRangeByRank. */
191void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
192 int i;
193 for (i = 0; i < zsl->level; i++) {
194 if (update[i]->level[i].forward == x) {
195 update[i]->level[i].span += x->level[i].span - 1;
196 update[i]->level[i].forward = x->level[i].forward;
197 } else {
198 update[i]->level[i].span -= 1;
199 }
200 }
201 if (x->level[0].forward) {
202 x->level[0].forward->backward = x->backward;
203 } else {
204 zsl->tail = x->backward;
205 }
206 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL((void*)0))
207 zsl->level--;
208 zsl->length--;
209}
210
211/* Delete an element with matching score/element from the skiplist.
212 * The function returns 1 if the node was found and deleted, otherwise
213 * 0 is returned.
214 *
215 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
216 * it is not freed (but just unlinked) and *node is set to the node pointer,
217 * so that it is possible for the caller to reuse the node (including the
218 * referenced SDS string at node->ele). */
219int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
220 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
221 int i;
222
223 x = zsl->header;
224 for (i = zsl->level-1; i >= 0; i--) {
225 while (x->level[i].forward &&
226 (x->level[i].forward->score < score ||
227 (x->level[i].forward->score == score &&
228 sdscmp(x->level[i].forward->ele,ele) < 0)))
229 {
230 x = x->level[i].forward;
231 }
232 update[i] = x;
233 }
234 /* We may have multiple elements with the same score, what we need
235 * is to find the element with both the right score and object. */
236 x = x->level[0].forward;
237 if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
238 zslDeleteNode(zsl, x, update);
239 if (!node)
240 zslFreeNode(x);
241 else
242 *node = x;
243 return 1;
244 }
245 return 0; /* not found */
246}
247
248/* Update the score of an element inside the sorted set skiplist.
249 * Note that the element must exist and must match 'score'.
250 * This function does not update the score in the hash table side, the
251 * caller should take care of it.
252 *
253 * Note that this function attempts to just update the node, in case after
254 * the score update, the node would be exactly at the same position.
255 * Otherwise the skiplist is modified by removing and re-adding a new
256 * element, which is more costly.
257 *
258 * The function returns the updated element skiplist node pointer. */
259zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
260 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
261 int i;
262
263 /* We need to seek to element to update to start: this is useful anyway,
264 * we'll have to update or remove it. */
265 x = zsl->header;
266 for (i = zsl->level-1; i >= 0; i--) {
267 while (x->level[i].forward &&
268 (x->level[i].forward->score < curscore ||
269 (x->level[i].forward->score == curscore &&
270 sdscmp(x->level[i].forward->ele,ele) < 0)))
271 {
272 x = x->level[i].forward;
273 }
274 update[i] = x;
275 }
276
277 /* Jump to our element: note that this function assumes that the
278 * element with the matching score exists. */
279 x = x->level[0].forward;
280 serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0)((x && curscore == x->score && sdscmp(x->
ele,ele) == 0)?(void)0 : (_serverAssert("x && curscore == x->score && sdscmp(x->ele,ele) == 0"
,"t_zset.c",280),__builtin_unreachable()))
;
281
282 /* If the node, after the score update, would be still exactly
283 * at the same position, we can just update the score without
284 * actually removing and re-inserting the element in the skiplist. */
285 if ((x->backward == NULL((void*)0) || x->backward->score < newscore) &&
286 (x->level[0].forward == NULL((void*)0) || x->level[0].forward->score > newscore))
287 {
288 x->score = newscore;
289 return x;
290 }
291
292 /* No way to reuse the old node: we need to remove and insert a new
293 * one at a different place. */
294 zslDeleteNode(zsl, x, update);
295 zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
296 /* We reused the old node x->ele SDS string, free the node now
297 * since zslInsert created a new one. */
298 x->ele = NULL((void*)0);
299 zslFreeNode(x);
300 return newnode;
301}
302
303int zslValueGteMin(double value, zrangespec *spec) {
304 return spec->minex ? (value > spec->min) : (value >= spec->min);
305}
306
307int zslValueLteMax(double value, zrangespec *spec) {
308 return spec->maxex ? (value < spec->max) : (value <= spec->max);
309}
310
311/* Returns if there is a part of the zset is in range. */
312int zslIsInRange(zskiplist *zsl, zrangespec *range) {
313 zskiplistNode *x;
314
315 /* Test for ranges that will always be empty. */
316 if (range->min > range->max ||
317 (range->min == range->max && (range->minex || range->maxex)))
318 return 0;
319 x = zsl->tail;
320 if (x == NULL((void*)0) || !zslValueGteMin(x->score,range))
321 return 0;
322 x = zsl->header->level[0].forward;
323 if (x == NULL((void*)0) || !zslValueLteMax(x->score,range))
324 return 0;
325 return 1;
326}
327
328/* Find the first node that is contained in the specified range.
329 * Returns NULL when no element is contained in the range. */
330zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
331 zskiplistNode *x;
332 int i;
333
334 /* If everything is out of range, return early. */
335 if (!zslIsInRange(zsl,range)) return NULL((void*)0);
336
337 x = zsl->header;
338 for (i = zsl->level-1; i >= 0; i--) {
339 /* Go forward while *OUT* of range. */
340 while (x->level[i].forward &&
341 !zslValueGteMin(x->level[i].forward->score,range))
342 x = x->level[i].forward;
343 }
344
345 /* This is an inner range, so the next node cannot be NULL. */
346 x = x->level[0].forward;
347 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,347),__builtin_unreachable()))
;
348
349 /* Check if score <= max. */
350 if (!zslValueLteMax(x->score,range)) return NULL((void*)0);
351 return x;
352}
353
354/* Find the last node that is contained in the specified range.
355 * Returns NULL when no element is contained in the range. */
356zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
357 zskiplistNode *x;
358 int i;
359
360 /* If everything is out of range, return early. */
361 if (!zslIsInRange(zsl,range)) return NULL((void*)0);
362
363 x = zsl->header;
364 for (i = zsl->level-1; i >= 0; i--) {
365 /* Go forward while *IN* range. */
366 while (x->level[i].forward &&
367 zslValueLteMax(x->level[i].forward->score,range))
368 x = x->level[i].forward;
369 }
370
371 /* This is an inner range, so this node cannot be NULL. */
372 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,372),__builtin_unreachable()))
;
373
374 /* Check if score >= min. */
375 if (!zslValueGteMin(x->score,range)) return NULL((void*)0);
376 return x;
377}
378
379/* Delete all the elements with score between min and max from the skiplist.
380 * Both min and max can be inclusive or exclusive (see range->minex and
381 * range->maxex). When inclusive a score >= min && score <= max is deleted.
382 * Note that this function takes the reference to the hash table view of the
383 * sorted set, in order to remove the elements from the hash table too. */
384unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
385 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
386 unsigned long removed = 0;
387 int i;
388
389 x = zsl->header;
390 for (i = zsl->level-1; i >= 0; i--) {
391 while (x->level[i].forward && (range->minex ?
392 x->level[i].forward->score <= range->min :
393 x->level[i].forward->score < range->min))
394 x = x->level[i].forward;
395 update[i] = x;
396 }
397
398 /* Current node is the last with score < or <= min. */
399 x = x->level[0].forward;
400
401 /* Delete nodes while in range. */
402 while (x &&
403 (range->maxex ? x->score < range->max : x->score <= range->max))
404 {
405 zskiplistNode *next = x->level[0].forward;
406 zslDeleteNode(zsl,x,update);
407 dictDelete(dict,x->ele);
408 zslFreeNode(x); /* Here is where x->ele is actually released. */
409 removed++;
410 x = next;
411 }
412 return removed;
413}
414
415unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
416 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
417 unsigned long removed = 0;
418 int i;
419
420
421 x = zsl->header;
422 for (i = zsl->level-1; i >= 0; i--) {
423 while (x->level[i].forward &&
424 !zslLexValueGteMin(x->level[i].forward->ele,range))
425 x = x->level[i].forward;
426 update[i] = x;
427 }
428
429 /* Current node is the last with score < or <= min. */
430 x = x->level[0].forward;
431
432 /* Delete nodes while in range. */
433 while (x && zslLexValueLteMax(x->ele,range)) {
434 zskiplistNode *next = x->level[0].forward;
435 zslDeleteNode(zsl,x,update);
436 dictDelete(dict,x->ele);
437 zslFreeNode(x); /* Here is where x->ele is actually released. */
438 removed++;
439 x = next;
440 }
441 return removed;
442}
443
444/* Delete all the elements with rank between start and end from the skiplist.
445 * Start and end are inclusive. Note that start and end need to be 1-based */
446unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
447 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
448 unsigned long traversed = 0, removed = 0;
449 int i;
450
451 x = zsl->header;
452 for (i = zsl->level-1; i >= 0; i--) {
453 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
454 traversed += x->level[i].span;
455 x = x->level[i].forward;
456 }
457 update[i] = x;
458 }
459
460 traversed++;
461 x = x->level[0].forward;
462 while (x && traversed <= end) {
463 zskiplistNode *next = x->level[0].forward;
464 zslDeleteNode(zsl,x,update);
465 dictDelete(dict,x->ele);
466 zslFreeNode(x);
467 removed++;
468 traversed++;
469 x = next;
470 }
471 return removed;
472}
473
474/* Find the rank for an element by both score and key.
475 * Returns 0 when the element cannot be found, rank otherwise.
476 * Note that the rank is 1-based due to the span of zsl->header to the
477 * first element. */
478unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
479 zskiplistNode *x;
480 unsigned long rank = 0;
481 int i;
482
483 x = zsl->header;
484 for (i = zsl->level-1; i >= 0; i--) {
485 while (x->level[i].forward &&
486 (x->level[i].forward->score < score ||
487 (x->level[i].forward->score == score &&
488 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
489 rank += x->level[i].span;
490 x = x->level[i].forward;
491 }
492
493 /* x might be equal to zsl->header, so test if obj is non-NULL */
494 if (x->ele && sdscmp(x->ele,ele) == 0) {
495 return rank;
496 }
497 }
498 return 0;
499}
500
501/* Finds an element by its rank. The rank argument needs to be 1-based. */
502zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
503 zskiplistNode *x;
504 unsigned long traversed = 0;
505 int i;
506
507 x = zsl->header;
508 for (i = zsl->level-1; i >= 0; i--) {
509 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
510 {
511 traversed += x->level[i].span;
512 x = x->level[i].forward;
513 }
514 if (traversed == rank) {
515 return x;
516 }
517 }
518 return NULL((void*)0);
519}
520
521/* Populate the rangespec according to the objects min and max. */
522static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
523 char *eptr;
524 spec->minex = spec->maxex = 0;
525
526 /* Parse the min-max interval. If one of the values is prefixed
527 * by the "(" character, it's considered "open". For instance
528 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
529 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
530 if (min->encoding == OBJ_ENCODING_INT1) {
531 spec->min = (long)min->ptr;
532 } else {
533 if (((char*)min->ptr)[0] == '(') {
534 spec->min = strtod((char*)min->ptr+1,&eptr);
535 if (eptr[0] != '\0' || isnan(spec->min)__builtin_isnan (spec->min)) return C_ERR-1;
536 spec->minex = 1;
537 } else {
538 spec->min = strtod((char*)min->ptr,&eptr);
539 if (eptr[0] != '\0' || isnan(spec->min)__builtin_isnan (spec->min)) return C_ERR-1;
540 }
541 }
542 if (max->encoding == OBJ_ENCODING_INT1) {
543 spec->max = (long)max->ptr;
544 } else {
545 if (((char*)max->ptr)[0] == '(') {
546 spec->max = strtod((char*)max->ptr+1,&eptr);
547 if (eptr[0] != '\0' || isnan(spec->max)__builtin_isnan (spec->max)) return C_ERR-1;
548 spec->maxex = 1;
549 } else {
550 spec->max = strtod((char*)max->ptr,&eptr);
551 if (eptr[0] != '\0' || isnan(spec->max)__builtin_isnan (spec->max)) return C_ERR-1;
552 }
553 }
554
555 return C_OK0;
556}
557
558/* ------------------------ Lexicographic ranges ---------------------------- */
559
560/* Parse max or min argument of ZRANGEBYLEX.
561 * (foo means foo (open interval)
562 * [foo means foo (closed interval)
563 * - means the min string possible
564 * + means the max string possible
565 *
566 * If the string is valid the *dest pointer is set to the redis object
567 * that will be used for the comparison, and ex will be set to 0 or 1
568 * respectively if the item is exclusive or inclusive. C_OK will be
569 * returned.
570 *
571 * If the string is not a valid range C_ERR is returned, and the value
572 * of *dest and *ex is undefined. */
573int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
574 char *c = item->ptr;
575
576 switch(c[0]) {
577 case '+':
578 if (c[1] != '\0') return C_ERR-1;
579 *ex = 1;
580 *dest = shared.maxstring;
581 return C_OK0;
582 case '-':
583 if (c[1] != '\0') return C_ERR-1;
584 *ex = 1;
585 *dest = shared.minstring;
586 return C_OK0;
587 case '(':
588 *ex = 1;
589 *dest = sdsnewlen(c+1,sdslen(c)-1);
590 return C_OK0;
591 case '[':
592 *ex = 0;
593 *dest = sdsnewlen(c+1,sdslen(c)-1);
594 return C_OK0;
595 default:
596 return C_ERR-1;
597 }
598}
599
600/* Free a lex range structure, must be called only after zelParseLexRange()
601 * populated the structure with success (C_OK returned). */
602void zslFreeLexRange(zlexrangespec *spec) {
603 if (spec->min != shared.minstring &&
604 spec->min != shared.maxstring) sdsfree(spec->min);
605 if (spec->max != shared.minstring &&
606 spec->max != shared.maxstring) sdsfree(spec->max);
607}
608
609/* Populate the lex rangespec according to the objects min and max.
610 *
611 * Return C_OK on success. On error C_ERR is returned.
612 * When OK is returned the structure must be freed with zslFreeLexRange(),
613 * otherwise no release is needed. */
614int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
615 /* The range can't be valid if objects are integer encoded.
616 * Every item must start with ( or [. */
617 if (min->encoding == OBJ_ENCODING_INT1 ||
618 max->encoding == OBJ_ENCODING_INT1) return C_ERR-1;
619
620 spec->min = spec->max = NULL((void*)0);
621 if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR-1 ||
622 zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR-1) {
623 zslFreeLexRange(spec);
624 return C_ERR-1;
625 } else {
626 return C_OK0;
627 }
628}
629
630/* This is just a wrapper to sdscmp() that is able to
631 * handle shared.minstring and shared.maxstring as the equivalent of
632 * -inf and +inf for strings */
633int sdscmplex(sds a, sds b) {
634 if (a == b) return 0;
635 if (a == shared.minstring || b == shared.maxstring) return -1;
636 if (a == shared.maxstring || b == shared.minstring) return 1;
637 return sdscmp(a,b);
638}
639
640int zslLexValueGteMin(sds value, zlexrangespec *spec) {
641 return spec->minex ?
642 (sdscmplex(value,spec->min) > 0) :
643 (sdscmplex(value,spec->min) >= 0);
644}
645
646int zslLexValueLteMax(sds value, zlexrangespec *spec) {
647 return spec->maxex ?
648 (sdscmplex(value,spec->max) < 0) :
649 (sdscmplex(value,spec->max) <= 0);
650}
651
652/* Returns if there is a part of the zset is in the lex range. */
653int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
654 zskiplistNode *x;
655
656 /* Test for ranges that will always be empty. */
657 int cmp = sdscmplex(range->min,range->max);
658 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
659 return 0;
660 x = zsl->tail;
661 if (x == NULL((void*)0) || !zslLexValueGteMin(x->ele,range))
662 return 0;
663 x = zsl->header->level[0].forward;
664 if (x == NULL((void*)0) || !zslLexValueLteMax(x->ele,range))
665 return 0;
666 return 1;
667}
668
669/* Find the first node that is contained in the specified lex range.
670 * Returns NULL when no element is contained in the range. */
671zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
672 zskiplistNode *x;
673 int i;
674
675 /* If everything is out of range, return early. */
676 if (!zslIsInLexRange(zsl,range)) return NULL((void*)0);
677
678 x = zsl->header;
679 for (i = zsl->level-1; i >= 0; i--) {
680 /* Go forward while *OUT* of range. */
681 while (x->level[i].forward &&
682 !zslLexValueGteMin(x->level[i].forward->ele,range))
683 x = x->level[i].forward;
684 }
685
686 /* This is an inner range, so the next node cannot be NULL. */
687 x = x->level[0].forward;
688 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,688),__builtin_unreachable()))
;
689
690 /* Check if score <= max. */
691 if (!zslLexValueLteMax(x->ele,range)) return NULL((void*)0);
692 return x;
693}
694
695/* Find the last node that is contained in the specified range.
696 * Returns NULL when no element is contained in the range. */
697zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
698 zskiplistNode *x;
699 int i;
700
701 /* If everything is out of range, return early. */
702 if (!zslIsInLexRange(zsl,range)) return NULL((void*)0);
703
704 x = zsl->header;
705 for (i = zsl->level-1; i >= 0; i--) {
706 /* Go forward while *IN* range. */
707 while (x->level[i].forward &&
708 zslLexValueLteMax(x->level[i].forward->ele,range))
709 x = x->level[i].forward;
710 }
711
712 /* This is an inner range, so this node cannot be NULL. */
713 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,713),__builtin_unreachable()))
;
714
715 /* Check if score >= min. */
716 if (!zslLexValueGteMin(x->ele,range)) return NULL((void*)0);
717 return x;
718}
719
720/*-----------------------------------------------------------------------------
721 * Ziplist-backed sorted set API
722 *----------------------------------------------------------------------------*/
723
724double zzlStrtod(unsigned char *vstr, unsigned int vlen) {
725 char buf[128];
726 if (vlen > sizeof(buf))
727 vlen = sizeof(buf);
728 memcpy(buf,vstr,vlen);
729 buf[vlen] = '\0';
730 return strtod(buf,NULL((void*)0));
731 }
732
733double zzlGetScore(unsigned char *sptr) {
734 unsigned char *vstr;
735 unsigned int vlen;
736 long long vlong;
737 double score;
738
739 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",739),__builtin_unreachable()))
;
740 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong))((ziplistGet(sptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssert("ziplistGet(sptr,&vstr,&vlen,&vlong)"
,"t_zset.c",740),__builtin_unreachable()))
;
741
742 if (vstr) {
743 score = zzlStrtod(vstr,vlen);
744 } else {
745 score = vlong;
746 }
747
748 return score;
749}
750
751/* Return a ziplist element as an SDS string. */
752sds ziplistGetObject(unsigned char *sptr) {
753 unsigned char *vstr;
754 unsigned int vlen;
755 long long vlong;
756
757 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",757),__builtin_unreachable()))
;
758 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong))((ziplistGet(sptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssert("ziplistGet(sptr,&vstr,&vlen,&vlong)"
,"t_zset.c",758),__builtin_unreachable()))
;
759
760 if (vstr) {
761 return sdsnewlen((char*)vstr,vlen);
762 } else {
763 return sdsfromlonglong(vlong);
764 }
765}
766
767/* Compare element in sorted set with given element. */
768int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
769 unsigned char *vstr;
770 unsigned int vlen;
771 long long vlong;
772 unsigned char vbuf[32];
773 int minlen, cmp;
774
775 serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssert("ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",775),__builtin_unreachable()))
;
776 if (vstr == NULL((void*)0)) {
777 /* Store string representation of long long in buf. */
778 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
779 vstr = vbuf;
780 }
781
782 minlen = (vlen < clen) ? vlen : clen;
783 cmp = memcmp(vstr,cstr,minlen);
784 if (cmp == 0) return vlen-clen;
785 return cmp;
786}
787
788unsigned int zzlLength(unsigned char *zl) {
789 return ziplistLen(zl)/2;
790}
791
792/* Move to next entry based on the values in eptr and sptr. Both are set to
793 * NULL when there is no next entry. */
794void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
795 unsigned char *_eptr, *_sptr;
796 serverAssert(*eptr != NULL && *sptr != NULL)((*eptr != ((void*)0) && *sptr != ((void*)0))?(void)0
: (_serverAssert("*eptr != NULL && *sptr != NULL","t_zset.c"
,796),__builtin_unreachable()))
;
797
798 _eptr = ziplistNext(zl,*sptr);
799 if (_eptr != NULL((void*)0)) {
800 _sptr = ziplistNext(zl,_eptr);
801 serverAssert(_sptr != NULL)((_sptr != ((void*)0))?(void)0 : (_serverAssert("_sptr != NULL"
,"t_zset.c",801),__builtin_unreachable()))
;
802 } else {
803 /* No next entry. */
804 _sptr = NULL((void*)0);
805 }
806
807 *eptr = _eptr;
808 *sptr = _sptr;
809}
810
811/* Move to the previous entry based on the values in eptr and sptr. Both are
812 * set to NULL when there is no next entry. */
813void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
814 unsigned char *_eptr, *_sptr;
815 serverAssert(*eptr != NULL && *sptr != NULL)((*eptr != ((void*)0) && *sptr != ((void*)0))?(void)0
: (_serverAssert("*eptr != NULL && *sptr != NULL","t_zset.c"
,815),__builtin_unreachable()))
;
816
817 _sptr = ziplistPrev(zl,*eptr);
818 if (_sptr != NULL((void*)0)) {
819 _eptr = ziplistPrev(zl,_sptr);
820 serverAssert(_eptr != NULL)((_eptr != ((void*)0))?(void)0 : (_serverAssert("_eptr != NULL"
,"t_zset.c",820),__builtin_unreachable()))
;
821 } else {
822 /* No previous entry. */
823 _eptr = NULL((void*)0);
824 }
825
826 *eptr = _eptr;
827 *sptr = _sptr;
828}
829
830/* Returns if there is a part of the zset is in range. Should only be used
831 * internally by zzlFirstInRange and zzlLastInRange. */
832int zzlIsInRange(unsigned char *zl, zrangespec *range) {
833 unsigned char *p;
834 double score;
835
836 /* Test for ranges that will always be empty. */
837 if (range->min > range->max ||
838 (range->min == range->max && (range->minex || range->maxex)))
839 return 0;
840
841 p = ziplistIndex(zl,-1); /* Last score. */
842 if (p == NULL((void*)0)) return 0; /* Empty sorted set */
843 score = zzlGetScore(p);
844 if (!zslValueGteMin(score,range))
845 return 0;
846
847 p = ziplistIndex(zl,1); /* First score. */
848 serverAssert(p != NULL)((p != ((void*)0))?(void)0 : (_serverAssert("p != NULL","t_zset.c"
,848),__builtin_unreachable()))
;
849 score = zzlGetScore(p);
850 if (!zslValueLteMax(score,range))
851 return 0;
852
853 return 1;
854}
855
856/* Find pointer to the first element contained in the specified range.
857 * Returns NULL when no element is contained in the range. */
858unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
859 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
860 double score;
861
862 /* If everything is out of range, return early. */
863 if (!zzlIsInRange(zl,range)) return NULL((void*)0);
864
865 while (eptr != NULL((void*)0)) {
866 sptr = ziplistNext(zl,eptr);
867 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",867),__builtin_unreachable()))
;
868
869 score = zzlGetScore(sptr);
870 if (zslValueGteMin(score,range)) {
871 /* Check if score <= max. */
872 if (zslValueLteMax(score,range))
873 return eptr;
874 return NULL((void*)0);
875 }
876
877 /* Move to next element. */
878 eptr = ziplistNext(zl,sptr);
879 }
880
881 return NULL((void*)0);
882}
883
884/* Find pointer to the last element contained in the specified range.
885 * Returns NULL when no element is contained in the range. */
886unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
887 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
888 double score;
889
890 /* If everything is out of range, return early. */
891 if (!zzlIsInRange(zl,range)) return NULL((void*)0);
892
893 while (eptr != NULL((void*)0)) {
894 sptr = ziplistNext(zl,eptr);
895 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",895),__builtin_unreachable()))
;
896
897 score = zzlGetScore(sptr);
898 if (zslValueLteMax(score,range)) {
899 /* Check if score >= min. */
900 if (zslValueGteMin(score,range))
901 return eptr;
902 return NULL((void*)0);
903 }
904
905 /* Move to previous element by moving to the score of previous element.
906 * When this returns NULL, we know there also is no element. */
907 sptr = ziplistPrev(zl,eptr);
908 if (sptr != NULL((void*)0))
909 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL)(((eptr = ziplistPrev(zl,sptr)) != ((void*)0))?(void)0 : (_serverAssert
("(eptr = ziplistPrev(zl,sptr)) != NULL","t_zset.c",909),__builtin_unreachable
()))
;
910 else
911 eptr = NULL((void*)0);
912 }
913
914 return NULL((void*)0);
915}
916
917int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
918 sds value = ziplistGetObject(p);
919 int res = zslLexValueGteMin(value,spec);
920 sdsfree(value);
921 return res;
922}
923
924int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
925 sds value = ziplistGetObject(p);
926 int res = zslLexValueLteMax(value,spec);
927 sdsfree(value);
928 return res;
929}
930
931/* Returns if there is a part of the zset is in range. Should only be used
932 * internally by zzlFirstInRange and zzlLastInRange. */
933int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
934 unsigned char *p;
935
936 /* Test for ranges that will always be empty. */
937 int cmp = sdscmplex(range->min,range->max);
938 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
939 return 0;
940
941 p = ziplistIndex(zl,-2); /* Last element. */
942 if (p == NULL((void*)0)) return 0;
943 if (!zzlLexValueGteMin(p,range))
944 return 0;
945
946 p = ziplistIndex(zl,0); /* First element. */
947 serverAssert(p != NULL)((p != ((void*)0))?(void)0 : (_serverAssert("p != NULL","t_zset.c"
,947),__builtin_unreachable()))
;
948 if (!zzlLexValueLteMax(p,range))
949 return 0;
950
951 return 1;
952}
953
954/* Find pointer to the first element contained in the specified lex range.
955 * Returns NULL when no element is contained in the range. */
956unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
957 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
958
959 /* If everything is out of range, return early. */
960 if (!zzlIsInLexRange(zl,range)) return NULL((void*)0);
961
962 while (eptr != NULL((void*)0)) {
963 if (zzlLexValueGteMin(eptr,range)) {
964 /* Check if score <= max. */
965 if (zzlLexValueLteMax(eptr,range))
966 return eptr;
967 return NULL((void*)0);
968 }
969
970 /* Move to next element. */
971 sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
972 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",972),__builtin_unreachable()))
;
973 eptr = ziplistNext(zl,sptr); /* Next element. */
974 }
975
976 return NULL((void*)0);
977}
978
979/* Find pointer to the last element contained in the specified lex range.
980 * Returns NULL when no element is contained in the range. */
981unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
982 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
983
984 /* If everything is out of range, return early. */
985 if (!zzlIsInLexRange(zl,range)) return NULL((void*)0);
986
987 while (eptr != NULL((void*)0)) {
988 if (zzlLexValueLteMax(eptr,range)) {
989 /* Check if score >= min. */
990 if (zzlLexValueGteMin(eptr,range))
991 return eptr;
992 return NULL((void*)0);
993 }
994
995 /* Move to previous element by moving to the score of previous element.
996 * When this returns NULL, we know there also is no element. */
997 sptr = ziplistPrev(zl,eptr);
998 if (sptr != NULL((void*)0))
999 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL)(((eptr = ziplistPrev(zl,sptr)) != ((void*)0))?(void)0 : (_serverAssert
("(eptr = ziplistPrev(zl,sptr)) != NULL","t_zset.c",999),__builtin_unreachable
()))
;
1000 else
1001 eptr = NULL((void*)0);
1002 }
1003
1004 return NULL((void*)0);
1005}
1006
1007unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
1008 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1009
1010 while (eptr != NULL((void*)0)) {
1011 sptr = ziplistNext(zl,eptr);
1012 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",1012),__builtin_unreachable()))
;
1013
1014 if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
1015 /* Matching element, pull out score. */
1016 if (score != NULL((void*)0)) *score = zzlGetScore(sptr);
1017 return eptr;
1018 }
1019
1020 /* Move to next element. */
1021 eptr = ziplistNext(zl,sptr);
1022 }
1023 return NULL((void*)0);
1024}
1025
1026/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
1027 * don't want to modify the one given as argument. */
1028unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
1029 unsigned char *p = eptr;
1030
1031 /* TODO: add function to ziplist API to delete N elements from offset. */
1032 zl = ziplistDelete(zl,&p);
1033 zl = ziplistDelete(zl,&p);
1034 return zl;
1035}
1036
1037unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
1038 unsigned char *sptr;
1039 char scorebuf[128];
1040 int scorelen;
1041 size_t offset;
1042
1043 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
1044 if (eptr == NULL((void*)0)) {
1045 zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL1);
1046 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL1);
1047 } else {
1048 /* Keep offset relative to zl, as it might be re-allocated. */
1049 offset = eptr-zl;
1050 zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
1051 eptr = zl+offset;
1052
1053 /* Insert score after the element. */
1054 serverAssert((sptr = ziplistNext(zl,eptr)) != NULL)(((sptr = ziplistNext(zl,eptr)) != ((void*)0))?(void)0 : (_serverAssert
("(sptr = ziplistNext(zl,eptr)) != NULL","t_zset.c",1054),__builtin_unreachable
()))
;
1055 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
1056 }
1057 return zl;
1058}
1059
1060/* Insert (element,score) pair in ziplist. This function assumes the element is
1061 * not yet present in the list. */
1062unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
1063 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1064 double s;
1065
1066 while (eptr != NULL((void*)0)) {
1067 sptr = ziplistNext(zl,eptr);
1068 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",1068),__builtin_unreachable()))
;
1069 s = zzlGetScore(sptr);
1070
1071 if (s > score) {
1072 /* First element with score larger than score for element to be
1073 * inserted. This means we should take its spot in the list to
1074 * maintain ordering. */
1075 zl = zzlInsertAt(zl,eptr,ele,score);
1076 break;
1077 } else if (s == score) {
1078 /* Ensure lexicographical ordering for elements. */
1079 if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
1080 zl = zzlInsertAt(zl,eptr,ele,score);
1081 break;
1082 }
1083 }
1084
1085 /* Move to next element. */
1086 eptr = ziplistNext(zl,sptr);
1087 }
1088
1089 /* Push on tail of list when it was not yet inserted. */
1090 if (eptr == NULL((void*)0))
1091 zl = zzlInsertAt(zl,NULL((void*)0),ele,score);
1092 return zl;
1093}
1094
1095unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1096 unsigned char *eptr, *sptr;
1097 double score;
1098 unsigned long num = 0;
1099
1100 if (deleted != NULL((void*)0)) *deleted = 0;
1101
1102 eptr = zzlFirstInRange(zl,range);
1103 if (eptr == NULL((void*)0)) return zl;
1104
1105 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1106 * byte and ziplistNext will return NULL. */
1107 while ((sptr = ziplistNext(zl,eptr)) != NULL((void*)0)) {
1108 score = zzlGetScore(sptr);
1109 if (zslValueLteMax(score,range)) {
1110 /* Delete both the element and the score. */
1111 zl = ziplistDelete(zl,&eptr);
1112 zl = ziplistDelete(zl,&eptr);
1113 num++;
1114 } else {
1115 /* No longer in range. */
1116 break;
1117 }
1118 }
1119
1120 if (deleted != NULL((void*)0)) *deleted = num;
1121 return zl;
1122}
1123
1124unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1125 unsigned char *eptr, *sptr;
1126 unsigned long num = 0;
1127
1128 if (deleted != NULL((void*)0)) *deleted = 0;
1129
1130 eptr = zzlFirstInLexRange(zl,range);
1131 if (eptr == NULL((void*)0)) return zl;
1132
1133 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1134 * byte and ziplistNext will return NULL. */
1135 while ((sptr = ziplistNext(zl,eptr)) != NULL((void*)0)) {
Although the value stored to 'sptr' is used in the enclosing expression, the value is never actually read from 'sptr'
1136 if (zzlLexValueLteMax(eptr,range)) {
1137 /* Delete both the element and the score. */
1138 zl = ziplistDelete(zl,&eptr);
1139 zl = ziplistDelete(zl,&eptr);
1140 num++;
1141 } else {
1142 /* No longer in range. */
1143 break;
1144 }
1145 }
1146
1147 if (deleted != NULL((void*)0)) *deleted = num;
1148 return zl;
1149}
1150
1151/* Delete all the elements with rank between start and end from the skiplist.
1152 * Start and end are inclusive. Note that start and end need to be 1-based */
1153unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1154 unsigned int num = (end-start)+1;
1155 if (deleted) *deleted = num;
1156 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
1157 return zl;
1158}
1159
1160/*-----------------------------------------------------------------------------
1161 * Common sorted set API
1162 *----------------------------------------------------------------------------*/
1163
1164unsigned long zsetLength(const robj *zobj) {
1165 unsigned long length = 0;
1166 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1167 length = zzlLength(zobj->ptr);
1168 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1169 length = ((const zset*)zobj->ptr)->zsl->length;
1170 } else {
1171 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1171,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1172 }
1173 return length;
1174}
1175
1176void zsetConvert(robj *zobj, int encoding) {
1177 zset *zs;
1178 zskiplistNode *node, *next;
1179 sds ele;
1180 double score;
1181
1182 if (zobj->encoding == encoding) return;
1183 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1184 unsigned char *zl = zobj->ptr;
1185 unsigned char *eptr, *sptr;
1186 unsigned char *vstr;
1187 unsigned int vlen;
1188 long long vlong;
1189
1190 if (encoding != OBJ_ENCODING_SKIPLIST7)
1191 serverPanic("Unknown target encoding")_serverPanic("t_zset.c",1191,"Unknown target encoding"),__builtin_unreachable
()
;
1192
1193 zs = zmalloc(sizeof(*zs));
1194 zs->dict = dictCreate(&zsetDictType,NULL((void*)0));
1195 zs->zsl = zslCreate();
1196
1197 eptr = ziplistIndex(zl,0);
1198 serverAssertWithInfo(NULL,zobj,eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(((void
*)0),zobj,"eptr != NULL","t_zset.c",1198),__builtin_unreachable
()))
;
1199 sptr = ziplistNext(zl,eptr);
1200 serverAssertWithInfo(NULL,zobj,sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(((void
*)0),zobj,"sptr != NULL","t_zset.c",1200),__builtin_unreachable
()))
;
1201
1202 while (eptr != NULL((void*)0)) {
1203 score = zzlGetScore(sptr);
1204 serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(((void*)0),zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",1204),__builtin_unreachable()))
;
1205 if (vstr == NULL((void*)0))
1206 ele = sdsfromlonglong(vlong);
1207 else
1208 ele = sdsnewlen((char*)vstr,vlen);
1209
1210 node = zslInsert(zs->zsl,score,ele);
1211 serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK)((dictAdd(zs->dict,ele,&node->score) == 0)?(void)0 :
(_serverAssert("dictAdd(zs->dict,ele,&node->score) == DICT_OK"
,"t_zset.c",1211),__builtin_unreachable()))
;
1212 zzlNext(zl,&eptr,&sptr);
1213 }
1214
1215 zfree(zobj->ptr);
1216 zobj->ptr = zs;
1217 zobj->encoding = OBJ_ENCODING_SKIPLIST7;
1218 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1219 unsigned char *zl = ziplistNew();
1220
1221 if (encoding != OBJ_ENCODING_ZIPLIST5)
1222 serverPanic("Unknown target encoding")_serverPanic("t_zset.c",1222,"Unknown target encoding"),__builtin_unreachable
()
;
1223
1224 /* Approach similar to zslFree(), since we want to free the skiplist at
1225 * the same time as creating the ziplist. */
1226 zs = zobj->ptr;
1227 dictRelease(zs->dict);
1228 node = zs->zsl->header->level[0].forward;
1229 zfree(zs->zsl->header);
1230 zfree(zs->zsl);
1231
1232 while (node) {
1233 zl = zzlInsertAt(zl,NULL((void*)0),node->ele,node->score);
1234 next = node->level[0].forward;
1235 zslFreeNode(node);
1236 node = next;
1237 }
1238
1239 zfree(zs);
1240 zobj->ptr = zl;
1241 zobj->encoding = OBJ_ENCODING_ZIPLIST5;
1242 } else {
1243 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1243,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1244 }
1245}
1246
1247/* Convert the sorted set object into a ziplist if it is not already a ziplist
1248 * and if the number of elements and the maximum element size is within the
1249 * expected ranges. */
1250void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
1251 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) return;
1252 zset *zset = zobj->ptr;
1253
1254 if (zset->zsl->length <= server.zset_max_ziplist_entries &&
1255 maxelelen <= server.zset_max_ziplist_value)
1256 zsetConvert(zobj,OBJ_ENCODING_ZIPLIST5);
1257}
1258
1259/* Return (by reference) the score of the specified member of the sorted set
1260 * storing it into *score. If the element does not exist C_ERR is returned
1261 * otherwise C_OK is returned and *score is correctly populated.
1262 * If 'zobj' or 'member' is NULL, C_ERR is returned. */
1263int zsetScore(robj *zobj, sds member, double *score) {
1264 if (!zobj || !member) return C_ERR-1;
1265
1266 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1267 if (zzlFind(zobj->ptr, member, score) == NULL((void*)0)) return C_ERR-1;
1268 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1269 zset *zs = zobj->ptr;
1270 dictEntry *de = dictFind(zs->dict, member);
1271 if (de == NULL((void*)0)) return C_ERR-1;
1272 *score = *(double*)dictGetVal(de)((de)->v.val);
1273 } else {
1274 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1274,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1275 }
1276 return C_OK0;
1277}
1278
1279/* Add a new element or update the score of an existing element in a sorted
1280 * set, regardless of its encoding.
1281 *
1282 * The set of flags change the command behavior. They are passed with an integer
1283 * pointer since the function will clear the flags and populate them with
1284 * other flags to indicate different conditions.
1285 *
1286 * The input flags are the following:
1287 *
1288 * ZADD_INCR: Increment the current element score by 'score' instead of updating
1289 * the current element score. If the element does not exist, we
1290 * assume 0 as previous score.
1291 * ZADD_NX: Perform the operation only if the element does not exist.
1292 * ZADD_XX: Perform the operation only if the element already exist.
1293 * ZADD_GT: Perform the operation on existing elements only if the new score is
1294 * greater than the current score.
1295 * ZADD_LT: Perform the operation on existing elements only if the new score is
1296 * less than the current score.
1297 *
1298 * When ZADD_INCR is used, the new score of the element is stored in
1299 * '*newscore' if 'newscore' is not NULL.
1300 *
1301 * The returned flags are the following:
1302 *
1303 * ZADD_NAN: The resulting score is not a number.
1304 * ZADD_ADDED: The element was added (not present before the call).
1305 * ZADD_UPDATED: The element score was updated.
1306 * ZADD_NOP: No operation was performed because of NX or XX.
1307 *
1308 * Return value:
1309 *
1310 * The function returns 1 on success, and sets the appropriate flags
1311 * ADDED or UPDATED to signal what happened during the operation (note that
1312 * none could be set if we re-added an element using the same score it used
1313 * to have, or in the case a zero increment is used).
1314 *
1315 * The function returns 0 on error, currently only when the increment
1316 * produces a NAN condition, or when the 'score' value is NAN since the
1317 * start.
1318 *
1319 * The command as a side effect of adding a new element may convert the sorted
1320 * set internal encoding from ziplist to hashtable+skiplist.
1321 *
1322 * Memory management of 'ele':
1323 *
1324 * The function does not take ownership of the 'ele' SDS string, but copies
1325 * it if needed. */
1326int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
1327 /* Turn options into simple to check vars. */
1328 int incr = (*flags & ZADD_INCR(1<<0)) != 0;
1329 int nx = (*flags & ZADD_NX(1<<1)) != 0;
1330 int xx = (*flags & ZADD_XX(1<<2)) != 0;
1331 int gt = (*flags & ZADD_GT(1<<7)) != 0;
1332 int lt = (*flags & ZADD_LT(1<<8)) != 0;
1333 *flags = 0; /* We'll return our response flags. */
1334 double curscore;
1335
1336 /* NaN as input is an error regardless of all the other parameters. */
1337 if (isnan(score)__builtin_isnan (score)) {
1338 *flags = ZADD_NAN(1<<4);
1339 return 0;
1340 }
1341
1342 /* Update the sorted set according to its encoding. */
1343 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1344 unsigned char *eptr;
1345
1346 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL((void*)0)) {
1347 /* NX? Return, same element already exists. */
1348 if (nx) {
1349 *flags |= ZADD_NOP(1<<3);
1350 return 1;
1351 }
1352
1353 /* Prepare the score for the increment if needed. */
1354 if (incr) {
1355 score += curscore;
1356 if (isnan(score)__builtin_isnan (score)) {
1357 *flags |= ZADD_NAN(1<<4);
1358 return 0;
1359 }
1360 if (newscore) *newscore = score;
1361 }
1362
1363 /* Remove and re-insert when score changed. */
1364 if (score != curscore &&
1365 /* LT? Only update if score is less than current. */
1366 (!lt || score < curscore) &&
1367 /* GT? Only update if score is greater than current. */
1368 (!gt || score > curscore))
1369 {
1370 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1371 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1372 *flags |= ZADD_UPDATED(1<<6);
1373 }
1374 return 1;
1375 } else if (!xx) {
1376 /* Optimize: check if the element is too large or the list
1377 * becomes too long *before* executing zzlInsert. */
1378 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1379 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
1380 sdslen(ele) > server.zset_max_ziplist_value)
1381 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST7);
1382 if (newscore) *newscore = score;
1383 *flags |= ZADD_ADDED(1<<5);
1384 return 1;
1385 } else {
1386 *flags |= ZADD_NOP(1<<3);
1387 return 1;
1388 }
1389 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1390 zset *zs = zobj->ptr;
1391 zskiplistNode *znode;
1392 dictEntry *de;
1393
1394 de = dictFind(zs->dict,ele);
1395 if (de != NULL((void*)0)) {
1396 /* NX? Return, same element already exists. */
1397 if (nx) {
1398 *flags |= ZADD_NOP(1<<3);
1399 return 1;
1400 }
1401 curscore = *(double*)dictGetVal(de)((de)->v.val);
1402
1403 /* Prepare the score for the increment if needed. */
1404 if (incr) {
1405 score += curscore;
1406 if (isnan(score)__builtin_isnan (score)) {
1407 *flags |= ZADD_NAN(1<<4);
1408 return 0;
1409 }
1410 if (newscore) *newscore = score;
1411 }
1412
1413 /* Remove and re-insert when score changes. */
1414 if (score != curscore &&
1415 /* LT? Only update if score is less than current. */
1416 (!lt || score < curscore) &&
1417 /* GT? Only update if score is greater than current. */
1418 (!gt || score > curscore))
1419 {
1420 znode = zslUpdateScore(zs->zsl,curscore,ele,score);
1421 /* Note that we did not removed the original element from
1422 * the hash table representing the sorted set, so we just
1423 * update the score. */
1424 dictGetVal(de)((de)->v.val) = &znode->score; /* Update score ptr. */
1425 *flags |= ZADD_UPDATED(1<<6);
1426 }
1427 return 1;
1428 } else if (!xx) {
1429 ele = sdsdup(ele);
1430 znode = zslInsert(zs->zsl,score,ele);
1431 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK)((dictAdd(zs->dict,ele,&znode->score) == 0)?(void)0
: (_serverAssert("dictAdd(zs->dict,ele,&znode->score) == DICT_OK"
,"t_zset.c",1431),__builtin_unreachable()))
;
1432 *flags |= ZADD_ADDED(1<<5);
1433 if (newscore) *newscore = score;
1434 return 1;
1435 } else {
1436 *flags |= ZADD_NOP(1<<3);
1437 return 1;
1438 }
1439 } else {
1440 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1440,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1441 }
1442 return 0; /* Never reached. */
1443}
1444
1445/* Deletes the element 'ele' from the sorted set encoded as a skiplist+dict,
1446 * returning 1 if the element existed and was deleted, 0 otherwise (the
1447 * element was not there). It does not resize the dict after deleting the
1448 * element. */
1449static int zsetRemoveFromSkiplist(zset *zs, sds ele) {
1450 dictEntry *de;
1451 double score;
1452
1453 de = dictUnlink(zs->dict,ele);
1454 if (de != NULL((void*)0)) {
1455 /* Get the score in order to delete from the skiplist later. */
1456 score = *(double*)dictGetVal(de)((de)->v.val);
1457
1458 /* Delete from the hash table and later from the skiplist.
1459 * Note that the order is important: deleting from the skiplist
1460 * actually releases the SDS string representing the element,
1461 * which is shared between the skiplist and the hash table, so
1462 * we need to delete from the skiplist as the final step. */
1463 dictFreeUnlinkedEntry(zs->dict,de);
1464
1465 /* Delete from skiplist. */
1466 int retval = zslDelete(zs->zsl,score,ele,NULL((void*)0));
1467 serverAssert(retval)((retval)?(void)0 : (_serverAssert("retval","t_zset.c",1467),
__builtin_unreachable()))
;
1468
1469 return 1;
1470 }
1471
1472 return 0;
1473}
1474
1475/* Delete the element 'ele' from the sorted set, returning 1 if the element
1476 * existed and was deleted, 0 otherwise (the element was not there). */
1477int zsetDel(robj *zobj, sds ele) {
1478 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1479 unsigned char *eptr;
1480
1481 if ((eptr = zzlFind(zobj->ptr,ele,NULL((void*)0))) != NULL((void*)0)) {
1482 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1483 return 1;
1484 }
1485 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1486 zset *zs = zobj->ptr;
1487 if (zsetRemoveFromSkiplist(zs, ele)) {
1488 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1489 return 1;
1490 }
1491 } else {
1492 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1492,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1493 }
1494 return 0; /* No such element found. */
1495}
1496
1497/* Given a sorted set object returns the 0-based rank of the object or
1498 * -1 if the object does not exist.
1499 *
1500 * For rank we mean the position of the element in the sorted collection
1501 * of elements. So the first element has rank 0, the second rank 1, and so
1502 * forth up to length-1 elements.
1503 *
1504 * If 'reverse' is false, the rank is returned considering as first element
1505 * the one with the lowest score. Otherwise if 'reverse' is non-zero
1506 * the rank is computed considering as element with rank 0 the one with
1507 * the highest score. */
1508long zsetRank(robj *zobj, sds ele, int reverse) {
1509 unsigned long llen;
1510 unsigned long rank;
1511
1512 llen = zsetLength(zobj);
1513
1514 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1515 unsigned char *zl = zobj->ptr;
1516 unsigned char *eptr, *sptr;
1517
1518 eptr = ziplistIndex(zl,0);
1519 serverAssert(eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssert("eptr != NULL"
,"t_zset.c",1519),__builtin_unreachable()))
;
1520 sptr = ziplistNext(zl,eptr);
1521 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",1521),__builtin_unreachable()))
;
1522
1523 rank = 1;
1524 while(eptr != NULL((void*)0)) {
1525 if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
1526 break;
1527 rank++;
1528 zzlNext(zl,&eptr,&sptr);
1529 }
1530
1531 if (eptr != NULL((void*)0)) {
1532 if (reverse)
1533 return llen-rank;
1534 else
1535 return rank-1;
1536 } else {
1537 return -1;
1538 }
1539 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1540 zset *zs = zobj->ptr;
1541 zskiplist *zsl = zs->zsl;
1542 dictEntry *de;
1543 double score;
1544
1545 de = dictFind(zs->dict,ele);
1546 if (de != NULL((void*)0)) {
1547 score = *(double*)dictGetVal(de)((de)->v.val);
1548 rank = zslGetRank(zsl,score,ele);
1549 /* Existing elements always have a rank. */
1550 serverAssert(rank != 0)((rank != 0)?(void)0 : (_serverAssert("rank != 0","t_zset.c",
1550),__builtin_unreachable()))
;
1551 if (reverse)
1552 return llen-rank;
1553 else
1554 return rank-1;
1555 } else {
1556 return -1;
1557 }
1558 } else {
1559 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1559,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1560 }
1561}
1562
1563/* This is a helper function for the COPY command.
1564 * Duplicate a sorted set object, with the guarantee that the returned object
1565 * has the same encoding as the original one.
1566 *
1567 * The resulting object always has refcount set to 1 */
1568robj *zsetDup(robj *o) {
1569 robj *zobj;
1570 zset *zs;
1571 zset *new_zs;
1572
1573 serverAssert(o->type == OBJ_ZSET)((o->type == 3)?(void)0 : (_serverAssert("o->type == OBJ_ZSET"
,"t_zset.c",1573),__builtin_unreachable()))
;
1574
1575 /* Create a new sorted set object that have the same encoding as the original object's encoding */
1576 if (o->encoding == OBJ_ENCODING_ZIPLIST5) {
1577 unsigned char *zl = o->ptr;
1578 size_t sz = ziplistBlobLen(zl);
1579 unsigned char *new_zl = zmalloc(sz);
1580 memcpy(new_zl, zl, sz);
1581 zobj = createObject(OBJ_ZSET3, new_zl);
1582 zobj->encoding = OBJ_ENCODING_ZIPLIST5;
1583 } else if (o->encoding == OBJ_ENCODING_SKIPLIST7) {
1584 zobj = createZsetObject();
1585 zs = o->ptr;
1586 new_zs = zobj->ptr;
1587 dictExpand(new_zs->dict,dictSize(zs->dict)((zs->dict)->ht[0].used+(zs->dict)->ht[1].used));
1588 zskiplist *zsl = zs->zsl;
1589 zskiplistNode *ln;
1590 sds ele;
1591 long llen = zsetLength(o);
1592
1593 /* We copy the skiplist elements from the greatest to the
1594 * smallest (that's trivial since the elements are already ordered in
1595 * the skiplist): this improves the load process, since the next loaded
1596 * element will always be the smaller, so adding to the skiplist
1597 * will always immediately stop at the head, making the insertion
1598 * O(1) instead of O(log(N)). */
1599 ln = zsl->tail;
1600 while (llen--) {
1601 ele = ln->ele;
1602 sds new_ele = sdsdup(ele);
1603 zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele);
1604 dictAdd(new_zs->dict,new_ele,&znode->score);
1605 ln = ln->backward;
1606 }
1607 } else {
1608 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1608,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1609 }
1610 return zobj;
1611}
1612
1613/* callback for to check the ziplist doesn't have duplicate recoreds */
1614static int _zsetZiplistValidateIntegrity(unsigned char *p, void *userdata) {
1615 struct {
1616 long count;
1617 dict *fields;
1618 } *data = userdata;
1619
1620 /* Even records are field names, add to dict and check that's not a dup */
1621 if (((data->count) & 1) == 0) {
1622 unsigned char *str;
1623 unsigned int slen;
1624 long long vll;
1625 if (!ziplistGet(p, &str, &slen, &vll))
1626 return 0;
1627 sds field = str? sdsnewlen(str, slen): sdsfromlonglong(vll);;
1628 if (dictAdd(data->fields, field, NULL((void*)0)) != DICT_OK0) {
1629 /* Duplicate, return an error */
1630 sdsfree(field);
1631 return 0;
1632 }
1633 }
1634
1635 (data->count)++;
1636 return 1;
1637}
1638
1639/* Validate the integrity of the data stracture.
1640 * when `deep` is 0, only the integrity of the header is validated.
1641 * when `deep` is 1, we scan all the entries one by one. */
1642int zsetZiplistValidateIntegrity(unsigned char *zl, size_t size, int deep) {
1643 if (!deep)
1644 return ziplistValidateIntegrity(zl, size, 0, NULL((void*)0), NULL((void*)0));
1645
1646 /* Keep track of the field names to locate duplicate ones */
1647 struct {
1648 long count;
1649 dict *fields;
1650 } data = {0, dictCreate(&hashDictType, NULL((void*)0))};
1651
1652 int ret = ziplistValidateIntegrity(zl, size, 1, _zsetZiplistValidateIntegrity, &data);
1653
1654 /* make sure we have an even number of records. */
1655 if (data.count & 1)
1656 ret = 0;
1657
1658 dictRelease(data.fields);
1659 return ret;
1660}
1661
1662/* Create a new sds string from the ziplist entry. */
1663sds zsetSdsFromZiplistEntry(ziplistEntry *e) {
1664 return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval);
1665}
1666
1667/* Reply with bulk string from the ziplist entry. */
1668void zsetReplyFromZiplistEntry(client *c, ziplistEntry *e) {
1669 if (e->sval)
1670 addReplyBulkCBuffer(c, e->sval, e->slen);
1671 else
1672 addReplyBulkLongLong(c, e->lval);
1673}
1674
1675
1676/* Return random element from a non empty zset.
1677 * 'key' and 'val' will be set to hold the element.
1678 * The memory in `key` is not to be freed or modified by the caller.
1679 * 'score' can be NULL in which case it's not extracted. */
1680void zsetTypeRandomElement(robj *zsetobj, unsigned long zsetsize, ziplistEntry *key, double *score) {
1681 if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1682 zset *zs = zsetobj->ptr;
1683 dictEntry *de = dictGetFairRandomKey(zs->dict);
1684 sds s = dictGetKey(de)((de)->key);
1685 key->sval = (unsigned char*)s;
1686 key->slen = sdslen(s);
1687 if (score)
1688 *score = *(double*)dictGetVal(de)((de)->v.val);
1689 } else if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1690 ziplistEntry val;
1691 ziplistRandomPair(zsetobj->ptr, zsetsize, key, &val);
1692 if (score) {
1693 if (val.sval) {
1694 *score = zzlStrtod(val.sval,val.slen);
1695 } else {
1696 *score = (double)val.lval;
1697 }
1698 }
1699 } else {
1700 serverPanic("Unknown zset encoding")_serverPanic("t_zset.c",1700,"Unknown zset encoding"),__builtin_unreachable
()
;
1701 }
1702}
1703
1704/*-----------------------------------------------------------------------------
1705 * Sorted set commands
1706 *----------------------------------------------------------------------------*/
1707
1708/* This generic command implements both ZADD and ZINCRBY. */
1709void zaddGenericCommand(client *c, int flags) {
1710 static char *nanerr = "resulting score is not a number (NaN)";
1711 robj *key = c->argv[1];
1712 robj *zobj;
1713 sds ele;
1714 double score = 0, *scores = NULL((void*)0);
1715 int j, elements;
1716 int scoreidx = 0;
1717 /* The following vars are used in order to track what the command actually
1718 * did during the execution, to reply to the client and to trigger the
1719 * notification of keyspace change. */
1720 int added = 0; /* Number of new elements added. */
1721 int updated = 0; /* Number of elements with updated score. */
1722 int processed = 0; /* Number of elements processed, may remain zero with
1723 options like XX. */
1724
1725 /* Parse options. At the end 'scoreidx' is set to the argument position
1726 * of the score of the first score-element pair. */
1727 scoreidx = 2;
1728 while(scoreidx < c->argc) {
1729 char *opt = c->argv[scoreidx]->ptr;
1730 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX(1<<1);
1731 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX(1<<2);
1732 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH(1<<16);
1733 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR(1<<0);
1734 else if (!strcasecmp(opt,"gt")) flags |= ZADD_GT(1<<7);
1735 else if (!strcasecmp(opt,"lt")) flags |= ZADD_LT(1<<8);
1736 else break;
1737 scoreidx++;
1738 }
1739
1740 /* Turn options into simple to check vars. */
1741 int incr = (flags & ZADD_INCR(1<<0)) != 0;
1742 int nx = (flags & ZADD_NX(1<<1)) != 0;
1743 int xx = (flags & ZADD_XX(1<<2)) != 0;
1744 int ch = (flags & ZADD_CH(1<<16)) != 0;
1745 int gt = (flags & ZADD_GT(1<<7)) != 0;
1746 int lt = (flags & ZADD_LT(1<<8)) != 0;
1747
1748 /* After the options, we expect to have an even number of args, since
1749 * we expect any number of score-element pairs. */
1750 elements = c->argc-scoreidx;
1751 if (elements % 2 || !elements) {
1752 addReplyErrorObject(c,shared.syntaxerr);
1753 return;
1754 }
1755 elements /= 2; /* Now this holds the number of score-element pairs. */
1756
1757 /* Check for incompatible options. */
1758 if (nx && xx) {
1759 addReplyError(c,
1760 "XX and NX options at the same time are not compatible");
1761 return;
1762 }
1763
1764 if ((gt && nx) || (lt && nx) || (gt && lt)) {
1765 addReplyError(c,
1766 "GT, LT, and/or NX options at the same time are not compatible");
1767 return;
1768 }
1769 /* Note that XX is compatible with either GT or LT */
1770
1771 if (incr && elements > 1) {
1772 addReplyError(c,
1773 "INCR option supports a single increment-element pair");
1774 return;
1775 }
1776
1777 /* Start parsing all the scores, we need to emit any syntax error
1778 * before executing additions to the sorted set, as the command should
1779 * either execute fully or nothing at all. */
1780 scores = zmalloc(sizeof(double)*elements);
1781 for (j = 0; j < elements; j++) {
1782 if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL((void*)0))
1783 != C_OK0) goto cleanup;
1784 }
1785
1786 /* Lookup the key and create the sorted set if does not exist. */
1787 zobj = lookupKeyWrite(c->db,key);
1788 if (checkType(c,zobj,OBJ_ZSET3)) goto cleanup;
1789 if (zobj == NULL((void*)0)) {
1790 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1791 if (server.zset_max_ziplist_entries == 0 ||
1792 server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
1793 {
1794 zobj = createZsetObject();
1795 } else {
1796 zobj = createZsetZiplistObject();
1797 }
1798 dbAdd(c->db,key,zobj);
1799 }
1800
1801 for (j = 0; j < elements; j++) {
1802 double newscore;
1803 score = scores[j];
1804 int retflags = flags;
1805
1806 ele = c->argv[scoreidx+1+j*2]->ptr;
1807 int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
1808 if (retval == 0) {
1809 addReplyError(c,nanerr);
1810 goto cleanup;
1811 }
1812 if (retflags & ZADD_ADDED(1<<5)) added++;
1813 if (retflags & ZADD_UPDATED(1<<6)) updated++;
1814 if (!(retflags & ZADD_NOP(1<<3))) processed++;
1815 score = newscore;
1816 }
1817 server.dirty += (added+updated);
1818
1819reply_to_client:
1820 if (incr) { /* ZINCRBY or INCR option. */
1821 if (processed)
1822 addReplyDouble(c,score);
1823 else
1824 addReplyNull(c);
1825 } else { /* ZADD. */
1826 addReplyLongLong(c,ch ? added+updated : added);
1827 }
1828
1829cleanup:
1830 zfree(scores);
1831 if (added || updated) {
1832 signalModifiedKey(c,c->db,key);
1833 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),
1834 incr ? "zincr" : "zadd", key, c->db->id);
1835 }
1836}
1837
1838void zaddCommand(client *c) {
1839 zaddGenericCommand(c,ZADD_NONE0);
1840}
1841
1842void zincrbyCommand(client *c) {
1843 zaddGenericCommand(c,ZADD_INCR(1<<0));
1844}
1845
1846void zremCommand(client *c) {
1847 robj *key = c->argv[1];
1848 robj *zobj;
1849 int deleted = 0, keyremoved = 0, j;
1850
1851 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL((void*)0) ||
1852 checkType(c,zobj,OBJ_ZSET3)) return;
1853
1854 for (j = 2; j < c->argc; j++) {
1855 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
1856 if (zsetLength(zobj) == 0) {
1857 dbDelete(c->db,key);
1858 keyremoved = 1;
1859 break;
1860 }
1861 }
1862
1863 if (deleted) {
1864 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),"zrem",key,c->db->id);
1865 if (keyremoved)
1866 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2),"del",key,c->db->id);
1867 signalModifiedKey(c,c->db,key);
1868 server.dirty += deleted;
1869 }
1870 addReplyLongLong(c,deleted);
1871}
1872
1873typedef enum {
1874 ZRANGE_AUTO = 0,
1875 ZRANGE_RANK,
1876 ZRANGE_SCORE,
1877 ZRANGE_LEX,
1878} zrange_type;
1879
1880/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1881void zremrangeGenericCommand(client *c, zrange_type rangetype) {
1882 robj *key = c->argv[1];
1883 robj *zobj;
1884 int keyremoved = 0;
1885 unsigned long deleted = 0;
1886 zrangespec range;
1887 zlexrangespec lexrange;
1888 long start, end, llen;
1889 char *notify_type = NULL((void*)0);
1890
1891 /* Step 1: Parse the range. */
1892 if (rangetype == ZRANGE_RANK) {
1893 notify_type = "zremrangebyrank";
1894 if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL((void*)0)) != C_OK0) ||
1895 (getLongFromObjectOrReply(c,c->argv[3],&end,NULL((void*)0)) != C_OK0))
1896 return;
1897 } else if (rangetype == ZRANGE_SCORE) {
1898 notify_type = "zremrangebyscore";
1899 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK0) {
1900 addReplyError(c,"min or max is not a float");
1901 return;
1902 }
1903 } else if (rangetype == ZRANGE_LEX) {
1904 notify_type = "zremrangebylex";
1905 if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK0) {
1906 addReplyError(c,"min or max not valid string range item");
1907 return;
1908 }
1909 } else {
1910 serverPanic("unknown rangetype %d", (int)rangetype)_serverPanic("t_zset.c",1910,"unknown rangetype %d", (int)rangetype
),__builtin_unreachable()
;
1911 }
1912
1913 /* Step 2: Lookup & range sanity checks if needed. */
1914 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL((void*)0) ||
1915 checkType(c,zobj,OBJ_ZSET3)) goto cleanup;
1916
1917 if (rangetype == ZRANGE_RANK) {
1918 /* Sanitize indexes. */
1919 llen = zsetLength(zobj);
1920 if (start < 0) start = llen+start;
1921 if (end < 0) end = llen+end;
1922 if (start < 0) start = 0;
1923
1924 /* Invariant: start >= 0, so this test will be true when end < 0.
1925 * The range is empty when start > end or start >= length. */
1926 if (start > end || start >= llen) {
1927 addReply(c,shared.czero);
1928 goto cleanup;
1929 }
1930 if (end >= llen) end = llen-1;
1931 }
1932
1933 /* Step 3: Perform the range deletion operation. */
1934 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1935 switch(rangetype) {
1936 case ZRANGE_AUTO:
1937 case ZRANGE_RANK:
1938 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1939 break;
1940 case ZRANGE_SCORE:
1941 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1942 break;
1943 case ZRANGE_LEX:
1944 zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1945 break;
1946 }
1947 if (zzlLength(zobj->ptr) == 0) {
1948 dbDelete(c->db,key);
1949 keyremoved = 1;
1950 }
1951 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1952 zset *zs = zobj->ptr;
1953 switch(rangetype) {
1954 case ZRANGE_AUTO:
1955 case ZRANGE_RANK:
1956 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1957 break;
1958 case ZRANGE_SCORE:
1959 deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1960 break;
1961 case ZRANGE_LEX:
1962 deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1963 break;
1964 }
1965 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1966 if (dictSize(zs->dict)((zs->dict)->ht[0].used+(zs->dict)->ht[1].used) == 0) {
1967 dbDelete(c->db,key);
1968 keyremoved = 1;
1969 }
1970 } else {
1971 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1971,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1972 }
1973
1974 /* Step 4: Notifications and reply. */
1975 if (deleted) {
1976 signalModifiedKey(c,c->db,key);
1977 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),notify_type,key,c->db->id);
1978 if (keyremoved)
1979 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2),"del",key,c->db->id);
1980 }
1981 server.dirty += deleted;
1982 addReplyLongLong(c,deleted);
1983
1984cleanup:
1985 if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1986}
1987
1988void zremrangebyrankCommand(client *c) {
1989 zremrangeGenericCommand(c,ZRANGE_RANK);
1990}
1991
1992void zremrangebyscoreCommand(client *c) {
1993 zremrangeGenericCommand(c,ZRANGE_SCORE);
1994}
1995
1996void zremrangebylexCommand(client *c) {
1997 zremrangeGenericCommand(c,ZRANGE_LEX);
1998}
1999
2000typedef struct {
2001 robj *subject;
2002 int type; /* Set, sorted set */
2003 int encoding;
2004 double weight;
2005
2006 union {
2007 /* Set iterators. */
2008 union _iterset {
2009 struct {
2010 intset *is;
2011 int ii;
2012 } is;
2013 struct {
2014 dict *dict;
2015 dictIterator *di;
2016 dictEntry *de;
2017 } ht;
2018 } set;
2019
2020 /* Sorted set iterators. */
2021 union _iterzset {
2022 struct {
2023 unsigned char *zl;
2024 unsigned char *eptr, *sptr;
2025 } zl;
2026 struct {
2027 zset *zs;
2028 zskiplistNode *node;
2029 } sl;
2030 } zset;
2031 } iter;
2032} zsetopsrc;
2033
2034
2035/* Use dirty flags for pointers that need to be cleaned up in the next
2036 * iteration over the zsetopval. The dirty flag for the long long value is
2037 * special, since long long values don't need cleanup. Instead, it means that
2038 * we already checked that "ell" holds a long long, or tried to convert another
2039 * representation into a long long value. When this was successful,
2040 * OPVAL_VALID_LL is set as well. */
2041#define OPVAL_DIRTY_SDS1 1
2042#define OPVAL_DIRTY_LL2 2
2043#define OPVAL_VALID_LL4 4
2044
2045/* Store value retrieved from the iterator. */
2046typedef struct {
2047 int flags;
2048 unsigned char _buf[32]; /* Private buffer. */
2049 sds ele;
2050 unsigned char *estr;
2051 unsigned int elen;
2052 long long ell;
2053 double score;
2054} zsetopval;
2055
2056typedef union _iterset iterset;
2057typedef union _iterzset iterzset;
2058
2059void zuiInitIterator(zsetopsrc *op) {
2060 if (op->subject == NULL((void*)0))
2061 return;
2062
2063 if (op->type == OBJ_SET2) {
2064 iterset *it = &op->iter.set;
2065 if (op->encoding == OBJ_ENCODING_INTSET6) {
2066 it->is.is = op->subject->ptr;
2067 it->is.ii = 0;
2068 } else if (op->encoding == OBJ_ENCODING_HT2) {
2069 it->ht.dict = op->subject->ptr;
2070 it->ht.di = dictGetIterator(op->subject->ptr);
2071 it->ht.de = dictNext(it->ht.di);
2072 } else {
2073 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2073,"Unknown set encoding"),__builtin_unreachable
()
;
2074 }
2075 } else if (op->type == OBJ_ZSET3) {
2076 /* Sorted sets are traversed in reverse order to optimize for
2077 * the insertion of the elements in a new list as in
2078 * ZDIFF/ZINTER/ZUNION */
2079 iterzset *it = &op->iter.zset;
2080 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2081 it->zl.zl = op->subject->ptr;
2082 it->zl.eptr = ziplistIndex(it->zl.zl,-2);
2083 if (it->zl.eptr != NULL((void*)0)) {
2084 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
2085 serverAssert(it->zl.sptr != NULL)((it->zl.sptr != ((void*)0))?(void)0 : (_serverAssert("it->zl.sptr != NULL"
,"t_zset.c",2085),__builtin_unreachable()))
;
2086 }
2087 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2088 it->sl.zs = op->subject->ptr;
2089 it->sl.node = it->sl.zs->zsl->tail;
2090 } else {
2091 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2091,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2092 }
2093 } else {
2094 serverPanic("Unsupported type")_serverPanic("t_zset.c",2094,"Unsupported type"),__builtin_unreachable
()
;
2095 }
2096}
2097
2098void zuiClearIterator(zsetopsrc *op) {
2099 if (op->subject == NULL((void*)0))
2100 return;
2101
2102 if (op->type == OBJ_SET2) {
2103 iterset *it = &op->iter.set;
2104 if (op->encoding == OBJ_ENCODING_INTSET6) {
2105 UNUSED(it)((void) it); /* skip */
2106 } else if (op->encoding == OBJ_ENCODING_HT2) {
2107 dictReleaseIterator(it->ht.di);
2108 } else {
2109 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2109,"Unknown set encoding"),__builtin_unreachable
()
;
2110 }
2111 } else if (op->type == OBJ_ZSET3) {
2112 iterzset *it = &op->iter.zset;
2113 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2114 UNUSED(it)((void) it); /* skip */
2115 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2116 UNUSED(it)((void) it); /* skip */
2117 } else {
2118 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2118,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2119 }
2120 } else {
2121 serverPanic("Unsupported type")_serverPanic("t_zset.c",2121,"Unsupported type"),__builtin_unreachable
()
;
2122 }
2123}
2124
2125unsigned long zuiLength(zsetopsrc *op) {
2126 if (op->subject == NULL((void*)0))
2127 return 0;
2128
2129 if (op->type == OBJ_SET2) {
2130 if (op->encoding == OBJ_ENCODING_INTSET6) {
2131 return intsetLen(op->subject->ptr);
2132 } else if (op->encoding == OBJ_ENCODING_HT2) {
2133 dict *ht = op->subject->ptr;
2134 return dictSize(ht)((ht)->ht[0].used+(ht)->ht[1].used);
2135 } else {
2136 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2136,"Unknown set encoding"),__builtin_unreachable
()
;
2137 }
2138 } else if (op->type == OBJ_ZSET3) {
2139 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2140 return zzlLength(op->subject->ptr);
2141 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2142 zset *zs = op->subject->ptr;
2143 return zs->zsl->length;
2144 } else {
2145 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2145,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2146 }
2147 } else {
2148 serverPanic("Unsupported type")_serverPanic("t_zset.c",2148,"Unsupported type"),__builtin_unreachable
()
;
2149 }
2150}
2151
2152/* Check if the current value is valid. If so, store it in the passed structure
2153 * and move to the next element. If not valid, this means we have reached the
2154 * end of the structure and can abort. */
2155int zuiNext(zsetopsrc *op, zsetopval *val) {
2156 if (op->subject == NULL((void*)0))
2157 return 0;
2158
2159 if (val->flags & OPVAL_DIRTY_SDS1)
2160 sdsfree(val->ele);
2161
2162 memset(val,0,sizeof(zsetopval));
2163
2164 if (op->type == OBJ_SET2) {
2165 iterset *it = &op->iter.set;
2166 if (op->encoding == OBJ_ENCODING_INTSET6) {
2167 int64_t ell;
2168
2169 if (!intsetGet(it->is.is,it->is.ii,&ell))
2170 return 0;
2171 val->ell = ell;
2172 val->score = 1.0;
2173
2174 /* Move to next element. */
2175 it->is.ii++;
2176 } else if (op->encoding == OBJ_ENCODING_HT2) {
2177 if (it->ht.de == NULL((void*)0))
2178 return 0;
2179 val->ele = dictGetKey(it->ht.de)((it->ht.de)->key);
2180 val->score = 1.0;
2181
2182 /* Move to next element. */
2183 it->ht.de = dictNext(it->ht.di);
2184 } else {
2185 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2185,"Unknown set encoding"),__builtin_unreachable
()
;
2186 }
2187 } else if (op->type == OBJ_ZSET3) {
2188 iterzset *it = &op->iter.zset;
2189 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2190 /* No need to check both, but better be explicit. */
2191 if (it->zl.eptr == NULL((void*)0) || it->zl.sptr == NULL((void*)0))
2192 return 0;
2193 serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell))((ziplistGet(it->zl.eptr,&val->estr,&val->elen
,&val->ell))?(void)0 : (_serverAssert("ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell)"
,"t_zset.c",2193),__builtin_unreachable()))
;
2194 val->score = zzlGetScore(it->zl.sptr);
2195
2196 /* Move to next element (going backwards, see zuiInitIterator). */
2197 zzlPrev(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
2198 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2199 if (it->sl.node == NULL((void*)0))
2200 return 0;
2201 val->ele = it->sl.node->ele;
2202 val->score = it->sl.node->score;
2203
2204 /* Move to next element. (going backwards, see zuiInitIterator) */
2205 it->sl.node = it->sl.node->backward;
2206 } else {
2207 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2207,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2208 }
2209 } else {
2210 serverPanic("Unsupported type")_serverPanic("t_zset.c",2210,"Unsupported type"),__builtin_unreachable
()
;
2211 }
2212 return 1;
2213}
2214
2215int zuiLongLongFromValue(zsetopval *val) {
2216 if (!(val->flags & OPVAL_DIRTY_LL2)) {
2217 val->flags |= OPVAL_DIRTY_LL2;
2218
2219 if (val->ele != NULL((void*)0)) {
2220 if (string2ll(val->ele,sdslen(val->ele),&val->ell))
2221 val->flags |= OPVAL_VALID_LL4;
2222 } else if (val->estr != NULL((void*)0)) {
2223 if (string2ll((char*)val->estr,val->elen,&val->ell))
2224 val->flags |= OPVAL_VALID_LL4;
2225 } else {
2226 /* The long long was already set, flag as valid. */
2227 val->flags |= OPVAL_VALID_LL4;
2228 }
2229 }
2230 return val->flags & OPVAL_VALID_LL4;
2231}
2232
2233sds zuiSdsFromValue(zsetopval *val) {
2234 if (val->ele == NULL((void*)0)) {
2235 if (val->estr != NULL((void*)0)) {
2236 val->ele = sdsnewlen((char*)val->estr,val->elen);
2237 } else {
2238 val->ele = sdsfromlonglong(val->ell);
2239 }
2240 val->flags |= OPVAL_DIRTY_SDS1;
2241 }
2242 return val->ele;
2243}
2244
2245/* This is different from zuiSdsFromValue since returns a new SDS string
2246 * which is up to the caller to free. */
2247sds zuiNewSdsFromValue(zsetopval *val) {
2248 if (val->flags & OPVAL_DIRTY_SDS1) {
2249 /* We have already one to return! */
2250 sds ele = val->ele;
2251 val->flags &= ~OPVAL_DIRTY_SDS1;
2252 val->ele = NULL((void*)0);
2253 return ele;
2254 } else if (val->ele) {
2255 return sdsdup(val->ele);
2256 } else if (val->estr) {
2257 return sdsnewlen((char*)val->estr,val->elen);
2258 } else {
2259 return sdsfromlonglong(val->ell);
2260 }
2261}
2262
2263int zuiBufferFromValue(zsetopval *val) {
2264 if (val->estr == NULL((void*)0)) {
2265 if (val->ele != NULL((void*)0)) {
2266 val->elen = sdslen(val->ele);
2267 val->estr = (unsigned char*)val->ele;
2268 } else {
2269 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
2270 val->estr = val->_buf;
2271 }
2272 }
2273 return 1;
2274}
2275
2276/* Find value pointed to by val in the source pointer to by op. When found,
2277 * return 1 and store its score in target. Return 0 otherwise. */
2278int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
2279 if (op->subject == NULL((void*)0))
2280 return 0;
2281
2282 if (op->type == OBJ_SET2) {
2283 if (op->encoding == OBJ_ENCODING_INTSET6) {
2284 if (zuiLongLongFromValue(val) &&
2285 intsetFind(op->subject->ptr,val->ell))
2286 {
2287 *score = 1.0;
2288 return 1;
2289 } else {
2290 return 0;
2291 }
2292 } else if (op->encoding == OBJ_ENCODING_HT2) {
2293 dict *ht = op->subject->ptr;
2294 zuiSdsFromValue(val);
2295 if (dictFind(ht,val->ele) != NULL((void*)0)) {
2296 *score = 1.0;
2297 return 1;
2298 } else {
2299 return 0;
2300 }
2301 } else {
2302 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2302,"Unknown set encoding"),__builtin_unreachable
()
;
2303 }
2304 } else if (op->type == OBJ_ZSET3) {
2305 zuiSdsFromValue(val);
2306
2307 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2308 if (zzlFind(op->subject->ptr,val->ele,score) != NULL((void*)0)) {
2309 /* Score is already set by zzlFind. */
2310 return 1;
2311 } else {
2312 return 0;
2313 }
2314 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2315 zset *zs = op->subject->ptr;
2316 dictEntry *de;
2317 if ((de = dictFind(zs->dict,val->ele)) != NULL((void*)0)) {
2318 *score = *(double*)dictGetVal(de)((de)->v.val);
2319 return 1;
2320 } else {
2321 return 0;
2322 }
2323 } else {
2324 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2324,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2325 }
2326 } else {
2327 serverPanic("Unsupported type")_serverPanic("t_zset.c",2327,"Unsupported type"),__builtin_unreachable
()
;
2328 }
2329}
2330
2331int zuiCompareByCardinality(const void *s1, const void *s2) {
2332 unsigned long first = zuiLength((zsetopsrc*)s1);
2333 unsigned long second = zuiLength((zsetopsrc*)s2);
2334 if (first > second) return 1;
2335 if (first < second) return -1;
2336 return 0;
2337}
2338
2339static int zuiCompareByRevCardinality(const void *s1, const void *s2) {
2340 return zuiCompareByCardinality(s1, s2) * -1;
2341}
2342
2343#define REDIS_AGGR_SUM1 1
2344#define REDIS_AGGR_MIN2 2
2345#define REDIS_AGGR_MAX3 3
2346#define zunionInterDictValue(_e)(((_e)->v.val) == ((void*)0) ? 1.0 : *(double*)((_e)->v
.val))
(dictGetVal(_e)((_e)->v.val) == NULL((void*)0) ? 1.0 : *(double*)dictGetVal(_e)((_e)->v.val))
2347
2348inline static void zunionInterAggregate(double *target, double val, int aggregate) {
2349 if (aggregate == REDIS_AGGR_SUM1) {
2350 *target = *target + val;
2351 /* The result of adding two doubles is NaN when one variable
2352 * is +inf and the other is -inf. When these numbers are added,
2353 * we maintain the convention of the result being 0.0. */
2354 if (isnan(*target)__builtin_isnan (*target)) *target = 0.0;
2355 } else if (aggregate == REDIS_AGGR_MIN2) {
2356 *target = val < *target ? val : *target;
2357 } else if (aggregate == REDIS_AGGR_MAX3) {
2358 *target = val > *target ? val : *target;
2359 } else {
2360 /* safety net */
2361 serverPanic("Unknown ZUNION/INTER aggregate type")_serverPanic("t_zset.c",2361,"Unknown ZUNION/INTER aggregate type"
),__builtin_unreachable()
;
2362 }
2363}
2364
2365static int zsetDictGetMaxElementLength(dict *d) {
2366 dictIterator *di;
2367 dictEntry *de;
2368 size_t maxelelen = 0;
2369
2370 di = dictGetIterator(d);
2371
2372 while((de = dictNext(di)) != NULL((void*)0)) {
2373 sds ele = dictGetKey(de)((de)->key);
2374 if (sdslen(ele) > maxelelen) maxelelen = sdslen(ele);
2375 }
2376
2377 dictReleaseIterator(di);
2378
2379 return maxelelen;
2380}
2381
2382static void zdiffAlgorithm1(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
2383 /* DIFF Algorithm 1:
2384 *
2385 * We perform the diff by iterating all the elements of the first set,
2386 * and only adding it to the target set if the element does not exist
2387 * into all the other sets.
2388 *
2389 * This way we perform at max N*M operations, where N is the size of
2390 * the first set, and M the number of sets.
2391 *
2392 * There is also a O(K*log(K)) cost for adding the resulting elements
2393 * to the target set, where K is the final size of the target set.
2394 *
2395 * The final complexity of this algorithm is O(N*M + K*log(K)). */
2396 int j;
2397 zsetopval zval;
2398 zskiplistNode *znode;
2399 sds tmp;
2400
2401 /* With algorithm 1 it is better to order the sets to subtract
2402 * by decreasing size, so that we are more likely to find
2403 * duplicated elements ASAP. */
2404 qsort(src+1,setnum-1,sizeof(zsetopsrc),zuiCompareByRevCardinality);
2405
2406 memset(&zval, 0, sizeof(zval));
2407 zuiInitIterator(&src[0]);
2408 while (zuiNext(&src[0],&zval)) {
2409 double value;
2410 int exists = 0;
2411
2412 for (j = 1; j < setnum; j++) {
2413 /* It is not safe to access the zset we are
2414 * iterating, so explicitly check for equal object.
2415 * This check isn't really needed anymore since we already
2416 * check for a duplicate set in the zsetChooseDiffAlgorithm
2417 * function, but we're leaving it for future-proofing. */
2418 if (src[j].subject == src[0].subject ||
2419 zuiFind(&src[j],&zval,&value)) {
2420 exists = 1;
2421 break;
2422 }
2423 }
2424
2425 if (!exists) {
2426 tmp = zuiNewSdsFromValue(&zval);
2427 znode = zslInsert(dstzset->zsl,zval.score,tmp);
2428 dictAdd(dstzset->dict,tmp,&znode->score);
2429 if (sdslen(tmp) > *maxelelen) *maxelelen = sdslen(tmp);
2430 }
2431 }
2432 zuiClearIterator(&src[0]);
2433}
2434
2435
2436static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
2437 /* DIFF Algorithm 2:
2438 *
2439 * Add all the elements of the first set to the auxiliary set.
2440 * Then remove all the elements of all the next sets from it.
2441 *
2442
2443 * This is O(L + (N-K)log(N)) where L is the sum of all the elements in every
2444 * set, N is the size of the first set, and K is the size of the result set.
2445 *
2446 * Note that from the (L-N) dict searches, (N-K) got to the zsetRemoveFromSkiplist
2447 * which costs log(N)
2448 *
2449 * There is also a O(K) cost at the end for finding the largest element
2450 * size, but this doesn't change the algorithm complexity since K < L, and
2451 * O(2L) is the same as O(L). */
2452 int j;
2453 int cardinality = 0;
2454 zsetopval zval;
2455 zskiplistNode *znode;
2456 sds tmp;
2457
2458 for (j = 0; j < setnum; j++) {
2459 if (zuiLength(&src[j]) == 0) continue;
2460
2461 memset(&zval, 0, sizeof(zval));
2462 zuiInitIterator(&src[j]);
2463 while (zuiNext(&src[j],&zval)) {
2464 if (j == 0) {
2465 tmp = zuiNewSdsFromValue(&zval);
2466 znode = zslInsert(dstzset->zsl,zval.score,tmp);
2467 dictAdd(dstzset->dict,tmp,&znode->score);
2468 cardinality++;
2469 } else {
2470 tmp = zuiSdsFromValue(&zval);
2471 if (zsetRemoveFromSkiplist(dstzset, tmp)) {
2472 cardinality--;
2473 }
2474 }
2475
2476 /* Exit if result set is empty as any additional removal
2477 * of elements will have no effect. */
2478 if (cardinality == 0) break;
2479 }
2480 zuiClearIterator(&src[j]);
2481
2482 if (cardinality == 0) break;
2483 }
2484
2485 /* Redize dict if needed after removing multiple elements */
2486 if (htNeedsResize(dstzset->dict)) dictResize(dstzset->dict);
2487
2488 /* Using this algorithm, we can't calculate the max element as we go,
2489 * we have to iterate through all elements to find the max one after. */
2490 *maxelelen = zsetDictGetMaxElementLength(dstzset->dict);
2491}
2492
2493static int zsetChooseDiffAlgorithm(zsetopsrc *src, long setnum) {
2494 int j;
2495
2496 /* Select what DIFF algorithm to use.
2497 *
2498 * Algorithm 1 is O(N*M + K*log(K)) where N is the size of the
2499 * first set, M the total number of sets, and K is the size of the
2500 * result set.
2501 *
2502 * Algorithm 2 is O(L + (N-K)log(N)) where L is the total number of elements
2503 * in all the sets, N is the size of the first set, and K is the size of the
2504 * result set.
2505 *
2506 * We compute what is the best bet with the current input here. */
2507 long long algo_one_work = 0;
2508 long long algo_two_work = 0;
2509
2510 for (j = 0; j < setnum; j++) {
2511 /* If any other set is equal to the first set, there is nothing to be
2512 * done, since we would remove all elements anyway. */
2513 if (j > 0 && src[0].subject == src[j].subject) {
2514 return 0;
2515 }
2516
2517 algo_one_work += zuiLength(&src[0]);
2518 algo_two_work += zuiLength(&src[j]);
2519 }
2520
2521 /* Algorithm 1 has better constant times and performs less operations
2522 * if there are elements in common. Give it some advantage. */
2523 algo_one_work /= 2;
2524 return (algo_one_work <= algo_two_work) ? 1 : 2;
2525}
2526
2527static void zdiff(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
2528 /* Skip everything if the smallest input is empty. */
2529 if (zuiLength(&src[0]) > 0) {
2530 int diff_algo = zsetChooseDiffAlgorithm(src, setnum);
2531 if (diff_algo == 1) {
2532 zdiffAlgorithm1(src, setnum, dstzset, maxelelen);
2533 } else if (diff_algo == 2) {
2534 zdiffAlgorithm2(src, setnum, dstzset, maxelelen);
2535 } else if (diff_algo != 0) {
2536 serverPanic("Unknown algorithm")_serverPanic("t_zset.c",2536,"Unknown algorithm"),__builtin_unreachable
()
;
2537 }
2538 }
2539}
2540
2541uint64_t dictSdsHash(const void *key);
2542int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
2543
2544dictType setAccumulatorDictType = {
2545 dictSdsHash, /* hash function */
2546 NULL((void*)0), /* key dup */
2547 NULL((void*)0), /* val dup */
2548 dictSdsKeyCompare, /* key compare */
2549 NULL((void*)0), /* key destructor */
2550 NULL((void*)0), /* val destructor */
2551 NULL((void*)0) /* allow to expand */
2552};
2553
2554/* The zunionInterDiffGenericCommand() function is called in order to implement the
2555 * following commands: ZUNION, ZINTER, ZDIFF, ZUNIONSTORE, ZINTERSTORE, ZDIFFSTORE.
2556 *
2557 * 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER/ZDIFF command,
2558 * this value is 1, for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE command, this value is 2.
2559 *
2560 * 'op' SET_OP_INTER, SET_OP_UNION or SET_OP_DIFF.
2561 */
2562void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op) {
2563 int i, j;
2564 long setnum;
2565 int aggregate = REDIS_AGGR_SUM1;
2566 zsetopsrc *src;
2567 zsetopval zval;
2568 sds tmp;
2569 size_t maxelelen = 0;
2570 robj *dstobj;
2571 zset *dstzset;
2572 zskiplistNode *znode;
2573 int withscores = 0;
2574
2575 /* expect setnum input keys to be given */
2576 if ((getLongFromObjectOrReply(c, c->argv[numkeysIndex], &setnum, NULL((void*)0)) != C_OK0))
2577 return;
2578
2579 if (setnum < 1) {
2580 addReplyError(c,
2581 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE");
2582 return;
2583 }
2584
2585 /* test if the expected number of keys would overflow */
2586 if (setnum > (c->argc-(numkeysIndex+1))) {
2587 addReplyErrorObject(c,shared.syntaxerr);
2588 return;
2589 }
2590
2591 /* read keys to be used for input */
2592 src = zcalloc(sizeof(zsetopsrc) * setnum);
2593 for (i = 0, j = numkeysIndex+1; i < setnum; i++, j++) {
2594 robj *obj = dstkey ?
2595 lookupKeyWrite(c->db,c->argv[j]) :
2596 lookupKeyRead(c->db,c->argv[j]);
2597 if (obj != NULL((void*)0)) {
2598 if (obj->type != OBJ_ZSET3 && obj->type != OBJ_SET2) {
2599 zfree(src);
2600 addReplyErrorObject(c,shared.wrongtypeerr);
2601 return;
2602 }
2603
2604 src[i].subject = obj;
2605 src[i].type = obj->type;
2606 src[i].encoding = obj->encoding;
2607 } else {
2608 src[i].subject = NULL((void*)0);
2609 }
2610
2611 /* Default all weights to 1. */
2612 src[i].weight = 1.0;
2613 }
2614
2615 /* parse optional extra arguments */
2616 if (j < c->argc) {
2617 int remaining = c->argc - j;
2618
2619 while (remaining) {
2620 if (op != SET_OP_DIFF1 &&
2621 remaining >= (setnum + 1) &&
2622 !strcasecmp(c->argv[j]->ptr,"weights"))
2623 {
2624 j++; remaining--;
2625 for (i = 0; i < setnum; i++, j++, remaining--) {
2626 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
2627 "weight value is not a float") != C_OK0)
2628 {
2629 zfree(src);
2630 return;
2631 }
2632 }
2633 } else if (op != SET_OP_DIFF1 &&
2634 remaining >= 2 &&
2635 !strcasecmp(c->argv[j]->ptr,"aggregate"))
2636 {
2637 j++; remaining--;
2638 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2639 aggregate = REDIS_AGGR_SUM1;
2640 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2641 aggregate = REDIS_AGGR_MIN2;
2642 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2643 aggregate = REDIS_AGGR_MAX3;
2644 } else {
2645 zfree(src);
2646 addReplyErrorObject(c,shared.syntaxerr);
2647 return;
2648 }
2649 j++; remaining--;
2650 } else if (remaining >= 1 &&
2651 !dstkey &&
2652 !strcasecmp(c->argv[j]->ptr,"withscores"))
2653 {
2654 j++; remaining--;
2655 withscores = 1;
2656 } else {
2657 zfree(src);
2658 addReplyErrorObject(c,shared.syntaxerr);
2659 return;
2660 }
2661 }
2662 }
2663
2664 if (op != SET_OP_DIFF1) {
2665 /* sort sets from the smallest to largest, this will improve our
2666 * algorithm's performance */
2667 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2668 }
2669
2670 dstobj = createZsetObject();
2671 dstzset = dstobj->ptr;
2672 memset(&zval, 0, sizeof(zval));
2673
2674 if (op == SET_OP_INTER2) {
2675 /* Skip everything if the smallest input is empty. */
2676 if (zuiLength(&src[0]) > 0) {
2677 /* Precondition: as src[0] is non-empty and the inputs are ordered
2678 * by size, all src[i > 0] are non-empty too. */
2679 zuiInitIterator(&src[0]);
2680 while (zuiNext(&src[0],&zval)) {
2681 double score, value;
2682
2683 score = src[0].weight * zval.score;
2684 if (isnan(score)__builtin_isnan (score)) score = 0;
2685
2686 for (j = 1; j < setnum; j++) {
2687 /* It is not safe to access the zset we are
2688 * iterating, so explicitly check for equal object. */
2689 if (src[j].subject == src[0].subject) {
2690 value = zval.score*src[j].weight;
2691 zunionInterAggregate(&score,value,aggregate);
2692 } else if (zuiFind(&src[j],&zval,&value)) {
2693 value *= src[j].weight;
2694 zunionInterAggregate(&score,value,aggregate);
2695 } else {
2696 break;
2697 }
2698 }
2699
2700 /* Only continue when present in every input. */
2701 if (j == setnum) {
2702 tmp = zuiNewSdsFromValue(&zval);
2703 znode = zslInsert(dstzset->zsl,score,tmp);
2704 dictAdd(dstzset->dict,tmp,&znode->score);
2705 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2706 }
2707 }
2708 zuiClearIterator(&src[0]);
2709 }
2710 } else if (op == SET_OP_UNION0) {
2711 dict *accumulator = dictCreate(&setAccumulatorDictType,NULL((void*)0));
2712 dictIterator *di;
2713 dictEntry *de, *existing;
2714 double score;
2715
2716 if (setnum) {
2717 /* Our union is at least as large as the largest set.
2718 * Resize the dictionary ASAP to avoid useless rehashing. */
2719 dictExpand(accumulator,zuiLength(&src[setnum-1]));
2720 }
2721
2722 /* Step 1: Create a dictionary of elements -> aggregated-scores
2723 * by iterating one sorted set after the other. */
2724 for (i = 0; i < setnum; i++) {
2725 if (zuiLength(&src[i]) == 0) continue;
2726
2727 zuiInitIterator(&src[i]);
2728 while (zuiNext(&src[i],&zval)) {
2729 /* Initialize value */
2730 score = src[i].weight * zval.score;
2731 if (isnan(score)__builtin_isnan (score)) score = 0;
2732
2733 /* Search for this element in the accumulating dictionary. */
2734 de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
2735 /* If we don't have it, we need to create a new entry. */
2736 if (!existing) {
2737 tmp = zuiNewSdsFromValue(&zval);
2738 /* Remember the longest single element encountered,
2739 * to understand if it's possible to convert to ziplist
2740 * at the end. */
2741 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2742 /* Update the element with its initial score. */
2743 dictSetKey(accumulator, de, tmp)do { if ((accumulator)->type->keyDup) (de)->key = (accumulator
)->type->keyDup((accumulator)->privdata, tmp); else (
de)->key = (tmp); } while(0)
;
2744 dictSetDoubleVal(de,score)do { (de)->v.d = score; } while(0);
2745 } else {
2746 /* Update the score with the score of the new instance
2747 * of the element found in the current sorted set.
2748 *
2749 * Here we access directly the dictEntry double
2750 * value inside the union as it is a big speedup
2751 * compared to using the getDouble/setDouble API. */
2752 zunionInterAggregate(&existing->v.d,score,aggregate);
2753 }
2754 }
2755 zuiClearIterator(&src[i]);
2756 }
2757
2758 /* Step 2: convert the dictionary into the final sorted set. */
2759 di = dictGetIterator(accumulator);
2760
2761 /* We now are aware of the final size of the resulting sorted set,
2762 * let's resize the dictionary embedded inside the sorted set to the
2763 * right size, in order to save rehashing time. */
2764 dictExpand(dstzset->dict,dictSize(accumulator)((accumulator)->ht[0].used+(accumulator)->ht[1].used));
2765
2766 while((de = dictNext(di)) != NULL((void*)0)) {
2767 sds ele = dictGetKey(de)((de)->key);
2768 score = dictGetDoubleVal(de)((de)->v.d);
2769 znode = zslInsert(dstzset->zsl,score,ele);
2770 dictAdd(dstzset->dict,ele,&znode->score);
2771 }
2772 dictReleaseIterator(di);
2773 dictRelease(accumulator);
2774 } else if (op == SET_OP_DIFF1) {
2775 zdiff(src, setnum, dstzset, &maxelelen);
2776 } else {
2777 serverPanic("Unknown operator")_serverPanic("t_zset.c",2777,"Unknown operator"),__builtin_unreachable
()
;
2778 }
2779
2780 if (dstkey) {
2781 if (dstzset->zsl->length) {
2782 zsetConvertToZiplistIfNeeded(dstobj, maxelelen);
2783 setKey(c, c->db, dstkey, dstobj);
2784 addReplyLongLong(c, zsetLength(dstobj));
2785 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),
2786 (op == SET_OP_UNION0) ? "zunionstore" :
2787 (op == SET_OP_INTER2 ? "zinterstore" : "zdiffstore"),
2788 dstkey, c->db->id);
2789 server.dirty++;
2790 } else {
2791 addReply(c, shared.czero);
2792 if (dbDelete(c->db, dstkey)) {
2793 signalModifiedKey(c, c->db, dstkey);
2794 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2), "del", dstkey, c->db->id);
2795 server.dirty++;
2796 }
2797 }
2798 } else {
2799 unsigned long length = dstzset->zsl->length;
2800 zskiplist *zsl = dstzset->zsl;
2801 zskiplistNode *zn = zsl->header->level[0].forward;
2802 /* In case of WITHSCORES, respond with a single array in RESP2, and
2803 * nested arrays in RESP3. We can't use a map response type since the
2804 * client library needs to know to respect the order. */
2805 if (withscores && c->resp == 2)
2806 addReplyArrayLen(c, length*2);
2807 else
2808 addReplyArrayLen(c, length);
2809
2810 while (zn != NULL((void*)0)) {
2811 if (withscores && c->resp > 2) addReplyArrayLen(c,2);
2812 addReplyBulkCBuffer(c,zn->ele,sdslen(zn->ele));
2813 if (withscores) addReplyDouble(c,zn->score);
2814 zn = zn->level[0].forward;
2815 }
2816 }
2817 decrRefCount(dstobj);
2818 zfree(src);
2819}
2820
2821void zunionstoreCommand(client *c) {
2822 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_UNION0);
2823}
2824
2825void zinterstoreCommand(client *c) {
2826 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_INTER2);
2827}
2828
2829void zdiffstoreCommand(client *c) {
2830 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_DIFF1);
2831}
2832
2833void zunionCommand(client *c) {
2834 zunionInterDiffGenericCommand(c, NULL((void*)0), 1, SET_OP_UNION0);
2835}
2836
2837void zinterCommand(client *c) {
2838 zunionInterDiffGenericCommand(c, NULL((void*)0), 1, SET_OP_INTER2);
2839}
2840
2841void zdiffCommand(client *c) {
2842 zunionInterDiffGenericCommand(c, NULL((void*)0), 1, SET_OP_DIFF1);
2843}
2844
2845typedef enum {
2846 ZRANGE_DIRECTION_AUTO = 0,
2847 ZRANGE_DIRECTION_FORWARD,
2848 ZRANGE_DIRECTION_REVERSE
2849} zrange_direction;
2850
2851typedef enum {
2852 ZRANGE_CONSUMER_TYPE_CLIENT = 0,
2853 ZRANGE_CONSUMER_TYPE_INTERNAL
2854} zrange_consumer_type;
2855
2856typedef struct zrange_result_handler zrange_result_handler;
2857
2858typedef void (*zrangeResultBeginFunction)(zrange_result_handler *c);
2859typedef void (*zrangeResultFinalizeFunction)(
2860 zrange_result_handler *c, size_t result_count);
2861typedef void (*zrangeResultEmitCBufferFunction)(
2862 zrange_result_handler *c, const void *p, size_t len, double score);
2863typedef void (*zrangeResultEmitLongLongFunction)(
2864 zrange_result_handler *c, long long ll, double score);
2865
2866void zrangeGenericCommand (zrange_result_handler *handler, int argc_start, int store,
2867 zrange_type rangetype, zrange_direction direction);
2868
2869/* Interface struct for ZRANGE/ZRANGESTORE generic implementation.
2870 * There is one implementation of this interface that sends a RESP reply to clients.
2871 * and one implementation that stores the range result into a zset object. */
2872struct zrange_result_handler {
2873 zrange_consumer_type type;
2874 client *client;
2875 robj *dstkey;
2876 robj *dstobj;
2877 void *userdata;
2878 int withscores;
2879 int should_emit_array_length;
2880 zrangeResultBeginFunction beginResultEmission;
2881 zrangeResultFinalizeFunction finalizeResultEmission;
2882 zrangeResultEmitCBufferFunction emitResultFromCBuffer;
2883 zrangeResultEmitLongLongFunction emitResultFromLongLong;
2884};
2885
2886/* Result handler methods for responding the ZRANGE to clients. */
2887static void zrangeResultBeginClient(zrange_result_handler *handler) {
2888 handler->userdata = addReplyDeferredLen(handler->client);
2889}
2890
2891static void zrangeResultEmitCBufferToClient(zrange_result_handler *handler,
2892 const void *value, size_t value_length_in_bytes, double score)
2893{
2894 if (handler->should_emit_array_length) {
2895 addReplyArrayLen(handler->client, 2);
2896 }
2897
2898 addReplyBulkCBuffer(handler->client, value, value_length_in_bytes);
2899
2900 if (handler->withscores) {
2901 addReplyDouble(handler->client, score);
2902 }
2903}
2904
2905static void zrangeResultEmitLongLongToClient(zrange_result_handler *handler,
2906 long long value, double score)
2907{
2908 if (handler->should_emit_array_length) {
2909 addReplyArrayLen(handler->client, 2);
2910 }
2911
2912 addReplyBulkLongLong(handler->client, value);
2913
2914 if (handler->withscores) {
2915 addReplyDouble(handler->client, score);
2916 }
2917}
2918
2919static void zrangeResultFinalizeClient(zrange_result_handler *handler,
2920 size_t result_count)
2921{
2922 /* In case of WITHSCORES, respond with a single array in RESP2, and
2923 * nested arrays in RESP3. We can't use a map response type since the
2924 * client library needs to know to respect the order. */
2925 if (handler->withscores && (handler->client->resp == 2)) {
2926 result_count *= 2;
2927 }
2928
2929 setDeferredArrayLen(handler->client, handler->userdata, result_count);
2930}
2931
2932/* Result handler methods for storing the ZRANGESTORE to a zset. */
2933static void zrangeResultBeginStore(zrange_result_handler *handler)
2934{
2935 handler->dstobj = createZsetZiplistObject();
2936}
2937
2938static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler,
2939 const void *value, size_t value_length_in_bytes, double score)
2940{
2941 double newscore;
2942 int retflags = 0;
2943 sds ele = sdsnewlen(value, value_length_in_bytes);
2944 int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore);
2945 sdsfree(ele);
2946 serverAssert(retval)((retval)?(void)0 : (_serverAssert("retval","t_zset.c",2946),
__builtin_unreachable()))
;
2947}
2948
2949static void zrangeResultEmitLongLongForStore(zrange_result_handler *handler,
2950 long long value, double score)
2951{
2952 double newscore;
2953 int retflags = 0;
2954 sds ele = sdsfromlonglong(value);
2955 int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore);
2956 sdsfree(ele);
2957 serverAssert(retval)((retval)?(void)0 : (_serverAssert("retval","t_zset.c",2957),
__builtin_unreachable()))
;
2958}
2959
2960static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t result_count)
2961{
2962 if (result_count) {
2963 setKey(handler->client, handler->client->db, handler->dstkey, handler->dstobj);
2964 addReplyLongLong(handler->client, result_count);
2965 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7), "zrangestore", handler->dstkey, handler->client->db->id);
2966 server.dirty++;
2967 } else {
2968 addReply(handler->client, shared.czero);
2969 if (dbDelete(handler->client->db, handler->dstkey)) {
2970 signalModifiedKey(handler->client, handler->client->db, handler->dstkey);
2971 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2), "del", handler->dstkey, handler->client->db->id);
2972 server.dirty++;
2973 }
2974 }
2975 decrRefCount(handler->dstobj);
2976}
2977
2978/* Initialize the consumer interface type with the requested type. */
2979static void zrangeResultHandlerInit(zrange_result_handler *handler,
2980 client *client, zrange_consumer_type type)
2981{
2982 memset(handler, 0, sizeof(*handler));
2983
2984 handler->client = client;
2985
2986 switch (type) {
2987 case ZRANGE_CONSUMER_TYPE_CLIENT:
2988 handler->beginResultEmission = zrangeResultBeginClient;
2989 handler->finalizeResultEmission = zrangeResultFinalizeClient;
2990 handler->emitResultFromCBuffer = zrangeResultEmitCBufferToClient;
2991 handler->emitResultFromLongLong = zrangeResultEmitLongLongToClient;
2992 break;
2993
2994 case ZRANGE_CONSUMER_TYPE_INTERNAL:
2995 handler->beginResultEmission = zrangeResultBeginStore;
2996 handler->finalizeResultEmission = zrangeResultFinalizeStore;
2997 handler->emitResultFromCBuffer = zrangeResultEmitCBufferForStore;
2998 handler->emitResultFromLongLong = zrangeResultEmitLongLongForStore;
2999 break;
3000 }
3001}
3002
3003static void zrangeResultHandlerScoreEmissionEnable(zrange_result_handler *handler) {
3004 handler->withscores = 1;
3005 handler->should_emit_array_length = (handler->client->resp > 2);
3006}
3007
3008static void zrangeResultHandlerDestinationKeySet (zrange_result_handler *handler,
3009 robj *dstkey)
3010{
3011 handler->dstkey = dstkey;
3012}
3013
3014/* This command implements ZRANGE, ZREVRANGE. */
3015void genericZrangebyrankCommand(zrange_result_handler *handler,
3016 robj *zobj, long start, long end, int withscores, int reverse) {
3017
3018 client *c = handler->client;
3019 long llen;
3020 long rangelen;
3021 size_t result_cardinality;
3022
3023 /* Sanitize indexes. */
3024 llen = zsetLength(zobj);
3025 if (start < 0) start = llen+start;
3026 if (end < 0) end = llen+end;
3027 if (start < 0) start = 0;
3028
3029 handler->beginResultEmission(handler);
3030
3031 /* Invariant: start >= 0, so this test will be true when end < 0.
3032 * The range is empty when start > end or start >= length. */
3033 if (start > end || start >= llen) {
3034 handler->finalizeResultEmission(handler, 0);
3035 return;
3036 }
3037 if (end >= llen) end = llen-1;
3038 rangelen = (end-start)+1;
3039 result_cardinality = rangelen;
3040
3041 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3042 unsigned char *zl = zobj->ptr;
3043 unsigned char *eptr, *sptr;
3044 unsigned char *vstr;
3045 unsigned int vlen;
3046 long long vlong;
3047 double score = 0.0;
3048
3049 if (reverse)
3050 eptr = ziplistIndex(zl,-2-(2*start));
3051 else
3052 eptr = ziplistIndex(zl,2*start);
3053
3054 serverAssertWithInfo(c,zobj,eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj
,"eptr != NULL","t_zset.c",3054),__builtin_unreachable()))
;
3055 sptr = ziplistNext(zl,eptr);
3056
3057 while (rangelen--) {
3058 serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL)((eptr != ((void*)0) && sptr != ((void*)0))?(void)0 :
(_serverAssertWithInfo(c,zobj,"eptr != NULL && sptr != NULL"
,"t_zset.c",3058),__builtin_unreachable()))
;
3059 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3059),__builtin_unreachable()))
;
3060
3061 if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
3062 score = zzlGetScore(sptr);
3063
3064 if (vstr == NULL((void*)0)) {
3065 handler->emitResultFromLongLong(handler, vlong, score);
3066 } else {
3067 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3068 }
3069
3070 if (reverse)
3071 zzlPrev(zl,&eptr,&sptr);
3072 else
3073 zzlNext(zl,&eptr,&sptr);
3074 }
3075
3076 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3077 zset *zs = zobj->ptr;
3078 zskiplist *zsl = zs->zsl;
3079 zskiplistNode *ln;
3080
3081 /* Check if starting point is trivial, before doing log(N) lookup. */
3082 if (reverse) {
3083 ln = zsl->tail;
3084 if (start > 0)
3085 ln = zslGetElementByRank(zsl,llen-start);
3086 } else {
3087 ln = zsl->header->level[0].forward;
3088 if (start > 0)
3089 ln = zslGetElementByRank(zsl,start+1);
3090 }
3091
3092 while(rangelen--) {
3093 serverAssertWithInfo(c,zobj,ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj,"ln != NULL"
,"t_zset.c",3093),__builtin_unreachable()))
;
3094 sds ele = ln->ele;
3095 handler->emitResultFromCBuffer(handler, ele, sdslen(ele), ln->score);
3096 ln = reverse ? ln->backward : ln->level[0].forward;
3097 }
3098 } else {
3099 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3099,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3100 }
3101
3102 handler->finalizeResultEmission(handler, result_cardinality);
3103}
3104
3105/* ZRANGESTORE <dst> <src> <min> <max> [BYSCORE | BYLEX] [REV] [LIMIT offset count] */
3106void zrangestoreCommand (client *c) {
3107 robj *dstkey = c->argv[1];
3108 zrange_result_handler handler;
3109 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_INTERNAL);
3110 zrangeResultHandlerDestinationKeySet(&handler, dstkey);
3111 zrangeGenericCommand(&handler, 2, 1, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
3112}
3113
3114/* ZRANGE <key> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] */
3115void zrangeCommand(client *c) {
3116 zrange_result_handler handler;
3117 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3118 zrangeGenericCommand(&handler, 1, 0, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
3119}
3120
3121/* ZREVRANGE <key> <min> <max> [WITHSCORES] */
3122void zrevrangeCommand(client *c) {
3123 zrange_result_handler handler;
3124 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3125 zrangeGenericCommand(&handler, 1, 0, ZRANGE_RANK, ZRANGE_DIRECTION_REVERSE);
3126}
3127
3128/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
3129void genericZrangebyscoreCommand(zrange_result_handler *handler,
3130 zrangespec *range, robj *zobj, long offset, long limit,
3131 int reverse) {
3132
3133 client *c = handler->client;
3134 unsigned long rangelen = 0;
3135
3136 handler->beginResultEmission(handler);
3137
3138 /* For invalid offset, return directly. */
3139 if (offset > 0 && offset >= (long)zsetLength(zobj)) {
3140 handler->finalizeResultEmission(handler, 0);
3141 return;
3142 }
3143
3144 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3145 unsigned char *zl = zobj->ptr;
3146 unsigned char *eptr, *sptr;
3147 unsigned char *vstr;
3148 unsigned int vlen;
3149 long long vlong;
3150
3151 /* If reversed, get the last node in range as starting point. */
3152 if (reverse) {
3153 eptr = zzlLastInRange(zl,range);
3154 } else {
3155 eptr = zzlFirstInRange(zl,range);
3156 }
3157
3158 /* Get score pointer for the first element. */
3159 if (eptr)
3160 sptr = ziplistNext(zl,eptr);
3161
3162 /* If there is an offset, just traverse the number of elements without
3163 * checking the score because that is done in the next loop. */
3164 while (eptr && offset--) {
3165 if (reverse) {
3166 zzlPrev(zl,&eptr,&sptr);
3167 } else {
3168 zzlNext(zl,&eptr,&sptr);
3169 }
3170 }
3171
3172 while (eptr && limit--) {
3173 double score = zzlGetScore(sptr);
3174
3175 /* Abort when the node is no longer in range. */
3176 if (reverse) {
3177 if (!zslValueGteMin(score,range)) break;
3178 } else {
3179 if (!zslValueLteMax(score,range)) break;
3180 }
3181
3182 /* We know the element exists, so ziplistGet should always
3183 * succeed */
3184 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3184),__builtin_unreachable()))
;
3185
3186 rangelen++;
3187 if (vstr == NULL((void*)0)) {
3188 handler->emitResultFromLongLong(handler, vlong, score);
3189 } else {
3190 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3191 }
3192
3193 /* Move to next node */
3194 if (reverse) {
3195 zzlPrev(zl,&eptr,&sptr);
3196 } else {
3197 zzlNext(zl,&eptr,&sptr);
3198 }
3199 }
3200 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3201 zset *zs = zobj->ptr;
3202 zskiplist *zsl = zs->zsl;
3203 zskiplistNode *ln;
3204
3205 /* If reversed, get the last node in range as starting point. */
3206 if (reverse) {
3207 ln = zslLastInRange(zsl,range);
3208 } else {
3209 ln = zslFirstInRange(zsl,range);
3210 }
3211
3212 /* If there is an offset, just traverse the number of elements without
3213 * checking the score because that is done in the next loop. */
3214 while (ln && offset--) {
3215 if (reverse) {
3216 ln = ln->backward;
3217 } else {
3218 ln = ln->level[0].forward;
3219 }
3220 }
3221
3222 while (ln && limit--) {
3223 /* Abort when the node is no longer in range. */
3224 if (reverse) {
3225 if (!zslValueGteMin(ln->score,range)) break;
3226 } else {
3227 if (!zslValueLteMax(ln->score,range)) break;
3228 }
3229
3230 rangelen++;
3231 handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
3232
3233 /* Move to next node */
3234 if (reverse) {
3235 ln = ln->backward;
3236 } else {
3237 ln = ln->level[0].forward;
3238 }
3239 }
3240 } else {
3241 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3241,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3242 }
3243
3244 handler->finalizeResultEmission(handler, rangelen);
3245}
3246
3247/* ZRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
3248void zrangebyscoreCommand(client *c) {
3249 zrange_result_handler handler;
3250 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3251 zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_FORWARD);
3252}
3253
3254/* ZREVRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
3255void zrevrangebyscoreCommand(client *c) {
3256 zrange_result_handler handler;
3257 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3258 zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_REVERSE);
3259}
3260
3261void zcountCommand(client *c) {
3262 robj *key = c->argv[1];
3263 robj *zobj;
3264 zrangespec range;
3265 unsigned long count = 0;
3266
3267 /* Parse the range arguments */
3268 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK0) {
3269 addReplyError(c,"min or max is not a float");
3270 return;
3271 }
3272
3273 /* Lookup the sorted set */
3274 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL((void*)0) ||
3275 checkType(c, zobj, OBJ_ZSET3)) return;
3276
3277 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3278 unsigned char *zl = zobj->ptr;
3279 unsigned char *eptr, *sptr;
3280 double score;
3281
3282 /* Use the first element in range as the starting point */
3283 eptr = zzlFirstInRange(zl,&range);
3284
3285 /* No "first" element */
3286 if (eptr == NULL((void*)0)) {
3287 addReply(c, shared.czero);
3288 return;
3289 }
3290
3291 /* First element is in range */
3292 sptr = ziplistNext(zl,eptr);
3293 score = zzlGetScore(sptr);
3294 serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range))((zslValueLteMax(score,&range))?(void)0 : (_serverAssertWithInfo
(c,zobj,"zslValueLteMax(score,&range)","t_zset.c",3294),__builtin_unreachable
()))
;
3295
3296 /* Iterate over elements in range */
3297 while (eptr) {
3298 score = zzlGetScore(sptr);
3299
3300 /* Abort when the node is no longer in range. */
3301 if (!zslValueLteMax(score,&range)) {
3302 break;
3303 } else {
3304 count++;
3305 zzlNext(zl,&eptr,&sptr);
3306 }
3307 }
3308 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3309 zset *zs = zobj->ptr;
3310 zskiplist *zsl = zs->zsl;
3311 zskiplistNode *zn;
3312 unsigned long rank;
3313
3314 /* Find first element in range */
3315 zn = zslFirstInRange(zsl, &range);
3316
3317 /* Use rank of first element, if any, to determine preliminary count */
3318 if (zn != NULL((void*)0)) {
3319 rank = zslGetRank(zsl, zn->score, zn->ele);
3320 count = (zsl->length - (rank - 1));
3321
3322 /* Find last element in range */
3323 zn = zslLastInRange(zsl, &range);
3324
3325 /* Use rank of last element, if any, to determine the actual count */
3326 if (zn != NULL((void*)0)) {
3327 rank = zslGetRank(zsl, zn->score, zn->ele);
3328 count -= (zsl->length - rank);
3329 }
3330 }
3331 } else {
3332 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3332,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3333 }
3334
3335 addReplyLongLong(c, count);
3336}
3337
3338void zlexcountCommand(client *c) {
3339 robj *key = c->argv[1];
3340 robj *zobj;
3341 zlexrangespec range;
3342 unsigned long count = 0;
3343
3344 /* Parse the range arguments */
3345 if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK0) {
3346 addReplyError(c,"min or max not valid string range item");
3347 return;
3348 }
3349
3350 /* Lookup the sorted set */
3351 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL((void*)0) ||
3352 checkType(c, zobj, OBJ_ZSET3))
3353 {
3354 zslFreeLexRange(&range);
3355 return;
3356 }
3357
3358 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3359 unsigned char *zl = zobj->ptr;
3360 unsigned char *eptr, *sptr;
3361
3362 /* Use the first element in range as the starting point */
3363 eptr = zzlFirstInLexRange(zl,&range);
3364
3365 /* No "first" element */
3366 if (eptr == NULL((void*)0)) {
3367 zslFreeLexRange(&range);
3368 addReply(c, shared.czero);
3369 return;
3370 }
3371
3372 /* First element is in range */
3373 sptr = ziplistNext(zl,eptr);
3374 serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range))((zzlLexValueLteMax(eptr,&range))?(void)0 : (_serverAssertWithInfo
(c,zobj,"zzlLexValueLteMax(eptr,&range)","t_zset.c",3374)
,__builtin_unreachable()))
;
3375
3376 /* Iterate over elements in range */
3377 while (eptr) {
3378 /* Abort when the node is no longer in range. */
3379 if (!zzlLexValueLteMax(eptr,&range)) {
3380 break;
3381 } else {
3382 count++;
3383 zzlNext(zl,&eptr,&sptr);
3384 }
3385 }
3386 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3387 zset *zs = zobj->ptr;
3388 zskiplist *zsl = zs->zsl;
3389 zskiplistNode *zn;
3390 unsigned long rank;
3391
3392 /* Find first element in range */
3393 zn = zslFirstInLexRange(zsl, &range);
3394
3395 /* Use rank of first element, if any, to determine preliminary count */
3396 if (zn != NULL((void*)0)) {
3397 rank = zslGetRank(zsl, zn->score, zn->ele);
3398 count = (zsl->length - (rank - 1));
3399
3400 /* Find last element in range */
3401 zn = zslLastInLexRange(zsl, &range);
3402
3403 /* Use rank of last element, if any, to determine the actual count */
3404 if (zn != NULL((void*)0)) {
3405 rank = zslGetRank(zsl, zn->score, zn->ele);
3406 count -= (zsl->length - rank);
3407 }
3408 }
3409 } else {
3410 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3410,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3411 }
3412
3413 zslFreeLexRange(&range);
3414 addReplyLongLong(c, count);
3415}
3416
3417/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
3418void genericZrangebylexCommand(zrange_result_handler *handler,
3419 zlexrangespec *range, robj *zobj, int withscores, long offset, long limit,
3420 int reverse)
3421{
3422 client *c = handler->client;
3423 unsigned long rangelen = 0;
3424
3425 handler->beginResultEmission(handler);
3426
3427 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3428 unsigned char *zl = zobj->ptr;
3429 unsigned char *eptr, *sptr;
3430 unsigned char *vstr;
3431 unsigned int vlen;
3432 long long vlong;
3433
3434 /* If reversed, get the last node in range as starting point. */
3435 if (reverse) {
3436 eptr = zzlLastInLexRange(zl,range);
3437 } else {
3438 eptr = zzlFirstInLexRange(zl,range);
3439 }
3440
3441 /* Get score pointer for the first element. */
3442 if (eptr)
3443 sptr = ziplistNext(zl,eptr);
3444
3445 /* If there is an offset, just traverse the number of elements without
3446 * checking the score because that is done in the next loop. */
3447 while (eptr && offset--) {
3448 if (reverse) {
3449 zzlPrev(zl,&eptr,&sptr);
3450 } else {
3451 zzlNext(zl,&eptr,&sptr);
3452 }
3453 }
3454
3455 while (eptr && limit--) {
3456 double score = 0;
3457 if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
3458 score = zzlGetScore(sptr);
3459
3460 /* Abort when the node is no longer in range. */
3461 if (reverse) {
3462 if (!zzlLexValueGteMin(eptr,range)) break;
3463 } else {
3464 if (!zzlLexValueLteMax(eptr,range)) break;
3465 }
3466
3467 /* We know the element exists, so ziplistGet should always
3468 * succeed. */
3469 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3469),__builtin_unreachable()))
;
3470
3471 rangelen++;
3472 if (vstr == NULL((void*)0)) {
3473 handler->emitResultFromLongLong(handler, vlong, score);
3474 } else {
3475 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3476 }
3477
3478 /* Move to next node */
3479 if (reverse) {
3480 zzlPrev(zl,&eptr,&sptr);
3481 } else {
3482 zzlNext(zl,&eptr,&sptr);
3483 }
3484 }
3485 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3486 zset *zs = zobj->ptr;
3487 zskiplist *zsl = zs->zsl;
3488 zskiplistNode *ln;
3489
3490 /* If reversed, get the last node in range as starting point. */
3491 if (reverse) {
3492 ln = zslLastInLexRange(zsl,range);
3493 } else {
3494 ln = zslFirstInLexRange(zsl,range);
3495 }
3496
3497 /* If there is an offset, just traverse the number of elements without
3498 * checking the score because that is done in the next loop. */
3499 while (ln && offset--) {
3500 if (reverse) {
3501 ln = ln->backward;
3502 } else {
3503 ln = ln->level[0].forward;
3504 }
3505 }
3506
3507 while (ln && limit--) {
3508 /* Abort when the node is no longer in range. */
3509 if (reverse) {
3510 if (!zslLexValueGteMin(ln->ele,range)) break;
3511 } else {
3512 if (!zslLexValueLteMax(ln->ele,range)) break;
3513 }
3514
3515 rangelen++;
3516 handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
3517
3518 /* Move to next node */
3519 if (reverse) {
3520 ln = ln->backward;
3521 } else {
3522 ln = ln->level[0].forward;
3523 }
3524 }
3525 } else {
3526 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3526,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3527 }
3528
3529 handler->finalizeResultEmission(handler, rangelen);
3530}
3531
3532/* ZRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
3533void zrangebylexCommand(client *c) {
3534 zrange_result_handler handler;
3535 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3536 zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD);
3537}
3538
3539/* ZREVRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
3540void zrevrangebylexCommand(client *c) {
3541 zrange_result_handler handler;
3542 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3543 zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_REVERSE);
3544}
3545
3546/**
3547 * This function handles ZRANGE and ZRANGESTORE, and also the deprecated
3548 * Z[REV]RANGE[BYPOS|BYLEX] commands.
3549 *
3550 * The simple ZRANGE and ZRANGESTORE can take _AUTO in rangetype and direction,
3551 * other command pass explicit value.
3552 *
3553 * The argc_start points to the src key argument, so following syntax is like:
3554 * <src> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count]
3555 */
3556void zrangeGenericCommand(zrange_result_handler *handler, int argc_start, int store,
3557 zrange_type rangetype, zrange_direction direction)
3558{
3559 client *c = handler->client;
3560 robj *key = c->argv[argc_start];
3561 robj *zobj;
3562 zrangespec range;
3563 zlexrangespec lexrange;
3564 int minidx = argc_start + 1;
3565 int maxidx = argc_start + 2;
3566
3567 /* Options common to all */
3568 long opt_start = 0;
3569 long opt_end = 0;
3570 int opt_withscores = 0;
3571 long opt_offset = 0;
3572 long opt_limit = -1;
3573
3574 /* Step 1: Skip the <src> <min> <max> args and parse remaining optional arguments. */
3575 for (int j=argc_start + 3; j < c->argc; j++) {
3576 int leftargs = c->argc-j-1;
3577 if (!store && !strcasecmp(c->argv[j]->ptr,"withscores")) {
3578 opt_withscores = 1;
3579 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3580 if ((getLongFromObjectOrReply(c, c->argv[j+1], &opt_offset, NULL((void*)0)) != C_OK0) ||
3581 (getLongFromObjectOrReply(c, c->argv[j+2], &opt_limit, NULL((void*)0)) != C_OK0))
3582 {
3583 return;
3584 }
3585 j += 2;
3586 } else if (direction == ZRANGE_DIRECTION_AUTO &&
3587 !strcasecmp(c->argv[j]->ptr,"rev"))
3588 {
3589 direction = ZRANGE_DIRECTION_REVERSE;
3590 } else if (rangetype == ZRANGE_AUTO &&
3591 !strcasecmp(c->argv[j]->ptr,"bylex"))
3592 {
3593 rangetype = ZRANGE_LEX;
3594 } else if (rangetype == ZRANGE_AUTO &&
3595 !strcasecmp(c->argv[j]->ptr,"byscore"))
3596 {
3597 rangetype = ZRANGE_SCORE;
3598 } else {
3599 addReplyErrorObject(c,shared.syntaxerr);
3600 return;
3601 }
3602 }
3603
3604 /* Use defaults if not overriden by arguments. */
3605 if (direction == ZRANGE_DIRECTION_AUTO)
3606 direction = ZRANGE_DIRECTION_FORWARD;
3607 if (rangetype == ZRANGE_AUTO)
3608 rangetype = ZRANGE_RANK;
3609
3610 /* Check for conflicting arguments. */
3611 if (opt_limit != -1 && rangetype == ZRANGE_RANK) {
3612 addReplyError(c,"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX");
3613 return;
3614 }
3615 if (opt_withscores && rangetype == ZRANGE_LEX) {
3616 addReplyError(c,"syntax error, WITHSCORES not supported in combination with BYLEX");
3617 return;
3618 }
3619
3620 if (direction == ZRANGE_DIRECTION_REVERSE &&
3621 ((ZRANGE_SCORE == rangetype) || (ZRANGE_LEX == rangetype)))
3622 {
3623 /* Range is given as [max,min] */
3624 int tmp = maxidx;
3625 maxidx = minidx;
3626 minidx = tmp;
3627 }
3628
3629 /* Step 2: Parse the range. */
3630 switch (rangetype) {
3631 case ZRANGE_AUTO:
3632 case ZRANGE_RANK:
3633 /* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */
3634 if ((getLongFromObjectOrReply(c, c->argv[minidx], &opt_start,NULL((void*)0)) != C_OK0) ||
3635 (getLongFromObjectOrReply(c, c->argv[maxidx], &opt_end,NULL((void*)0)) != C_OK0))
3636 {
3637 return;
3638 }
3639 break;
3640
3641 case ZRANGE_SCORE:
3642 /* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */
3643 if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK0) {
3644 addReplyError(c, "min or max is not a float");
3645 return;
3646 }
3647 break;
3648
3649 case ZRANGE_LEX:
3650 /* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */
3651 if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK0) {
3652 addReplyError(c, "min or max not valid string range item");
3653 return;
3654 }
3655 break;
3656 }
3657
3658 if (opt_withscores || store) {
3659 zrangeResultHandlerScoreEmissionEnable(handler);
3660 }
3661
3662 /* Step 3: Lookup the key and get the range. */
3663 zobj = handler->dstkey ?
3664 lookupKeyWrite(c->db,key) :
3665 lookupKeyRead(c->db,key);
3666 if (zobj == NULL((void*)0)) {
3667 addReply(c,shared.emptyarray);
3668 goto cleanup;
3669 }
3670
3671 if (checkType(c,zobj,OBJ_ZSET3)) goto cleanup;
3672
3673 /* Step 4: Pass this to the command-specific handler. */
3674 switch (rangetype) {
3675 case ZRANGE_AUTO:
3676 case ZRANGE_RANK:
3677 genericZrangebyrankCommand(handler, zobj, opt_start, opt_end,
3678 opt_withscores || store, direction == ZRANGE_DIRECTION_REVERSE);
3679 break;
3680
3681 case ZRANGE_SCORE:
3682 genericZrangebyscoreCommand(handler, &range, zobj, opt_offset,
3683 opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
3684 break;
3685
3686 case ZRANGE_LEX:
3687 genericZrangebylexCommand(handler, &lexrange, zobj, opt_withscores || store,
3688 opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
3689 break;
3690 }
3691
3692 /* Instead of returning here, we'll just fall-through the clean-up. */
3693
3694cleanup:
3695
3696 if (rangetype == ZRANGE_LEX) {
3697 zslFreeLexRange(&lexrange);
3698 }
3699}
3700
3701void zcardCommand(client *c) {
3702 robj *key = c->argv[1];
3703 robj *zobj;
3704
3705 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL((void*)0) ||
3706 checkType(c,zobj,OBJ_ZSET3)) return;
3707
3708 addReplyLongLong(c,zsetLength(zobj));
3709}
3710
3711void zscoreCommand(client *c) {
3712 robj *key = c->argv[1];
3713 robj *zobj;
3714 double score;
3715
3716 if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL((void*)0) ||
3717 checkType(c,zobj,OBJ_ZSET3)) return;
3718
3719 if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR-1) {
3720 addReplyNull(c);
3721 } else {
3722 addReplyDouble(c,score);
3723 }
3724}
3725
3726void zmscoreCommand(client *c) {
3727 robj *key = c->argv[1];
3728 robj *zobj;
3729 double score;
3730 zobj = lookupKeyRead(c->db,key);
3731 if (checkType(c,zobj,OBJ_ZSET3)) return;
3732
3733 addReplyArrayLen(c,c->argc - 2);
3734 for (int j = 2; j < c->argc; j++) {
3735 /* Treat a missing set the same way as an empty set */
3736 if (zobj == NULL((void*)0) || zsetScore(zobj,c->argv[j]->ptr,&score) == C_ERR-1) {
3737 addReplyNull(c);
3738 } else {
3739 addReplyDouble(c,score);
3740 }
3741 }
3742}
3743
3744void zrankGenericCommand(client *c, int reverse) {
3745 robj *key = c->argv[1];
3746 robj *ele = c->argv[2];
3747 robj *zobj;
3748 long rank;
3749
3750 if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL((void*)0) ||
3751 checkType(c,zobj,OBJ_ZSET3)) return;
3752
3753 serverAssertWithInfo(c,ele,sdsEncodedObject(ele))(((ele->encoding == 0 || ele->encoding == 8))?(void)0 :
(_serverAssertWithInfo(c,ele,"sdsEncodedObject(ele)","t_zset.c"
,3753),__builtin_unreachable()))
;
3754 rank = zsetRank(zobj,ele->ptr,reverse);
3755 if (rank >= 0) {
3756 addReplyLongLong(c,rank);
3757 } else {
3758 addReplyNull(c);
3759 }
3760}
3761
3762void zrankCommand(client *c) {
3763 zrankGenericCommand(c, 0);
3764}
3765
3766void zrevrankCommand(client *c) {
3767 zrankGenericCommand(c, 1);
3768}
3769
3770void zscanCommand(client *c) {
3771 robj *o;
3772 unsigned long cursor;
3773
3774 if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR-1) return;
3775 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL((void*)0) ||
3776 checkType(c,o,OBJ_ZSET3)) return;
3777 scanGenericCommand(c,o,cursor);
3778}
3779
3780/* This command implements the generic zpop operation, used by:
3781 * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
3782 * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
3783 *
3784 * If 'emitkey' is true also the key name is emitted, useful for the blocking
3785 * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
3786 *
3787 * The synchronous version instead does not need to emit the key, but may
3788 * use the 'count' argument to return multiple items if available. */
3789void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
3790 int idx;
3791 robj *key = NULL((void*)0);
3792 robj *zobj = NULL((void*)0);
3793 sds ele;
3794 double score;
3795 long count = 1;
3796
3797 /* If a count argument as passed, parse it or return an error. */
3798 if (countarg) {
3799 if (getLongFromObjectOrReply(c,countarg,&count,NULL((void*)0)) != C_OK0)
3800 return;
3801 if (count <= 0) {
3802 addReply(c,shared.emptyarray);
3803 return;
3804 }
3805 }
3806
3807 /* Check type and break on the first error, otherwise identify candidate. */
3808 idx = 0;
3809 while (idx < keyc) {
3810 key = keyv[idx++];
3811 zobj = lookupKeyWrite(c->db,key);
3812 if (!zobj) continue;
3813 if (checkType(c,zobj,OBJ_ZSET3)) return;
3814 break;
3815 }
3816
3817 /* No candidate for zpopping, return empty. */
3818 if (!zobj) {
3819 addReply(c,shared.emptyarray);
3820 return;
3821 }
3822
3823 void *arraylen_ptr = addReplyDeferredLen(c);
3824 long arraylen = 0;
3825
3826 /* We emit the key only for the blocking variant. */
3827 if (emitkey) addReplyBulk(c,key);
3828
3829 /* Remove the element. */
3830 do {
3831 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3832 unsigned char *zl = zobj->ptr;
3833 unsigned char *eptr, *sptr;
3834 unsigned char *vstr;
3835 unsigned int vlen;
3836 long long vlong;
3837
3838 /* Get the first or last element in the sorted set. */
3839 eptr = ziplistIndex(zl,where == ZSET_MAX1 ? -2 : 0);
3840 serverAssertWithInfo(c,zobj,eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj
,"eptr != NULL","t_zset.c",3840),__builtin_unreachable()))
;
3841 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3841),__builtin_unreachable()))
;
3842 if (vstr == NULL((void*)0))
3843 ele = sdsfromlonglong(vlong);
3844 else
3845 ele = sdsnewlen(vstr,vlen);
3846
3847 /* Get the score. */
3848 sptr = ziplistNext(zl,eptr);
3849 serverAssertWithInfo(c,zobj,sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj
,"sptr != NULL","t_zset.c",3849),__builtin_unreachable()))
;
3850 score = zzlGetScore(sptr);
3851 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3852 zset *zs = zobj->ptr;
3853 zskiplist *zsl = zs->zsl;
3854 zskiplistNode *zln;
3855
3856 /* Get the first or last element in the sorted set. */
3857 zln = (where == ZSET_MAX1 ? zsl->tail :
3858 zsl->header->level[0].forward);
3859
3860 /* There must be an element in the sorted set. */
3861 serverAssertWithInfo(c,zobj,zln != NULL)((zln != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj,
"zln != NULL","t_zset.c",3861),__builtin_unreachable()))
;
3862 ele = sdsdup(zln->ele);
3863 score = zln->score;
3864 } else {
3865 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3865,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3866 }
3867
3868 serverAssertWithInfo(c,zobj,zsetDel(zobj,ele))((zsetDel(zobj,ele))?(void)0 : (_serverAssertWithInfo(c,zobj,
"zsetDel(zobj,ele)","t_zset.c",3868),__builtin_unreachable())
)
;
3869 server.dirty++;
3870
3871 if (arraylen == 0) { /* Do this only for the first iteration. */
3872 char *events[2] = {"zpopmin","zpopmax"};
3873 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),events[where],key,c->db->id);
3874 signalModifiedKey(c,c->db,key);
3875 }
3876
3877 addReplyBulkCBuffer(c,ele,sdslen(ele));
3878 addReplyDouble(c,score);
3879 sdsfree(ele);
3880 arraylen += 2;
3881
3882 /* Remove the key, if indeed needed. */
3883 if (zsetLength(zobj) == 0) {
3884 dbDelete(c->db,key);
3885 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2),"del",key,c->db->id);
3886 break;
3887 }
3888 } while(--count);
3889
3890 setDeferredArrayLen(c,arraylen_ptr,arraylen + (emitkey != 0));
3891}
3892
3893/* ZPOPMIN key [<count>] */
3894void zpopminCommand(client *c) {
3895 if (c->argc > 3) {
3896 addReplyErrorObject(c,shared.syntaxerr);
3897 return;
3898 }
3899 genericZpopCommand(c,&c->argv[1],1,ZSET_MIN0,0,
3900 c->argc == 3 ? c->argv[2] : NULL((void*)0));
3901}
3902
3903/* ZMAXPOP key [<count>] */
3904void zpopmaxCommand(client *c) {
3905 if (c->argc > 3) {
3906 addReplyErrorObject(c,shared.syntaxerr);
3907 return;
3908 }
3909 genericZpopCommand(c,&c->argv[1],1,ZSET_MAX1,0,
3910 c->argc == 3 ? c->argv[2] : NULL((void*)0));
3911}
3912
3913/* BZPOPMIN / BZPOPMAX actual implementation. */
3914void blockingGenericZpopCommand(client *c, int where) {
3915 robj *o;
3916 mstime_t timeout;
3917 int j;
3918
3919 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS0)
3920 != C_OK0) return;
3921
3922 for (j = 1; j < c->argc-1; j++) {
3923 o = lookupKeyWrite(c->db,c->argv[j]);
3924 if (checkType(c,o,OBJ_ZSET3)) return;
3925 if (o != NULL((void*)0)) {
3926 if (zsetLength(o) != 0) {
3927 /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
3928 genericZpopCommand(c,&c->argv[j],1,where,1,NULL((void*)0));
3929 /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
3930 rewriteClientCommandVector(c,2,
3931 where == ZSET_MAX1 ? shared.zpopmax : shared.zpopmin,
3932 c->argv[j]);
3933 return;
3934 }
3935 }
3936 }
3937
3938 /* If we are not allowed to block the client and the zset is empty the only thing
3939 * we can do is treating it as a timeout (even with timeout 0). */
3940 if (c->flags & CLIENT_DENY_BLOCKING(1ULL<<41)) {
3941 addReplyNullArray(c);
3942 return;
3943 }
3944
3945 /* If the keys do not exist we must block */
3946 blockForKeys(c,BLOCKED_ZSET5,c->argv + 1,c->argc - 2,timeout,NULL((void*)0),NULL((void*)0),NULL((void*)0));
3947}
3948
3949// BZPOPMIN key [key ...] timeout
3950void bzpopminCommand(client *c) {
3951 blockingGenericZpopCommand(c,ZSET_MIN0);
3952}
3953
3954// BZPOPMAX key [key ...] timeout
3955void bzpopmaxCommand(client *c) {
3956 blockingGenericZpopCommand(c,ZSET_MAX1);
3957}
3958
3959static void zarndmemberReplyWithZiplist(client *c, unsigned int count, ziplistEntry *keys, ziplistEntry *vals) {
3960 for (unsigned long i = 0; i < count; i++) {
3961 if (vals && c->resp > 2)
3962 addReplyArrayLen(c,2);
3963 if (keys[i].sval)
3964 addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
3965 else
3966 addReplyBulkLongLong(c, keys[i].lval);
3967 if (vals) {
3968 if (vals[i].sval) {
3969 addReplyDouble(c, zzlStrtod(vals[i].sval,vals[i].slen));
3970 } else
3971 addReplyDouble(c, vals[i].lval);
3972 }
3973 }
3974}
3975
3976/* How many times bigger should be the zset compared to the requested size
3977 * for us to not use the "remove elements" strategy? Read later in the
3978 * implementation for more info. */
3979#define ZRANDMEMBER_SUB_STRATEGY_MUL3 3
3980
3981/* If client is trying to ask for a very large number of random elements,
3982 * queuing may consume an unlimited amount of memory, so we want to limit
3983 * the number of randoms per time. */
3984#define ZRANDMEMBER_RANDOM_SAMPLE_LIMIT1000 1000
3985
3986void zrandmemberWithCountCommand(client *c, long l, int withscores) {
3987 unsigned long count, size;
3988 int uniq = 1;
3989 robj *zsetobj;
3990
3991 if ((zsetobj = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp]))
3992 == NULL((void*)0) || checkType(c, zsetobj, OBJ_ZSET3)) return;
3993 size = zsetLength(zsetobj);
3994
3995 if(l >= 0) {
3996 count = (unsigned long) l;
3997 } else {
3998 count = -l;
3999 uniq = 0;
4000 }
4001
4002 /* If count is zero, serve it ASAP to avoid special cases later. */
4003 if (count == 0) {
4004 addReply(c,shared.emptyarray);
4005 return;
4006 }
4007
4008 /* CASE 1: The count was negative, so the extraction method is just:
4009 * "return N random elements" sampling the whole set every time.
4010 * This case is trivial and can be served without auxiliary data
4011 * structures. This case is the only one that also needs to return the
4012 * elements in random order. */
4013 if (!uniq || count == 1) {
4014 if (withscores && c->resp == 2)
4015 addReplyArrayLen(c, count*2);
4016 else
4017 addReplyArrayLen(c, count);
4018 if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST7) {
4019 zset *zs = zsetobj->ptr;
4020 while (count--) {
4021 dictEntry *de = dictGetFairRandomKey(zs->dict);
4022 sds key = dictGetKey(de)((de)->key);
4023 if (withscores && c->resp > 2)
4024 addReplyArrayLen(c,2);
4025 addReplyBulkCBuffer(c, key, sdslen(key));
4026 if (withscores)
4027 addReplyDouble(c, dictGetDoubleVal(de)((de)->v.d));
4028 }
4029 } else if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST5) {
4030 ziplistEntry *keys, *vals = NULL((void*)0);
4031 unsigned long limit, sample_count;
4032 limit = count > ZRANDMEMBER_RANDOM_SAMPLE_LIMIT1000 ? ZRANDMEMBER_RANDOM_SAMPLE_LIMIT1000 : count;
4033 keys = zmalloc(sizeof(ziplistEntry)*limit);
4034 if (withscores)
4035 vals = zmalloc(sizeof(ziplistEntry)*limit);
4036 while (count) {
4037 sample_count = count > limit ? limit : count;
4038 count -= sample_count;
4039 ziplistRandomPairs(zsetobj->ptr, sample_count, keys, vals);
4040 zarndmemberReplyWithZiplist(c, sample_count, keys, vals);
4041 }
4042 zfree(keys);
4043 zfree(vals);
4044 }
4045 return;
4046 }
4047
4048 zsetopsrc src;
4049 zsetopval zval;
4050 src.subject = zsetobj;
4051 src.type = zsetobj->type;
4052 src.encoding = zsetobj->encoding;
4053 zuiInitIterator(&src);
4054 memset(&zval, 0, sizeof(zval));
4055
4056 /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */
4057 long reply_size = count < size ? count : size;
4058 if (withscores && c->resp == 2)
4059 addReplyArrayLen(c, reply_size*2);
4060 else
4061 addReplyArrayLen(c, reply_size);
4062
4063 /* CASE 2:
4064 * The number of requested elements is greater than the number of
4065 * elements inside the zset: simply return the whole zset. */
4066 if (count >= size) {
4067 while (zuiNext(&src, &zval)) {
4068 if (withscores && c->resp > 2)
4069 addReplyArrayLen(c,2);
4070 addReplyBulkSds(c, zuiNewSdsFromValue(&zval));
4071 if (withscores)
4072 addReplyDouble(c, zval.score);
4073 }
4074 return;
4075 }
4076
4077 /* CASE 3:
4078 * The number of elements inside the zset is not greater than
4079 * ZRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
4080 * In this case we create a dict from scratch with all the elements, and
4081 * subtract random elements to reach the requested number of elements.
4082 *
4083 * This is done because if the number of requested elements is just
4084 * a bit less than the number of elements in the set, the natural approach
4085 * used into CASE 4 is highly inefficient. */
4086 if (count*ZRANDMEMBER_SUB_STRATEGY_MUL3 > size) {
4087 dict *d = dictCreate(&sdsReplyDictType, NULL((void*)0));
4088 dictExpand(d, size);
4089 /* Add all the elements into the temporary dictionary. */
4090 while (zuiNext(&src, &zval)) {
4091 sds key = zuiNewSdsFromValue(&zval);
4092 dictEntry *de = dictAddRaw(d, key, NULL((void*)0));
4093 serverAssert(de)((de)?(void)0 : (_serverAssert("de","t_zset.c",4093),__builtin_unreachable
()))
;
4094 if (withscores)
4095 dictSetDoubleVal(de, zval.score)do { (de)->v.d = zval.score; } while(0);
4096 }
4097 serverAssert(dictSize(d) == size)((((d)->ht[0].used+(d)->ht[1].used) == size)?(void)0 : (
_serverAssert("dictSize(d) == size","t_zset.c",4097),__builtin_unreachable
()))
;
4098
4099 /* Remove random elements to reach the right count. */
4100 while (size > count) {
4101 dictEntry *de;
4102 de = dictGetRandomKey(d);
4103 dictUnlink(d,dictGetKey(de)((de)->key));
4104 sdsfree(dictGetKey(de)((de)->key));
4105 dictFreeUnlinkedEntry(d,de);
4106 size--;
4107 }
4108
4109 /* Reply with what's in the dict and release memory */
4110 dictIterator *di;
4111 dictEntry *de;
4112 di = dictGetIterator(d);
4113 while ((de = dictNext(di)) != NULL((void*)0)) {
4114 if (withscores && c->resp > 2)
4115 addReplyArrayLen(c,2);
4116 addReplyBulkSds(c, dictGetKey(de)((de)->key));
4117 if (withscores)
4118 addReplyDouble(c, dictGetDoubleVal(de)((de)->v.d));
4119 }
4120
4121 dictReleaseIterator(di);
4122 dictRelease(d);
4123 }
4124
4125 /* CASE 4: We have a big zset compared to the requested number of elements.
4126 * In this case we can simply get random elements from the zset and add
4127 * to the temporary set, trying to eventually get enough unique elements
4128 * to reach the specified count. */
4129 else {
4130 if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST5) {
4131 /* it is inefficient to repeatedly pick one random element from a
4132 * ziplist. so we use this instead: */
4133 ziplistEntry *keys, *vals = NULL((void*)0);
4134 keys = zmalloc(sizeof(ziplistEntry)*count);
4135 if (withscores)
4136 vals = zmalloc(sizeof(ziplistEntry)*count);
4137 serverAssert(ziplistRandomPairsUnique(zsetobj->ptr, count, keys, vals) == count)((ziplistRandomPairsUnique(zsetobj->ptr, count, keys, vals
) == count)?(void)0 : (_serverAssert("ziplistRandomPairsUnique(zsetobj->ptr, count, keys, vals) == count"
,"t_zset.c",4137),__builtin_unreachable()))
;
4138 zarndmemberReplyWithZiplist(c, count, keys, vals);
4139 zfree(keys);
4140 zfree(vals);
4141 return;
4142 }
4143
4144 /* Hashtable encoding (generic implementation) */
4145 unsigned long added = 0;
4146 dict *d = dictCreate(&hashDictType, NULL((void*)0));
4147 dictExpand(d, count);
4148
4149 while (added < count) {
4150 ziplistEntry key;
4151 double score;
4152 zsetTypeRandomElement(zsetobj, size, &key, withscores ? &score: NULL((void*)0));
4153
4154 /* Try to add the object to the dictionary. If it already exists
4155 * free it, otherwise increment the number of objects we have
4156 * in the result dictionary. */
4157 sds skey = zsetSdsFromZiplistEntry(&key);
4158 if (dictAdd(d,skey,NULL((void*)0)) != DICT_OK0) {
4159 sdsfree(skey);
4160 continue;
4161 }
4162 added++;
4163
4164 if (withscores && c->resp > 2)
4165 addReplyArrayLen(c,2);
4166 zsetReplyFromZiplistEntry(c, &key);
4167 if (withscores)
4168 addReplyDouble(c, score);
4169 }
4170
4171 /* Release memory */
4172 dictRelease(d);
4173 }
4174}
4175
4176/* ZRANDMEMBER [<count> WITHSCORES] */
4177void zrandmemberCommand(client *c) {
4178 long l;
4179 int withscores = 0;
4180 robj *zset;
4181 ziplistEntry ele;
4182
4183 if (c->argc >= 3) {
4184 if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL((void*)0)) != C_OK0) return;
4185 if (c->argc > 4 || (c->argc == 4 && strcasecmp(c->argv[3]->ptr,"withscores"))) {
4186 addReplyErrorObject(c,shared.syntaxerr);
4187 return;
4188 } else if (c->argc == 4)
4189 withscores = 1;
4190 zrandmemberWithCountCommand(c, l, withscores);
4191 return;
4192 }
4193
4194 /* Handle variant without <count> argument. Reply with simple bulk string */
4195 if ((zset = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))== NULL((void*)0) ||
4196 checkType(c,zset,OBJ_ZSET3)) {
4197 return;
4198 }
4199
4200 zsetTypeRandomElement(zset, zsetLength(zset), &ele,NULL((void*)0));
4201 zsetReplyFromZiplistEntry(c,&ele);
4202}