Bug Summary

File:src/defrag.c
Warning:line 1127, column 17
Value stored to 'quit' is never read

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -triple x86_64-pc-linux-gnu -analyze -disable-free -disable-llvm-verifier -discard-value-names -main-file-name defrag.c -analyzer-store=region -analyzer-opt-analyze-nested-blocks -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -mrelocation-model static -mthread-model posix -mframe-pointer=none -fmath-errno -fno-rounding-math -masm-verbose -mconstructor-aliases -munwind-tables -target-cpu x86-64 -dwarf-column-info -fno-split-dwarf-inlining -debugger-tuning=gdb -resource-dir /usr/lib/llvm-10/lib/clang/10.0.0 -D REDIS_STATIC= -I ../deps/hiredis -I ../deps/linenoise -I ../deps/lua/src -I ../deps/hdr_histogram -D USE_JEMALLOC -I ../deps/jemalloc/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/llvm-10/lib/clang/10.0.0/include -internal-externc-isystem /usr/include/x86_64-linux-gnu -internal-externc-isystem /include -internal-externc-isystem /usr/include -O2 -Wno-c11-extensions -Wno-missing-field-initializers -std=c11 -fdebug-compilation-dir /home/netto/Desktop/redis-6.2.1/src -ferror-limit 19 -fmessage-length 0 -fgnuc-version=4.2.1 -fobjc-runtime=gcc -fdiagnostics-show-option -vectorize-loops -vectorize-slp -analyzer-output=html -faddrsig -o /tmp/scan-build-2021-03-14-133648-8817-1 -x c defrag.c
1/*
2 * Active memory defragmentation
3 * Try to find key / value allocations that need to be re-allocated in order
4 * to reduce external fragmentation.
5 * We do that by scanning the keyspace and for each pointer we have, we can try to
6 * ask the allocator if moving it to a new address will help reduce fragmentation.
7 *
8 * Copyright (c) 2020, Redis Labs, Inc
9 * All rights reserved.
10 *
11 * Redistribution and use in source and binary forms, with or without
12 * modification, are permitted provided that the following conditions are met:
13 *
14 * * Redistributions of source code must retain the above copyright notice,
15 * this list of conditions and the following disclaimer.
16 * * Redistributions in binary form must reproduce the above copyright
17 * notice, this list of conditions and the following disclaimer in the
18 * documentation and/or other materials provided with the distribution.
19 * * Neither the name of Redis nor the names of its contributors may be used
20 * to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 * POSSIBILITY OF SUCH DAMAGE.
34 */
35
36#include "server.h"
37#include <time.h>
38#include <assert.h>
39#include <stddef.h>
40
41#ifdef HAVE_DEFRAG
42
43/* this method was added to jemalloc in order to help us understand which
44 * pointers are worthwhile moving and which aren't */
45int je_get_defrag_hint(void* ptr);
46
47/* forward declarations*/
48void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
49dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
50
51/* Defrag helper for generic allocations.
52 *
53 * returns NULL in case the allocation wasn't moved.
54 * when it returns a non-null value, the old pointer was already released
55 * and should NOT be accessed. */
56void* activeDefragAlloc(void *ptr) {
57 size_t size;
58 void *newptr;
59 if(!je_get_defrag_hint(ptr)) {
60 server.stat_active_defrag_misses++;
61 return NULL((void*)0);
62 }
63 /* move this allocation to a new allocation.
64 * make sure not to use the thread cache. so that we don't get back the same
65 * pointers we try to free */
66 size = zmalloc_size(ptr)je_malloc_usable_size(ptr);
67 newptr = zmalloc_no_tcache(size);
68 memcpy(newptr, ptr, size);
69 zfree_no_tcache(ptr);
70 return newptr;
71}
72
73/*Defrag helper for sds strings
74 *
75 * returns NULL in case the allocation wasn't moved.
76 * when it returns a non-null value, the old pointer was already released
77 * and should NOT be accessed. */
78sds activeDefragSds(sds sdsptr) {
79 void* ptr = sdsAllocPtr(sdsptr);
80 void* newptr = activeDefragAlloc(ptr);
81 if (newptr) {
82 size_t offset = sdsptr - (char*)ptr;
83 sdsptr = (char*)newptr + offset;
84 return sdsptr;
85 }
86 return NULL((void*)0);
87}
88
89/* Defrag helper for robj and/or string objects
90 *
91 * returns NULL in case the allocation wasn't moved.
92 * when it returns a non-null value, the old pointer was already released
93 * and should NOT be accessed. */
94robj *activeDefragStringOb(robj* ob, long *defragged) {
95 robj *ret = NULL((void*)0);
96 if (ob->refcount!=1)
97 return NULL((void*)0);
98
99 /* try to defrag robj (only if not an EMBSTR type (handled below). */
100 if (ob->type!=OBJ_STRING0 || ob->encoding!=OBJ_ENCODING_EMBSTR8) {
101 if ((ret = activeDefragAlloc(ob))) {
102 ob = ret;
103 (*defragged)++;
104 }
105 }
106
107 /* try to defrag string object */
108 if (ob->type == OBJ_STRING0) {
109 if(ob->encoding==OBJ_ENCODING_RAW0) {
110 sds newsds = activeDefragSds((sds)ob->ptr);
111 if (newsds) {
112 ob->ptr = newsds;
113 (*defragged)++;
114 }
115 } else if (ob->encoding==OBJ_ENCODING_EMBSTR8) {
116 /* The sds is embedded in the object allocation, calculate the
117 * offset and update the pointer in the new allocation. */
118 long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
119 if ((ret = activeDefragAlloc(ob))) {
120 ret->ptr = (void*)((intptr_t)ret + ofs);
121 (*defragged)++;
122 }
123 } else if (ob->encoding!=OBJ_ENCODING_INT1) {
124 serverPanic("Unknown string encoding")_serverPanic("defrag.c",124,"Unknown string encoding"),__builtin_unreachable
()
;
125 }
126 }
127 return ret;
128}
129
130/* Defrag helper for dictEntries to be used during dict iteration (called on
131 * each step). Returns a stat of how many pointers were moved. */
132long dictIterDefragEntry(dictIterator *iter) {
133 /* This function is a little bit dirty since it messes with the internals
134 * of the dict and it's iterator, but the benefit is that it is very easy
135 * to use, and require no other changes in the dict. */
136 long defragged = 0;
137 dictht *ht;
138 /* Handle the next entry (if there is one), and update the pointer in the
139 * current entry. */
140 if (iter->nextEntry) {
141 dictEntry *newde = activeDefragAlloc(iter->nextEntry);
142 if (newde) {
143 defragged++;
144 iter->nextEntry = newde;
145 iter->entry->next = newde;
146 }
147 }
148 /* handle the case of the first entry in the hash bucket. */
149 ht = &iter->d->ht[iter->table];
150 if (ht->table[iter->index] == iter->entry) {
151 dictEntry *newde = activeDefragAlloc(iter->entry);
152 if (newde) {
153 iter->entry = newde;
154 ht->table[iter->index] = newde;
155 defragged++;
156 }
157 }
158 return defragged;
159}
160
161/* Defrag helper for dict main allocations (dict struct, and hash tables).
162 * receives a pointer to the dict* and implicitly updates it when the dict
163 * struct itself was moved. Returns a stat of how many pointers were moved. */
164long dictDefragTables(dict* d) {
165 dictEntry **newtable;
166 long defragged = 0;
167 /* handle the first hash table */
168 newtable = activeDefragAlloc(d->ht[0].table);
169 if (newtable)
170 defragged++, d->ht[0].table = newtable;
171 /* handle the second hash table */
172 if (d->ht[1].table) {
173 newtable = activeDefragAlloc(d->ht[1].table);
174 if (newtable)
175 defragged++, d->ht[1].table = newtable;
176 }
177 return defragged;
178}
179
180/* Internal function used by zslDefrag */
181void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
182 int i;
183 for (i = 0; i < zsl->level; i++) {
184 if (update[i]->level[i].forward == oldnode)
185 update[i]->level[i].forward = newnode;
186 }
187 serverAssert(zsl->header!=oldnode)((zsl->header!=oldnode)?(void)0 : (_serverAssert("zsl->header!=oldnode"
,"defrag.c",187),__builtin_unreachable()))
;
188 if (newnode->level[0].forward) {
189 serverAssert(newnode->level[0].forward->backward==oldnode)((newnode->level[0].forward->backward==oldnode)?(void)0
: (_serverAssert("newnode->level[0].forward->backward==oldnode"
,"defrag.c",189),__builtin_unreachable()))
;
190 newnode->level[0].forward->backward = newnode;
191 } else {
192 serverAssert(zsl->tail==oldnode)((zsl->tail==oldnode)?(void)0 : (_serverAssert("zsl->tail==oldnode"
,"defrag.c",192),__builtin_unreachable()))
;
193 zsl->tail = newnode;
194 }
195}
196
197/* Defrag helper for sorted set.
198 * Update the robj pointer, defrag the skiplist struct and return the new score
199 * reference. We may not access oldele pointer (not even the pointer stored in
200 * the skiplist), as it was already freed. Newele may be null, in which case we
201 * only need to defrag the skiplist, but not update the obj pointer.
202 * When return value is non-NULL, it is the score reference that must be updated
203 * in the dict record. */
204double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
205 zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x, *newx;
206 int i;
207 sds ele = newele? newele: oldele;
208
209 /* find the skiplist node referring to the object that was moved,
210 * and all pointers that need to be updated if we'll end up moving the skiplist node. */
211 x = zsl->header;
212 for (i = zsl->level-1; i >= 0; i--) {
213 while (x->level[i].forward &&
214 x->level[i].forward->ele != oldele && /* make sure not to access the
215 ->obj pointer if it matches
216 oldele */
217 (x->level[i].forward->score < score ||
218 (x->level[i].forward->score == score &&
219 sdscmp(x->level[i].forward->ele,ele) < 0)))
220 x = x->level[i].forward;
221 update[i] = x;
222 }
223
224 /* update the robj pointer inside the skip list record. */
225 x = x->level[0].forward;
226 serverAssert(x && score == x->score && x->ele==oldele)((x && score == x->score && x->ele==oldele
)?(void)0 : (_serverAssert("x && score == x->score && x->ele==oldele"
,"defrag.c",226),__builtin_unreachable()))
;
227 if (newele)
228 x->ele = newele;
229
230 /* try to defrag the skiplist record itself */
231 newx = activeDefragAlloc(x);
232 if (newx) {
233 zslUpdateNode(zsl, x, newx, update);
234 return &newx->score;
235 }
236 return NULL((void*)0);
237}
238
239/* Defrag helper for sorted set.
240 * Defrag a single dict entry key name, and corresponding skiplist struct */
241long activeDefragZsetEntry(zset *zs, dictEntry *de) {
242 sds newsds;
243 double* newscore;
244 long defragged = 0;
245 sds sdsele = dictGetKey(de)((de)->key);
246 if ((newsds = activeDefragSds(sdsele)))
247 defragged++, de->key = newsds;
248 newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de)((de)->v.val), sdsele, newsds);
249 if (newscore) {
250 dictSetVal(zs->dict, de, newscore)do { if ((zs->dict)->type->valDup) (de)->v.val = (
zs->dict)->type->valDup((zs->dict)->privdata, newscore
); else (de)->v.val = (newscore); } while(0)
;
251 defragged++;
252 }
253 return defragged;
254}
255
256#define DEFRAG_SDS_DICT_NO_VAL0 0
257#define DEFRAG_SDS_DICT_VAL_IS_SDS1 1
258#define DEFRAG_SDS_DICT_VAL_IS_STROB2 2
259#define DEFRAG_SDS_DICT_VAL_VOID_PTR3 3
260
261/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
262long activeDefragSdsDict(dict* d, int val_type) {
263 dictIterator *di;
264 dictEntry *de;
265 long defragged = 0;
266 di = dictGetIterator(d);
267 while((de = dictNext(di)) != NULL((void*)0)) {
268 sds sdsele = dictGetKey(de)((de)->key), newsds;
269 if ((newsds = activeDefragSds(sdsele)))
270 de->key = newsds, defragged++;
271 /* defrag the value */
272 if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS1) {
273 sdsele = dictGetVal(de)((de)->v.val);
274 if ((newsds = activeDefragSds(sdsele)))
275 de->v.val = newsds, defragged++;
276 } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB2) {
277 robj *newele, *ele = dictGetVal(de)((de)->v.val);
278 if ((newele = activeDefragStringOb(ele, &defragged)))
279 de->v.val = newele;
280 } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR3) {
281 void *newptr, *ptr = dictGetVal(de)((de)->v.val);
282 if ((newptr = activeDefragAlloc(ptr)))
283 de->v.val = newptr, defragged++;
284 }
285 defragged += dictIterDefragEntry(di);
286 }
287 dictReleaseIterator(di);
288 return defragged;
289}
290
291/* Defrag a list of ptr, sds or robj string values */
292long activeDefragList(list *l, int val_type) {
293 long defragged = 0;
294 listNode *ln, *newln;
295 for (ln = l->head; ln; ln = ln->next) {
296 if ((newln = activeDefragAlloc(ln))) {
297 if (newln->prev)
298 newln->prev->next = newln;
299 else
300 l->head = newln;
301 if (newln->next)
302 newln->next->prev = newln;
303 else
304 l->tail = newln;
305 ln = newln;
306 defragged++;
307 }
308 if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS1) {
309 sds newsds, sdsele = ln->value;
310 if ((newsds = activeDefragSds(sdsele)))
311 ln->value = newsds, defragged++;
312 } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB2) {
313 robj *newele, *ele = ln->value;
314 if ((newele = activeDefragStringOb(ele, &defragged)))
315 ln->value = newele;
316 } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR3) {
317 void *newptr, *ptr = ln->value;
318 if ((newptr = activeDefragAlloc(ptr)))
319 ln->value = newptr, defragged++;
320 }
321 }
322 return defragged;
323}
324
325/* Defrag a list of sds values and a dict with the same sds keys */
326long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
327 long defragged = 0;
328 sds newsds, sdsele;
329 listNode *ln, *newln;
330 dictIterator *di;
331 dictEntry *de;
332 /* Defrag the list and it's sds values */
333 for (ln = l->head; ln; ln = ln->next) {
334 if ((newln = activeDefragAlloc(ln))) {
335 if (newln->prev)
336 newln->prev->next = newln;
337 else
338 l->head = newln;
339 if (newln->next)
340 newln->next->prev = newln;
341 else
342 l->tail = newln;
343 ln = newln;
344 defragged++;
345 }
346 sdsele = ln->value;
347 if ((newsds = activeDefragSds(sdsele))) {
348 /* When defragging an sds value, we need to update the dict key */
349 uint64_t hash = dictGetHash(d, newsds);
350 replaceSatelliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged);
351 ln->value = newsds;
352 defragged++;
353 }
354 }
355
356 /* Defrag the dict values (keys were already handled) */
357 di = dictGetIterator(d);
358 while((de = dictNext(di)) != NULL((void*)0)) {
359 if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS1) {
360 sds newsds, sdsele = dictGetVal(de)((de)->v.val);
361 if ((newsds = activeDefragSds(sdsele)))
362 de->v.val = newsds, defragged++;
363 } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB2) {
364 robj *newele, *ele = dictGetVal(de)((de)->v.val);
365 if ((newele = activeDefragStringOb(ele, &defragged)))
366 de->v.val = newele, defragged++;
367 } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR3) {
368 void *newptr, *ptr = dictGetVal(de)((de)->v.val);
369 if ((newptr = activeDefragAlloc(ptr)))
370 de->v.val = newptr, defragged++;
371 }
372 defragged += dictIterDefragEntry(di);
373 }
374 dictReleaseIterator(di);
375
376 return defragged;
377}
378
379/* Utility function that replaces an old key pointer in the dictionary with a
380 * new pointer. Additionally, we try to defrag the dictEntry in that dict.
381 * Oldkey mey be a dead pointer and should not be accessed (we get a
382 * pre-calculated hash value). Newkey may be null if the key pointer wasn't
383 * moved. Return value is the the dictEntry if found, or NULL if not found.
384 * NOTE: this is very ugly code, but it let's us avoid the complication of
385 * doing a scan on another dict. */
386dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) {
387 dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
388 if (deref) {
389 dictEntry *de = *deref;
390 dictEntry *newde = activeDefragAlloc(de);
391 if (newde) {
392 de = *deref = newde;
393 (*defragged)++;
394 }
395 if (newkey)
396 de->key = newkey;
397 return de;
398 }
399 return NULL((void*)0);
400}
401
402long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
403 quicklistNode *newnode, *node = *node_ref;
404 long defragged = 0;
405 unsigned char *newzl;
406 if ((newnode = activeDefragAlloc(node))) {
407 if (newnode->prev)
408 newnode->prev->next = newnode;
409 else
410 ql->head = newnode;
411 if (newnode->next)
412 newnode->next->prev = newnode;
413 else
414 ql->tail = newnode;
415 *node_ref = node = newnode;
416 defragged++;
417 }
418 if ((newzl = activeDefragAlloc(node->zl)))
419 defragged++, node->zl = newzl;
420 return defragged;
421}
422
423long activeDefragQuickListNodes(quicklist *ql) {
424 quicklistNode *node = ql->head;
425 long defragged = 0;
426 while (node) {
427 defragged += activeDefragQuickListNode(ql, &node);
428 node = node->next;
429 }
430 return defragged;
431}
432
433/* when the value has lots of elements, we want to handle it later and not as
434 * part of the main dictionary scan. this is needed in order to prevent latency
435 * spikes when handling large items */
436void defragLater(redisDb *db, dictEntry *kde) {
437 sds key = sdsdup(dictGetKey(kde)((kde)->key));
438 listAddNodeTail(db->defrag_later, key);
439}
440
441/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
442long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
443 quicklist *ql = ob->ptr;
444 quicklistNode *node;
445 long iterations = 0;
446 int bookmark_failed = 0;
447 if (ob->type != OBJ_LIST1 || ob->encoding != OBJ_ENCODING_QUICKLIST9)
448 return 0;
449
450 if (*cursor == 0) {
451 /* if cursor is 0, we start new iteration */
452 node = ql->head;
453 } else {
454 node = quicklistBookmarkFind(ql, "_AD");
455 if (!node) {
456 /* if the bookmark was deleted, it means we reached the end. */
457 *cursor = 0;
458 return 0;
459 }
460 node = node->next;
461 }
462
463 (*cursor)++;
464 while (node) {
465 (*defragged) += activeDefragQuickListNode(ql, &node);
466 server.stat_active_defrag_scanned++;
467 if (++iterations > 128 && !bookmark_failed) {
468 if (ustime() > endtime) {
469 if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
470 bookmark_failed = 1;
471 } else {
472 ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
473 return 1;
474 }
475 }
476 iterations = 0;
477 }
478 node = node->next;
479 }
480 quicklistBookmarkDelete(ql, "_AD");
481 *cursor = 0;
482 return bookmark_failed? 1: 0;
483}
484
485typedef struct {
486 zset *zs;
487 long defragged;
488} scanLaterZsetData;
489
490void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
491 dictEntry *de = (dictEntry*)_de;
492 scanLaterZsetData *data = privdata;
493 data->defragged += activeDefragZsetEntry(data->zs, de);
494 server.stat_active_defrag_scanned++;
495}
496
497long scanLaterZset(robj *ob, unsigned long *cursor) {
498 if (ob->type != OBJ_ZSET3 || ob->encoding != OBJ_ENCODING_SKIPLIST7)
499 return 0;
500 zset *zs = (zset*)ob->ptr;
501 dict *d = zs->dict;
502 scanLaterZsetData data = {zs, 0};
503 *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data);
504 return data.defragged;
505}
506
507void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
508 dictEntry *de = (dictEntry*)_de;
509 long *defragged = privdata;
510 sds sdsele = dictGetKey(de)((de)->key), newsds;
511 if ((newsds = activeDefragSds(sdsele)))
512 (*defragged)++, de->key = newsds;
513 server.stat_active_defrag_scanned++;
514}
515
516long scanLaterSet(robj *ob, unsigned long *cursor) {
517 long defragged = 0;
518 if (ob->type != OBJ_SET2 || ob->encoding != OBJ_ENCODING_HT2)
519 return 0;
520 dict *d = ob->ptr;
521 *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged);
522 return defragged;
523}
524
525void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
526 dictEntry *de = (dictEntry*)_de;
527 long *defragged = privdata;
528 sds sdsele = dictGetKey(de)((de)->key), newsds;
529 if ((newsds = activeDefragSds(sdsele)))
530 (*defragged)++, de->key = newsds;
531 sdsele = dictGetVal(de)((de)->v.val);
532 if ((newsds = activeDefragSds(sdsele)))
533 (*defragged)++, de->v.val = newsds;
534 server.stat_active_defrag_scanned++;
535}
536
537long scanLaterHash(robj *ob, unsigned long *cursor) {
538 long defragged = 0;
539 if (ob->type != OBJ_HASH4 || ob->encoding != OBJ_ENCODING_HT2)
540 return 0;
541 dict *d = ob->ptr;
542 *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged);
543 return defragged;
544}
545
546long defragQuicklist(redisDb *db, dictEntry *kde) {
547 robj *ob = dictGetVal(kde)((kde)->v.val);
548 long defragged = 0;
549 quicklist *ql = ob->ptr, *newql;
550 serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST)((ob->type == 1 && ob->encoding == 9)?(void)0 :
(_serverAssert("ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST"
,"defrag.c",550),__builtin_unreachable()))
;
551 if ((newql = activeDefragAlloc(ql)))
552 defragged++, ob->ptr = ql = newql;
553 if (ql->len > server.active_defrag_max_scan_fields)
554 defragLater(db, kde);
555 else
556 defragged += activeDefragQuickListNodes(ql);
557 return defragged;
558}
559
560long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
561 robj *ob = dictGetVal(kde)((kde)->v.val);
562 long defragged = 0;
563 zset *zs = (zset*)ob->ptr;
564 zset *newzs;
565 zskiplist *newzsl;
566 dict *newdict;
567 dictEntry *de;
568 struct zskiplistNode *newheader;
569 serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST)((ob->type == 3 && ob->encoding == 7)?(void)0 :
(_serverAssert("ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST"
,"defrag.c",569),__builtin_unreachable()))
;
570 if ((newzs = activeDefragAlloc(zs)))
571 defragged++, ob->ptr = zs = newzs;
572 if ((newzsl = activeDefragAlloc(zs->zsl)))
573 defragged++, zs->zsl = newzsl;
574 if ((newheader = activeDefragAlloc(zs->zsl->header)))
575 defragged++, zs->zsl->header = newheader;
576 if (dictSize(zs->dict)((zs->dict)->ht[0].used+(zs->dict)->ht[1].used) > server.active_defrag_max_scan_fields)
577 defragLater(db, kde);
578 else {
579 dictIterator *di = dictGetIterator(zs->dict);
580 while((de = dictNext(di)) != NULL((void*)0)) {
581 defragged += activeDefragZsetEntry(zs, de);
582 }
583 dictReleaseIterator(di);
584 }
585 /* handle the dict struct */
586 if ((newdict = activeDefragAlloc(zs->dict)))
587 defragged++, zs->dict = newdict;
588 /* defrag the dict tables */
589 defragged += dictDefragTables(zs->dict);
590 return defragged;
591}
592
593long defragHash(redisDb *db, dictEntry *kde) {
594 long defragged = 0;
595 robj *ob = dictGetVal(kde)((kde)->v.val);
596 dict *d, *newd;
597 serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT)((ob->type == 4 && ob->encoding == 2)?(void)0 :
(_serverAssert("ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT"
,"defrag.c",597),__builtin_unreachable()))
;
598 d = ob->ptr;
599 if (dictSize(d)((d)->ht[0].used+(d)->ht[1].used) > server.active_defrag_max_scan_fields)
600 defragLater(db, kde);
601 else
602 defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS1);
603 /* handle the dict struct */
604 if ((newd = activeDefragAlloc(ob->ptr)))
605 defragged++, ob->ptr = newd;
606 /* defrag the dict tables */
607 defragged += dictDefragTables(ob->ptr);
608 return defragged;
609}
610
611long defragSet(redisDb *db, dictEntry *kde) {
612 long defragged = 0;
613 robj *ob = dictGetVal(kde)((kde)->v.val);
614 dict *d, *newd;
615 serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT)((ob->type == 2 && ob->encoding == 2)?(void)0 :
(_serverAssert("ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT"
,"defrag.c",615),__builtin_unreachable()))
;
616 d = ob->ptr;
617 if (dictSize(d)((d)->ht[0].used+(d)->ht[1].used) > server.active_defrag_max_scan_fields)
618 defragLater(db, kde);
619 else
620 defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL0);
621 /* handle the dict struct */
622 if ((newd = activeDefragAlloc(ob->ptr)))
623 defragged++, ob->ptr = newd;
624 /* defrag the dict tables */
625 defragged += dictDefragTables(ob->ptr);
626 return defragged;
627}
628
629/* Defrag callback for radix tree iterator, called for each node,
630 * used in order to defrag the nodes allocations. */
631int defragRaxNode(raxNode **noderef) {
632 raxNode *newnode = activeDefragAlloc(*noderef);
633 if (newnode) {
634 *noderef = newnode;
635 return 1;
636 }
637 return 0;
638}
639
640/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
641int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
642 static unsigned char last[sizeof(streamID)];
643 raxIterator ri;
644 long iterations = 0;
645 if (ob->type != OBJ_STREAM6 || ob->encoding != OBJ_ENCODING_STREAM10) {
646 *cursor = 0;
647 return 0;
648 }
649
650 stream *s = ob->ptr;
651 raxStart(&ri,s->rax);
652 if (*cursor == 0) {
653 /* if cursor is 0, we start new iteration */
654 defragRaxNode(&s->rax->head);
655 /* assign the iterator node callback before the seek, so that the
656 * initial nodes that are processed till the first item are covered */
657 ri.node_cb = defragRaxNode;
658 raxSeek(&ri,"^",NULL((void*)0),0);
659 } else {
660 /* if cursor is non-zero, we seek to the static 'last' */
661 if (!raxSeek(&ri,">", last, sizeof(last))) {
662 *cursor = 0;
663 raxStop(&ri);
664 return 0;
665 }
666 /* assign the iterator node callback after the seek, so that the
667 * initial nodes that are processed till now aren't covered */
668 ri.node_cb = defragRaxNode;
669 }
670
671 (*cursor)++;
672 while (raxNext(&ri)) {
673 void *newdata = activeDefragAlloc(ri.data);
674 if (newdata)
675 raxSetData(ri.node, ri.data=newdata), (*defragged)++;
676 server.stat_active_defrag_scanned++;
677 if (++iterations > 128) {
678 if (ustime() > endtime) {
679 serverAssert(ri.key_len==sizeof(last))((ri.key_len==sizeof(last))?(void)0 : (_serverAssert("ri.key_len==sizeof(last)"
,"defrag.c",679),__builtin_unreachable()))
;
680 memcpy(last,ri.key,ri.key_len);
681 raxStop(&ri);
682 return 1;
683 }
684 iterations = 0;
685 }
686 }
687 raxStop(&ri);
688 *cursor = 0;
689 return 0;
690}
691
692/* optional callback used defrag each rax element (not including the element pointer itself) */
693typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
694
695/* defrag radix tree including:
696 * 1) rax struct
697 * 2) rax nodes
698 * 3) rax entry data (only if defrag_data is specified)
699 * 4) call a callback per element, and allow the callback to return a new pointer for the element */
700long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
701 long defragged = 0;
702 raxIterator ri;
703 rax* rax;
704 if ((rax = activeDefragAlloc(*raxref)))
705 defragged++, *raxref = rax;
706 rax = *raxref;
707 raxStart(&ri,rax);
708 ri.node_cb = defragRaxNode;
709 defragRaxNode(&rax->head);
710 raxSeek(&ri,"^",NULL((void*)0),0);
711 while (raxNext(&ri)) {
712 void *newdata = NULL((void*)0);
713 if (element_cb)
714 newdata = element_cb(&ri, element_cb_data, &defragged);
715 if (defrag_data && !newdata)
716 newdata = activeDefragAlloc(ri.data);
717 if (newdata)
718 raxSetData(ri.node, ri.data=newdata), defragged++;
719 }
720 raxStop(&ri);
721 return defragged;
722}
723
724typedef struct {
725 streamCG *cg;
726 streamConsumer *c;
727} PendingEntryContext;
728
729void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
730 UNUSED(defragged)((void) defragged);
731 PendingEntryContext *ctx = privdata;
732 streamNACK *nack = ri->data, *newnack;
733 nack->consumer = ctx->c; /* update nack pointer to consumer */
734 newnack = activeDefragAlloc(nack);
735 if (newnack) {
736 /* update consumer group pointer to the nack */
737 void *prev;
738 raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
739 serverAssert(prev==nack)((prev==nack)?(void)0 : (_serverAssert("prev==nack","defrag.c"
,739),__builtin_unreachable()))
;
740 /* note: we don't increment 'defragged' that's done by the caller */
741 }
742 return newnack;
743}
744
745void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
746 streamConsumer *c = ri->data;
747 streamCG *cg = privdata;
748 void *newc = activeDefragAlloc(c);
749 if (newc) {
750 /* note: we don't increment 'defragged' that's done by the caller */
751 c = newc;
752 }
753 sds newsds = activeDefragSds(c->name);
754 if (newsds)
755 (*defragged)++, c->name = newsds;
756 if (c->pel) {
757 PendingEntryContext pel_ctx = {cg, c};
758 *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
759 }
760 return newc; /* returns NULL if c was not defragged */
761}
762
763void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
764 streamCG *cg = ri->data;
765 UNUSED(privdata)((void) privdata);
766 if (cg->consumers)
767 *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
768 if (cg->pel)
769 *defragged += defragRadixTree(&cg->pel, 0, NULL((void*)0), NULL((void*)0));
770 return NULL((void*)0);
771}
772
773long defragStream(redisDb *db, dictEntry *kde) {
774 long defragged = 0;
775 robj *ob = dictGetVal(kde)((kde)->v.val);
776 serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM)((ob->type == 6 && ob->encoding == 10)?(void)0 :
(_serverAssert("ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM"
,"defrag.c",776),__builtin_unreachable()))
;
777 stream *s = ob->ptr, *news;
778
779 /* handle the main struct */
780 if ((news = activeDefragAlloc(s)))
781 defragged++, ob->ptr = s = news;
782
783 if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
784 rax *newrax = activeDefragAlloc(s->rax);
785 if (newrax)
786 defragged++, s->rax = newrax;
787 defragLater(db, kde);
788 } else
789 defragged += defragRadixTree(&s->rax, 1, NULL((void*)0), NULL((void*)0));
790
791 if (s->cgroups)
792 defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL((void*)0));
793 return defragged;
794}
795
796/* Defrag a module key. This is either done immediately or scheduled
797 * for later. Returns then number of pointers defragged.
798 */
799long defragModule(redisDb *db, dictEntry *kde) {
800 robj *obj = dictGetVal(kde)((kde)->v.val);
801 serverAssert(obj->type == OBJ_MODULE)((obj->type == 5)?(void)0 : (_serverAssert("obj->type == OBJ_MODULE"
,"defrag.c",801),__builtin_unreachable()))
;
802 long defragged = 0;
803
804 if (!moduleDefragValue(dictGetKey(kde)((kde)->key), obj, &defragged))
805 defragLater(db, kde);
806
807 return defragged;
808}
809
810/* for each key we scan in the main dict, this function will attempt to defrag
811 * all the various pointers it has. Returns a stat of how many pointers were
812 * moved. */
813long defragKey(redisDb *db, dictEntry *de) {
814 sds keysds = dictGetKey(de)((de)->key);
815 robj *newob, *ob;
816 unsigned char *newzl;
817 long defragged = 0;
818 sds newsds;
819
820 /* Try to defrag the key name. */
821 newsds = activeDefragSds(keysds);
822 if (newsds)
823 defragged++, de->key = newsds;
824 if (dictSize(db->expires)((db->expires)->ht[0].used+(db->expires)->ht[1].used
)
) {
825 /* Dirty code:
826 * I can't search in db->expires for that key after i already released
827 * the pointer it holds it won't be able to do the string compare */
828 uint64_t hash = dictGetHash(db->dict, de->key);
829 replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
830 }
831
832 /* Try to defrag robj and / or string value. */
833 ob = dictGetVal(de)((de)->v.val);
834 if ((newob = activeDefragStringOb(ob, &defragged))) {
835 de->v.val = newob;
836 ob = newob;
837 }
838
839 if (ob->type == OBJ_STRING0) {
840 /* Already handled in activeDefragStringOb. */
841 } else if (ob->type == OBJ_LIST1) {
842 if (ob->encoding == OBJ_ENCODING_QUICKLIST9) {
843 defragged += defragQuicklist(db, de);
844 } else if (ob->encoding == OBJ_ENCODING_ZIPLIST5) {
845 if ((newzl = activeDefragAlloc(ob->ptr)))
846 defragged++, ob->ptr = newzl;
847 } else {
848 serverPanic("Unknown list encoding")_serverPanic("defrag.c",848,"Unknown list encoding"),__builtin_unreachable
()
;
849 }
850 } else if (ob->type == OBJ_SET2) {
851 if (ob->encoding == OBJ_ENCODING_HT2) {
852 defragged += defragSet(db, de);
853 } else if (ob->encoding == OBJ_ENCODING_INTSET6) {
854 intset *newis, *is = ob->ptr;
855 if ((newis = activeDefragAlloc(is)))
856 defragged++, ob->ptr = newis;
857 } else {
858 serverPanic("Unknown set encoding")_serverPanic("defrag.c",858,"Unknown set encoding"),__builtin_unreachable
()
;
859 }
860 } else if (ob->type == OBJ_ZSET3) {
861 if (ob->encoding == OBJ_ENCODING_ZIPLIST5) {
862 if ((newzl = activeDefragAlloc(ob->ptr)))
863 defragged++, ob->ptr = newzl;
864 } else if (ob->encoding == OBJ_ENCODING_SKIPLIST7) {
865 defragged += defragZsetSkiplist(db, de);
866 } else {
867 serverPanic("Unknown sorted set encoding")_serverPanic("defrag.c",867,"Unknown sorted set encoding"),__builtin_unreachable
()
;
868 }
869 } else if (ob->type == OBJ_HASH4) {
870 if (ob->encoding == OBJ_ENCODING_ZIPLIST5) {
871 if ((newzl = activeDefragAlloc(ob->ptr)))
872 defragged++, ob->ptr = newzl;
873 } else if (ob->encoding == OBJ_ENCODING_HT2) {
874 defragged += defragHash(db, de);
875 } else {
876 serverPanic("Unknown hash encoding")_serverPanic("defrag.c",876,"Unknown hash encoding"),__builtin_unreachable
()
;
877 }
878 } else if (ob->type == OBJ_STREAM6) {
879 defragged += defragStream(db, de);
880 } else if (ob->type == OBJ_MODULE5) {
881 defragged += defragModule(db, de);
882 } else {
883 serverPanic("Unknown object type")_serverPanic("defrag.c",883,"Unknown object type"),__builtin_unreachable
()
;
884 }
885 return defragged;
886}
887
888/* Defrag scan callback for the main db dictionary. */
889void defragScanCallback(void *privdata, const dictEntry *de) {
890 long defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
891 server.stat_active_defrag_hits += defragged;
892 if(defragged)
893 server.stat_active_defrag_key_hits++;
894 else
895 server.stat_active_defrag_key_misses++;
896 server.stat_active_defrag_scanned++;
897}
898
899/* Defrag scan callback for each hash table bucket,
900 * used in order to defrag the dictEntry allocations. */
901void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
902 UNUSED(privdata)((void) privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
903 while(*bucketref) {
904 dictEntry *de = *bucketref, *newde;
905 if ((newde = activeDefragAlloc(de))) {
906 *bucketref = newde;
907 }
908 bucketref = &(*bucketref)->next;
909 }
910}
911
912/* Utility function to get the fragmentation ratio from jemalloc.
913 * It is critical to do that by comparing only heap maps that belong to
914 * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
915 * fragmentation ratio in order to decide if a defrag action should be taken
916 * or not, a false detection can cause the defragmenter to waste a lot of CPU
917 * without the possibility of getting any results. */
918float getAllocatorFragmentation(size_t *out_frag_bytes) {
919 size_t resident, active, allocated;
920 zmalloc_get_allocator_info(&allocated, &active, &resident);
921 float frag_pct = ((float)active / allocated)*100 - 100;
922 size_t frag_bytes = active - allocated;
923 float rss_pct = ((float)resident / allocated)*100 - 100;
924 size_t rss_bytes = resident - allocated;
925 if(out_frag_bytes)
926 *out_frag_bytes = frag_bytes;
927 serverLog(LL_DEBUG0,
928 "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)",
929 allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes);
930 return frag_pct;
931}
932
933/* We may need to defrag other globals, one small allocation can hold a full allocator run.
934 * so although small, it is still important to defrag these */
935long defragOtherGlobals() {
936 long defragged = 0;
937
938 /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
939 * but we assume most of these are short lived, we only need to defrag allocations
940 * that remain static for a long time */
941 defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB2);
942 defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL0);
943 defragged += moduleDefragGlobals();
944 return defragged;
945}
946
947/* returns 0 more work may or may not be needed (see non-zero cursor),
948 * and 1 if time is up and more work is needed. */
949int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
950 if (de) {
951 robj *ob = dictGetVal(de)((de)->v.val);
952 if (ob->type == OBJ_LIST1) {
953 return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits);
954 } else if (ob->type == OBJ_SET2) {
955 server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
956 } else if (ob->type == OBJ_ZSET3) {
957 server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
958 } else if (ob->type == OBJ_HASH4) {
959 server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
960 } else if (ob->type == OBJ_STREAM6) {
961 return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
962 } else if (ob->type == OBJ_MODULE5) {
963 return moduleLateDefrag(dictGetKey(de)((de)->key), ob, cursor, endtime, &server.stat_active_defrag_hits);
964 } else {
965 *cursor = 0; /* object type may have changed since we schedule it for later */
966 }
967 } else {
968 *cursor = 0; /* object may have been deleted already */
969 }
970 return 0;
971}
972
973/* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */
974static sds defrag_later_current_key = NULL((void*)0);
975static unsigned long defrag_later_cursor = 0;
976
977/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
978int defragLaterStep(redisDb *db, long long endtime) {
979 unsigned int iterations = 0;
980 unsigned long long prev_defragged = server.stat_active_defrag_hits;
981 unsigned long long prev_scanned = server.stat_active_defrag_scanned;
982 long long key_defragged;
983
984 do {
985 /* if we're not continuing a scan from the last call or loop, start a new one */
986 if (!defrag_later_cursor) {
987 listNode *head = listFirst(db->defrag_later)((db->defrag_later)->head);
988
989 /* Move on to next key */
990 if (defrag_later_current_key) {
991 serverAssert(defrag_later_current_key == head->value)((defrag_later_current_key == head->value)?(void)0 : (_serverAssert
("defrag_later_current_key == head->value","defrag.c",991)
,__builtin_unreachable()))
;
992 listDelNode(db->defrag_later, head);
993 defrag_later_cursor = 0;
994 defrag_later_current_key = NULL((void*)0);
995 }
996
997 /* stop if we reached the last one. */
998 head = listFirst(db->defrag_later)((db->defrag_later)->head);
999 if (!head)
1000 return 0;
1001
1002 /* start a new key */
1003 defrag_later_current_key = head->value;
1004 defrag_later_cursor = 0;
1005 }
1006
1007 /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
1008 dictEntry *de = dictFind(db->dict, defrag_later_current_key);
1009 key_defragged = server.stat_active_defrag_hits;
1010 do {
1011 int quit = 0;
1012 if (defragLaterItem(de, &defrag_later_cursor, endtime))
1013 quit = 1; /* time is up, we didn't finish all the work */
1014
1015 /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
1016 * (if we have a lot of pointers in one hash bucket, or rehashing),
1017 * check if we reached the time limit. */
1018 if (quit || (++iterations > 16 ||
1019 server.stat_active_defrag_hits - prev_defragged > 512 ||
1020 server.stat_active_defrag_scanned - prev_scanned > 64)) {
1021 if (quit || ustime() > endtime) {
1022 if(key_defragged != server.stat_active_defrag_hits)
1023 server.stat_active_defrag_key_hits++;
1024 else
1025 server.stat_active_defrag_key_misses++;
1026 return 1;
1027 }
1028 iterations = 0;
1029 prev_defragged = server.stat_active_defrag_hits;
1030 prev_scanned = server.stat_active_defrag_scanned;
1031 }
1032 } while(defrag_later_cursor);
1033 if(key_defragged != server.stat_active_defrag_hits)
1034 server.stat_active_defrag_key_hits++;
1035 else
1036 server.stat_active_defrag_key_misses++;
1037 } while(1);
1038}
1039
1040#define INTERPOLATE(x, x1, x2, y1, y2)( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
1041#define LIMIT(y, min, max)((y)<(min)? min: ((y)>(max)? max: (y))) ((y)<(min)? min: ((y)>(max)? max: (y)))
1042
1043/* decide if defrag is needed, and at what CPU effort to invest in it */
1044void computeDefragCycles() {
1045 size_t frag_bytes;
1046 float frag_pct = getAllocatorFragmentation(&frag_bytes);
1047 /* If we're not already running, and below the threshold, exit. */
1048 if (!server.active_defrag_running) {
1049 if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
1050 return;
1051 }
1052
1053 /* Calculate the adaptive aggressiveness of the defrag */
1054 int cpu_pct = INTERPOLATE(frag_pct,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower
)) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min
)) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower
)) )
1055 server.active_defrag_threshold_lower,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower
)) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min
)) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower
)) )
1056 server.active_defrag_threshold_upper,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower
)) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min
)) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower
)) )
1057 server.active_defrag_cycle_min,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower
)) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min
)) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower
)) )
1058 server.active_defrag_cycle_max)( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower
)) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min
)) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower
)) )
;
1059 cpu_pct = LIMIT(cpu_pct,((cpu_pct)<(server.active_defrag_cycle_min)? server.active_defrag_cycle_min
: ((cpu_pct)>(server.active_defrag_cycle_max)? server.active_defrag_cycle_max
: (cpu_pct)))
1060 server.active_defrag_cycle_min,((cpu_pct)<(server.active_defrag_cycle_min)? server.active_defrag_cycle_min
: ((cpu_pct)>(server.active_defrag_cycle_max)? server.active_defrag_cycle_max
: (cpu_pct)))
1061 server.active_defrag_cycle_max)((cpu_pct)<(server.active_defrag_cycle_min)? server.active_defrag_cycle_min
: ((cpu_pct)>(server.active_defrag_cycle_max)? server.active_defrag_cycle_max
: (cpu_pct)))
;
1062 /* We allow increasing the aggressiveness during a scan, but don't
1063 * reduce it. */
1064 if (!server.active_defrag_running ||
1065 cpu_pct > server.active_defrag_running)
1066 {
1067 server.active_defrag_running = cpu_pct;
1068 serverLog(LL_VERBOSE1,
1069 "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
1070 frag_pct, frag_bytes, cpu_pct);
1071 }
1072}
1073
1074/* Perform incremental defragmentation work from the serverCron.
1075 * This works in a similar way to activeExpireCycle, in the sense that
1076 * we do incremental work across calls. */
1077void activeDefragCycle(void) {
1078 static int current_db = -1;
1079 static unsigned long cursor = 0;
1080 static redisDb *db = NULL((void*)0);
1081 static long long start_scan, start_stat;
1082 unsigned int iterations = 0;
1083 unsigned long long prev_defragged = server.stat_active_defrag_hits;
1084 unsigned long long prev_scanned = server.stat_active_defrag_scanned;
1085 long long start, timelimit, endtime;
1086 mstime_t latency;
1087 int quit = 0;
1088
1089 if (!server.active_defrag_enabled) {
1090 if (server.active_defrag_running) {
1091 /* if active defrag was disabled mid-run, start from fresh next time. */
1092 server.active_defrag_running = 0;
1093 if (db)
1094 listEmpty(db->defrag_later);
1095 defrag_later_current_key = NULL((void*)0);
1096 defrag_later_cursor = 0;
1097 current_db = -1;
1098 cursor = 0;
1099 db = NULL((void*)0);
1100 }
1101 return;
1102 }
1103
1104 if (hasActiveChildProcess())
1105 return; /* Defragging memory while there's a fork will just do damage. */
1106
1107 /* Once a second, check if the fragmentation justfies starting a scan
1108 * or making it more aggressive. */
1109 run_with_period(1000)if ((1000 <= 1000/server.hz) || !(server.cronloops%((1000)
/(1000/server.hz))))
{
1110 computeDefragCycles();
1111 }
1112 if (!server.active_defrag_running)
1113 return;
1114
1115 /* See activeExpireCycle for how timelimit is handled. */
1116 start = ustime();
1117 timelimit = 1000000*server.active_defrag_running/server.hz/100;
1118 if (timelimit <= 0) timelimit = 1;
1119 endtime = start + timelimit;
1120 latencyStartMonitor(latency)if (server.latency_monitor_threshold) { latency = mstime(); }
else { latency = 0; }
;
1121
1122 do {
1123 /* if we're not continuing a scan from the last call or loop, start a new one */
1124 if (!cursor) {
1125 /* finish any leftovers from previous db before moving to the next one */
1126 if (db && defragLaterStep(db, endtime)) {
1127 quit = 1; /* time is up, we didn't finish all the work */
Value stored to 'quit' is never read
1128 break; /* this will exit the function and we'll continue on the next cycle */
1129 }
1130
1131 /* Move on to next database, and stop if we reached the last one. */
1132 if (++current_db >= server.dbnum) {
1133 /* defrag other items not part of the db / keys */
1134 defragOtherGlobals();
1135
1136 long long now = ustime();
1137 size_t frag_bytes;
1138 float frag_pct = getAllocatorFragmentation(&frag_bytes);
1139 serverLog(LL_VERBOSE1,
1140 "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
1141 (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes);
1142
1143 start_scan = now;
1144 current_db = -1;
1145 cursor = 0;
1146 db = NULL((void*)0);
1147 server.active_defrag_running = 0;
1148
1149 computeDefragCycles(); /* if another scan is needed, start it right away */
1150 if (server.active_defrag_running != 0 && ustime() < endtime)
1151 continue;
1152 break;
1153 }
1154 else if (current_db==0) {
1155 /* Start a scan from the first database. */
1156 start_scan = ustime();
1157 start_stat = server.stat_active_defrag_hits;
1158 }
1159
1160 db = &server.db[current_db];
1161 cursor = 0;
1162 }
1163
1164 do {
1165 /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
1166 if (defragLaterStep(db, endtime)) {
1167 quit = 1; /* time is up, we didn't finish all the work */
1168 break; /* this will exit the function and we'll continue on the next cycle */
1169 }
1170
1171 cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
1172
1173 /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
1174 * (if we have a lot of pointers in one hash bucket or rehasing),
1175 * check if we reached the time limit.
1176 * But regardless, don't start a new db in this loop, this is because after
1177 * the last db we call defragOtherGlobals, which must be done in one cycle */
1178 if (!cursor || (++iterations > 16 ||
1179 server.stat_active_defrag_hits - prev_defragged > 512 ||
1180 server.stat_active_defrag_scanned - prev_scanned > 64)) {
1181 if (!cursor || ustime() > endtime) {
1182 quit = 1;
1183 break;
1184 }
1185 iterations = 0;
1186 prev_defragged = server.stat_active_defrag_hits;
1187 prev_scanned = server.stat_active_defrag_scanned;
1188 }
1189 } while(cursor && !quit);
1190 } while(!quit);
1191
1192 latencyEndMonitor(latency)if (server.latency_monitor_threshold) { latency = mstime() - latency
; }
;
1193 latencyAddSampleIfNeeded("active-defrag-cycle",latency)if (server.latency_monitor_threshold && (latency) >=
server.latency_monitor_threshold) latencyAddSample(("active-defrag-cycle"
),(latency));
;
1194}
1195
1196#else /* HAVE_DEFRAG */
1197
1198void activeDefragCycle(void) {
1199 /* Not implemented yet. */
1200}
1201
1202void *activeDefragAlloc(void *ptr) {
1203 UNUSED(ptr)((void) ptr);
1204 return NULL((void*)0);
1205}
1206
1207robj *activeDefragStringOb(robj *ob, long *defragged) {
1208 UNUSED(ob)((void) ob);
1209 UNUSED(defragged)((void) defragged);
1210 return NULL((void*)0);
1211}
1212
1213#endif