Bug Summary

File:src/t_zset.c
Warning:line 180, column 30
The left operand of '==' is a garbage value

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -triple x86_64-pc-linux-gnu -analyze -disable-free -disable-llvm-verifier -discard-value-names -main-file-name t_zset.c -analyzer-store=region -analyzer-opt-analyze-nested-blocks -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model static -mthread-model posix -mframe-pointer=none -fmath-errno -fno-rounding-math -masm-verbose -mconstructor-aliases -munwind-tables -target-cpu x86-64 -dwarf-column-info -fno-split-dwarf-inlining -debugger-tuning=gdb -resource-dir /usr/lib/llvm-10/lib/clang/10.0.0 -D REDIS_STATIC= -I ../deps/hiredis -I ../deps/linenoise -I ../deps/lua/src -I ../deps/hdr_histogram -D USE_JEMALLOC -I ../deps/jemalloc/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/llvm-10/lib/clang/10.0.0/include -internal-externc-isystem /usr/include/x86_64-linux-gnu -internal-externc-isystem /include -internal-externc-isystem /usr/include -O2 -Wno-c11-extensions -Wno-missing-field-initializers -std=c11 -fdebug-compilation-dir /home/netto/Desktop/redis-6.2.1/src -ferror-limit 19 -fmessage-length 0 -fgnuc-version=4.2.1 -fobjc-runtime=gcc -fdiagnostics-show-option -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -o /tmp/scan-build-2021-03-14-133648-8817-1 -x c t_zset.c
1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31/*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35/* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to a hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view").
42 *
43 * Note that the SDS string representing the element is the same in both
44 * the hash table and skiplist in order to save memory. What we do in order
45 * to manage the shared SDS string more easily is to free the SDS string
46 * only in zslFreeNode(). The dictionary has no value free method set.
47 * So we should always remove an element from the dictionary, and later from
48 * the skiplist.
49 *
50 * This skiplist implementation is almost a C translation of the original
51 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
52 * Alternative to Balanced Trees", modified in three ways:
53 * a) this implementation allows for repeated scores.
54 * b) the comparison is not just by key (our 'score') but by satellite data.
55 * c) there is a back pointer, so it's a doubly linked list with the back
56 * pointers being only at "level 1". This allows to traverse the list
57 * from tail to head, useful for ZREVRANGE. */
58
59#include "server.h"
60#include <math.h>
61
62/*-----------------------------------------------------------------------------
63 * Skiplist implementation of the low level API
64 *----------------------------------------------------------------------------*/
65
66int zslLexValueGteMin(sds value, zlexrangespec *spec);
67int zslLexValueLteMax(sds value, zlexrangespec *spec);
68
69/* Create a skiplist node with the specified number of levels.
70 * The SDS string 'ele' is referenced by the node after the call. */
71zskiplistNode *zslCreateNode(int level, double score, sds ele) {
72 zskiplistNode *zn =
73 zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
74 zn->score = score;
75 zn->ele = ele;
76 return zn;
77}
78
79/* Create a new skiplist. */
80zskiplist *zslCreate(void) {
81 int j;
82 zskiplist *zsl;
83
84 zsl = zmalloc(sizeof(*zsl));
85 zsl->level = 1;
86 zsl->length = 0;
87 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL32,0,NULL((void*)0));
88 for (j = 0; j < ZSKIPLIST_MAXLEVEL32; j++) {
89 zsl->header->level[j].forward = NULL((void*)0);
90 zsl->header->level[j].span = 0;
91 }
92 zsl->header->backward = NULL((void*)0);
93 zsl->tail = NULL((void*)0);
94 return zsl;
95}
96
97/* Free the specified skiplist node. The referenced SDS string representation
98 * of the element is freed too, unless node->ele is set to NULL before calling
99 * this function. */
100void zslFreeNode(zskiplistNode *node) {
101 sdsfree(node->ele);
102 zfree(node);
103}
104
105/* Free a whole skiplist. */
106void zslFree(zskiplist *zsl) {
107 zskiplistNode *node = zsl->header->level[0].forward, *next;
108
109 zfree(zsl->header);
110 while(node) {
111 next = node->level[0].forward;
112 zslFreeNode(node);
113 node = next;
114 }
115 zfree(zsl);
116}
117
118/* Returns a random level for the new skiplist node we are going to create.
119 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
120 * (both inclusive), with a powerlaw-alike distribution where higher
121 * levels are less likely to be returned. */
122int zslRandomLevel(void) {
123 int level = 1;
124 while ((random()&0xFFFF) < (ZSKIPLIST_P0.25 * 0xFFFF))
125 level += 1;
126 return (level<ZSKIPLIST_MAXLEVEL32) ? level : ZSKIPLIST_MAXLEVEL32;
127}
128
129/* Insert a new node in the skiplist. Assumes the element does not already
130 * exist (up to the caller to enforce that). The skiplist takes ownership
131 * of the passed SDS string 'ele'. */
132zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
133 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
134 unsigned int rank[ZSKIPLIST_MAXLEVEL32];
135 int i, level;
136
137 serverAssert(!isnan(score))((!__builtin_isnan (score))?(void)0 : (_serverAssert("!isnan(score)"
,"t_zset.c",137),__builtin_unreachable()))
;
12
Assuming the condition is true
13
'?' condition is true
138 x = zsl->header;
139 for (i = zsl->level-1; i >= 0; i--) {
14
Assuming 'i' is < 0
15
Loop condition is false. Execution continues on line 156
140 /* store rank that is crossed to reach the insert position */
141 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
142 while (x->level[i].forward &&
143 (x->level[i].forward->score < score ||
144 (x->level[i].forward->score == score &&
145 sdscmp(x->level[i].forward->ele,ele) < 0)))
146 {
147 rank[i] += x->level[i].span;
148 x = x->level[i].forward;
149 }
150 update[i] = x;
151 }
152 /* we assume the element is not already inside, since we allow duplicated
153 * scores, reinserting the same element should never happen since the
154 * caller of zslInsert() should test in the hash table if the element is
155 * already inside or not. */
156 level = zslRandomLevel();
157 if (level > zsl->level) {
16
Assuming 'level' is <= field 'level'
17
Taking false branch
158 for (i = zsl->level; i < level; i++) {
159 rank[i] = 0;
160 update[i] = zsl->header;
161 update[i]->level[i].span = zsl->length;
162 }
163 zsl->level = level;
164 }
165 x = zslCreateNode(level,score,ele);
166 for (i = 0; i < level; i++) {
18
Assuming 'i' is >= 'level'
19
Loop condition is false. Execution continues on line 176
167 x->level[i].forward = update[i]->level[i].forward;
168 update[i]->level[i].forward = x;
169
170 /* update span covered by update[i] as x is inserted here */
171 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
172 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
173 }
174
175 /* increment span for untouched levels */
176 for (i = level; i < zsl->level; i++) {
20
Assuming 'i' is >= field 'level'
21
Loop condition is false. Execution continues on line 180
177 update[i]->level[i].span++;
178 }
179
180 x->backward = (update[0] == zsl->header) ? NULL((void*)0) : update[0];
22
The left operand of '==' is a garbage value
181 if (x->level[0].forward)
182 x->level[0].forward->backward = x;
183 else
184 zsl->tail = x;
185 zsl->length++;
186 return x;
187}
188
189/* Internal function used by zslDelete, zslDeleteRangeByScore and
190 * zslDeleteRangeByRank. */
191void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
192 int i;
193 for (i = 0; i < zsl->level; i++) {
194 if (update[i]->level[i].forward == x) {
195 update[i]->level[i].span += x->level[i].span - 1;
196 update[i]->level[i].forward = x->level[i].forward;
197 } else {
198 update[i]->level[i].span -= 1;
199 }
200 }
201 if (x->level[0].forward) {
202 x->level[0].forward->backward = x->backward;
203 } else {
204 zsl->tail = x->backward;
205 }
206 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL((void*)0))
207 zsl->level--;
208 zsl->length--;
209}
210
211/* Delete an element with matching score/element from the skiplist.
212 * The function returns 1 if the node was found and deleted, otherwise
213 * 0 is returned.
214 *
215 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
216 * it is not freed (but just unlinked) and *node is set to the node pointer,
217 * so that it is possible for the caller to reuse the node (including the
218 * referenced SDS string at node->ele). */
219int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
220 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
221 int i;
222
223 x = zsl->header;
224 for (i = zsl->level-1; i >= 0; i--) {
225 while (x->level[i].forward &&
226 (x->level[i].forward->score < score ||
227 (x->level[i].forward->score == score &&
228 sdscmp(x->level[i].forward->ele,ele) < 0)))
229 {
230 x = x->level[i].forward;
231 }
232 update[i] = x;
233 }
234 /* We may have multiple elements with the same score, what we need
235 * is to find the element with both the right score and object. */
236 x = x->level[0].forward;
237 if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
238 zslDeleteNode(zsl, x, update);
239 if (!node)
240 zslFreeNode(x);
241 else
242 *node = x;
243 return 1;
244 }
245 return 0; /* not found */
246}
247
248/* Update the score of an element inside the sorted set skiplist.
249 * Note that the element must exist and must match 'score'.
250 * This function does not update the score in the hash table side, the
251 * caller should take care of it.
252 *
253 * Note that this function attempts to just update the node, in case after
254 * the score update, the node would be exactly at the same position.
255 * Otherwise the skiplist is modified by removing and re-adding a new
256 * element, which is more costly.
257 *
258 * The function returns the updated element skiplist node pointer. */
259zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
260 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
261 int i;
262
263 /* We need to seek to element to update to start: this is useful anyway,
264 * we'll have to update or remove it. */
265 x = zsl->header;
266 for (i = zsl->level-1; i >= 0; i--) {
267 while (x->level[i].forward &&
268 (x->level[i].forward->score < curscore ||
269 (x->level[i].forward->score == curscore &&
270 sdscmp(x->level[i].forward->ele,ele) < 0)))
271 {
272 x = x->level[i].forward;
273 }
274 update[i] = x;
275 }
276
277 /* Jump to our element: note that this function assumes that the
278 * element with the matching score exists. */
279 x = x->level[0].forward;
280 serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0)((x && curscore == x->score && sdscmp(x->
ele,ele) == 0)?(void)0 : (_serverAssert("x && curscore == x->score && sdscmp(x->ele,ele) == 0"
,"t_zset.c",280),__builtin_unreachable()))
;
281
282 /* If the node, after the score update, would be still exactly
283 * at the same position, we can just update the score without
284 * actually removing and re-inserting the element in the skiplist. */
285 if ((x->backward == NULL((void*)0) || x->backward->score < newscore) &&
286 (x->level[0].forward == NULL((void*)0) || x->level[0].forward->score > newscore))
287 {
288 x->score = newscore;
289 return x;
290 }
291
292 /* No way to reuse the old node: we need to remove and insert a new
293 * one at a different place. */
294 zslDeleteNode(zsl, x, update);
295 zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
296 /* We reused the old node x->ele SDS string, free the node now
297 * since zslInsert created a new one. */
298 x->ele = NULL((void*)0);
299 zslFreeNode(x);
300 return newnode;
301}
302
303int zslValueGteMin(double value, zrangespec *spec) {
304 return spec->minex ? (value > spec->min) : (value >= spec->min);
305}
306
307int zslValueLteMax(double value, zrangespec *spec) {
308 return spec->maxex ? (value < spec->max) : (value <= spec->max);
309}
310
311/* Returns if there is a part of the zset is in range. */
312int zslIsInRange(zskiplist *zsl, zrangespec *range) {
313 zskiplistNode *x;
314
315 /* Test for ranges that will always be empty. */
316 if (range->min > range->max ||
317 (range->min == range->max && (range->minex || range->maxex)))
318 return 0;
319 x = zsl->tail;
320 if (x == NULL((void*)0) || !zslValueGteMin(x->score,range))
321 return 0;
322 x = zsl->header->level[0].forward;
323 if (x == NULL((void*)0) || !zslValueLteMax(x->score,range))
324 return 0;
325 return 1;
326}
327
328/* Find the first node that is contained in the specified range.
329 * Returns NULL when no element is contained in the range. */
330zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
331 zskiplistNode *x;
332 int i;
333
334 /* If everything is out of range, return early. */
335 if (!zslIsInRange(zsl,range)) return NULL((void*)0);
336
337 x = zsl->header;
338 for (i = zsl->level-1; i >= 0; i--) {
339 /* Go forward while *OUT* of range. */
340 while (x->level[i].forward &&
341 !zslValueGteMin(x->level[i].forward->score,range))
342 x = x->level[i].forward;
343 }
344
345 /* This is an inner range, so the next node cannot be NULL. */
346 x = x->level[0].forward;
347 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,347),__builtin_unreachable()))
;
348
349 /* Check if score <= max. */
350 if (!zslValueLteMax(x->score,range)) return NULL((void*)0);
351 return x;
352}
353
354/* Find the last node that is contained in the specified range.
355 * Returns NULL when no element is contained in the range. */
356zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
357 zskiplistNode *x;
358 int i;
359
360 /* If everything is out of range, return early. */
361 if (!zslIsInRange(zsl,range)) return NULL((void*)0);
362
363 x = zsl->header;
364 for (i = zsl->level-1; i >= 0; i--) {
365 /* Go forward while *IN* range. */
366 while (x->level[i].forward &&
367 zslValueLteMax(x->level[i].forward->score,range))
368 x = x->level[i].forward;
369 }
370
371 /* This is an inner range, so this node cannot be NULL. */
372 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,372),__builtin_unreachable()))
;
373
374 /* Check if score >= min. */
375 if (!zslValueGteMin(x->score,range)) return NULL((void*)0);
376 return x;
377}
378
379/* Delete all the elements with score between min and max from the skiplist.
380 * Both min and max can be inclusive or exclusive (see range->minex and
381 * range->maxex). When inclusive a score >= min && score <= max is deleted.
382 * Note that this function takes the reference to the hash table view of the
383 * sorted set, in order to remove the elements from the hash table too. */
384unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
385 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
386 unsigned long removed = 0;
387 int i;
388
389 x = zsl->header;
390 for (i = zsl->level-1; i >= 0; i--) {
391 while (x->level[i].forward && (range->minex ?
392 x->level[i].forward->score <= range->min :
393 x->level[i].forward->score < range->min))
394 x = x->level[i].forward;
395 update[i] = x;
396 }
397
398 /* Current node is the last with score < or <= min. */
399 x = x->level[0].forward;
400
401 /* Delete nodes while in range. */
402 while (x &&
403 (range->maxex ? x->score < range->max : x->score <= range->max))
404 {
405 zskiplistNode *next = x->level[0].forward;
406 zslDeleteNode(zsl,x,update);
407 dictDelete(dict,x->ele);
408 zslFreeNode(x); /* Here is where x->ele is actually released. */
409 removed++;
410 x = next;
411 }
412 return removed;
413}
414
415unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
416 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
417 unsigned long removed = 0;
418 int i;
419
420
421 x = zsl->header;
422 for (i = zsl->level-1; i >= 0; i--) {
423 while (x->level[i].forward &&
424 !zslLexValueGteMin(x->level[i].forward->ele,range))
425 x = x->level[i].forward;
426 update[i] = x;
427 }
428
429 /* Current node is the last with score < or <= min. */
430 x = x->level[0].forward;
431
432 /* Delete nodes while in range. */
433 while (x && zslLexValueLteMax(x->ele,range)) {
434 zskiplistNode *next = x->level[0].forward;
435 zslDeleteNode(zsl,x,update);
436 dictDelete(dict,x->ele);
437 zslFreeNode(x); /* Here is where x->ele is actually released. */
438 removed++;
439 x = next;
440 }
441 return removed;
442}
443
444/* Delete all the elements with rank between start and end from the skiplist.
445 * Start and end are inclusive. Note that start and end need to be 1-based */
446unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
447 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x;
448 unsigned long traversed = 0, removed = 0;
449 int i;
450
451 x = zsl->header;
452 for (i = zsl->level-1; i >= 0; i--) {
453 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
454 traversed += x->level[i].span;
455 x = x->level[i].forward;
456 }
457 update[i] = x;
458 }
459
460 traversed++;
461 x = x->level[0].forward;
462 while (x && traversed <= end) {
463 zskiplistNode *next = x->level[0].forward;
464 zslDeleteNode(zsl,x,update);
465 dictDelete(dict,x->ele);
466 zslFreeNode(x);
467 removed++;
468 traversed++;
469 x = next;
470 }
471 return removed;
472}
473
474/* Find the rank for an element by both score and key.
475 * Returns 0 when the element cannot be found, rank otherwise.
476 * Note that the rank is 1-based due to the span of zsl->header to the
477 * first element. */
478unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
479 zskiplistNode *x;
480 unsigned long rank = 0;
481 int i;
482
483 x = zsl->header;
484 for (i = zsl->level-1; i >= 0; i--) {
485 while (x->level[i].forward &&
486 (x->level[i].forward->score < score ||
487 (x->level[i].forward->score == score &&
488 sdscmp(x->level[i].forward->ele,ele) <= 0))) {
489 rank += x->level[i].span;
490 x = x->level[i].forward;
491 }
492
493 /* x might be equal to zsl->header, so test if obj is non-NULL */
494 if (x->ele && sdscmp(x->ele,ele) == 0) {
495 return rank;
496 }
497 }
498 return 0;
499}
500
501/* Finds an element by its rank. The rank argument needs to be 1-based. */
502zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
503 zskiplistNode *x;
504 unsigned long traversed = 0;
505 int i;
506
507 x = zsl->header;
508 for (i = zsl->level-1; i >= 0; i--) {
509 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
510 {
511 traversed += x->level[i].span;
512 x = x->level[i].forward;
513 }
514 if (traversed == rank) {
515 return x;
516 }
517 }
518 return NULL((void*)0);
519}
520
521/* Populate the rangespec according to the objects min and max. */
522static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
523 char *eptr;
524 spec->minex = spec->maxex = 0;
525
526 /* Parse the min-max interval. If one of the values is prefixed
527 * by the "(" character, it's considered "open". For instance
528 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
529 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
530 if (min->encoding == OBJ_ENCODING_INT1) {
531 spec->min = (long)min->ptr;
532 } else {
533 if (((char*)min->ptr)[0] == '(') {
534 spec->min = strtod((char*)min->ptr+1,&eptr);
535 if (eptr[0] != '\0' || isnan(spec->min)__builtin_isnan (spec->min)) return C_ERR-1;
536 spec->minex = 1;
537 } else {
538 spec->min = strtod((char*)min->ptr,&eptr);
539 if (eptr[0] != '\0' || isnan(spec->min)__builtin_isnan (spec->min)) return C_ERR-1;
540 }
541 }
542 if (max->encoding == OBJ_ENCODING_INT1) {
543 spec->max = (long)max->ptr;
544 } else {
545 if (((char*)max->ptr)[0] == '(') {
546 spec->max = strtod((char*)max->ptr+1,&eptr);
547 if (eptr[0] != '\0' || isnan(spec->max)__builtin_isnan (spec->max)) return C_ERR-1;
548 spec->maxex = 1;
549 } else {
550 spec->max = strtod((char*)max->ptr,&eptr);
551 if (eptr[0] != '\0' || isnan(spec->max)__builtin_isnan (spec->max)) return C_ERR-1;
552 }
553 }
554
555 return C_OK0;
556}
557
558/* ------------------------ Lexicographic ranges ---------------------------- */
559
560/* Parse max or min argument of ZRANGEBYLEX.
561 * (foo means foo (open interval)
562 * [foo means foo (closed interval)
563 * - means the min string possible
564 * + means the max string possible
565 *
566 * If the string is valid the *dest pointer is set to the redis object
567 * that will be used for the comparison, and ex will be set to 0 or 1
568 * respectively if the item is exclusive or inclusive. C_OK will be
569 * returned.
570 *
571 * If the string is not a valid range C_ERR is returned, and the value
572 * of *dest and *ex is undefined. */
573int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
574 char *c = item->ptr;
575
576 switch(c[0]) {
577 case '+':
578 if (c[1] != '\0') return C_ERR-1;
579 *ex = 1;
580 *dest = shared.maxstring;
581 return C_OK0;
582 case '-':
583 if (c[1] != '\0') return C_ERR-1;
584 *ex = 1;
585 *dest = shared.minstring;
586 return C_OK0;
587 case '(':
588 *ex = 1;
589 *dest = sdsnewlen(c+1,sdslen(c)-1);
590 return C_OK0;
591 case '[':
592 *ex = 0;
593 *dest = sdsnewlen(c+1,sdslen(c)-1);
594 return C_OK0;
595 default:
596 return C_ERR-1;
597 }
598}
599
600/* Free a lex range structure, must be called only after zelParseLexRange()
601 * populated the structure with success (C_OK returned). */
602void zslFreeLexRange(zlexrangespec *spec) {
603 if (spec->min != shared.minstring &&
604 spec->min != shared.maxstring) sdsfree(spec->min);
605 if (spec->max != shared.minstring &&
606 spec->max != shared.maxstring) sdsfree(spec->max);
607}
608
609/* Populate the lex rangespec according to the objects min and max.
610 *
611 * Return C_OK on success. On error C_ERR is returned.
612 * When OK is returned the structure must be freed with zslFreeLexRange(),
613 * otherwise no release is needed. */
614int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
615 /* The range can't be valid if objects are integer encoded.
616 * Every item must start with ( or [. */
617 if (min->encoding == OBJ_ENCODING_INT1 ||
618 max->encoding == OBJ_ENCODING_INT1) return C_ERR-1;
619
620 spec->min = spec->max = NULL((void*)0);
621 if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR-1 ||
622 zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR-1) {
623 zslFreeLexRange(spec);
624 return C_ERR-1;
625 } else {
626 return C_OK0;
627 }
628}
629
630/* This is just a wrapper to sdscmp() that is able to
631 * handle shared.minstring and shared.maxstring as the equivalent of
632 * -inf and +inf for strings */
633int sdscmplex(sds a, sds b) {
634 if (a == b) return 0;
635 if (a == shared.minstring || b == shared.maxstring) return -1;
636 if (a == shared.maxstring || b == shared.minstring) return 1;
637 return sdscmp(a,b);
638}
639
640int zslLexValueGteMin(sds value, zlexrangespec *spec) {
641 return spec->minex ?
642 (sdscmplex(value,spec->min) > 0) :
643 (sdscmplex(value,spec->min) >= 0);
644}
645
646int zslLexValueLteMax(sds value, zlexrangespec *spec) {
647 return spec->maxex ?
648 (sdscmplex(value,spec->max) < 0) :
649 (sdscmplex(value,spec->max) <= 0);
650}
651
652/* Returns if there is a part of the zset is in the lex range. */
653int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
654 zskiplistNode *x;
655
656 /* Test for ranges that will always be empty. */
657 int cmp = sdscmplex(range->min,range->max);
658 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
659 return 0;
660 x = zsl->tail;
661 if (x == NULL((void*)0) || !zslLexValueGteMin(x->ele,range))
662 return 0;
663 x = zsl->header->level[0].forward;
664 if (x == NULL((void*)0) || !zslLexValueLteMax(x->ele,range))
665 return 0;
666 return 1;
667}
668
669/* Find the first node that is contained in the specified lex range.
670 * Returns NULL when no element is contained in the range. */
671zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
672 zskiplistNode *x;
673 int i;
674
675 /* If everything is out of range, return early. */
676 if (!zslIsInLexRange(zsl,range)) return NULL((void*)0);
677
678 x = zsl->header;
679 for (i = zsl->level-1; i >= 0; i--) {
680 /* Go forward while *OUT* of range. */
681 while (x->level[i].forward &&
682 !zslLexValueGteMin(x->level[i].forward->ele,range))
683 x = x->level[i].forward;
684 }
685
686 /* This is an inner range, so the next node cannot be NULL. */
687 x = x->level[0].forward;
688 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,688),__builtin_unreachable()))
;
689
690 /* Check if score <= max. */
691 if (!zslLexValueLteMax(x->ele,range)) return NULL((void*)0);
692 return x;
693}
694
695/* Find the last node that is contained in the specified range.
696 * Returns NULL when no element is contained in the range. */
697zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
698 zskiplistNode *x;
699 int i;
700
701 /* If everything is out of range, return early. */
702 if (!zslIsInLexRange(zsl,range)) return NULL((void*)0);
703
704 x = zsl->header;
705 for (i = zsl->level-1; i >= 0; i--) {
706 /* Go forward while *IN* range. */
707 while (x->level[i].forward &&
708 zslLexValueLteMax(x->level[i].forward->ele,range))
709 x = x->level[i].forward;
710 }
711
712 /* This is an inner range, so this node cannot be NULL. */
713 serverAssert(x != NULL)((x != ((void*)0))?(void)0 : (_serverAssert("x != NULL","t_zset.c"
,713),__builtin_unreachable()))
;
714
715 /* Check if score >= min. */
716 if (!zslLexValueGteMin(x->ele,range)) return NULL((void*)0);
717 return x;
718}
719
720/*-----------------------------------------------------------------------------
721 * Ziplist-backed sorted set API
722 *----------------------------------------------------------------------------*/
723
724double zzlStrtod(unsigned char *vstr, unsigned int vlen) {
725 char buf[128];
726 if (vlen > sizeof(buf))
727 vlen = sizeof(buf);
728 memcpy(buf,vstr,vlen);
729 buf[vlen] = '\0';
730 return strtod(buf,NULL((void*)0));
731 }
732
733double zzlGetScore(unsigned char *sptr) {
734 unsigned char *vstr;
735 unsigned int vlen;
736 long long vlong;
737 double score;
738
739 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",739),__builtin_unreachable()))
;
740 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong))((ziplistGet(sptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssert("ziplistGet(sptr,&vstr,&vlen,&vlong)"
,"t_zset.c",740),__builtin_unreachable()))
;
741
742 if (vstr) {
743 score = zzlStrtod(vstr,vlen);
744 } else {
745 score = vlong;
746 }
747
748 return score;
749}
750
751/* Return a ziplist element as an SDS string. */
752sds ziplistGetObject(unsigned char *sptr) {
753 unsigned char *vstr;
754 unsigned int vlen;
755 long long vlong;
756
757 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",757),__builtin_unreachable()))
;
758 serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong))((ziplistGet(sptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssert("ziplistGet(sptr,&vstr,&vlen,&vlong)"
,"t_zset.c",758),__builtin_unreachable()))
;
759
760 if (vstr) {
761 return sdsnewlen((char*)vstr,vlen);
762 } else {
763 return sdsfromlonglong(vlong);
764 }
765}
766
767/* Compare element in sorted set with given element. */
768int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
769 unsigned char *vstr;
770 unsigned int vlen;
771 long long vlong;
772 unsigned char vbuf[32];
773 int minlen, cmp;
774
775 serverAssert(ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssert("ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",775),__builtin_unreachable()))
;
776 if (vstr == NULL((void*)0)) {
777 /* Store string representation of long long in buf. */
778 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
779 vstr = vbuf;
780 }
781
782 minlen = (vlen < clen) ? vlen : clen;
783 cmp = memcmp(vstr,cstr,minlen);
784 if (cmp == 0) return vlen-clen;
785 return cmp;
786}
787
788unsigned int zzlLength(unsigned char *zl) {
789 return ziplistLen(zl)/2;
790}
791
792/* Move to next entry based on the values in eptr and sptr. Both are set to
793 * NULL when there is no next entry. */
794void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
795 unsigned char *_eptr, *_sptr;
796 serverAssert(*eptr != NULL && *sptr != NULL)((*eptr != ((void*)0) && *sptr != ((void*)0))?(void)0
: (_serverAssert("*eptr != NULL && *sptr != NULL","t_zset.c"
,796),__builtin_unreachable()))
;
797
798 _eptr = ziplistNext(zl,*sptr);
799 if (_eptr != NULL((void*)0)) {
800 _sptr = ziplistNext(zl,_eptr);
801 serverAssert(_sptr != NULL)((_sptr != ((void*)0))?(void)0 : (_serverAssert("_sptr != NULL"
,"t_zset.c",801),__builtin_unreachable()))
;
802 } else {
803 /* No next entry. */
804 _sptr = NULL((void*)0);
805 }
806
807 *eptr = _eptr;
808 *sptr = _sptr;
809}
810
811/* Move to the previous entry based on the values in eptr and sptr. Both are
812 * set to NULL when there is no next entry. */
813void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
814 unsigned char *_eptr, *_sptr;
815 serverAssert(*eptr != NULL && *sptr != NULL)((*eptr != ((void*)0) && *sptr != ((void*)0))?(void)0
: (_serverAssert("*eptr != NULL && *sptr != NULL","t_zset.c"
,815),__builtin_unreachable()))
;
816
817 _sptr = ziplistPrev(zl,*eptr);
818 if (_sptr != NULL((void*)0)) {
819 _eptr = ziplistPrev(zl,_sptr);
820 serverAssert(_eptr != NULL)((_eptr != ((void*)0))?(void)0 : (_serverAssert("_eptr != NULL"
,"t_zset.c",820),__builtin_unreachable()))
;
821 } else {
822 /* No previous entry. */
823 _eptr = NULL((void*)0);
824 }
825
826 *eptr = _eptr;
827 *sptr = _sptr;
828}
829
830/* Returns if there is a part of the zset is in range. Should only be used
831 * internally by zzlFirstInRange and zzlLastInRange. */
832int zzlIsInRange(unsigned char *zl, zrangespec *range) {
833 unsigned char *p;
834 double score;
835
836 /* Test for ranges that will always be empty. */
837 if (range->min > range->max ||
838 (range->min == range->max && (range->minex || range->maxex)))
839 return 0;
840
841 p = ziplistIndex(zl,-1); /* Last score. */
842 if (p == NULL((void*)0)) return 0; /* Empty sorted set */
843 score = zzlGetScore(p);
844 if (!zslValueGteMin(score,range))
845 return 0;
846
847 p = ziplistIndex(zl,1); /* First score. */
848 serverAssert(p != NULL)((p != ((void*)0))?(void)0 : (_serverAssert("p != NULL","t_zset.c"
,848),__builtin_unreachable()))
;
849 score = zzlGetScore(p);
850 if (!zslValueLteMax(score,range))
851 return 0;
852
853 return 1;
854}
855
856/* Find pointer to the first element contained in the specified range.
857 * Returns NULL when no element is contained in the range. */
858unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
859 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
860 double score;
861
862 /* If everything is out of range, return early. */
863 if (!zzlIsInRange(zl,range)) return NULL((void*)0);
864
865 while (eptr != NULL((void*)0)) {
866 sptr = ziplistNext(zl,eptr);
867 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",867),__builtin_unreachable()))
;
868
869 score = zzlGetScore(sptr);
870 if (zslValueGteMin(score,range)) {
871 /* Check if score <= max. */
872 if (zslValueLteMax(score,range))
873 return eptr;
874 return NULL((void*)0);
875 }
876
877 /* Move to next element. */
878 eptr = ziplistNext(zl,sptr);
879 }
880
881 return NULL((void*)0);
882}
883
884/* Find pointer to the last element contained in the specified range.
885 * Returns NULL when no element is contained in the range. */
886unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
887 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
888 double score;
889
890 /* If everything is out of range, return early. */
891 if (!zzlIsInRange(zl,range)) return NULL((void*)0);
892
893 while (eptr != NULL((void*)0)) {
894 sptr = ziplistNext(zl,eptr);
895 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",895),__builtin_unreachable()))
;
896
897 score = zzlGetScore(sptr);
898 if (zslValueLteMax(score,range)) {
899 /* Check if score >= min. */
900 if (zslValueGteMin(score,range))
901 return eptr;
902 return NULL((void*)0);
903 }
904
905 /* Move to previous element by moving to the score of previous element.
906 * When this returns NULL, we know there also is no element. */
907 sptr = ziplistPrev(zl,eptr);
908 if (sptr != NULL((void*)0))
909 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL)(((eptr = ziplistPrev(zl,sptr)) != ((void*)0))?(void)0 : (_serverAssert
("(eptr = ziplistPrev(zl,sptr)) != NULL","t_zset.c",909),__builtin_unreachable
()))
;
910 else
911 eptr = NULL((void*)0);
912 }
913
914 return NULL((void*)0);
915}
916
917int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
918 sds value = ziplistGetObject(p);
919 int res = zslLexValueGteMin(value,spec);
920 sdsfree(value);
921 return res;
922}
923
924int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
925 sds value = ziplistGetObject(p);
926 int res = zslLexValueLteMax(value,spec);
927 sdsfree(value);
928 return res;
929}
930
931/* Returns if there is a part of the zset is in range. Should only be used
932 * internally by zzlFirstInRange and zzlLastInRange. */
933int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
934 unsigned char *p;
935
936 /* Test for ranges that will always be empty. */
937 int cmp = sdscmplex(range->min,range->max);
938 if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
939 return 0;
940
941 p = ziplistIndex(zl,-2); /* Last element. */
942 if (p == NULL((void*)0)) return 0;
943 if (!zzlLexValueGteMin(p,range))
944 return 0;
945
946 p = ziplistIndex(zl,0); /* First element. */
947 serverAssert(p != NULL)((p != ((void*)0))?(void)0 : (_serverAssert("p != NULL","t_zset.c"
,947),__builtin_unreachable()))
;
948 if (!zzlLexValueLteMax(p,range))
949 return 0;
950
951 return 1;
952}
953
954/* Find pointer to the first element contained in the specified lex range.
955 * Returns NULL when no element is contained in the range. */
956unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
957 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
958
959 /* If everything is out of range, return early. */
960 if (!zzlIsInLexRange(zl,range)) return NULL((void*)0);
961
962 while (eptr != NULL((void*)0)) {
963 if (zzlLexValueGteMin(eptr,range)) {
964 /* Check if score <= max. */
965 if (zzlLexValueLteMax(eptr,range))
966 return eptr;
967 return NULL((void*)0);
968 }
969
970 /* Move to next element. */
971 sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
972 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",972),__builtin_unreachable()))
;
973 eptr = ziplistNext(zl,sptr); /* Next element. */
974 }
975
976 return NULL((void*)0);
977}
978
979/* Find pointer to the last element contained in the specified lex range.
980 * Returns NULL when no element is contained in the range. */
981unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
982 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
983
984 /* If everything is out of range, return early. */
985 if (!zzlIsInLexRange(zl,range)) return NULL((void*)0);
986
987 while (eptr != NULL((void*)0)) {
988 if (zzlLexValueLteMax(eptr,range)) {
989 /* Check if score >= min. */
990 if (zzlLexValueGteMin(eptr,range))
991 return eptr;
992 return NULL((void*)0);
993 }
994
995 /* Move to previous element by moving to the score of previous element.
996 * When this returns NULL, we know there also is no element. */
997 sptr = ziplistPrev(zl,eptr);
998 if (sptr != NULL((void*)0))
999 serverAssert((eptr = ziplistPrev(zl,sptr)) != NULL)(((eptr = ziplistPrev(zl,sptr)) != ((void*)0))?(void)0 : (_serverAssert
("(eptr = ziplistPrev(zl,sptr)) != NULL","t_zset.c",999),__builtin_unreachable
()))
;
1000 else
1001 eptr = NULL((void*)0);
1002 }
1003
1004 return NULL((void*)0);
1005}
1006
1007unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
1008 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1009
1010 while (eptr != NULL((void*)0)) {
1011 sptr = ziplistNext(zl,eptr);
1012 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",1012),__builtin_unreachable()))
;
1013
1014 if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
1015 /* Matching element, pull out score. */
1016 if (score != NULL((void*)0)) *score = zzlGetScore(sptr);
1017 return eptr;
1018 }
1019
1020 /* Move to next element. */
1021 eptr = ziplistNext(zl,sptr);
1022 }
1023 return NULL((void*)0);
1024}
1025
1026/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
1027 * don't want to modify the one given as argument. */
1028unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
1029 unsigned char *p = eptr;
1030
1031 /* TODO: add function to ziplist API to delete N elements from offset. */
1032 zl = ziplistDelete(zl,&p);
1033 zl = ziplistDelete(zl,&p);
1034 return zl;
1035}
1036
1037unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
1038 unsigned char *sptr;
1039 char scorebuf[128];
1040 int scorelen;
1041 size_t offset;
1042
1043 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
1044 if (eptr == NULL((void*)0)) {
1045 zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL1);
1046 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL1);
1047 } else {
1048 /* Keep offset relative to zl, as it might be re-allocated. */
1049 offset = eptr-zl;
1050 zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele));
1051 eptr = zl+offset;
1052
1053 /* Insert score after the element. */
1054 serverAssert((sptr = ziplistNext(zl,eptr)) != NULL)(((sptr = ziplistNext(zl,eptr)) != ((void*)0))?(void)0 : (_serverAssert
("(sptr = ziplistNext(zl,eptr)) != NULL","t_zset.c",1054),__builtin_unreachable
()))
;
1055 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
1056 }
1057 return zl;
1058}
1059
1060/* Insert (element,score) pair in ziplist. This function assumes the element is
1061 * not yet present in the list. */
1062unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
1063 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
1064 double s;
1065
1066 while (eptr != NULL((void*)0)) {
1067 sptr = ziplistNext(zl,eptr);
1068 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",1068),__builtin_unreachable()))
;
1069 s = zzlGetScore(sptr);
1070
1071 if (s > score) {
1072 /* First element with score larger than score for element to be
1073 * inserted. This means we should take its spot in the list to
1074 * maintain ordering. */
1075 zl = zzlInsertAt(zl,eptr,ele,score);
1076 break;
1077 } else if (s == score) {
1078 /* Ensure lexicographical ordering for elements. */
1079 if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
1080 zl = zzlInsertAt(zl,eptr,ele,score);
1081 break;
1082 }
1083 }
1084
1085 /* Move to next element. */
1086 eptr = ziplistNext(zl,sptr);
1087 }
1088
1089 /* Push on tail of list when it was not yet inserted. */
1090 if (eptr == NULL((void*)0))
1091 zl = zzlInsertAt(zl,NULL((void*)0),ele,score);
1092 return zl;
1093}
1094
1095unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
1096 unsigned char *eptr, *sptr;
1097 double score;
1098 unsigned long num = 0;
1099
1100 if (deleted != NULL((void*)0)) *deleted = 0;
1101
1102 eptr = zzlFirstInRange(zl,range);
1103 if (eptr == NULL((void*)0)) return zl;
1104
1105 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1106 * byte and ziplistNext will return NULL. */
1107 while ((sptr = ziplistNext(zl,eptr)) != NULL((void*)0)) {
1108 score = zzlGetScore(sptr);
1109 if (zslValueLteMax(score,range)) {
1110 /* Delete both the element and the score. */
1111 zl = ziplistDelete(zl,&eptr);
1112 zl = ziplistDelete(zl,&eptr);
1113 num++;
1114 } else {
1115 /* No longer in range. */
1116 break;
1117 }
1118 }
1119
1120 if (deleted != NULL((void*)0)) *deleted = num;
1121 return zl;
1122}
1123
1124unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
1125 unsigned char *eptr, *sptr;
1126 unsigned long num = 0;
1127
1128 if (deleted != NULL((void*)0)) *deleted = 0;
1129
1130 eptr = zzlFirstInLexRange(zl,range);
1131 if (eptr == NULL((void*)0)) return zl;
1132
1133 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
1134 * byte and ziplistNext will return NULL. */
1135 while ((sptr = ziplistNext(zl,eptr)) != NULL((void*)0)) {
1136 if (zzlLexValueLteMax(eptr,range)) {
1137 /* Delete both the element and the score. */
1138 zl = ziplistDelete(zl,&eptr);
1139 zl = ziplistDelete(zl,&eptr);
1140 num++;
1141 } else {
1142 /* No longer in range. */
1143 break;
1144 }
1145 }
1146
1147 if (deleted != NULL((void*)0)) *deleted = num;
1148 return zl;
1149}
1150
1151/* Delete all the elements with rank between start and end from the skiplist.
1152 * Start and end are inclusive. Note that start and end need to be 1-based */
1153unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
1154 unsigned int num = (end-start)+1;
1155 if (deleted) *deleted = num;
1156 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
1157 return zl;
1158}
1159
1160/*-----------------------------------------------------------------------------
1161 * Common sorted set API
1162 *----------------------------------------------------------------------------*/
1163
1164unsigned long zsetLength(const robj *zobj) {
1165 unsigned long length = 0;
1166 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1167 length = zzlLength(zobj->ptr);
1168 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1169 length = ((const zset*)zobj->ptr)->zsl->length;
1170 } else {
1171 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1171,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1172 }
1173 return length;
1174}
1175
1176void zsetConvert(robj *zobj, int encoding) {
1177 zset *zs;
1178 zskiplistNode *node, *next;
1179 sds ele;
1180 double score;
1181
1182 if (zobj->encoding == encoding) return;
1183 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1184 unsigned char *zl = zobj->ptr;
1185 unsigned char *eptr, *sptr;
1186 unsigned char *vstr;
1187 unsigned int vlen;
1188 long long vlong;
1189
1190 if (encoding != OBJ_ENCODING_SKIPLIST7)
1191 serverPanic("Unknown target encoding")_serverPanic("t_zset.c",1191,"Unknown target encoding"),__builtin_unreachable
()
;
1192
1193 zs = zmalloc(sizeof(*zs));
1194 zs->dict = dictCreate(&zsetDictType,NULL((void*)0));
1195 zs->zsl = zslCreate();
1196
1197 eptr = ziplistIndex(zl,0);
1198 serverAssertWithInfo(NULL,zobj,eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(((void
*)0),zobj,"eptr != NULL","t_zset.c",1198),__builtin_unreachable
()))
;
1199 sptr = ziplistNext(zl,eptr);
1200 serverAssertWithInfo(NULL,zobj,sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(((void
*)0),zobj,"sptr != NULL","t_zset.c",1200),__builtin_unreachable
()))
;
1201
1202 while (eptr != NULL((void*)0)) {
1203 score = zzlGetScore(sptr);
1204 serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(((void*)0),zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",1204),__builtin_unreachable()))
;
1205 if (vstr == NULL((void*)0))
1206 ele = sdsfromlonglong(vlong);
1207 else
1208 ele = sdsnewlen((char*)vstr,vlen);
1209
1210 node = zslInsert(zs->zsl,score,ele);
1211 serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK)((dictAdd(zs->dict,ele,&node->score) == 0)?(void)0 :
(_serverAssert("dictAdd(zs->dict,ele,&node->score) == DICT_OK"
,"t_zset.c",1211),__builtin_unreachable()))
;
1212 zzlNext(zl,&eptr,&sptr);
1213 }
1214
1215 zfree(zobj->ptr);
1216 zobj->ptr = zs;
1217 zobj->encoding = OBJ_ENCODING_SKIPLIST7;
1218 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1219 unsigned char *zl = ziplistNew();
1220
1221 if (encoding != OBJ_ENCODING_ZIPLIST5)
1222 serverPanic("Unknown target encoding")_serverPanic("t_zset.c",1222,"Unknown target encoding"),__builtin_unreachable
()
;
1223
1224 /* Approach similar to zslFree(), since we want to free the skiplist at
1225 * the same time as creating the ziplist. */
1226 zs = zobj->ptr;
1227 dictRelease(zs->dict);
1228 node = zs->zsl->header->level[0].forward;
1229 zfree(zs->zsl->header);
1230 zfree(zs->zsl);
1231
1232 while (node) {
1233 zl = zzlInsertAt(zl,NULL((void*)0),node->ele,node->score);
1234 next = node->level[0].forward;
1235 zslFreeNode(node);
1236 node = next;
1237 }
1238
1239 zfree(zs);
1240 zobj->ptr = zl;
1241 zobj->encoding = OBJ_ENCODING_ZIPLIST5;
1242 } else {
1243 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1243,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1244 }
1245}
1246
1247/* Convert the sorted set object into a ziplist if it is not already a ziplist
1248 * and if the number of elements and the maximum element size is within the
1249 * expected ranges. */
1250void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen) {
1251 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) return;
1252 zset *zset = zobj->ptr;
1253
1254 if (zset->zsl->length <= server.zset_max_ziplist_entries &&
1255 maxelelen <= server.zset_max_ziplist_value)
1256 zsetConvert(zobj,OBJ_ENCODING_ZIPLIST5);
1257}
1258
1259/* Return (by reference) the score of the specified member of the sorted set
1260 * storing it into *score. If the element does not exist C_ERR is returned
1261 * otherwise C_OK is returned and *score is correctly populated.
1262 * If 'zobj' or 'member' is NULL, C_ERR is returned. */
1263int zsetScore(robj *zobj, sds member, double *score) {
1264 if (!zobj || !member) return C_ERR-1;
1265
1266 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1267 if (zzlFind(zobj->ptr, member, score) == NULL((void*)0)) return C_ERR-1;
1268 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1269 zset *zs = zobj->ptr;
1270 dictEntry *de = dictFind(zs->dict, member);
1271 if (de == NULL((void*)0)) return C_ERR-1;
1272 *score = *(double*)dictGetVal(de)((de)->v.val);
1273 } else {
1274 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1274,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1275 }
1276 return C_OK0;
1277}
1278
1279/* Add a new element or update the score of an existing element in a sorted
1280 * set, regardless of its encoding.
1281 *
1282 * The set of flags change the command behavior. They are passed with an integer
1283 * pointer since the function will clear the flags and populate them with
1284 * other flags to indicate different conditions.
1285 *
1286 * The input flags are the following:
1287 *
1288 * ZADD_INCR: Increment the current element score by 'score' instead of updating
1289 * the current element score. If the element does not exist, we
1290 * assume 0 as previous score.
1291 * ZADD_NX: Perform the operation only if the element does not exist.
1292 * ZADD_XX: Perform the operation only if the element already exist.
1293 * ZADD_GT: Perform the operation on existing elements only if the new score is
1294 * greater than the current score.
1295 * ZADD_LT: Perform the operation on existing elements only if the new score is
1296 * less than the current score.
1297 *
1298 * When ZADD_INCR is used, the new score of the element is stored in
1299 * '*newscore' if 'newscore' is not NULL.
1300 *
1301 * The returned flags are the following:
1302 *
1303 * ZADD_NAN: The resulting score is not a number.
1304 * ZADD_ADDED: The element was added (not present before the call).
1305 * ZADD_UPDATED: The element score was updated.
1306 * ZADD_NOP: No operation was performed because of NX or XX.
1307 *
1308 * Return value:
1309 *
1310 * The function returns 1 on success, and sets the appropriate flags
1311 * ADDED or UPDATED to signal what happened during the operation (note that
1312 * none could be set if we re-added an element using the same score it used
1313 * to have, or in the case a zero increment is used).
1314 *
1315 * The function returns 0 on error, currently only when the increment
1316 * produces a NAN condition, or when the 'score' value is NAN since the
1317 * start.
1318 *
1319 * The command as a side effect of adding a new element may convert the sorted
1320 * set internal encoding from ziplist to hashtable+skiplist.
1321 *
1322 * Memory management of 'ele':
1323 *
1324 * The function does not take ownership of the 'ele' SDS string, but copies
1325 * it if needed. */
1326int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
1327 /* Turn options into simple to check vars. */
1328 int incr = (*flags & ZADD_INCR(1<<0)) != 0;
1329 int nx = (*flags & ZADD_NX(1<<1)) != 0;
1330 int xx = (*flags & ZADD_XX(1<<2)) != 0;
1331 int gt = (*flags & ZADD_GT(1<<7)) != 0;
1332 int lt = (*flags & ZADD_LT(1<<8)) != 0;
1333 *flags = 0; /* We'll return our response flags. */
1334 double curscore;
1335
1336 /* NaN as input is an error regardless of all the other parameters. */
1337 if (isnan(score)__builtin_isnan (score)) {
2
Assuming the condition is false
3
Taking false branch
1338 *flags = ZADD_NAN(1<<4);
1339 return 0;
1340 }
1341
1342 /* Update the sorted set according to its encoding. */
1343 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
4
Assuming field 'encoding' is not equal to OBJ_ENCODING_ZIPLIST
5
Taking false branch
1344 unsigned char *eptr;
1345
1346 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL((void*)0)) {
1347 /* NX? Return, same element already exists. */
1348 if (nx) {
1349 *flags |= ZADD_NOP(1<<3);
1350 return 1;
1351 }
1352
1353 /* Prepare the score for the increment if needed. */
1354 if (incr) {
1355 score += curscore;
1356 if (isnan(score)__builtin_isnan (score)) {
1357 *flags |= ZADD_NAN(1<<4);
1358 return 0;
1359 }
1360 if (newscore) *newscore = score;
1361 }
1362
1363 /* Remove and re-insert when score changed. */
1364 if (score != curscore &&
1365 /* LT? Only update if score is less than current. */
1366 (!lt || score < curscore) &&
1367 /* GT? Only update if score is greater than current. */
1368 (!gt || score > curscore))
1369 {
1370 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1371 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1372 *flags |= ZADD_UPDATED(1<<6);
1373 }
1374 return 1;
1375 } else if (!xx) {
1376 /* Optimize: check if the element is too large or the list
1377 * becomes too long *before* executing zzlInsert. */
1378 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
1379 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
1380 sdslen(ele) > server.zset_max_ziplist_value)
1381 zsetConvert(zobj,OBJ_ENCODING_SKIPLIST7);
1382 if (newscore) *newscore = score;
1383 *flags |= ZADD_ADDED(1<<5);
1384 return 1;
1385 } else {
1386 *flags |= ZADD_NOP(1<<3);
1387 return 1;
1388 }
1389 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
6
Assuming field 'encoding' is equal to OBJ_ENCODING_SKIPLIST
7
Taking true branch
1390 zset *zs = zobj->ptr;
1391 zskiplistNode *znode;
1392 dictEntry *de;
1393
1394 de = dictFind(zs->dict,ele);
1395 if (de != NULL((void*)0)) {
8
Assuming 'de' is equal to NULL
9
Taking false branch
1396 /* NX? Return, same element already exists. */
1397 if (nx) {
1398 *flags |= ZADD_NOP(1<<3);
1399 return 1;
1400 }
1401 curscore = *(double*)dictGetVal(de)((de)->v.val);
1402
1403 /* Prepare the score for the increment if needed. */
1404 if (incr) {
1405 score += curscore;
1406 if (isnan(score)__builtin_isnan (score)) {
1407 *flags |= ZADD_NAN(1<<4);
1408 return 0;
1409 }
1410 if (newscore) *newscore = score;
1411 }
1412
1413 /* Remove and re-insert when score changes. */
1414 if (score != curscore &&
1415 /* LT? Only update if score is less than current. */
1416 (!lt || score < curscore) &&
1417 /* GT? Only update if score is greater than current. */
1418 (!gt || score > curscore))
1419 {
1420 znode = zslUpdateScore(zs->zsl,curscore,ele,score);
1421 /* Note that we did not removed the original element from
1422 * the hash table representing the sorted set, so we just
1423 * update the score. */
1424 dictGetVal(de)((de)->v.val) = &znode->score; /* Update score ptr. */
1425 *flags |= ZADD_UPDATED(1<<6);
1426 }
1427 return 1;
1428 } else if (!xx
9.1
'xx' is 0
) {
10
Taking true branch
1429 ele = sdsdup(ele);
1430 znode = zslInsert(zs->zsl,score,ele);
11
Calling 'zslInsert'
1431 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK)((dictAdd(zs->dict,ele,&znode->score) == 0)?(void)0
: (_serverAssert("dictAdd(zs->dict,ele,&znode->score) == DICT_OK"
,"t_zset.c",1431),__builtin_unreachable()))
;
1432 *flags |= ZADD_ADDED(1<<5);
1433 if (newscore) *newscore = score;
1434 return 1;
1435 } else {
1436 *flags |= ZADD_NOP(1<<3);
1437 return 1;
1438 }
1439 } else {
1440 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1440,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1441 }
1442 return 0; /* Never reached. */
1443}
1444
1445/* Deletes the element 'ele' from the sorted set encoded as a skiplist+dict,
1446 * returning 1 if the element existed and was deleted, 0 otherwise (the
1447 * element was not there). It does not resize the dict after deleting the
1448 * element. */
1449static int zsetRemoveFromSkiplist(zset *zs, sds ele) {
1450 dictEntry *de;
1451 double score;
1452
1453 de = dictUnlink(zs->dict,ele);
1454 if (de != NULL((void*)0)) {
1455 /* Get the score in order to delete from the skiplist later. */
1456 score = *(double*)dictGetVal(de)((de)->v.val);
1457
1458 /* Delete from the hash table and later from the skiplist.
1459 * Note that the order is important: deleting from the skiplist
1460 * actually releases the SDS string representing the element,
1461 * which is shared between the skiplist and the hash table, so
1462 * we need to delete from the skiplist as the final step. */
1463 dictFreeUnlinkedEntry(zs->dict,de);
1464
1465 /* Delete from skiplist. */
1466 int retval = zslDelete(zs->zsl,score,ele,NULL((void*)0));
1467 serverAssert(retval)((retval)?(void)0 : (_serverAssert("retval","t_zset.c",1467),
__builtin_unreachable()))
;
1468
1469 return 1;
1470 }
1471
1472 return 0;
1473}
1474
1475/* Delete the element 'ele' from the sorted set, returning 1 if the element
1476 * existed and was deleted, 0 otherwise (the element was not there). */
1477int zsetDel(robj *zobj, sds ele) {
1478 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1479 unsigned char *eptr;
1480
1481 if ((eptr = zzlFind(zobj->ptr,ele,NULL((void*)0))) != NULL((void*)0)) {
1482 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1483 return 1;
1484 }
1485 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1486 zset *zs = zobj->ptr;
1487 if (zsetRemoveFromSkiplist(zs, ele)) {
1488 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1489 return 1;
1490 }
1491 } else {
1492 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1492,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1493 }
1494 return 0; /* No such element found. */
1495}
1496
1497/* Given a sorted set object returns the 0-based rank of the object or
1498 * -1 if the object does not exist.
1499 *
1500 * For rank we mean the position of the element in the sorted collection
1501 * of elements. So the first element has rank 0, the second rank 1, and so
1502 * forth up to length-1 elements.
1503 *
1504 * If 'reverse' is false, the rank is returned considering as first element
1505 * the one with the lowest score. Otherwise if 'reverse' is non-zero
1506 * the rank is computed considering as element with rank 0 the one with
1507 * the highest score. */
1508long zsetRank(robj *zobj, sds ele, int reverse) {
1509 unsigned long llen;
1510 unsigned long rank;
1511
1512 llen = zsetLength(zobj);
1513
1514 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1515 unsigned char *zl = zobj->ptr;
1516 unsigned char *eptr, *sptr;
1517
1518 eptr = ziplistIndex(zl,0);
1519 serverAssert(eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssert("eptr != NULL"
,"t_zset.c",1519),__builtin_unreachable()))
;
1520 sptr = ziplistNext(zl,eptr);
1521 serverAssert(sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssert("sptr != NULL"
,"t_zset.c",1521),__builtin_unreachable()))
;
1522
1523 rank = 1;
1524 while(eptr != NULL((void*)0)) {
1525 if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
1526 break;
1527 rank++;
1528 zzlNext(zl,&eptr,&sptr);
1529 }
1530
1531 if (eptr != NULL((void*)0)) {
1532 if (reverse)
1533 return llen-rank;
1534 else
1535 return rank-1;
1536 } else {
1537 return -1;
1538 }
1539 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1540 zset *zs = zobj->ptr;
1541 zskiplist *zsl = zs->zsl;
1542 dictEntry *de;
1543 double score;
1544
1545 de = dictFind(zs->dict,ele);
1546 if (de != NULL((void*)0)) {
1547 score = *(double*)dictGetVal(de)((de)->v.val);
1548 rank = zslGetRank(zsl,score,ele);
1549 /* Existing elements always have a rank. */
1550 serverAssert(rank != 0)((rank != 0)?(void)0 : (_serverAssert("rank != 0","t_zset.c",
1550),__builtin_unreachable()))
;
1551 if (reverse)
1552 return llen-rank;
1553 else
1554 return rank-1;
1555 } else {
1556 return -1;
1557 }
1558 } else {
1559 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1559,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1560 }
1561}
1562
1563/* This is a helper function for the COPY command.
1564 * Duplicate a sorted set object, with the guarantee that the returned object
1565 * has the same encoding as the original one.
1566 *
1567 * The resulting object always has refcount set to 1 */
1568robj *zsetDup(robj *o) {
1569 robj *zobj;
1570 zset *zs;
1571 zset *new_zs;
1572
1573 serverAssert(o->type == OBJ_ZSET)((o->type == 3)?(void)0 : (_serverAssert("o->type == OBJ_ZSET"
,"t_zset.c",1573),__builtin_unreachable()))
;
1574
1575 /* Create a new sorted set object that have the same encoding as the original object's encoding */
1576 if (o->encoding == OBJ_ENCODING_ZIPLIST5) {
1577 unsigned char *zl = o->ptr;
1578 size_t sz = ziplistBlobLen(zl);
1579 unsigned char *new_zl = zmalloc(sz);
1580 memcpy(new_zl, zl, sz);
1581 zobj = createObject(OBJ_ZSET3, new_zl);
1582 zobj->encoding = OBJ_ENCODING_ZIPLIST5;
1583 } else if (o->encoding == OBJ_ENCODING_SKIPLIST7) {
1584 zobj = createZsetObject();
1585 zs = o->ptr;
1586 new_zs = zobj->ptr;
1587 dictExpand(new_zs->dict,dictSize(zs->dict)((zs->dict)->ht[0].used+(zs->dict)->ht[1].used));
1588 zskiplist *zsl = zs->zsl;
1589 zskiplistNode *ln;
1590 sds ele;
1591 long llen = zsetLength(o);
1592
1593 /* We copy the skiplist elements from the greatest to the
1594 * smallest (that's trivial since the elements are already ordered in
1595 * the skiplist): this improves the load process, since the next loaded
1596 * element will always be the smaller, so adding to the skiplist
1597 * will always immediately stop at the head, making the insertion
1598 * O(1) instead of O(log(N)). */
1599 ln = zsl->tail;
1600 while (llen--) {
1601 ele = ln->ele;
1602 sds new_ele = sdsdup(ele);
1603 zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele);
1604 dictAdd(new_zs->dict,new_ele,&znode->score);
1605 ln = ln->backward;
1606 }
1607 } else {
1608 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1608,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1609 }
1610 return zobj;
1611}
1612
1613/* callback for to check the ziplist doesn't have duplicate recoreds */
1614static int _zsetZiplistValidateIntegrity(unsigned char *p, void *userdata) {
1615 struct {
1616 long count;
1617 dict *fields;
1618 } *data = userdata;
1619
1620 /* Even records are field names, add to dict and check that's not a dup */
1621 if (((data->count) & 1) == 0) {
1622 unsigned char *str;
1623 unsigned int slen;
1624 long long vll;
1625 if (!ziplistGet(p, &str, &slen, &vll))
1626 return 0;
1627 sds field = str? sdsnewlen(str, slen): sdsfromlonglong(vll);;
1628 if (dictAdd(data->fields, field, NULL((void*)0)) != DICT_OK0) {
1629 /* Duplicate, return an error */
1630 sdsfree(field);
1631 return 0;
1632 }
1633 }
1634
1635 (data->count)++;
1636 return 1;
1637}
1638
1639/* Validate the integrity of the data stracture.
1640 * when `deep` is 0, only the integrity of the header is validated.
1641 * when `deep` is 1, we scan all the entries one by one. */
1642int zsetZiplistValidateIntegrity(unsigned char *zl, size_t size, int deep) {
1643 if (!deep)
1644 return ziplistValidateIntegrity(zl, size, 0, NULL((void*)0), NULL((void*)0));
1645
1646 /* Keep track of the field names to locate duplicate ones */
1647 struct {
1648 long count;
1649 dict *fields;
1650 } data = {0, dictCreate(&hashDictType, NULL((void*)0))};
1651
1652 int ret = ziplistValidateIntegrity(zl, size, 1, _zsetZiplistValidateIntegrity, &data);
1653
1654 /* make sure we have an even number of records. */
1655 if (data.count & 1)
1656 ret = 0;
1657
1658 dictRelease(data.fields);
1659 return ret;
1660}
1661
1662/* Create a new sds string from the ziplist entry. */
1663sds zsetSdsFromZiplistEntry(ziplistEntry *e) {
1664 return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval);
1665}
1666
1667/* Reply with bulk string from the ziplist entry. */
1668void zsetReplyFromZiplistEntry(client *c, ziplistEntry *e) {
1669 if (e->sval)
1670 addReplyBulkCBuffer(c, e->sval, e->slen);
1671 else
1672 addReplyBulkLongLong(c, e->lval);
1673}
1674
1675
1676/* Return random element from a non empty zset.
1677 * 'key' and 'val' will be set to hold the element.
1678 * The memory in `key` is not to be freed or modified by the caller.
1679 * 'score' can be NULL in which case it's not extracted. */
1680void zsetTypeRandomElement(robj *zsetobj, unsigned long zsetsize, ziplistEntry *key, double *score) {
1681 if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1682 zset *zs = zsetobj->ptr;
1683 dictEntry *de = dictGetFairRandomKey(zs->dict);
1684 sds s = dictGetKey(de)((de)->key);
1685 key->sval = (unsigned char*)s;
1686 key->slen = sdslen(s);
1687 if (score)
1688 *score = *(double*)dictGetVal(de)((de)->v.val);
1689 } else if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1690 ziplistEntry val;
1691 ziplistRandomPair(zsetobj->ptr, zsetsize, key, &val);
1692 if (score) {
1693 if (val.sval) {
1694 *score = zzlStrtod(val.sval,val.slen);
1695 } else {
1696 *score = (double)val.lval;
1697 }
1698 }
1699 } else {
1700 serverPanic("Unknown zset encoding")_serverPanic("t_zset.c",1700,"Unknown zset encoding"),__builtin_unreachable
()
;
1701 }
1702}
1703
1704/*-----------------------------------------------------------------------------
1705 * Sorted set commands
1706 *----------------------------------------------------------------------------*/
1707
1708/* This generic command implements both ZADD and ZINCRBY. */
1709void zaddGenericCommand(client *c, int flags) {
1710 static char *nanerr = "resulting score is not a number (NaN)";
1711 robj *key = c->argv[1];
1712 robj *zobj;
1713 sds ele;
1714 double score = 0, *scores = NULL((void*)0);
1715 int j, elements;
1716 int scoreidx = 0;
1717 /* The following vars are used in order to track what the command actually
1718 * did during the execution, to reply to the client and to trigger the
1719 * notification of keyspace change. */
1720 int added = 0; /* Number of new elements added. */
1721 int updated = 0; /* Number of elements with updated score. */
1722 int processed = 0; /* Number of elements processed, may remain zero with
1723 options like XX. */
1724
1725 /* Parse options. At the end 'scoreidx' is set to the argument position
1726 * of the score of the first score-element pair. */
1727 scoreidx = 2;
1728 while(scoreidx < c->argc) {
1729 char *opt = c->argv[scoreidx]->ptr;
1730 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX(1<<1);
1731 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX(1<<2);
1732 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH(1<<16);
1733 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR(1<<0);
1734 else if (!strcasecmp(opt,"gt")) flags |= ZADD_GT(1<<7);
1735 else if (!strcasecmp(opt,"lt")) flags |= ZADD_LT(1<<8);
1736 else break;
1737 scoreidx++;
1738 }
1739
1740 /* Turn options into simple to check vars. */
1741 int incr = (flags & ZADD_INCR(1<<0)) != 0;
1742 int nx = (flags & ZADD_NX(1<<1)) != 0;
1743 int xx = (flags & ZADD_XX(1<<2)) != 0;
1744 int ch = (flags & ZADD_CH(1<<16)) != 0;
1745 int gt = (flags & ZADD_GT(1<<7)) != 0;
1746 int lt = (flags & ZADD_LT(1<<8)) != 0;
1747
1748 /* After the options, we expect to have an even number of args, since
1749 * we expect any number of score-element pairs. */
1750 elements = c->argc-scoreidx;
1751 if (elements % 2 || !elements) {
1752 addReplyErrorObject(c,shared.syntaxerr);
1753 return;
1754 }
1755 elements /= 2; /* Now this holds the number of score-element pairs. */
1756
1757 /* Check for incompatible options. */
1758 if (nx && xx) {
1759 addReplyError(c,
1760 "XX and NX options at the same time are not compatible");
1761 return;
1762 }
1763
1764 if ((gt && nx) || (lt && nx) || (gt && lt)) {
1765 addReplyError(c,
1766 "GT, LT, and/or NX options at the same time are not compatible");
1767 return;
1768 }
1769 /* Note that XX is compatible with either GT or LT */
1770
1771 if (incr && elements > 1) {
1772 addReplyError(c,
1773 "INCR option supports a single increment-element pair");
1774 return;
1775 }
1776
1777 /* Start parsing all the scores, we need to emit any syntax error
1778 * before executing additions to the sorted set, as the command should
1779 * either execute fully or nothing at all. */
1780 scores = zmalloc(sizeof(double)*elements);
1781 for (j = 0; j < elements; j++) {
1782 if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL((void*)0))
1783 != C_OK0) goto cleanup;
1784 }
1785
1786 /* Lookup the key and create the sorted set if does not exist. */
1787 zobj = lookupKeyWrite(c->db,key);
1788 if (checkType(c,zobj,OBJ_ZSET3)) goto cleanup;
1789 if (zobj == NULL((void*)0)) {
1790 if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
1791 if (server.zset_max_ziplist_entries == 0 ||
1792 server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
1793 {
1794 zobj = createZsetObject();
1795 } else {
1796 zobj = createZsetZiplistObject();
1797 }
1798 dbAdd(c->db,key,zobj);
1799 }
1800
1801 for (j = 0; j < elements; j++) {
1802 double newscore;
1803 score = scores[j];
1804 int retflags = flags;
1805
1806 ele = c->argv[scoreidx+1+j*2]->ptr;
1807 int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
1808 if (retval == 0) {
1809 addReplyError(c,nanerr);
1810 goto cleanup;
1811 }
1812 if (retflags & ZADD_ADDED(1<<5)) added++;
1813 if (retflags & ZADD_UPDATED(1<<6)) updated++;
1814 if (!(retflags & ZADD_NOP(1<<3))) processed++;
1815 score = newscore;
1816 }
1817 server.dirty += (added+updated);
1818
1819reply_to_client:
1820 if (incr) { /* ZINCRBY or INCR option. */
1821 if (processed)
1822 addReplyDouble(c,score);
1823 else
1824 addReplyNull(c);
1825 } else { /* ZADD. */
1826 addReplyLongLong(c,ch ? added+updated : added);
1827 }
1828
1829cleanup:
1830 zfree(scores);
1831 if (added || updated) {
1832 signalModifiedKey(c,c->db,key);
1833 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),
1834 incr ? "zincr" : "zadd", key, c->db->id);
1835 }
1836}
1837
1838void zaddCommand(client *c) {
1839 zaddGenericCommand(c,ZADD_NONE0);
1840}
1841
1842void zincrbyCommand(client *c) {
1843 zaddGenericCommand(c,ZADD_INCR(1<<0));
1844}
1845
1846void zremCommand(client *c) {
1847 robj *key = c->argv[1];
1848 robj *zobj;
1849 int deleted = 0, keyremoved = 0, j;
1850
1851 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL((void*)0) ||
1852 checkType(c,zobj,OBJ_ZSET3)) return;
1853
1854 for (j = 2; j < c->argc; j++) {
1855 if (zsetDel(zobj,c->argv[j]->ptr)) deleted++;
1856 if (zsetLength(zobj) == 0) {
1857 dbDelete(c->db,key);
1858 keyremoved = 1;
1859 break;
1860 }
1861 }
1862
1863 if (deleted) {
1864 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),"zrem",key,c->db->id);
1865 if (keyremoved)
1866 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2),"del",key,c->db->id);
1867 signalModifiedKey(c,c->db,key);
1868 server.dirty += deleted;
1869 }
1870 addReplyLongLong(c,deleted);
1871}
1872
1873typedef enum {
1874 ZRANGE_AUTO = 0,
1875 ZRANGE_RANK,
1876 ZRANGE_SCORE,
1877 ZRANGE_LEX,
1878} zrange_type;
1879
1880/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
1881void zremrangeGenericCommand(client *c, zrange_type rangetype) {
1882 robj *key = c->argv[1];
1883 robj *zobj;
1884 int keyremoved = 0;
1885 unsigned long deleted = 0;
1886 zrangespec range;
1887 zlexrangespec lexrange;
1888 long start, end, llen;
1889 char *notify_type = NULL((void*)0);
1890
1891 /* Step 1: Parse the range. */
1892 if (rangetype == ZRANGE_RANK) {
1893 notify_type = "zremrangebyrank";
1894 if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL((void*)0)) != C_OK0) ||
1895 (getLongFromObjectOrReply(c,c->argv[3],&end,NULL((void*)0)) != C_OK0))
1896 return;
1897 } else if (rangetype == ZRANGE_SCORE) {
1898 notify_type = "zremrangebyscore";
1899 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK0) {
1900 addReplyError(c,"min or max is not a float");
1901 return;
1902 }
1903 } else if (rangetype == ZRANGE_LEX) {
1904 notify_type = "zremrangebylex";
1905 if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK0) {
1906 addReplyError(c,"min or max not valid string range item");
1907 return;
1908 }
1909 } else {
1910 serverPanic("unknown rangetype %d", (int)rangetype)_serverPanic("t_zset.c",1910,"unknown rangetype %d", (int)rangetype
),__builtin_unreachable()
;
1911 }
1912
1913 /* Step 2: Lookup & range sanity checks if needed. */
1914 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL((void*)0) ||
1915 checkType(c,zobj,OBJ_ZSET3)) goto cleanup;
1916
1917 if (rangetype == ZRANGE_RANK) {
1918 /* Sanitize indexes. */
1919 llen = zsetLength(zobj);
1920 if (start < 0) start = llen+start;
1921 if (end < 0) end = llen+end;
1922 if (start < 0) start = 0;
1923
1924 /* Invariant: start >= 0, so this test will be true when end < 0.
1925 * The range is empty when start > end or start >= length. */
1926 if (start > end || start >= llen) {
1927 addReply(c,shared.czero);
1928 goto cleanup;
1929 }
1930 if (end >= llen) end = llen-1;
1931 }
1932
1933 /* Step 3: Perform the range deletion operation. */
1934 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
1935 switch(rangetype) {
1936 case ZRANGE_AUTO:
1937 case ZRANGE_RANK:
1938 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1939 break;
1940 case ZRANGE_SCORE:
1941 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
1942 break;
1943 case ZRANGE_LEX:
1944 zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
1945 break;
1946 }
1947 if (zzlLength(zobj->ptr) == 0) {
1948 dbDelete(c->db,key);
1949 keyremoved = 1;
1950 }
1951 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
1952 zset *zs = zobj->ptr;
1953 switch(rangetype) {
1954 case ZRANGE_AUTO:
1955 case ZRANGE_RANK:
1956 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1957 break;
1958 case ZRANGE_SCORE:
1959 deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
1960 break;
1961 case ZRANGE_LEX:
1962 deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
1963 break;
1964 }
1965 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1966 if (dictSize(zs->dict)((zs->dict)->ht[0].used+(zs->dict)->ht[1].used) == 0) {
1967 dbDelete(c->db,key);
1968 keyremoved = 1;
1969 }
1970 } else {
1971 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",1971,"Unknown sorted set encoding"),__builtin_unreachable
()
;
1972 }
1973
1974 /* Step 4: Notifications and reply. */
1975 if (deleted) {
1976 signalModifiedKey(c,c->db,key);
1977 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),notify_type,key,c->db->id);
1978 if (keyremoved)
1979 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2),"del",key,c->db->id);
1980 }
1981 server.dirty += deleted;
1982 addReplyLongLong(c,deleted);
1983
1984cleanup:
1985 if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
1986}
1987
1988void zremrangebyrankCommand(client *c) {
1989 zremrangeGenericCommand(c,ZRANGE_RANK);
1990}
1991
1992void zremrangebyscoreCommand(client *c) {
1993 zremrangeGenericCommand(c,ZRANGE_SCORE);
1994}
1995
1996void zremrangebylexCommand(client *c) {
1997 zremrangeGenericCommand(c,ZRANGE_LEX);
1998}
1999
2000typedef struct {
2001 robj *subject;
2002 int type; /* Set, sorted set */
2003 int encoding;
2004 double weight;
2005
2006 union {
2007 /* Set iterators. */
2008 union _iterset {
2009 struct {
2010 intset *is;
2011 int ii;
2012 } is;
2013 struct {
2014 dict *dict;
2015 dictIterator *di;
2016 dictEntry *de;
2017 } ht;
2018 } set;
2019
2020 /* Sorted set iterators. */
2021 union _iterzset {
2022 struct {
2023 unsigned char *zl;
2024 unsigned char *eptr, *sptr;
2025 } zl;
2026 struct {
2027 zset *zs;
2028 zskiplistNode *node;
2029 } sl;
2030 } zset;
2031 } iter;
2032} zsetopsrc;
2033
2034
2035/* Use dirty flags for pointers that need to be cleaned up in the next
2036 * iteration over the zsetopval. The dirty flag for the long long value is
2037 * special, since long long values don't need cleanup. Instead, it means that
2038 * we already checked that "ell" holds a long long, or tried to convert another
2039 * representation into a long long value. When this was successful,
2040 * OPVAL_VALID_LL is set as well. */
2041#define OPVAL_DIRTY_SDS1 1
2042#define OPVAL_DIRTY_LL2 2
2043#define OPVAL_VALID_LL4 4
2044
2045/* Store value retrieved from the iterator. */
2046typedef struct {
2047 int flags;
2048 unsigned char _buf[32]; /* Private buffer. */
2049 sds ele;
2050 unsigned char *estr;
2051 unsigned int elen;
2052 long long ell;
2053 double score;
2054} zsetopval;
2055
2056typedef union _iterset iterset;
2057typedef union _iterzset iterzset;
2058
2059void zuiInitIterator(zsetopsrc *op) {
2060 if (op->subject == NULL((void*)0))
2061 return;
2062
2063 if (op->type == OBJ_SET2) {
2064 iterset *it = &op->iter.set;
2065 if (op->encoding == OBJ_ENCODING_INTSET6) {
2066 it->is.is = op->subject->ptr;
2067 it->is.ii = 0;
2068 } else if (op->encoding == OBJ_ENCODING_HT2) {
2069 it->ht.dict = op->subject->ptr;
2070 it->ht.di = dictGetIterator(op->subject->ptr);
2071 it->ht.de = dictNext(it->ht.di);
2072 } else {
2073 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2073,"Unknown set encoding"),__builtin_unreachable
()
;
2074 }
2075 } else if (op->type == OBJ_ZSET3) {
2076 /* Sorted sets are traversed in reverse order to optimize for
2077 * the insertion of the elements in a new list as in
2078 * ZDIFF/ZINTER/ZUNION */
2079 iterzset *it = &op->iter.zset;
2080 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2081 it->zl.zl = op->subject->ptr;
2082 it->zl.eptr = ziplistIndex(it->zl.zl,-2);
2083 if (it->zl.eptr != NULL((void*)0)) {
2084 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
2085 serverAssert(it->zl.sptr != NULL)((it->zl.sptr != ((void*)0))?(void)0 : (_serverAssert("it->zl.sptr != NULL"
,"t_zset.c",2085),__builtin_unreachable()))
;
2086 }
2087 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2088 it->sl.zs = op->subject->ptr;
2089 it->sl.node = it->sl.zs->zsl->tail;
2090 } else {
2091 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2091,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2092 }
2093 } else {
2094 serverPanic("Unsupported type")_serverPanic("t_zset.c",2094,"Unsupported type"),__builtin_unreachable
()
;
2095 }
2096}
2097
2098void zuiClearIterator(zsetopsrc *op) {
2099 if (op->subject == NULL((void*)0))
2100 return;
2101
2102 if (op->type == OBJ_SET2) {
2103 iterset *it = &op->iter.set;
2104 if (op->encoding == OBJ_ENCODING_INTSET6) {
2105 UNUSED(it)((void) it); /* skip */
2106 } else if (op->encoding == OBJ_ENCODING_HT2) {
2107 dictReleaseIterator(it->ht.di);
2108 } else {
2109 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2109,"Unknown set encoding"),__builtin_unreachable
()
;
2110 }
2111 } else if (op->type == OBJ_ZSET3) {
2112 iterzset *it = &op->iter.zset;
2113 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2114 UNUSED(it)((void) it); /* skip */
2115 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2116 UNUSED(it)((void) it); /* skip */
2117 } else {
2118 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2118,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2119 }
2120 } else {
2121 serverPanic("Unsupported type")_serverPanic("t_zset.c",2121,"Unsupported type"),__builtin_unreachable
()
;
2122 }
2123}
2124
2125unsigned long zuiLength(zsetopsrc *op) {
2126 if (op->subject == NULL((void*)0))
2127 return 0;
2128
2129 if (op->type == OBJ_SET2) {
2130 if (op->encoding == OBJ_ENCODING_INTSET6) {
2131 return intsetLen(op->subject->ptr);
2132 } else if (op->encoding == OBJ_ENCODING_HT2) {
2133 dict *ht = op->subject->ptr;
2134 return dictSize(ht)((ht)->ht[0].used+(ht)->ht[1].used);
2135 } else {
2136 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2136,"Unknown set encoding"),__builtin_unreachable
()
;
2137 }
2138 } else if (op->type == OBJ_ZSET3) {
2139 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2140 return zzlLength(op->subject->ptr);
2141 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2142 zset *zs = op->subject->ptr;
2143 return zs->zsl->length;
2144 } else {
2145 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2145,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2146 }
2147 } else {
2148 serverPanic("Unsupported type")_serverPanic("t_zset.c",2148,"Unsupported type"),__builtin_unreachable
()
;
2149 }
2150}
2151
2152/* Check if the current value is valid. If so, store it in the passed structure
2153 * and move to the next element. If not valid, this means we have reached the
2154 * end of the structure and can abort. */
2155int zuiNext(zsetopsrc *op, zsetopval *val) {
2156 if (op->subject == NULL((void*)0))
2157 return 0;
2158
2159 if (val->flags & OPVAL_DIRTY_SDS1)
2160 sdsfree(val->ele);
2161
2162 memset(val,0,sizeof(zsetopval));
2163
2164 if (op->type == OBJ_SET2) {
2165 iterset *it = &op->iter.set;
2166 if (op->encoding == OBJ_ENCODING_INTSET6) {
2167 int64_t ell;
2168
2169 if (!intsetGet(it->is.is,it->is.ii,&ell))
2170 return 0;
2171 val->ell = ell;
2172 val->score = 1.0;
2173
2174 /* Move to next element. */
2175 it->is.ii++;
2176 } else if (op->encoding == OBJ_ENCODING_HT2) {
2177 if (it->ht.de == NULL((void*)0))
2178 return 0;
2179 val->ele = dictGetKey(it->ht.de)((it->ht.de)->key);
2180 val->score = 1.0;
2181
2182 /* Move to next element. */
2183 it->ht.de = dictNext(it->ht.di);
2184 } else {
2185 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2185,"Unknown set encoding"),__builtin_unreachable
()
;
2186 }
2187 } else if (op->type == OBJ_ZSET3) {
2188 iterzset *it = &op->iter.zset;
2189 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2190 /* No need to check both, but better be explicit. */
2191 if (it->zl.eptr == NULL((void*)0) || it->zl.sptr == NULL((void*)0))
2192 return 0;
2193 serverAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell))((ziplistGet(it->zl.eptr,&val->estr,&val->elen
,&val->ell))?(void)0 : (_serverAssert("ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell)"
,"t_zset.c",2193),__builtin_unreachable()))
;
2194 val->score = zzlGetScore(it->zl.sptr);
2195
2196 /* Move to next element (going backwards, see zuiInitIterator). */
2197 zzlPrev(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
2198 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2199 if (it->sl.node == NULL((void*)0))
2200 return 0;
2201 val->ele = it->sl.node->ele;
2202 val->score = it->sl.node->score;
2203
2204 /* Move to next element. (going backwards, see zuiInitIterator) */
2205 it->sl.node = it->sl.node->backward;
2206 } else {
2207 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2207,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2208 }
2209 } else {
2210 serverPanic("Unsupported type")_serverPanic("t_zset.c",2210,"Unsupported type"),__builtin_unreachable
()
;
2211 }
2212 return 1;
2213}
2214
2215int zuiLongLongFromValue(zsetopval *val) {
2216 if (!(val->flags & OPVAL_DIRTY_LL2)) {
2217 val->flags |= OPVAL_DIRTY_LL2;
2218
2219 if (val->ele != NULL((void*)0)) {
2220 if (string2ll(val->ele,sdslen(val->ele),&val->ell))
2221 val->flags |= OPVAL_VALID_LL4;
2222 } else if (val->estr != NULL((void*)0)) {
2223 if (string2ll((char*)val->estr,val->elen,&val->ell))
2224 val->flags |= OPVAL_VALID_LL4;
2225 } else {
2226 /* The long long was already set, flag as valid. */
2227 val->flags |= OPVAL_VALID_LL4;
2228 }
2229 }
2230 return val->flags & OPVAL_VALID_LL4;
2231}
2232
2233sds zuiSdsFromValue(zsetopval *val) {
2234 if (val->ele == NULL((void*)0)) {
2235 if (val->estr != NULL((void*)0)) {
2236 val->ele = sdsnewlen((char*)val->estr,val->elen);
2237 } else {
2238 val->ele = sdsfromlonglong(val->ell);
2239 }
2240 val->flags |= OPVAL_DIRTY_SDS1;
2241 }
2242 return val->ele;
2243}
2244
2245/* This is different from zuiSdsFromValue since returns a new SDS string
2246 * which is up to the caller to free. */
2247sds zuiNewSdsFromValue(zsetopval *val) {
2248 if (val->flags & OPVAL_DIRTY_SDS1) {
2249 /* We have already one to return! */
2250 sds ele = val->ele;
2251 val->flags &= ~OPVAL_DIRTY_SDS1;
2252 val->ele = NULL((void*)0);
2253 return ele;
2254 } else if (val->ele) {
2255 return sdsdup(val->ele);
2256 } else if (val->estr) {
2257 return sdsnewlen((char*)val->estr,val->elen);
2258 } else {
2259 return sdsfromlonglong(val->ell);
2260 }
2261}
2262
2263int zuiBufferFromValue(zsetopval *val) {
2264 if (val->estr == NULL((void*)0)) {
2265 if (val->ele != NULL((void*)0)) {
2266 val->elen = sdslen(val->ele);
2267 val->estr = (unsigned char*)val->ele;
2268 } else {
2269 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
2270 val->estr = val->_buf;
2271 }
2272 }
2273 return 1;
2274}
2275
2276/* Find value pointed to by val in the source pointer to by op. When found,
2277 * return 1 and store its score in target. Return 0 otherwise. */
2278int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
2279 if (op->subject == NULL((void*)0))
2280 return 0;
2281
2282 if (op->type == OBJ_SET2) {
2283 if (op->encoding == OBJ_ENCODING_INTSET6) {
2284 if (zuiLongLongFromValue(val) &&
2285 intsetFind(op->subject->ptr,val->ell))
2286 {
2287 *score = 1.0;
2288 return 1;
2289 } else {
2290 return 0;
2291 }
2292 } else if (op->encoding == OBJ_ENCODING_HT2) {
2293 dict *ht = op->subject->ptr;
2294 zuiSdsFromValue(val);
2295 if (dictFind(ht,val->ele) != NULL((void*)0)) {
2296 *score = 1.0;
2297 return 1;
2298 } else {
2299 return 0;
2300 }
2301 } else {
2302 serverPanic("Unknown set encoding")_serverPanic("t_zset.c",2302,"Unknown set encoding"),__builtin_unreachable
()
;
2303 }
2304 } else if (op->type == OBJ_ZSET3) {
2305 zuiSdsFromValue(val);
2306
2307 if (op->encoding == OBJ_ENCODING_ZIPLIST5) {
2308 if (zzlFind(op->subject->ptr,val->ele,score) != NULL((void*)0)) {
2309 /* Score is already set by zzlFind. */
2310 return 1;
2311 } else {
2312 return 0;
2313 }
2314 } else if (op->encoding == OBJ_ENCODING_SKIPLIST7) {
2315 zset *zs = op->subject->ptr;
2316 dictEntry *de;
2317 if ((de = dictFind(zs->dict,val->ele)) != NULL((void*)0)) {
2318 *score = *(double*)dictGetVal(de)((de)->v.val);
2319 return 1;
2320 } else {
2321 return 0;
2322 }
2323 } else {
2324 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",2324,"Unknown sorted set encoding"),__builtin_unreachable
()
;
2325 }
2326 } else {
2327 serverPanic("Unsupported type")_serverPanic("t_zset.c",2327,"Unsupported type"),__builtin_unreachable
()
;
2328 }
2329}
2330
2331int zuiCompareByCardinality(const void *s1, const void *s2) {
2332 unsigned long first = zuiLength((zsetopsrc*)s1);
2333 unsigned long second = zuiLength((zsetopsrc*)s2);
2334 if (first > second) return 1;
2335 if (first < second) return -1;
2336 return 0;
2337}
2338
2339static int zuiCompareByRevCardinality(const void *s1, const void *s2) {
2340 return zuiCompareByCardinality(s1, s2) * -1;
2341}
2342
2343#define REDIS_AGGR_SUM1 1
2344#define REDIS_AGGR_MIN2 2
2345#define REDIS_AGGR_MAX3 3
2346#define zunionInterDictValue(_e)(((_e)->v.val) == ((void*)0) ? 1.0 : *(double*)((_e)->v
.val))
(dictGetVal(_e)((_e)->v.val) == NULL((void*)0) ? 1.0 : *(double*)dictGetVal(_e)((_e)->v.val))
2347
2348inline static void zunionInterAggregate(double *target, double val, int aggregate) {
2349 if (aggregate == REDIS_AGGR_SUM1) {
2350 *target = *target + val;
2351 /* The result of adding two doubles is NaN when one variable
2352 * is +inf and the other is -inf. When these numbers are added,
2353 * we maintain the convention of the result being 0.0. */
2354 if (isnan(*target)__builtin_isnan (*target)) *target = 0.0;
2355 } else if (aggregate == REDIS_AGGR_MIN2) {
2356 *target = val < *target ? val : *target;
2357 } else if (aggregate == REDIS_AGGR_MAX3) {
2358 *target = val > *target ? val : *target;
2359 } else {
2360 /* safety net */
2361 serverPanic("Unknown ZUNION/INTER aggregate type")_serverPanic("t_zset.c",2361,"Unknown ZUNION/INTER aggregate type"
),__builtin_unreachable()
;
2362 }
2363}
2364
2365static int zsetDictGetMaxElementLength(dict *d) {
2366 dictIterator *di;
2367 dictEntry *de;
2368 size_t maxelelen = 0;
2369
2370 di = dictGetIterator(d);
2371
2372 while((de = dictNext(di)) != NULL((void*)0)) {
2373 sds ele = dictGetKey(de)((de)->key);
2374 if (sdslen(ele) > maxelelen) maxelelen = sdslen(ele);
2375 }
2376
2377 dictReleaseIterator(di);
2378
2379 return maxelelen;
2380}
2381
2382static void zdiffAlgorithm1(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
2383 /* DIFF Algorithm 1:
2384 *
2385 * We perform the diff by iterating all the elements of the first set,
2386 * and only adding it to the target set if the element does not exist
2387 * into all the other sets.
2388 *
2389 * This way we perform at max N*M operations, where N is the size of
2390 * the first set, and M the number of sets.
2391 *
2392 * There is also a O(K*log(K)) cost for adding the resulting elements
2393 * to the target set, where K is the final size of the target set.
2394 *
2395 * The final complexity of this algorithm is O(N*M + K*log(K)). */
2396 int j;
2397 zsetopval zval;
2398 zskiplistNode *znode;
2399 sds tmp;
2400
2401 /* With algorithm 1 it is better to order the sets to subtract
2402 * by decreasing size, so that we are more likely to find
2403 * duplicated elements ASAP. */
2404 qsort(src+1,setnum-1,sizeof(zsetopsrc),zuiCompareByRevCardinality);
2405
2406 memset(&zval, 0, sizeof(zval));
2407 zuiInitIterator(&src[0]);
2408 while (zuiNext(&src[0],&zval)) {
2409 double value;
2410 int exists = 0;
2411
2412 for (j = 1; j < setnum; j++) {
2413 /* It is not safe to access the zset we are
2414 * iterating, so explicitly check for equal object.
2415 * This check isn't really needed anymore since we already
2416 * check for a duplicate set in the zsetChooseDiffAlgorithm
2417 * function, but we're leaving it for future-proofing. */
2418 if (src[j].subject == src[0].subject ||
2419 zuiFind(&src[j],&zval,&value)) {
2420 exists = 1;
2421 break;
2422 }
2423 }
2424
2425 if (!exists) {
2426 tmp = zuiNewSdsFromValue(&zval);
2427 znode = zslInsert(dstzset->zsl,zval.score,tmp);
2428 dictAdd(dstzset->dict,tmp,&znode->score);
2429 if (sdslen(tmp) > *maxelelen) *maxelelen = sdslen(tmp);
2430 }
2431 }
2432 zuiClearIterator(&src[0]);
2433}
2434
2435
2436static void zdiffAlgorithm2(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
2437 /* DIFF Algorithm 2:
2438 *
2439 * Add all the elements of the first set to the auxiliary set.
2440 * Then remove all the elements of all the next sets from it.
2441 *
2442
2443 * This is O(L + (N-K)log(N)) where L is the sum of all the elements in every
2444 * set, N is the size of the first set, and K is the size of the result set.
2445 *
2446 * Note that from the (L-N) dict searches, (N-K) got to the zsetRemoveFromSkiplist
2447 * which costs log(N)
2448 *
2449 * There is also a O(K) cost at the end for finding the largest element
2450 * size, but this doesn't change the algorithm complexity since K < L, and
2451 * O(2L) is the same as O(L). */
2452 int j;
2453 int cardinality = 0;
2454 zsetopval zval;
2455 zskiplistNode *znode;
2456 sds tmp;
2457
2458 for (j = 0; j < setnum; j++) {
2459 if (zuiLength(&src[j]) == 0) continue;
2460
2461 memset(&zval, 0, sizeof(zval));
2462 zuiInitIterator(&src[j]);
2463 while (zuiNext(&src[j],&zval)) {
2464 if (j == 0) {
2465 tmp = zuiNewSdsFromValue(&zval);
2466 znode = zslInsert(dstzset->zsl,zval.score,tmp);
2467 dictAdd(dstzset->dict,tmp,&znode->score);
2468 cardinality++;
2469 } else {
2470 tmp = zuiSdsFromValue(&zval);
2471 if (zsetRemoveFromSkiplist(dstzset, tmp)) {
2472 cardinality--;
2473 }
2474 }
2475
2476 /* Exit if result set is empty as any additional removal
2477 * of elements will have no effect. */
2478 if (cardinality == 0) break;
2479 }
2480 zuiClearIterator(&src[j]);
2481
2482 if (cardinality == 0) break;
2483 }
2484
2485 /* Redize dict if needed after removing multiple elements */
2486 if (htNeedsResize(dstzset->dict)) dictResize(dstzset->dict);
2487
2488 /* Using this algorithm, we can't calculate the max element as we go,
2489 * we have to iterate through all elements to find the max one after. */
2490 *maxelelen = zsetDictGetMaxElementLength(dstzset->dict);
2491}
2492
2493static int zsetChooseDiffAlgorithm(zsetopsrc *src, long setnum) {
2494 int j;
2495
2496 /* Select what DIFF algorithm to use.
2497 *
2498 * Algorithm 1 is O(N*M + K*log(K)) where N is the size of the
2499 * first set, M the total number of sets, and K is the size of the
2500 * result set.
2501 *
2502 * Algorithm 2 is O(L + (N-K)log(N)) where L is the total number of elements
2503 * in all the sets, N is the size of the first set, and K is the size of the
2504 * result set.
2505 *
2506 * We compute what is the best bet with the current input here. */
2507 long long algo_one_work = 0;
2508 long long algo_two_work = 0;
2509
2510 for (j = 0; j < setnum; j++) {
2511 /* If any other set is equal to the first set, there is nothing to be
2512 * done, since we would remove all elements anyway. */
2513 if (j > 0 && src[0].subject == src[j].subject) {
2514 return 0;
2515 }
2516
2517 algo_one_work += zuiLength(&src[0]);
2518 algo_two_work += zuiLength(&src[j]);
2519 }
2520
2521 /* Algorithm 1 has better constant times and performs less operations
2522 * if there are elements in common. Give it some advantage. */
2523 algo_one_work /= 2;
2524 return (algo_one_work <= algo_two_work) ? 1 : 2;
2525}
2526
2527static void zdiff(zsetopsrc *src, long setnum, zset *dstzset, size_t *maxelelen) {
2528 /* Skip everything if the smallest input is empty. */
2529 if (zuiLength(&src[0]) > 0) {
2530 int diff_algo = zsetChooseDiffAlgorithm(src, setnum);
2531 if (diff_algo == 1) {
2532 zdiffAlgorithm1(src, setnum, dstzset, maxelelen);
2533 } else if (diff_algo == 2) {
2534 zdiffAlgorithm2(src, setnum, dstzset, maxelelen);
2535 } else if (diff_algo != 0) {
2536 serverPanic("Unknown algorithm")_serverPanic("t_zset.c",2536,"Unknown algorithm"),__builtin_unreachable
()
;
2537 }
2538 }
2539}
2540
2541uint64_t dictSdsHash(const void *key);
2542int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
2543
2544dictType setAccumulatorDictType = {
2545 dictSdsHash, /* hash function */
2546 NULL((void*)0), /* key dup */
2547 NULL((void*)0), /* val dup */
2548 dictSdsKeyCompare, /* key compare */
2549 NULL((void*)0), /* key destructor */
2550 NULL((void*)0), /* val destructor */
2551 NULL((void*)0) /* allow to expand */
2552};
2553
2554/* The zunionInterDiffGenericCommand() function is called in order to implement the
2555 * following commands: ZUNION, ZINTER, ZDIFF, ZUNIONSTORE, ZINTERSTORE, ZDIFFSTORE.
2556 *
2557 * 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER/ZDIFF command,
2558 * this value is 1, for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE command, this value is 2.
2559 *
2560 * 'op' SET_OP_INTER, SET_OP_UNION or SET_OP_DIFF.
2561 */
2562void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op) {
2563 int i, j;
2564 long setnum;
2565 int aggregate = REDIS_AGGR_SUM1;
2566 zsetopsrc *src;
2567 zsetopval zval;
2568 sds tmp;
2569 size_t maxelelen = 0;
2570 robj *dstobj;
2571 zset *dstzset;
2572 zskiplistNode *znode;
2573 int withscores = 0;
2574
2575 /* expect setnum input keys to be given */
2576 if ((getLongFromObjectOrReply(c, c->argv[numkeysIndex], &setnum, NULL((void*)0)) != C_OK0))
2577 return;
2578
2579 if (setnum < 1) {
2580 addReplyError(c,
2581 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE/ZDIFFSTORE");
2582 return;
2583 }
2584
2585 /* test if the expected number of keys would overflow */
2586 if (setnum > (c->argc-(numkeysIndex+1))) {
2587 addReplyErrorObject(c,shared.syntaxerr);
2588 return;
2589 }
2590
2591 /* read keys to be used for input */
2592 src = zcalloc(sizeof(zsetopsrc) * setnum);
2593 for (i = 0, j = numkeysIndex+1; i < setnum; i++, j++) {
2594 robj *obj = dstkey ?
2595 lookupKeyWrite(c->db,c->argv[j]) :
2596 lookupKeyRead(c->db,c->argv[j]);
2597 if (obj != NULL((void*)0)) {
2598 if (obj->type != OBJ_ZSET3 && obj->type != OBJ_SET2) {
2599 zfree(src);
2600 addReplyErrorObject(c,shared.wrongtypeerr);
2601 return;
2602 }
2603
2604 src[i].subject = obj;
2605 src[i].type = obj->type;
2606 src[i].encoding = obj->encoding;
2607 } else {
2608 src[i].subject = NULL((void*)0);
2609 }
2610
2611 /* Default all weights to 1. */
2612 src[i].weight = 1.0;
2613 }
2614
2615 /* parse optional extra arguments */
2616 if (j < c->argc) {
2617 int remaining = c->argc - j;
2618
2619 while (remaining) {
2620 if (op != SET_OP_DIFF1 &&
2621 remaining >= (setnum + 1) &&
2622 !strcasecmp(c->argv[j]->ptr,"weights"))
2623 {
2624 j++; remaining--;
2625 for (i = 0; i < setnum; i++, j++, remaining--) {
2626 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
2627 "weight value is not a float") != C_OK0)
2628 {
2629 zfree(src);
2630 return;
2631 }
2632 }
2633 } else if (op != SET_OP_DIFF1 &&
2634 remaining >= 2 &&
2635 !strcasecmp(c->argv[j]->ptr,"aggregate"))
2636 {
2637 j++; remaining--;
2638 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
2639 aggregate = REDIS_AGGR_SUM1;
2640 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
2641 aggregate = REDIS_AGGR_MIN2;
2642 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
2643 aggregate = REDIS_AGGR_MAX3;
2644 } else {
2645 zfree(src);
2646 addReplyErrorObject(c,shared.syntaxerr);
2647 return;
2648 }
2649 j++; remaining--;
2650 } else if (remaining >= 1 &&
2651 !dstkey &&
2652 !strcasecmp(c->argv[j]->ptr,"withscores"))
2653 {
2654 j++; remaining--;
2655 withscores = 1;
2656 } else {
2657 zfree(src);
2658 addReplyErrorObject(c,shared.syntaxerr);
2659 return;
2660 }
2661 }
2662 }
2663
2664 if (op != SET_OP_DIFF1) {
2665 /* sort sets from the smallest to largest, this will improve our
2666 * algorithm's performance */
2667 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
2668 }
2669
2670 dstobj = createZsetObject();
2671 dstzset = dstobj->ptr;
2672 memset(&zval, 0, sizeof(zval));
2673
2674 if (op == SET_OP_INTER2) {
2675 /* Skip everything if the smallest input is empty. */
2676 if (zuiLength(&src[0]) > 0) {
2677 /* Precondition: as src[0] is non-empty and the inputs are ordered
2678 * by size, all src[i > 0] are non-empty too. */
2679 zuiInitIterator(&src[0]);
2680 while (zuiNext(&src[0],&zval)) {
2681 double score, value;
2682
2683 score = src[0].weight * zval.score;
2684 if (isnan(score)__builtin_isnan (score)) score = 0;
2685
2686 for (j = 1; j < setnum; j++) {
2687 /* It is not safe to access the zset we are
2688 * iterating, so explicitly check for equal object. */
2689 if (src[j].subject == src[0].subject) {
2690 value = zval.score*src[j].weight;
2691 zunionInterAggregate(&score,value,aggregate);
2692 } else if (zuiFind(&src[j],&zval,&value)) {
2693 value *= src[j].weight;
2694 zunionInterAggregate(&score,value,aggregate);
2695 } else {
2696 break;
2697 }
2698 }
2699
2700 /* Only continue when present in every input. */
2701 if (j == setnum) {
2702 tmp = zuiNewSdsFromValue(&zval);
2703 znode = zslInsert(dstzset->zsl,score,tmp);
2704 dictAdd(dstzset->dict,tmp,&znode->score);
2705 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2706 }
2707 }
2708 zuiClearIterator(&src[0]);
2709 }
2710 } else if (op == SET_OP_UNION0) {
2711 dict *accumulator = dictCreate(&setAccumulatorDictType,NULL((void*)0));
2712 dictIterator *di;
2713 dictEntry *de, *existing;
2714 double score;
2715
2716 if (setnum) {
2717 /* Our union is at least as large as the largest set.
2718 * Resize the dictionary ASAP to avoid useless rehashing. */
2719 dictExpand(accumulator,zuiLength(&src[setnum-1]));
2720 }
2721
2722 /* Step 1: Create a dictionary of elements -> aggregated-scores
2723 * by iterating one sorted set after the other. */
2724 for (i = 0; i < setnum; i++) {
2725 if (zuiLength(&src[i]) == 0) continue;
2726
2727 zuiInitIterator(&src[i]);
2728 while (zuiNext(&src[i],&zval)) {
2729 /* Initialize value */
2730 score = src[i].weight * zval.score;
2731 if (isnan(score)__builtin_isnan (score)) score = 0;
2732
2733 /* Search for this element in the accumulating dictionary. */
2734 de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing);
2735 /* If we don't have it, we need to create a new entry. */
2736 if (!existing) {
2737 tmp = zuiNewSdsFromValue(&zval);
2738 /* Remember the longest single element encountered,
2739 * to understand if it's possible to convert to ziplist
2740 * at the end. */
2741 if (sdslen(tmp) > maxelelen) maxelelen = sdslen(tmp);
2742 /* Update the element with its initial score. */
2743 dictSetKey(accumulator, de, tmp)do { if ((accumulator)->type->keyDup) (de)->key = (accumulator
)->type->keyDup((accumulator)->privdata, tmp); else (
de)->key = (tmp); } while(0)
;
2744 dictSetDoubleVal(de,score)do { (de)->v.d = score; } while(0);
2745 } else {
2746 /* Update the score with the score of the new instance
2747 * of the element found in the current sorted set.
2748 *
2749 * Here we access directly the dictEntry double
2750 * value inside the union as it is a big speedup
2751 * compared to using the getDouble/setDouble API. */
2752 zunionInterAggregate(&existing->v.d,score,aggregate);
2753 }
2754 }
2755 zuiClearIterator(&src[i]);
2756 }
2757
2758 /* Step 2: convert the dictionary into the final sorted set. */
2759 di = dictGetIterator(accumulator);
2760
2761 /* We now are aware of the final size of the resulting sorted set,
2762 * let's resize the dictionary embedded inside the sorted set to the
2763 * right size, in order to save rehashing time. */
2764 dictExpand(dstzset->dict,dictSize(accumulator)((accumulator)->ht[0].used+(accumulator)->ht[1].used));
2765
2766 while((de = dictNext(di)) != NULL((void*)0)) {
2767 sds ele = dictGetKey(de)((de)->key);
2768 score = dictGetDoubleVal(de)((de)->v.d);
2769 znode = zslInsert(dstzset->zsl,score,ele);
2770 dictAdd(dstzset->dict,ele,&znode->score);
2771 }
2772 dictReleaseIterator(di);
2773 dictRelease(accumulator);
2774 } else if (op == SET_OP_DIFF1) {
2775 zdiff(src, setnum, dstzset, &maxelelen);
2776 } else {
2777 serverPanic("Unknown operator")_serverPanic("t_zset.c",2777,"Unknown operator"),__builtin_unreachable
()
;
2778 }
2779
2780 if (dstkey) {
2781 if (dstzset->zsl->length) {
2782 zsetConvertToZiplistIfNeeded(dstobj, maxelelen);
2783 setKey(c, c->db, dstkey, dstobj);
2784 addReplyLongLong(c, zsetLength(dstobj));
2785 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),
2786 (op == SET_OP_UNION0) ? "zunionstore" :
2787 (op == SET_OP_INTER2 ? "zinterstore" : "zdiffstore"),
2788 dstkey, c->db->id);
2789 server.dirty++;
2790 } else {
2791 addReply(c, shared.czero);
2792 if (dbDelete(c->db, dstkey)) {
2793 signalModifiedKey(c, c->db, dstkey);
2794 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2), "del", dstkey, c->db->id);
2795 server.dirty++;
2796 }
2797 }
2798 } else {
2799 unsigned long length = dstzset->zsl->length;
2800 zskiplist *zsl = dstzset->zsl;
2801 zskiplistNode *zn = zsl->header->level[0].forward;
2802 /* In case of WITHSCORES, respond with a single array in RESP2, and
2803 * nested arrays in RESP3. We can't use a map response type since the
2804 * client library needs to know to respect the order. */
2805 if (withscores && c->resp == 2)
2806 addReplyArrayLen(c, length*2);
2807 else
2808 addReplyArrayLen(c, length);
2809
2810 while (zn != NULL((void*)0)) {
2811 if (withscores && c->resp > 2) addReplyArrayLen(c,2);
2812 addReplyBulkCBuffer(c,zn->ele,sdslen(zn->ele));
2813 if (withscores) addReplyDouble(c,zn->score);
2814 zn = zn->level[0].forward;
2815 }
2816 }
2817 decrRefCount(dstobj);
2818 zfree(src);
2819}
2820
2821void zunionstoreCommand(client *c) {
2822 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_UNION0);
2823}
2824
2825void zinterstoreCommand(client *c) {
2826 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_INTER2);
2827}
2828
2829void zdiffstoreCommand(client *c) {
2830 zunionInterDiffGenericCommand(c, c->argv[1], 2, SET_OP_DIFF1);
2831}
2832
2833void zunionCommand(client *c) {
2834 zunionInterDiffGenericCommand(c, NULL((void*)0), 1, SET_OP_UNION0);
2835}
2836
2837void zinterCommand(client *c) {
2838 zunionInterDiffGenericCommand(c, NULL((void*)0), 1, SET_OP_INTER2);
2839}
2840
2841void zdiffCommand(client *c) {
2842 zunionInterDiffGenericCommand(c, NULL((void*)0), 1, SET_OP_DIFF1);
2843}
2844
2845typedef enum {
2846 ZRANGE_DIRECTION_AUTO = 0,
2847 ZRANGE_DIRECTION_FORWARD,
2848 ZRANGE_DIRECTION_REVERSE
2849} zrange_direction;
2850
2851typedef enum {
2852 ZRANGE_CONSUMER_TYPE_CLIENT = 0,
2853 ZRANGE_CONSUMER_TYPE_INTERNAL
2854} zrange_consumer_type;
2855
2856typedef struct zrange_result_handler zrange_result_handler;
2857
2858typedef void (*zrangeResultBeginFunction)(zrange_result_handler *c);
2859typedef void (*zrangeResultFinalizeFunction)(
2860 zrange_result_handler *c, size_t result_count);
2861typedef void (*zrangeResultEmitCBufferFunction)(
2862 zrange_result_handler *c, const void *p, size_t len, double score);
2863typedef void (*zrangeResultEmitLongLongFunction)(
2864 zrange_result_handler *c, long long ll, double score);
2865
2866void zrangeGenericCommand (zrange_result_handler *handler, int argc_start, int store,
2867 zrange_type rangetype, zrange_direction direction);
2868
2869/* Interface struct for ZRANGE/ZRANGESTORE generic implementation.
2870 * There is one implementation of this interface that sends a RESP reply to clients.
2871 * and one implementation that stores the range result into a zset object. */
2872struct zrange_result_handler {
2873 zrange_consumer_type type;
2874 client *client;
2875 robj *dstkey;
2876 robj *dstobj;
2877 void *userdata;
2878 int withscores;
2879 int should_emit_array_length;
2880 zrangeResultBeginFunction beginResultEmission;
2881 zrangeResultFinalizeFunction finalizeResultEmission;
2882 zrangeResultEmitCBufferFunction emitResultFromCBuffer;
2883 zrangeResultEmitLongLongFunction emitResultFromLongLong;
2884};
2885
2886/* Result handler methods for responding the ZRANGE to clients. */
2887static void zrangeResultBeginClient(zrange_result_handler *handler) {
2888 handler->userdata = addReplyDeferredLen(handler->client);
2889}
2890
2891static void zrangeResultEmitCBufferToClient(zrange_result_handler *handler,
2892 const void *value, size_t value_length_in_bytes, double score)
2893{
2894 if (handler->should_emit_array_length) {
2895 addReplyArrayLen(handler->client, 2);
2896 }
2897
2898 addReplyBulkCBuffer(handler->client, value, value_length_in_bytes);
2899
2900 if (handler->withscores) {
2901 addReplyDouble(handler->client, score);
2902 }
2903}
2904
2905static void zrangeResultEmitLongLongToClient(zrange_result_handler *handler,
2906 long long value, double score)
2907{
2908 if (handler->should_emit_array_length) {
2909 addReplyArrayLen(handler->client, 2);
2910 }
2911
2912 addReplyBulkLongLong(handler->client, value);
2913
2914 if (handler->withscores) {
2915 addReplyDouble(handler->client, score);
2916 }
2917}
2918
2919static void zrangeResultFinalizeClient(zrange_result_handler *handler,
2920 size_t result_count)
2921{
2922 /* In case of WITHSCORES, respond with a single array in RESP2, and
2923 * nested arrays in RESP3. We can't use a map response type since the
2924 * client library needs to know to respect the order. */
2925 if (handler->withscores && (handler->client->resp == 2)) {
2926 result_count *= 2;
2927 }
2928
2929 setDeferredArrayLen(handler->client, handler->userdata, result_count);
2930}
2931
2932/* Result handler methods for storing the ZRANGESTORE to a zset. */
2933static void zrangeResultBeginStore(zrange_result_handler *handler)
2934{
2935 handler->dstobj = createZsetZiplistObject();
2936}
2937
2938static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler,
2939 const void *value, size_t value_length_in_bytes, double score)
2940{
2941 double newscore;
2942 int retflags = 0;
2943 sds ele = sdsnewlen(value, value_length_in_bytes);
2944 int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore);
2945 sdsfree(ele);
2946 serverAssert(retval)((retval)?(void)0 : (_serverAssert("retval","t_zset.c",2946),
__builtin_unreachable()))
;
2947}
2948
2949static void zrangeResultEmitLongLongForStore(zrange_result_handler *handler,
2950 long long value, double score)
2951{
2952 double newscore;
2953 int retflags = 0;
2954 sds ele = sdsfromlonglong(value);
2955 int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore);
1
Calling 'zsetAdd'
2956 sdsfree(ele);
2957 serverAssert(retval)((retval)?(void)0 : (_serverAssert("retval","t_zset.c",2957),
__builtin_unreachable()))
;
2958}
2959
2960static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t result_count)
2961{
2962 if (result_count) {
2963 setKey(handler->client, handler->client->db, handler->dstkey, handler->dstobj);
2964 addReplyLongLong(handler->client, result_count);
2965 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7), "zrangestore", handler->dstkey, handler->client->db->id);
2966 server.dirty++;
2967 } else {
2968 addReply(handler->client, shared.czero);
2969 if (dbDelete(handler->client->db, handler->dstkey)) {
2970 signalModifiedKey(handler->client, handler->client->db, handler->dstkey);
2971 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2), "del", handler->dstkey, handler->client->db->id);
2972 server.dirty++;
2973 }
2974 }
2975 decrRefCount(handler->dstobj);
2976}
2977
2978/* Initialize the consumer interface type with the requested type. */
2979static void zrangeResultHandlerInit(zrange_result_handler *handler,
2980 client *client, zrange_consumer_type type)
2981{
2982 memset(handler, 0, sizeof(*handler));
2983
2984 handler->client = client;
2985
2986 switch (type) {
2987 case ZRANGE_CONSUMER_TYPE_CLIENT:
2988 handler->beginResultEmission = zrangeResultBeginClient;
2989 handler->finalizeResultEmission = zrangeResultFinalizeClient;
2990 handler->emitResultFromCBuffer = zrangeResultEmitCBufferToClient;
2991 handler->emitResultFromLongLong = zrangeResultEmitLongLongToClient;
2992 break;
2993
2994 case ZRANGE_CONSUMER_TYPE_INTERNAL:
2995 handler->beginResultEmission = zrangeResultBeginStore;
2996 handler->finalizeResultEmission = zrangeResultFinalizeStore;
2997 handler->emitResultFromCBuffer = zrangeResultEmitCBufferForStore;
2998 handler->emitResultFromLongLong = zrangeResultEmitLongLongForStore;
2999 break;
3000 }
3001}
3002
3003static void zrangeResultHandlerScoreEmissionEnable(zrange_result_handler *handler) {
3004 handler->withscores = 1;
3005 handler->should_emit_array_length = (handler->client->resp > 2);
3006}
3007
3008static void zrangeResultHandlerDestinationKeySet (zrange_result_handler *handler,
3009 robj *dstkey)
3010{
3011 handler->dstkey = dstkey;
3012}
3013
3014/* This command implements ZRANGE, ZREVRANGE. */
3015void genericZrangebyrankCommand(zrange_result_handler *handler,
3016 robj *zobj, long start, long end, int withscores, int reverse) {
3017
3018 client *c = handler->client;
3019 long llen;
3020 long rangelen;
3021 size_t result_cardinality;
3022
3023 /* Sanitize indexes. */
3024 llen = zsetLength(zobj);
3025 if (start < 0) start = llen+start;
3026 if (end < 0) end = llen+end;
3027 if (start < 0) start = 0;
3028
3029 handler->beginResultEmission(handler);
3030
3031 /* Invariant: start >= 0, so this test will be true when end < 0.
3032 * The range is empty when start > end or start >= length. */
3033 if (start > end || start >= llen) {
3034 handler->finalizeResultEmission(handler, 0);
3035 return;
3036 }
3037 if (end >= llen) end = llen-1;
3038 rangelen = (end-start)+1;
3039 result_cardinality = rangelen;
3040
3041 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3042 unsigned char *zl = zobj->ptr;
3043 unsigned char *eptr, *sptr;
3044 unsigned char *vstr;
3045 unsigned int vlen;
3046 long long vlong;
3047 double score = 0.0;
3048
3049 if (reverse)
3050 eptr = ziplistIndex(zl,-2-(2*start));
3051 else
3052 eptr = ziplistIndex(zl,2*start);
3053
3054 serverAssertWithInfo(c,zobj,eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj
,"eptr != NULL","t_zset.c",3054),__builtin_unreachable()))
;
3055 sptr = ziplistNext(zl,eptr);
3056
3057 while (rangelen--) {
3058 serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL)((eptr != ((void*)0) && sptr != ((void*)0))?(void)0 :
(_serverAssertWithInfo(c,zobj,"eptr != NULL && sptr != NULL"
,"t_zset.c",3058),__builtin_unreachable()))
;
3059 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3059),__builtin_unreachable()))
;
3060
3061 if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
3062 score = zzlGetScore(sptr);
3063
3064 if (vstr == NULL((void*)0)) {
3065 handler->emitResultFromLongLong(handler, vlong, score);
3066 } else {
3067 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3068 }
3069
3070 if (reverse)
3071 zzlPrev(zl,&eptr,&sptr);
3072 else
3073 zzlNext(zl,&eptr,&sptr);
3074 }
3075
3076 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3077 zset *zs = zobj->ptr;
3078 zskiplist *zsl = zs->zsl;
3079 zskiplistNode *ln;
3080
3081 /* Check if starting point is trivial, before doing log(N) lookup. */
3082 if (reverse) {
3083 ln = zsl->tail;
3084 if (start > 0)
3085 ln = zslGetElementByRank(zsl,llen-start);
3086 } else {
3087 ln = zsl->header->level[0].forward;
3088 if (start > 0)
3089 ln = zslGetElementByRank(zsl,start+1);
3090 }
3091
3092 while(rangelen--) {
3093 serverAssertWithInfo(c,zobj,ln != NULL)((ln != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj,"ln != NULL"
,"t_zset.c",3093),__builtin_unreachable()))
;
3094 sds ele = ln->ele;
3095 handler->emitResultFromCBuffer(handler, ele, sdslen(ele), ln->score);
3096 ln = reverse ? ln->backward : ln->level[0].forward;
3097 }
3098 } else {
3099 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3099,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3100 }
3101
3102 handler->finalizeResultEmission(handler, result_cardinality);
3103}
3104
3105/* ZRANGESTORE <dst> <src> <min> <max> [BYSCORE | BYLEX] [REV] [LIMIT offset count] */
3106void zrangestoreCommand (client *c) {
3107 robj *dstkey = c->argv[1];
3108 zrange_result_handler handler;
3109 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_INTERNAL);
3110 zrangeResultHandlerDestinationKeySet(&handler, dstkey);
3111 zrangeGenericCommand(&handler, 2, 1, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
3112}
3113
3114/* ZRANGE <key> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] */
3115void zrangeCommand(client *c) {
3116 zrange_result_handler handler;
3117 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3118 zrangeGenericCommand(&handler, 1, 0, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
3119}
3120
3121/* ZREVRANGE <key> <min> <max> [WITHSCORES] */
3122void zrevrangeCommand(client *c) {
3123 zrange_result_handler handler;
3124 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3125 zrangeGenericCommand(&handler, 1, 0, ZRANGE_RANK, ZRANGE_DIRECTION_REVERSE);
3126}
3127
3128/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
3129void genericZrangebyscoreCommand(zrange_result_handler *handler,
3130 zrangespec *range, robj *zobj, long offset, long limit,
3131 int reverse) {
3132
3133 client *c = handler->client;
3134 unsigned long rangelen = 0;
3135
3136 handler->beginResultEmission(handler);
3137
3138 /* For invalid offset, return directly. */
3139 if (offset > 0 && offset >= (long)zsetLength(zobj)) {
3140 handler->finalizeResultEmission(handler, 0);
3141 return;
3142 }
3143
3144 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3145 unsigned char *zl = zobj->ptr;
3146 unsigned char *eptr, *sptr;
3147 unsigned char *vstr;
3148 unsigned int vlen;
3149 long long vlong;
3150
3151 /* If reversed, get the last node in range as starting point. */
3152 if (reverse) {
3153 eptr = zzlLastInRange(zl,range);
3154 } else {
3155 eptr = zzlFirstInRange(zl,range);
3156 }
3157
3158 /* Get score pointer for the first element. */
3159 if (eptr)
3160 sptr = ziplistNext(zl,eptr);
3161
3162 /* If there is an offset, just traverse the number of elements without
3163 * checking the score because that is done in the next loop. */
3164 while (eptr && offset--) {
3165 if (reverse) {
3166 zzlPrev(zl,&eptr,&sptr);
3167 } else {
3168 zzlNext(zl,&eptr,&sptr);
3169 }
3170 }
3171
3172 while (eptr && limit--) {
3173 double score = zzlGetScore(sptr);
3174
3175 /* Abort when the node is no longer in range. */
3176 if (reverse) {
3177 if (!zslValueGteMin(score,range)) break;
3178 } else {
3179 if (!zslValueLteMax(score,range)) break;
3180 }
3181
3182 /* We know the element exists, so ziplistGet should always
3183 * succeed */
3184 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3184),__builtin_unreachable()))
;
3185
3186 rangelen++;
3187 if (vstr == NULL((void*)0)) {
3188 handler->emitResultFromLongLong(handler, vlong, score);
3189 } else {
3190 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3191 }
3192
3193 /* Move to next node */
3194 if (reverse) {
3195 zzlPrev(zl,&eptr,&sptr);
3196 } else {
3197 zzlNext(zl,&eptr,&sptr);
3198 }
3199 }
3200 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3201 zset *zs = zobj->ptr;
3202 zskiplist *zsl = zs->zsl;
3203 zskiplistNode *ln;
3204
3205 /* If reversed, get the last node in range as starting point. */
3206 if (reverse) {
3207 ln = zslLastInRange(zsl,range);
3208 } else {
3209 ln = zslFirstInRange(zsl,range);
3210 }
3211
3212 /* If there is an offset, just traverse the number of elements without
3213 * checking the score because that is done in the next loop. */
3214 while (ln && offset--) {
3215 if (reverse) {
3216 ln = ln->backward;
3217 } else {
3218 ln = ln->level[0].forward;
3219 }
3220 }
3221
3222 while (ln && limit--) {
3223 /* Abort when the node is no longer in range. */
3224 if (reverse) {
3225 if (!zslValueGteMin(ln->score,range)) break;
3226 } else {
3227 if (!zslValueLteMax(ln->score,range)) break;
3228 }
3229
3230 rangelen++;
3231 handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
3232
3233 /* Move to next node */
3234 if (reverse) {
3235 ln = ln->backward;
3236 } else {
3237 ln = ln->level[0].forward;
3238 }
3239 }
3240 } else {
3241 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3241,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3242 }
3243
3244 handler->finalizeResultEmission(handler, rangelen);
3245}
3246
3247/* ZRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
3248void zrangebyscoreCommand(client *c) {
3249 zrange_result_handler handler;
3250 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3251 zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_FORWARD);
3252}
3253
3254/* ZREVRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
3255void zrevrangebyscoreCommand(client *c) {
3256 zrange_result_handler handler;
3257 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3258 zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_REVERSE);
3259}
3260
3261void zcountCommand(client *c) {
3262 robj *key = c->argv[1];
3263 robj *zobj;
3264 zrangespec range;
3265 unsigned long count = 0;
3266
3267 /* Parse the range arguments */
3268 if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK0) {
3269 addReplyError(c,"min or max is not a float");
3270 return;
3271 }
3272
3273 /* Lookup the sorted set */
3274 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL((void*)0) ||
3275 checkType(c, zobj, OBJ_ZSET3)) return;
3276
3277 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3278 unsigned char *zl = zobj->ptr;
3279 unsigned char *eptr, *sptr;
3280 double score;
3281
3282 /* Use the first element in range as the starting point */
3283 eptr = zzlFirstInRange(zl,&range);
3284
3285 /* No "first" element */
3286 if (eptr == NULL((void*)0)) {
3287 addReply(c, shared.czero);
3288 return;
3289 }
3290
3291 /* First element is in range */
3292 sptr = ziplistNext(zl,eptr);
3293 score = zzlGetScore(sptr);
3294 serverAssertWithInfo(c,zobj,zslValueLteMax(score,&range))((zslValueLteMax(score,&range))?(void)0 : (_serverAssertWithInfo
(c,zobj,"zslValueLteMax(score,&range)","t_zset.c",3294),__builtin_unreachable
()))
;
3295
3296 /* Iterate over elements in range */
3297 while (eptr) {
3298 score = zzlGetScore(sptr);
3299
3300 /* Abort when the node is no longer in range. */
3301 if (!zslValueLteMax(score,&range)) {
3302 break;
3303 } else {
3304 count++;
3305 zzlNext(zl,&eptr,&sptr);
3306 }
3307 }
3308 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3309 zset *zs = zobj->ptr;
3310 zskiplist *zsl = zs->zsl;
3311 zskiplistNode *zn;
3312 unsigned long rank;
3313
3314 /* Find first element in range */
3315 zn = zslFirstInRange(zsl, &range);
3316
3317 /* Use rank of first element, if any, to determine preliminary count */
3318 if (zn != NULL((void*)0)) {
3319 rank = zslGetRank(zsl, zn->score, zn->ele);
3320 count = (zsl->length - (rank - 1));
3321
3322 /* Find last element in range */
3323 zn = zslLastInRange(zsl, &range);
3324
3325 /* Use rank of last element, if any, to determine the actual count */
3326 if (zn != NULL((void*)0)) {
3327 rank = zslGetRank(zsl, zn->score, zn->ele);
3328 count -= (zsl->length - rank);
3329 }
3330 }
3331 } else {
3332 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3332,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3333 }
3334
3335 addReplyLongLong(c, count);
3336}
3337
3338void zlexcountCommand(client *c) {
3339 robj *key = c->argv[1];
3340 robj *zobj;
3341 zlexrangespec range;
3342 unsigned long count = 0;
3343
3344 /* Parse the range arguments */
3345 if (zslParseLexRange(c->argv[2],c->argv[3],&range) != C_OK0) {
3346 addReplyError(c,"min or max not valid string range item");
3347 return;
3348 }
3349
3350 /* Lookup the sorted set */
3351 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL((void*)0) ||
3352 checkType(c, zobj, OBJ_ZSET3))
3353 {
3354 zslFreeLexRange(&range);
3355 return;
3356 }
3357
3358 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3359 unsigned char *zl = zobj->ptr;
3360 unsigned char *eptr, *sptr;
3361
3362 /* Use the first element in range as the starting point */
3363 eptr = zzlFirstInLexRange(zl,&range);
3364
3365 /* No "first" element */
3366 if (eptr == NULL((void*)0)) {
3367 zslFreeLexRange(&range);
3368 addReply(c, shared.czero);
3369 return;
3370 }
3371
3372 /* First element is in range */
3373 sptr = ziplistNext(zl,eptr);
3374 serverAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range))((zzlLexValueLteMax(eptr,&range))?(void)0 : (_serverAssertWithInfo
(c,zobj,"zzlLexValueLteMax(eptr,&range)","t_zset.c",3374)
,__builtin_unreachable()))
;
3375
3376 /* Iterate over elements in range */
3377 while (eptr) {
3378 /* Abort when the node is no longer in range. */
3379 if (!zzlLexValueLteMax(eptr,&range)) {
3380 break;
3381 } else {
3382 count++;
3383 zzlNext(zl,&eptr,&sptr);
3384 }
3385 }
3386 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3387 zset *zs = zobj->ptr;
3388 zskiplist *zsl = zs->zsl;
3389 zskiplistNode *zn;
3390 unsigned long rank;
3391
3392 /* Find first element in range */
3393 zn = zslFirstInLexRange(zsl, &range);
3394
3395 /* Use rank of first element, if any, to determine preliminary count */
3396 if (zn != NULL((void*)0)) {
3397 rank = zslGetRank(zsl, zn->score, zn->ele);
3398 count = (zsl->length - (rank - 1));
3399
3400 /* Find last element in range */
3401 zn = zslLastInLexRange(zsl, &range);
3402
3403 /* Use rank of last element, if any, to determine the actual count */
3404 if (zn != NULL((void*)0)) {
3405 rank = zslGetRank(zsl, zn->score, zn->ele);
3406 count -= (zsl->length - rank);
3407 }
3408 }
3409 } else {
3410 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3410,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3411 }
3412
3413 zslFreeLexRange(&range);
3414 addReplyLongLong(c, count);
3415}
3416
3417/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
3418void genericZrangebylexCommand(zrange_result_handler *handler,
3419 zlexrangespec *range, robj *zobj, int withscores, long offset, long limit,
3420 int reverse)
3421{
3422 client *c = handler->client;
3423 unsigned long rangelen = 0;
3424
3425 handler->beginResultEmission(handler);
3426
3427 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3428 unsigned char *zl = zobj->ptr;
3429 unsigned char *eptr, *sptr;
3430 unsigned char *vstr;
3431 unsigned int vlen;
3432 long long vlong;
3433
3434 /* If reversed, get the last node in range as starting point. */
3435 if (reverse) {
3436 eptr = zzlLastInLexRange(zl,range);
3437 } else {
3438 eptr = zzlFirstInLexRange(zl,range);
3439 }
3440
3441 /* Get score pointer for the first element. */
3442 if (eptr)
3443 sptr = ziplistNext(zl,eptr);
3444
3445 /* If there is an offset, just traverse the number of elements without
3446 * checking the score because that is done in the next loop. */
3447 while (eptr && offset--) {
3448 if (reverse) {
3449 zzlPrev(zl,&eptr,&sptr);
3450 } else {
3451 zzlNext(zl,&eptr,&sptr);
3452 }
3453 }
3454
3455 while (eptr && limit--) {
3456 double score = 0;
3457 if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
3458 score = zzlGetScore(sptr);
3459
3460 /* Abort when the node is no longer in range. */
3461 if (reverse) {
3462 if (!zzlLexValueGteMin(eptr,range)) break;
3463 } else {
3464 if (!zzlLexValueLteMax(eptr,range)) break;
3465 }
3466
3467 /* We know the element exists, so ziplistGet should always
3468 * succeed. */
3469 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3469),__builtin_unreachable()))
;
3470
3471 rangelen++;
3472 if (vstr == NULL((void*)0)) {
3473 handler->emitResultFromLongLong(handler, vlong, score);
3474 } else {
3475 handler->emitResultFromCBuffer(handler, vstr, vlen, score);
3476 }
3477
3478 /* Move to next node */
3479 if (reverse) {
3480 zzlPrev(zl,&eptr,&sptr);
3481 } else {
3482 zzlNext(zl,&eptr,&sptr);
3483 }
3484 }
3485 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3486 zset *zs = zobj->ptr;
3487 zskiplist *zsl = zs->zsl;
3488 zskiplistNode *ln;
3489
3490 /* If reversed, get the last node in range as starting point. */
3491 if (reverse) {
3492 ln = zslLastInLexRange(zsl,range);
3493 } else {
3494 ln = zslFirstInLexRange(zsl,range);
3495 }
3496
3497 /* If there is an offset, just traverse the number of elements without
3498 * checking the score because that is done in the next loop. */
3499 while (ln && offset--) {
3500 if (reverse) {
3501 ln = ln->backward;
3502 } else {
3503 ln = ln->level[0].forward;
3504 }
3505 }
3506
3507 while (ln && limit--) {
3508 /* Abort when the node is no longer in range. */
3509 if (reverse) {
3510 if (!zslLexValueGteMin(ln->ele,range)) break;
3511 } else {
3512 if (!zslLexValueLteMax(ln->ele,range)) break;
3513 }
3514
3515 rangelen++;
3516 handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
3517
3518 /* Move to next node */
3519 if (reverse) {
3520 ln = ln->backward;
3521 } else {
3522 ln = ln->level[0].forward;
3523 }
3524 }
3525 } else {
3526 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3526,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3527 }
3528
3529 handler->finalizeResultEmission(handler, rangelen);
3530}
3531
3532/* ZRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
3533void zrangebylexCommand(client *c) {
3534 zrange_result_handler handler;
3535 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3536 zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD);
3537}
3538
3539/* ZREVRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
3540void zrevrangebylexCommand(client *c) {
3541 zrange_result_handler handler;
3542 zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
3543 zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_REVERSE);
3544}
3545
3546/**
3547 * This function handles ZRANGE and ZRANGESTORE, and also the deprecated
3548 * Z[REV]RANGE[BYPOS|BYLEX] commands.
3549 *
3550 * The simple ZRANGE and ZRANGESTORE can take _AUTO in rangetype and direction,
3551 * other command pass explicit value.
3552 *
3553 * The argc_start points to the src key argument, so following syntax is like:
3554 * <src> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count]
3555 */
3556void zrangeGenericCommand(zrange_result_handler *handler, int argc_start, int store,
3557 zrange_type rangetype, zrange_direction direction)
3558{
3559 client *c = handler->client;
3560 robj *key = c->argv[argc_start];
3561 robj *zobj;
3562 zrangespec range;
3563 zlexrangespec lexrange;
3564 int minidx = argc_start + 1;
3565 int maxidx = argc_start + 2;
3566
3567 /* Options common to all */
3568 long opt_start = 0;
3569 long opt_end = 0;
3570 int opt_withscores = 0;
3571 long opt_offset = 0;
3572 long opt_limit = -1;
3573
3574 /* Step 1: Skip the <src> <min> <max> args and parse remaining optional arguments. */
3575 for (int j=argc_start + 3; j < c->argc; j++) {
3576 int leftargs = c->argc-j-1;
3577 if (!store && !strcasecmp(c->argv[j]->ptr,"withscores")) {
3578 opt_withscores = 1;
3579 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3580 if ((getLongFromObjectOrReply(c, c->argv[j+1], &opt_offset, NULL((void*)0)) != C_OK0) ||
3581 (getLongFromObjectOrReply(c, c->argv[j+2], &opt_limit, NULL((void*)0)) != C_OK0))
3582 {
3583 return;
3584 }
3585 j += 2;
3586 } else if (direction == ZRANGE_DIRECTION_AUTO &&
3587 !strcasecmp(c->argv[j]->ptr,"rev"))
3588 {
3589 direction = ZRANGE_DIRECTION_REVERSE;
3590 } else if (rangetype == ZRANGE_AUTO &&
3591 !strcasecmp(c->argv[j]->ptr,"bylex"))
3592 {
3593 rangetype = ZRANGE_LEX;
3594 } else if (rangetype == ZRANGE_AUTO &&
3595 !strcasecmp(c->argv[j]->ptr,"byscore"))
3596 {
3597 rangetype = ZRANGE_SCORE;
3598 } else {
3599 addReplyErrorObject(c,shared.syntaxerr);
3600 return;
3601 }
3602 }
3603
3604 /* Use defaults if not overriden by arguments. */
3605 if (direction == ZRANGE_DIRECTION_AUTO)
3606 direction = ZRANGE_DIRECTION_FORWARD;
3607 if (rangetype == ZRANGE_AUTO)
3608 rangetype = ZRANGE_RANK;
3609
3610 /* Check for conflicting arguments. */
3611 if (opt_limit != -1 && rangetype == ZRANGE_RANK) {
3612 addReplyError(c,"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX");
3613 return;
3614 }
3615 if (opt_withscores && rangetype == ZRANGE_LEX) {
3616 addReplyError(c,"syntax error, WITHSCORES not supported in combination with BYLEX");
3617 return;
3618 }
3619
3620 if (direction == ZRANGE_DIRECTION_REVERSE &&
3621 ((ZRANGE_SCORE == rangetype) || (ZRANGE_LEX == rangetype)))
3622 {
3623 /* Range is given as [max,min] */
3624 int tmp = maxidx;
3625 maxidx = minidx;
3626 minidx = tmp;
3627 }
3628
3629 /* Step 2: Parse the range. */
3630 switch (rangetype) {
3631 case ZRANGE_AUTO:
3632 case ZRANGE_RANK:
3633 /* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */
3634 if ((getLongFromObjectOrReply(c, c->argv[minidx], &opt_start,NULL((void*)0)) != C_OK0) ||
3635 (getLongFromObjectOrReply(c, c->argv[maxidx], &opt_end,NULL((void*)0)) != C_OK0))
3636 {
3637 return;
3638 }
3639 break;
3640
3641 case ZRANGE_SCORE:
3642 /* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */
3643 if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK0) {
3644 addReplyError(c, "min or max is not a float");
3645 return;
3646 }
3647 break;
3648
3649 case ZRANGE_LEX:
3650 /* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */
3651 if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK0) {
3652 addReplyError(c, "min or max not valid string range item");
3653 return;
3654 }
3655 break;
3656 }
3657
3658 if (opt_withscores || store) {
3659 zrangeResultHandlerScoreEmissionEnable(handler);
3660 }
3661
3662 /* Step 3: Lookup the key and get the range. */
3663 zobj = handler->dstkey ?
3664 lookupKeyWrite(c->db,key) :
3665 lookupKeyRead(c->db,key);
3666 if (zobj == NULL((void*)0)) {
3667 addReply(c,shared.emptyarray);
3668 goto cleanup;
3669 }
3670
3671 if (checkType(c,zobj,OBJ_ZSET3)) goto cleanup;
3672
3673 /* Step 4: Pass this to the command-specific handler. */
3674 switch (rangetype) {
3675 case ZRANGE_AUTO:
3676 case ZRANGE_RANK:
3677 genericZrangebyrankCommand(handler, zobj, opt_start, opt_end,
3678 opt_withscores || store, direction == ZRANGE_DIRECTION_REVERSE);
3679 break;
3680
3681 case ZRANGE_SCORE:
3682 genericZrangebyscoreCommand(handler, &range, zobj, opt_offset,
3683 opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
3684 break;
3685
3686 case ZRANGE_LEX:
3687 genericZrangebylexCommand(handler, &lexrange, zobj, opt_withscores || store,
3688 opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
3689 break;
3690 }
3691
3692 /* Instead of returning here, we'll just fall-through the clean-up. */
3693
3694cleanup:
3695
3696 if (rangetype == ZRANGE_LEX) {
3697 zslFreeLexRange(&lexrange);
3698 }
3699}
3700
3701void zcardCommand(client *c) {
3702 robj *key = c->argv[1];
3703 robj *zobj;
3704
3705 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL((void*)0) ||
3706 checkType(c,zobj,OBJ_ZSET3)) return;
3707
3708 addReplyLongLong(c,zsetLength(zobj));
3709}
3710
3711void zscoreCommand(client *c) {
3712 robj *key = c->argv[1];
3713 robj *zobj;
3714 double score;
3715
3716 if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL((void*)0) ||
3717 checkType(c,zobj,OBJ_ZSET3)) return;
3718
3719 if (zsetScore(zobj,c->argv[2]->ptr,&score) == C_ERR-1) {
3720 addReplyNull(c);
3721 } else {
3722 addReplyDouble(c,score);
3723 }
3724}
3725
3726void zmscoreCommand(client *c) {
3727 robj *key = c->argv[1];
3728 robj *zobj;
3729 double score;
3730 zobj = lookupKeyRead(c->db,key);
3731 if (checkType(c,zobj,OBJ_ZSET3)) return;
3732
3733 addReplyArrayLen(c,c->argc - 2);
3734 for (int j = 2; j < c->argc; j++) {
3735 /* Treat a missing set the same way as an empty set */
3736 if (zobj == NULL((void*)0) || zsetScore(zobj,c->argv[j]->ptr,&score) == C_ERR-1) {
3737 addReplyNull(c);
3738 } else {
3739 addReplyDouble(c,score);
3740 }
3741 }
3742}
3743
3744void zrankGenericCommand(client *c, int reverse) {
3745 robj *key = c->argv[1];
3746 robj *ele = c->argv[2];
3747 robj *zobj;
3748 long rank;
3749
3750 if ((zobj = lookupKeyReadOrReply(c,key,shared.null[c->resp])) == NULL((void*)0) ||
3751 checkType(c,zobj,OBJ_ZSET3)) return;
3752
3753 serverAssertWithInfo(c,ele,sdsEncodedObject(ele))(((ele->encoding == 0 || ele->encoding == 8))?(void)0 :
(_serverAssertWithInfo(c,ele,"sdsEncodedObject(ele)","t_zset.c"
,3753),__builtin_unreachable()))
;
3754 rank = zsetRank(zobj,ele->ptr,reverse);
3755 if (rank >= 0) {
3756 addReplyLongLong(c,rank);
3757 } else {
3758 addReplyNull(c);
3759 }
3760}
3761
3762void zrankCommand(client *c) {
3763 zrankGenericCommand(c, 0);
3764}
3765
3766void zrevrankCommand(client *c) {
3767 zrankGenericCommand(c, 1);
3768}
3769
3770void zscanCommand(client *c) {
3771 robj *o;
3772 unsigned long cursor;
3773
3774 if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR-1) return;
3775 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL((void*)0) ||
3776 checkType(c,o,OBJ_ZSET3)) return;
3777 scanGenericCommand(c,o,cursor);
3778}
3779
3780/* This command implements the generic zpop operation, used by:
3781 * ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
3782 * inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
3783 *
3784 * If 'emitkey' is true also the key name is emitted, useful for the blocking
3785 * behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
3786 *
3787 * The synchronous version instead does not need to emit the key, but may
3788 * use the 'count' argument to return multiple items if available. */
3789void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
3790 int idx;
3791 robj *key = NULL((void*)0);
3792 robj *zobj = NULL((void*)0);
3793 sds ele;
3794 double score;
3795 long count = 1;
3796
3797 /* If a count argument as passed, parse it or return an error. */
3798 if (countarg) {
3799 if (getLongFromObjectOrReply(c,countarg,&count,NULL((void*)0)) != C_OK0)
3800 return;
3801 if (count <= 0) {
3802 addReply(c,shared.emptyarray);
3803 return;
3804 }
3805 }
3806
3807 /* Check type and break on the first error, otherwise identify candidate. */
3808 idx = 0;
3809 while (idx < keyc) {
3810 key = keyv[idx++];
3811 zobj = lookupKeyWrite(c->db,key);
3812 if (!zobj) continue;
3813 if (checkType(c,zobj,OBJ_ZSET3)) return;
3814 break;
3815 }
3816
3817 /* No candidate for zpopping, return empty. */
3818 if (!zobj) {
3819 addReply(c,shared.emptyarray);
3820 return;
3821 }
3822
3823 void *arraylen_ptr = addReplyDeferredLen(c);
3824 long arraylen = 0;
3825
3826 /* We emit the key only for the blocking variant. */
3827 if (emitkey) addReplyBulk(c,key);
3828
3829 /* Remove the element. */
3830 do {
3831 if (zobj->encoding == OBJ_ENCODING_ZIPLIST5) {
3832 unsigned char *zl = zobj->ptr;
3833 unsigned char *eptr, *sptr;
3834 unsigned char *vstr;
3835 unsigned int vlen;
3836 long long vlong;
3837
3838 /* Get the first or last element in the sorted set. */
3839 eptr = ziplistIndex(zl,where == ZSET_MAX1 ? -2 : 0);
3840 serverAssertWithInfo(c,zobj,eptr != NULL)((eptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj
,"eptr != NULL","t_zset.c",3840),__builtin_unreachable()))
;
3841 serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong))((ziplistGet(eptr,&vstr,&vlen,&vlong))?(void)0 : (
_serverAssertWithInfo(c,zobj,"ziplistGet(eptr,&vstr,&vlen,&vlong)"
,"t_zset.c",3841),__builtin_unreachable()))
;
3842 if (vstr == NULL((void*)0))
3843 ele = sdsfromlonglong(vlong);
3844 else
3845 ele = sdsnewlen(vstr,vlen);
3846
3847 /* Get the score. */
3848 sptr = ziplistNext(zl,eptr);
3849 serverAssertWithInfo(c,zobj,sptr != NULL)((sptr != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj
,"sptr != NULL","t_zset.c",3849),__builtin_unreachable()))
;
3850 score = zzlGetScore(sptr);
3851 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST7) {
3852 zset *zs = zobj->ptr;
3853 zskiplist *zsl = zs->zsl;
3854 zskiplistNode *zln;
3855
3856 /* Get the first or last element in the sorted set. */
3857 zln = (where == ZSET_MAX1 ? zsl->tail :
3858 zsl->header->level[0].forward);
3859
3860 /* There must be an element in the sorted set. */
3861 serverAssertWithInfo(c,zobj,zln != NULL)((zln != ((void*)0))?(void)0 : (_serverAssertWithInfo(c,zobj,
"zln != NULL","t_zset.c",3861),__builtin_unreachable()))
;
3862 ele = sdsdup(zln->ele);
3863 score = zln->score;
3864 } else {
3865 serverPanic("Unknown sorted set encoding")_serverPanic("t_zset.c",3865,"Unknown sorted set encoding"),__builtin_unreachable
()
;
3866 }
3867
3868 serverAssertWithInfo(c,zobj,zsetDel(zobj,ele))((zsetDel(zobj,ele))?(void)0 : (_serverAssertWithInfo(c,zobj,
"zsetDel(zobj,ele)","t_zset.c",3868),__builtin_unreachable())
)
;
3869 server.dirty++;
3870
3871 if (arraylen == 0) { /* Do this only for the first iteration. */
3872 char *events[2] = {"zpopmin","zpopmax"};
3873 notifyKeyspaceEvent(NOTIFY_ZSET(1<<7),events[where],key,c->db->id);
3874 signalModifiedKey(c,c->db,key);
3875 }
3876
3877 addReplyBulkCBuffer(c,ele,sdslen(ele));
3878 addReplyDouble(c,score);
3879 sdsfree(ele);
3880 arraylen += 2;
3881
3882 /* Remove the key, if indeed needed. */
3883 if (zsetLength(zobj) == 0) {
3884 dbDelete(c->db,key);
3885 notifyKeyspaceEvent(NOTIFY_GENERIC(1<<2),"del",key,c->db->id);
3886 break;
3887 }
3888 } while(--count);
3889
3890 setDeferredArrayLen(c,arraylen_ptr,arraylen + (emitkey != 0));
3891}
3892
3893/* ZPOPMIN key [<count>] */
3894void zpopminCommand(client *c) {
3895 if (c->argc > 3) {
3896 addReplyErrorObject(c,shared.syntaxerr);
3897 return;
3898 }
3899 genericZpopCommand(c,&c->argv[1],1,ZSET_MIN0,0,
3900 c->argc == 3 ? c->argv[2] : NULL((void*)0));
3901}
3902
3903/* ZMAXPOP key [<count>] */
3904void zpopmaxCommand(client *c) {
3905 if (c->argc > 3) {
3906 addReplyErrorObject(c,shared.syntaxerr);
3907 return;
3908 }
3909 genericZpopCommand(c,&c->argv[1],1,ZSET_MAX1,0,
3910 c->argc == 3 ? c->argv[2] : NULL((void*)0));
3911}
3912
3913/* BZPOPMIN / BZPOPMAX actual implementation. */
3914void blockingGenericZpopCommand(client *c, int where) {
3915 robj *o;
3916 mstime_t timeout;
3917 int j;
3918
3919 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS0)
3920 != C_OK0) return;
3921
3922 for (j = 1; j < c->argc-1; j++) {
3923 o = lookupKeyWrite(c->db,c->argv[j]);
3924 if (checkType(c,o,OBJ_ZSET3)) return;
3925 if (o != NULL((void*)0)) {
3926 if (zsetLength(o) != 0) {
3927 /* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
3928 genericZpopCommand(c,&c->argv[j],1,where,1,NULL((void*)0));
3929 /* Replicate it as an ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
3930 rewriteClientCommandVector(c,2,
3931 where == ZSET_MAX1 ? shared.zpopmax : shared.zpopmin,
3932 c->argv[j]);
3933 return;
3934 }
3935 }
3936 }
3937
3938 /* If we are not allowed to block the client and the zset is empty the only thing
3939 * we can do is treating it as a timeout (even with timeout 0). */
3940 if (c->flags & CLIENT_DENY_BLOCKING(1ULL<<41)) {
3941 addReplyNullArray(c);
3942 return;
3943 }
3944
3945 /* If the keys do not exist we must block */
3946 blockForKeys(c,BLOCKED_ZSET5,c->argv + 1,c->argc - 2,timeout,NULL((void*)0),NULL((void*)0),NULL((void*)0));
3947}
3948
3949// BZPOPMIN key [key ...] timeout
3950void bzpopminCommand(client *c) {
3951 blockingGenericZpopCommand(c,ZSET_MIN0);
3952}
3953
3954// BZPOPMAX key [key ...] timeout
3955void bzpopmaxCommand(client *c) {
3956 blockingGenericZpopCommand(c,ZSET_MAX1);
3957}
3958
3959static void zarndmemberReplyWithZiplist(client *c, unsigned int count, ziplistEntry *keys, ziplistEntry *vals) {
3960 for (unsigned long i = 0; i < count; i++) {
3961 if (vals && c->resp > 2)
3962 addReplyArrayLen(c,2);
3963 if (keys[i].sval)
3964 addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
3965 else
3966 addReplyBulkLongLong(c, keys[i].lval);
3967 if (vals) {
3968 if (vals[i].sval) {
3969 addReplyDouble(c, zzlStrtod(vals[i].sval,vals[i].slen));
3970 } else
3971 addReplyDouble(c, vals[i].lval);
3972 }
3973 }
3974}
3975
3976/* How many times bigger should be the zset compared to the requested size
3977 * for us to not use the "remove elements" strategy? Read later in the
3978 * implementation for more info. */
3979#define ZRANDMEMBER_SUB_STRATEGY_MUL3 3
3980
3981/* If client is trying to ask for a very large number of random elements,
3982 * queuing may consume an unlimited amount of memory, so we want to limit
3983 * the number of randoms per time. */
3984#define ZRANDMEMBER_RANDOM_SAMPLE_LIMIT1000 1000
3985
3986void zrandmemberWithCountCommand(client *c, long l, int withscores) {
3987 unsigned long count, size;
3988 int uniq = 1;
3989 robj *zsetobj;
3990
3991 if ((zsetobj = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp]))
3992 == NULL((void*)0) || checkType(c, zsetobj, OBJ_ZSET3)) return;
3993 size = zsetLength(zsetobj);
3994
3995 if(l >= 0) {
3996 count = (unsigned long) l;
3997 } else {
3998 count = -l;
3999 uniq = 0;
4000 }
4001
4002 /* If count is zero, serve it ASAP to avoid special cases later. */
4003 if (count == 0) {
4004 addReply(c,shared.emptyarray);
4005 return;
4006 }
4007
4008 /* CASE 1: The count was negative, so the extraction method is just:
4009 * "return N random elements" sampling the whole set every time.
4010 * This case is trivial and can be served without auxiliary data
4011 * structures. This case is the only one that also needs to return the
4012 * elements in random order. */
4013 if (!uniq || count == 1) {
4014 if (withscores && c->resp == 2)
4015 addReplyArrayLen(c, count*2);
4016 else
4017 addReplyArrayLen(c, count);
4018 if (zsetobj->encoding == OBJ_ENCODING_SKIPLIST7) {
4019 zset *zs = zsetobj->ptr;
4020 while (count--) {
4021 dictEntry *de = dictGetFairRandomKey(zs->dict);
4022 sds key = dictGetKey(de)((de)->key);
4023 if (withscores && c->resp > 2)
4024 addReplyArrayLen(c,2);
4025 addReplyBulkCBuffer(c, key, sdslen(key));
4026 if (withscores)
4027 addReplyDouble(c, dictGetDoubleVal(de)((de)->v.d));
4028 }
4029 } else if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST5) {
4030 ziplistEntry *keys, *vals = NULL((void*)0);
4031 unsigned long limit, sample_count;
4032 limit = count > ZRANDMEMBER_RANDOM_SAMPLE_LIMIT1000 ? ZRANDMEMBER_RANDOM_SAMPLE_LIMIT1000 : count;
4033 keys = zmalloc(sizeof(ziplistEntry)*limit);
4034 if (withscores)
4035 vals = zmalloc(sizeof(ziplistEntry)*limit);
4036 while (count) {
4037 sample_count = count > limit ? limit : count;
4038 count -= sample_count;
4039 ziplistRandomPairs(zsetobj->ptr, sample_count, keys, vals);
4040 zarndmemberReplyWithZiplist(c, sample_count, keys, vals);
4041 }
4042 zfree(keys);
4043 zfree(vals);
4044 }
4045 return;
4046 }
4047
4048 zsetopsrc src;
4049 zsetopval zval;
4050 src.subject = zsetobj;
4051 src.type = zsetobj->type;
4052 src.encoding = zsetobj->encoding;
4053 zuiInitIterator(&src);
4054 memset(&zval, 0, sizeof(zval));
4055
4056 /* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */
4057 long reply_size = count < size ? count : size;
4058 if (withscores && c->resp == 2)
4059 addReplyArrayLen(c, reply_size*2);
4060 else
4061 addReplyArrayLen(c, reply_size);
4062
4063 /* CASE 2:
4064 * The number of requested elements is greater than the number of
4065 * elements inside the zset: simply return the whole zset. */
4066 if (count >= size) {
4067 while (zuiNext(&src, &zval)) {
4068 if (withscores && c->resp > 2)
4069 addReplyArrayLen(c,2);
4070 addReplyBulkSds(c, zuiNewSdsFromValue(&zval));
4071 if (withscores)
4072 addReplyDouble(c, zval.score);
4073 }
4074 return;
4075 }
4076
4077 /* CASE 3:
4078 * The number of elements inside the zset is not greater than
4079 * ZRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
4080 * In this case we create a dict from scratch with all the elements, and
4081 * subtract random elements to reach the requested number of elements.
4082 *
4083 * This is done because if the number of requested elements is just
4084 * a bit less than the number of elements in the set, the natural approach
4085 * used into CASE 4 is highly inefficient. */
4086 if (count*ZRANDMEMBER_SUB_STRATEGY_MUL3 > size) {
4087 dict *d = dictCreate(&sdsReplyDictType, NULL((void*)0));
4088 dictExpand(d, size);
4089 /* Add all the elements into the temporary dictionary. */
4090 while (zuiNext(&src, &zval)) {
4091 sds key = zuiNewSdsFromValue(&zval);
4092 dictEntry *de = dictAddRaw(d, key, NULL((void*)0));
4093 serverAssert(de)((de)?(void)0 : (_serverAssert("de","t_zset.c",4093),__builtin_unreachable
()))
;
4094 if (withscores)
4095 dictSetDoubleVal(de, zval.score)do { (de)->v.d = zval.score; } while(0);
4096 }
4097 serverAssert(dictSize(d) == size)((((d)->ht[0].used+(d)->ht[1].used) == size)?(void)0 : (
_serverAssert("dictSize(d) == size","t_zset.c",4097),__builtin_unreachable
()))
;
4098
4099 /* Remove random elements to reach the right count. */
4100 while (size > count) {
4101 dictEntry *de;
4102 de = dictGetRandomKey(d);
4103 dictUnlink(d,dictGetKey(de)((de)->key));
4104 sdsfree(dictGetKey(de)((de)->key));
4105 dictFreeUnlinkedEntry(d,de);
4106 size--;
4107 }
4108
4109 /* Reply with what's in the dict and release memory */
4110 dictIterator *di;
4111 dictEntry *de;
4112 di = dictGetIterator(d);
4113 while ((de = dictNext(di)) != NULL((void*)0)) {
4114 if (withscores && c->resp > 2)
4115 addReplyArrayLen(c,2);
4116 addReplyBulkSds(c, dictGetKey(de)((de)->key));
4117 if (withscores)
4118 addReplyDouble(c, dictGetDoubleVal(de)((de)->v.d));
4119 }
4120
4121 dictReleaseIterator(di);
4122 dictRelease(d);
4123 }
4124
4125 /* CASE 4: We have a big zset compared to the requested number of elements.
4126 * In this case we can simply get random elements from the zset and add
4127 * to the temporary set, trying to eventually get enough unique elements
4128 * to reach the specified count. */
4129 else {
4130 if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST5) {
4131 /* it is inefficient to repeatedly pick one random element from a
4132 * ziplist. so we use this instead: */
4133 ziplistEntry *keys, *vals = NULL((void*)0);
4134 keys = zmalloc(sizeof(ziplistEntry)*count);
4135 if (withscores)
4136 vals = zmalloc(sizeof(ziplistEntry)*count);
4137 serverAssert(ziplistRandomPairsUnique(zsetobj->ptr, count, keys, vals) == count)((ziplistRandomPairsUnique(zsetobj->ptr, count, keys, vals
) == count)?(void)0 : (_serverAssert("ziplistRandomPairsUnique(zsetobj->ptr, count, keys, vals) == count"
,"t_zset.c",4137),__builtin_unreachable()))
;
4138 zarndmemberReplyWithZiplist(c, count, keys, vals);
4139 zfree(keys);
4140 zfree(vals);
4141 return;
4142 }
4143
4144 /* Hashtable encoding (generic implementation) */
4145 unsigned long added = 0;
4146 dict *d = dictCreate(&hashDictType, NULL((void*)0));
4147 dictExpand(d, count);
4148
4149 while (added < count) {
4150 ziplistEntry key;
4151 double score;
4152 zsetTypeRandomElement(zsetobj, size, &key, withscores ? &score: NULL((void*)0));
4153
4154 /* Try to add the object to the dictionary. If it already exists
4155 * free it, otherwise increment the number of objects we have
4156 * in the result dictionary. */
4157 sds skey = zsetSdsFromZiplistEntry(&key);
4158 if (dictAdd(d,skey,NULL((void*)0)) != DICT_OK0) {
4159 sdsfree(skey);
4160 continue;
4161 }
4162 added++;
4163
4164 if (withscores && c->resp > 2)
4165 addReplyArrayLen(c,2);
4166 zsetReplyFromZiplistEntry(c, &key);
4167 if (withscores)
4168 addReplyDouble(c, score);
4169 }
4170
4171 /* Release memory */
4172 dictRelease(d);
4173 }
4174}
4175
4176/* ZRANDMEMBER [<count> WITHSCORES] */
4177void zrandmemberCommand(client *c) {
4178 long l;
4179 int withscores = 0;
4180 robj *zset;
4181 ziplistEntry ele;
4182
4183 if (c->argc >= 3) {
4184 if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL((void*)0)) != C_OK0) return;
4185 if (c->argc > 4 || (c->argc == 4 && strcasecmp(c->argv[3]->ptr,"withscores"))) {
4186 addReplyErrorObject(c,shared.syntaxerr);
4187 return;
4188 } else if (c->argc == 4)
4189 withscores = 1;
4190 zrandmemberWithCountCommand(c, l, withscores);
4191 return;
4192 }
4193
4194 /* Handle variant without <count> argument. Reply with simple bulk string */
4195 if ((zset = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))== NULL((void*)0) ||
4196 checkType(c,zset,OBJ_ZSET3)) {
4197 return;
4198 }
4199
4200 zsetTypeRandomElement(zset, zsetLength(zset), &ele,NULL((void*)0));
4201 zsetReplyFromZiplistEntry(c,&ele);
4202}