File: | src/defrag.c |
Warning: | line 1127, column 17 Value stored to 'quit' is never read |
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
1 | /* |
2 | * Active memory defragmentation |
3 | * Try to find key / value allocations that need to be re-allocated in order |
4 | * to reduce external fragmentation. |
5 | * We do that by scanning the keyspace and for each pointer we have, we can try to |
6 | * ask the allocator if moving it to a new address will help reduce fragmentation. |
7 | * |
8 | * Copyright (c) 2020, Redis Labs, Inc |
9 | * All rights reserved. |
10 | * |
11 | * Redistribution and use in source and binary forms, with or without |
12 | * modification, are permitted provided that the following conditions are met: |
13 | * |
14 | * * Redistributions of source code must retain the above copyright notice, |
15 | * this list of conditions and the following disclaimer. |
16 | * * Redistributions in binary form must reproduce the above copyright |
17 | * notice, this list of conditions and the following disclaimer in the |
18 | * documentation and/or other materials provided with the distribution. |
19 | * * Neither the name of Redis nor the names of its contributors may be used |
20 | * to endorse or promote products derived from this software without |
21 | * specific prior written permission. |
22 | * |
23 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
24 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
25 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
26 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
27 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
28 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
29 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
30 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
31 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
32 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
33 | * POSSIBILITY OF SUCH DAMAGE. |
34 | */ |
35 | |
36 | #include "server.h" |
37 | #include <time.h> |
38 | #include <assert.h> |
39 | #include <stddef.h> |
40 | |
41 | #ifdef HAVE_DEFRAG |
42 | |
43 | /* this method was added to jemalloc in order to help us understand which |
44 | * pointers are worthwhile moving and which aren't */ |
45 | int je_get_defrag_hint(void* ptr); |
46 | |
47 | /* forward declarations*/ |
48 | void defragDictBucketCallback(void *privdata, dictEntry **bucketref); |
49 | dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); |
50 | |
51 | /* Defrag helper for generic allocations. |
52 | * |
53 | * returns NULL in case the allocation wasn't moved. |
54 | * when it returns a non-null value, the old pointer was already released |
55 | * and should NOT be accessed. */ |
56 | void* activeDefragAlloc(void *ptr) { |
57 | size_t size; |
58 | void *newptr; |
59 | if(!je_get_defrag_hint(ptr)) { |
60 | server.stat_active_defrag_misses++; |
61 | return NULL((void*)0); |
62 | } |
63 | /* move this allocation to a new allocation. |
64 | * make sure not to use the thread cache. so that we don't get back the same |
65 | * pointers we try to free */ |
66 | size = zmalloc_size(ptr)je_malloc_usable_size(ptr); |
67 | newptr = zmalloc_no_tcache(size); |
68 | memcpy(newptr, ptr, size); |
69 | zfree_no_tcache(ptr); |
70 | return newptr; |
71 | } |
72 | |
73 | /*Defrag helper for sds strings |
74 | * |
75 | * returns NULL in case the allocation wasn't moved. |
76 | * when it returns a non-null value, the old pointer was already released |
77 | * and should NOT be accessed. */ |
78 | sds activeDefragSds(sds sdsptr) { |
79 | void* ptr = sdsAllocPtr(sdsptr); |
80 | void* newptr = activeDefragAlloc(ptr); |
81 | if (newptr) { |
82 | size_t offset = sdsptr - (char*)ptr; |
83 | sdsptr = (char*)newptr + offset; |
84 | return sdsptr; |
85 | } |
86 | return NULL((void*)0); |
87 | } |
88 | |
89 | /* Defrag helper for robj and/or string objects |
90 | * |
91 | * returns NULL in case the allocation wasn't moved. |
92 | * when it returns a non-null value, the old pointer was already released |
93 | * and should NOT be accessed. */ |
94 | robj *activeDefragStringOb(robj* ob, long *defragged) { |
95 | robj *ret = NULL((void*)0); |
96 | if (ob->refcount!=1) |
97 | return NULL((void*)0); |
98 | |
99 | /* try to defrag robj (only if not an EMBSTR type (handled below). */ |
100 | if (ob->type!=OBJ_STRING0 || ob->encoding!=OBJ_ENCODING_EMBSTR8) { |
101 | if ((ret = activeDefragAlloc(ob))) { |
102 | ob = ret; |
103 | (*defragged)++; |
104 | } |
105 | } |
106 | |
107 | /* try to defrag string object */ |
108 | if (ob->type == OBJ_STRING0) { |
109 | if(ob->encoding==OBJ_ENCODING_RAW0) { |
110 | sds newsds = activeDefragSds((sds)ob->ptr); |
111 | if (newsds) { |
112 | ob->ptr = newsds; |
113 | (*defragged)++; |
114 | } |
115 | } else if (ob->encoding==OBJ_ENCODING_EMBSTR8) { |
116 | /* The sds is embedded in the object allocation, calculate the |
117 | * offset and update the pointer in the new allocation. */ |
118 | long ofs = (intptr_t)ob->ptr - (intptr_t)ob; |
119 | if ((ret = activeDefragAlloc(ob))) { |
120 | ret->ptr = (void*)((intptr_t)ret + ofs); |
121 | (*defragged)++; |
122 | } |
123 | } else if (ob->encoding!=OBJ_ENCODING_INT1) { |
124 | serverPanic("Unknown string encoding")_serverPanic("defrag.c",124,"Unknown string encoding"),__builtin_unreachable (); |
125 | } |
126 | } |
127 | return ret; |
128 | } |
129 | |
130 | /* Defrag helper for dictEntries to be used during dict iteration (called on |
131 | * each step). Returns a stat of how many pointers were moved. */ |
132 | long dictIterDefragEntry(dictIterator *iter) { |
133 | /* This function is a little bit dirty since it messes with the internals |
134 | * of the dict and it's iterator, but the benefit is that it is very easy |
135 | * to use, and require no other changes in the dict. */ |
136 | long defragged = 0; |
137 | dictht *ht; |
138 | /* Handle the next entry (if there is one), and update the pointer in the |
139 | * current entry. */ |
140 | if (iter->nextEntry) { |
141 | dictEntry *newde = activeDefragAlloc(iter->nextEntry); |
142 | if (newde) { |
143 | defragged++; |
144 | iter->nextEntry = newde; |
145 | iter->entry->next = newde; |
146 | } |
147 | } |
148 | /* handle the case of the first entry in the hash bucket. */ |
149 | ht = &iter->d->ht[iter->table]; |
150 | if (ht->table[iter->index] == iter->entry) { |
151 | dictEntry *newde = activeDefragAlloc(iter->entry); |
152 | if (newde) { |
153 | iter->entry = newde; |
154 | ht->table[iter->index] = newde; |
155 | defragged++; |
156 | } |
157 | } |
158 | return defragged; |
159 | } |
160 | |
161 | /* Defrag helper for dict main allocations (dict struct, and hash tables). |
162 | * receives a pointer to the dict* and implicitly updates it when the dict |
163 | * struct itself was moved. Returns a stat of how many pointers were moved. */ |
164 | long dictDefragTables(dict* d) { |
165 | dictEntry **newtable; |
166 | long defragged = 0; |
167 | /* handle the first hash table */ |
168 | newtable = activeDefragAlloc(d->ht[0].table); |
169 | if (newtable) |
170 | defragged++, d->ht[0].table = newtable; |
171 | /* handle the second hash table */ |
172 | if (d->ht[1].table) { |
173 | newtable = activeDefragAlloc(d->ht[1].table); |
174 | if (newtable) |
175 | defragged++, d->ht[1].table = newtable; |
176 | } |
177 | return defragged; |
178 | } |
179 | |
180 | /* Internal function used by zslDefrag */ |
181 | void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { |
182 | int i; |
183 | for (i = 0; i < zsl->level; i++) { |
184 | if (update[i]->level[i].forward == oldnode) |
185 | update[i]->level[i].forward = newnode; |
186 | } |
187 | serverAssert(zsl->header!=oldnode)((zsl->header!=oldnode)?(void)0 : (_serverAssert("zsl->header!=oldnode" ,"defrag.c",187),__builtin_unreachable())); |
188 | if (newnode->level[0].forward) { |
189 | serverAssert(newnode->level[0].forward->backward==oldnode)((newnode->level[0].forward->backward==oldnode)?(void)0 : (_serverAssert("newnode->level[0].forward->backward==oldnode" ,"defrag.c",189),__builtin_unreachable())); |
190 | newnode->level[0].forward->backward = newnode; |
191 | } else { |
192 | serverAssert(zsl->tail==oldnode)((zsl->tail==oldnode)?(void)0 : (_serverAssert("zsl->tail==oldnode" ,"defrag.c",192),__builtin_unreachable())); |
193 | zsl->tail = newnode; |
194 | } |
195 | } |
196 | |
197 | /* Defrag helper for sorted set. |
198 | * Update the robj pointer, defrag the skiplist struct and return the new score |
199 | * reference. We may not access oldele pointer (not even the pointer stored in |
200 | * the skiplist), as it was already freed. Newele may be null, in which case we |
201 | * only need to defrag the skiplist, but not update the obj pointer. |
202 | * When return value is non-NULL, it is the score reference that must be updated |
203 | * in the dict record. */ |
204 | double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { |
205 | zskiplistNode *update[ZSKIPLIST_MAXLEVEL32], *x, *newx; |
206 | int i; |
207 | sds ele = newele? newele: oldele; |
208 | |
209 | /* find the skiplist node referring to the object that was moved, |
210 | * and all pointers that need to be updated if we'll end up moving the skiplist node. */ |
211 | x = zsl->header; |
212 | for (i = zsl->level-1; i >= 0; i--) { |
213 | while (x->level[i].forward && |
214 | x->level[i].forward->ele != oldele && /* make sure not to access the |
215 | ->obj pointer if it matches |
216 | oldele */ |
217 | (x->level[i].forward->score < score || |
218 | (x->level[i].forward->score == score && |
219 | sdscmp(x->level[i].forward->ele,ele) < 0))) |
220 | x = x->level[i].forward; |
221 | update[i] = x; |
222 | } |
223 | |
224 | /* update the robj pointer inside the skip list record. */ |
225 | x = x->level[0].forward; |
226 | serverAssert(x && score == x->score && x->ele==oldele)((x && score == x->score && x->ele==oldele )?(void)0 : (_serverAssert("x && score == x->score && x->ele==oldele" ,"defrag.c",226),__builtin_unreachable())); |
227 | if (newele) |
228 | x->ele = newele; |
229 | |
230 | /* try to defrag the skiplist record itself */ |
231 | newx = activeDefragAlloc(x); |
232 | if (newx) { |
233 | zslUpdateNode(zsl, x, newx, update); |
234 | return &newx->score; |
235 | } |
236 | return NULL((void*)0); |
237 | } |
238 | |
239 | /* Defrag helper for sorted set. |
240 | * Defrag a single dict entry key name, and corresponding skiplist struct */ |
241 | long activeDefragZsetEntry(zset *zs, dictEntry *de) { |
242 | sds newsds; |
243 | double* newscore; |
244 | long defragged = 0; |
245 | sds sdsele = dictGetKey(de)((de)->key); |
246 | if ((newsds = activeDefragSds(sdsele))) |
247 | defragged++, de->key = newsds; |
248 | newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de)((de)->v.val), sdsele, newsds); |
249 | if (newscore) { |
250 | dictSetVal(zs->dict, de, newscore)do { if ((zs->dict)->type->valDup) (de)->v.val = ( zs->dict)->type->valDup((zs->dict)->privdata, newscore ); else (de)->v.val = (newscore); } while(0); |
251 | defragged++; |
252 | } |
253 | return defragged; |
254 | } |
255 | |
256 | #define DEFRAG_SDS_DICT_NO_VAL0 0 |
257 | #define DEFRAG_SDS_DICT_VAL_IS_SDS1 1 |
258 | #define DEFRAG_SDS_DICT_VAL_IS_STROB2 2 |
259 | #define DEFRAG_SDS_DICT_VAL_VOID_PTR3 3 |
260 | |
261 | /* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ |
262 | long activeDefragSdsDict(dict* d, int val_type) { |
263 | dictIterator *di; |
264 | dictEntry *de; |
265 | long defragged = 0; |
266 | di = dictGetIterator(d); |
267 | while((de = dictNext(di)) != NULL((void*)0)) { |
268 | sds sdsele = dictGetKey(de)((de)->key), newsds; |
269 | if ((newsds = activeDefragSds(sdsele))) |
270 | de->key = newsds, defragged++; |
271 | /* defrag the value */ |
272 | if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS1) { |
273 | sdsele = dictGetVal(de)((de)->v.val); |
274 | if ((newsds = activeDefragSds(sdsele))) |
275 | de->v.val = newsds, defragged++; |
276 | } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB2) { |
277 | robj *newele, *ele = dictGetVal(de)((de)->v.val); |
278 | if ((newele = activeDefragStringOb(ele, &defragged))) |
279 | de->v.val = newele; |
280 | } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR3) { |
281 | void *newptr, *ptr = dictGetVal(de)((de)->v.val); |
282 | if ((newptr = activeDefragAlloc(ptr))) |
283 | de->v.val = newptr, defragged++; |
284 | } |
285 | defragged += dictIterDefragEntry(di); |
286 | } |
287 | dictReleaseIterator(di); |
288 | return defragged; |
289 | } |
290 | |
291 | /* Defrag a list of ptr, sds or robj string values */ |
292 | long activeDefragList(list *l, int val_type) { |
293 | long defragged = 0; |
294 | listNode *ln, *newln; |
295 | for (ln = l->head; ln; ln = ln->next) { |
296 | if ((newln = activeDefragAlloc(ln))) { |
297 | if (newln->prev) |
298 | newln->prev->next = newln; |
299 | else |
300 | l->head = newln; |
301 | if (newln->next) |
302 | newln->next->prev = newln; |
303 | else |
304 | l->tail = newln; |
305 | ln = newln; |
306 | defragged++; |
307 | } |
308 | if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS1) { |
309 | sds newsds, sdsele = ln->value; |
310 | if ((newsds = activeDefragSds(sdsele))) |
311 | ln->value = newsds, defragged++; |
312 | } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB2) { |
313 | robj *newele, *ele = ln->value; |
314 | if ((newele = activeDefragStringOb(ele, &defragged))) |
315 | ln->value = newele; |
316 | } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR3) { |
317 | void *newptr, *ptr = ln->value; |
318 | if ((newptr = activeDefragAlloc(ptr))) |
319 | ln->value = newptr, defragged++; |
320 | } |
321 | } |
322 | return defragged; |
323 | } |
324 | |
325 | /* Defrag a list of sds values and a dict with the same sds keys */ |
326 | long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { |
327 | long defragged = 0; |
328 | sds newsds, sdsele; |
329 | listNode *ln, *newln; |
330 | dictIterator *di; |
331 | dictEntry *de; |
332 | /* Defrag the list and it's sds values */ |
333 | for (ln = l->head; ln; ln = ln->next) { |
334 | if ((newln = activeDefragAlloc(ln))) { |
335 | if (newln->prev) |
336 | newln->prev->next = newln; |
337 | else |
338 | l->head = newln; |
339 | if (newln->next) |
340 | newln->next->prev = newln; |
341 | else |
342 | l->tail = newln; |
343 | ln = newln; |
344 | defragged++; |
345 | } |
346 | sdsele = ln->value; |
347 | if ((newsds = activeDefragSds(sdsele))) { |
348 | /* When defragging an sds value, we need to update the dict key */ |
349 | uint64_t hash = dictGetHash(d, newsds); |
350 | replaceSatelliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged); |
351 | ln->value = newsds; |
352 | defragged++; |
353 | } |
354 | } |
355 | |
356 | /* Defrag the dict values (keys were already handled) */ |
357 | di = dictGetIterator(d); |
358 | while((de = dictNext(di)) != NULL((void*)0)) { |
359 | if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS1) { |
360 | sds newsds, sdsele = dictGetVal(de)((de)->v.val); |
361 | if ((newsds = activeDefragSds(sdsele))) |
362 | de->v.val = newsds, defragged++; |
363 | } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB2) { |
364 | robj *newele, *ele = dictGetVal(de)((de)->v.val); |
365 | if ((newele = activeDefragStringOb(ele, &defragged))) |
366 | de->v.val = newele, defragged++; |
367 | } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR3) { |
368 | void *newptr, *ptr = dictGetVal(de)((de)->v.val); |
369 | if ((newptr = activeDefragAlloc(ptr))) |
370 | de->v.val = newptr, defragged++; |
371 | } |
372 | defragged += dictIterDefragEntry(di); |
373 | } |
374 | dictReleaseIterator(di); |
375 | |
376 | return defragged; |
377 | } |
378 | |
379 | /* Utility function that replaces an old key pointer in the dictionary with a |
380 | * new pointer. Additionally, we try to defrag the dictEntry in that dict. |
381 | * Oldkey mey be a dead pointer and should not be accessed (we get a |
382 | * pre-calculated hash value). Newkey may be null if the key pointer wasn't |
383 | * moved. Return value is the the dictEntry if found, or NULL if not found. |
384 | * NOTE: this is very ugly code, but it let's us avoid the complication of |
385 | * doing a scan on another dict. */ |
386 | dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { |
387 | dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); |
388 | if (deref) { |
389 | dictEntry *de = *deref; |
390 | dictEntry *newde = activeDefragAlloc(de); |
391 | if (newde) { |
392 | de = *deref = newde; |
393 | (*defragged)++; |
394 | } |
395 | if (newkey) |
396 | de->key = newkey; |
397 | return de; |
398 | } |
399 | return NULL((void*)0); |
400 | } |
401 | |
402 | long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { |
403 | quicklistNode *newnode, *node = *node_ref; |
404 | long defragged = 0; |
405 | unsigned char *newzl; |
406 | if ((newnode = activeDefragAlloc(node))) { |
407 | if (newnode->prev) |
408 | newnode->prev->next = newnode; |
409 | else |
410 | ql->head = newnode; |
411 | if (newnode->next) |
412 | newnode->next->prev = newnode; |
413 | else |
414 | ql->tail = newnode; |
415 | *node_ref = node = newnode; |
416 | defragged++; |
417 | } |
418 | if ((newzl = activeDefragAlloc(node->zl))) |
419 | defragged++, node->zl = newzl; |
420 | return defragged; |
421 | } |
422 | |
423 | long activeDefragQuickListNodes(quicklist *ql) { |
424 | quicklistNode *node = ql->head; |
425 | long defragged = 0; |
426 | while (node) { |
427 | defragged += activeDefragQuickListNode(ql, &node); |
428 | node = node->next; |
429 | } |
430 | return defragged; |
431 | } |
432 | |
433 | /* when the value has lots of elements, we want to handle it later and not as |
434 | * part of the main dictionary scan. this is needed in order to prevent latency |
435 | * spikes when handling large items */ |
436 | void defragLater(redisDb *db, dictEntry *kde) { |
437 | sds key = sdsdup(dictGetKey(kde)((kde)->key)); |
438 | listAddNodeTail(db->defrag_later, key); |
439 | } |
440 | |
441 | /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ |
442 | long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { |
443 | quicklist *ql = ob->ptr; |
444 | quicklistNode *node; |
445 | long iterations = 0; |
446 | int bookmark_failed = 0; |
447 | if (ob->type != OBJ_LIST1 || ob->encoding != OBJ_ENCODING_QUICKLIST9) |
448 | return 0; |
449 | |
450 | if (*cursor == 0) { |
451 | /* if cursor is 0, we start new iteration */ |
452 | node = ql->head; |
453 | } else { |
454 | node = quicklistBookmarkFind(ql, "_AD"); |
455 | if (!node) { |
456 | /* if the bookmark was deleted, it means we reached the end. */ |
457 | *cursor = 0; |
458 | return 0; |
459 | } |
460 | node = node->next; |
461 | } |
462 | |
463 | (*cursor)++; |
464 | while (node) { |
465 | (*defragged) += activeDefragQuickListNode(ql, &node); |
466 | server.stat_active_defrag_scanned++; |
467 | if (++iterations > 128 && !bookmark_failed) { |
468 | if (ustime() > endtime) { |
469 | if (!quicklistBookmarkCreate(&ql, "_AD", node)) { |
470 | bookmark_failed = 1; |
471 | } else { |
472 | ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */ |
473 | return 1; |
474 | } |
475 | } |
476 | iterations = 0; |
477 | } |
478 | node = node->next; |
479 | } |
480 | quicklistBookmarkDelete(ql, "_AD"); |
481 | *cursor = 0; |
482 | return bookmark_failed? 1: 0; |
483 | } |
484 | |
485 | typedef struct { |
486 | zset *zs; |
487 | long defragged; |
488 | } scanLaterZsetData; |
489 | |
490 | void scanLaterZsetCallback(void *privdata, const dictEntry *_de) { |
491 | dictEntry *de = (dictEntry*)_de; |
492 | scanLaterZsetData *data = privdata; |
493 | data->defragged += activeDefragZsetEntry(data->zs, de); |
494 | server.stat_active_defrag_scanned++; |
495 | } |
496 | |
497 | long scanLaterZset(robj *ob, unsigned long *cursor) { |
498 | if (ob->type != OBJ_ZSET3 || ob->encoding != OBJ_ENCODING_SKIPLIST7) |
499 | return 0; |
500 | zset *zs = (zset*)ob->ptr; |
501 | dict *d = zs->dict; |
502 | scanLaterZsetData data = {zs, 0}; |
503 | *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data); |
504 | return data.defragged; |
505 | } |
506 | |
507 | void scanLaterSetCallback(void *privdata, const dictEntry *_de) { |
508 | dictEntry *de = (dictEntry*)_de; |
509 | long *defragged = privdata; |
510 | sds sdsele = dictGetKey(de)((de)->key), newsds; |
511 | if ((newsds = activeDefragSds(sdsele))) |
512 | (*defragged)++, de->key = newsds; |
513 | server.stat_active_defrag_scanned++; |
514 | } |
515 | |
516 | long scanLaterSet(robj *ob, unsigned long *cursor) { |
517 | long defragged = 0; |
518 | if (ob->type != OBJ_SET2 || ob->encoding != OBJ_ENCODING_HT2) |
519 | return 0; |
520 | dict *d = ob->ptr; |
521 | *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged); |
522 | return defragged; |
523 | } |
524 | |
525 | void scanLaterHashCallback(void *privdata, const dictEntry *_de) { |
526 | dictEntry *de = (dictEntry*)_de; |
527 | long *defragged = privdata; |
528 | sds sdsele = dictGetKey(de)((de)->key), newsds; |
529 | if ((newsds = activeDefragSds(sdsele))) |
530 | (*defragged)++, de->key = newsds; |
531 | sdsele = dictGetVal(de)((de)->v.val); |
532 | if ((newsds = activeDefragSds(sdsele))) |
533 | (*defragged)++, de->v.val = newsds; |
534 | server.stat_active_defrag_scanned++; |
535 | } |
536 | |
537 | long scanLaterHash(robj *ob, unsigned long *cursor) { |
538 | long defragged = 0; |
539 | if (ob->type != OBJ_HASH4 || ob->encoding != OBJ_ENCODING_HT2) |
540 | return 0; |
541 | dict *d = ob->ptr; |
542 | *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged); |
543 | return defragged; |
544 | } |
545 | |
546 | long defragQuicklist(redisDb *db, dictEntry *kde) { |
547 | robj *ob = dictGetVal(kde)((kde)->v.val); |
548 | long defragged = 0; |
549 | quicklist *ql = ob->ptr, *newql; |
550 | serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST)((ob->type == 1 && ob->encoding == 9)?(void)0 : (_serverAssert("ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST" ,"defrag.c",550),__builtin_unreachable())); |
551 | if ((newql = activeDefragAlloc(ql))) |
552 | defragged++, ob->ptr = ql = newql; |
553 | if (ql->len > server.active_defrag_max_scan_fields) |
554 | defragLater(db, kde); |
555 | else |
556 | defragged += activeDefragQuickListNodes(ql); |
557 | return defragged; |
558 | } |
559 | |
560 | long defragZsetSkiplist(redisDb *db, dictEntry *kde) { |
561 | robj *ob = dictGetVal(kde)((kde)->v.val); |
562 | long defragged = 0; |
563 | zset *zs = (zset*)ob->ptr; |
564 | zset *newzs; |
565 | zskiplist *newzsl; |
566 | dict *newdict; |
567 | dictEntry *de; |
568 | struct zskiplistNode *newheader; |
569 | serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST)((ob->type == 3 && ob->encoding == 7)?(void)0 : (_serverAssert("ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST" ,"defrag.c",569),__builtin_unreachable())); |
570 | if ((newzs = activeDefragAlloc(zs))) |
571 | defragged++, ob->ptr = zs = newzs; |
572 | if ((newzsl = activeDefragAlloc(zs->zsl))) |
573 | defragged++, zs->zsl = newzsl; |
574 | if ((newheader = activeDefragAlloc(zs->zsl->header))) |
575 | defragged++, zs->zsl->header = newheader; |
576 | if (dictSize(zs->dict)((zs->dict)->ht[0].used+(zs->dict)->ht[1].used) > server.active_defrag_max_scan_fields) |
577 | defragLater(db, kde); |
578 | else { |
579 | dictIterator *di = dictGetIterator(zs->dict); |
580 | while((de = dictNext(di)) != NULL((void*)0)) { |
581 | defragged += activeDefragZsetEntry(zs, de); |
582 | } |
583 | dictReleaseIterator(di); |
584 | } |
585 | /* handle the dict struct */ |
586 | if ((newdict = activeDefragAlloc(zs->dict))) |
587 | defragged++, zs->dict = newdict; |
588 | /* defrag the dict tables */ |
589 | defragged += dictDefragTables(zs->dict); |
590 | return defragged; |
591 | } |
592 | |
593 | long defragHash(redisDb *db, dictEntry *kde) { |
594 | long defragged = 0; |
595 | robj *ob = dictGetVal(kde)((kde)->v.val); |
596 | dict *d, *newd; |
597 | serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT)((ob->type == 4 && ob->encoding == 2)?(void)0 : (_serverAssert("ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT" ,"defrag.c",597),__builtin_unreachable())); |
598 | d = ob->ptr; |
599 | if (dictSize(d)((d)->ht[0].used+(d)->ht[1].used) > server.active_defrag_max_scan_fields) |
600 | defragLater(db, kde); |
601 | else |
602 | defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS1); |
603 | /* handle the dict struct */ |
604 | if ((newd = activeDefragAlloc(ob->ptr))) |
605 | defragged++, ob->ptr = newd; |
606 | /* defrag the dict tables */ |
607 | defragged += dictDefragTables(ob->ptr); |
608 | return defragged; |
609 | } |
610 | |
611 | long defragSet(redisDb *db, dictEntry *kde) { |
612 | long defragged = 0; |
613 | robj *ob = dictGetVal(kde)((kde)->v.val); |
614 | dict *d, *newd; |
615 | serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT)((ob->type == 2 && ob->encoding == 2)?(void)0 : (_serverAssert("ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT" ,"defrag.c",615),__builtin_unreachable())); |
616 | d = ob->ptr; |
617 | if (dictSize(d)((d)->ht[0].used+(d)->ht[1].used) > server.active_defrag_max_scan_fields) |
618 | defragLater(db, kde); |
619 | else |
620 | defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL0); |
621 | /* handle the dict struct */ |
622 | if ((newd = activeDefragAlloc(ob->ptr))) |
623 | defragged++, ob->ptr = newd; |
624 | /* defrag the dict tables */ |
625 | defragged += dictDefragTables(ob->ptr); |
626 | return defragged; |
627 | } |
628 | |
629 | /* Defrag callback for radix tree iterator, called for each node, |
630 | * used in order to defrag the nodes allocations. */ |
631 | int defragRaxNode(raxNode **noderef) { |
632 | raxNode *newnode = activeDefragAlloc(*noderef); |
633 | if (newnode) { |
634 | *noderef = newnode; |
635 | return 1; |
636 | } |
637 | return 0; |
638 | } |
639 | |
640 | /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ |
641 | int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { |
642 | static unsigned char last[sizeof(streamID)]; |
643 | raxIterator ri; |
644 | long iterations = 0; |
645 | if (ob->type != OBJ_STREAM6 || ob->encoding != OBJ_ENCODING_STREAM10) { |
646 | *cursor = 0; |
647 | return 0; |
648 | } |
649 | |
650 | stream *s = ob->ptr; |
651 | raxStart(&ri,s->rax); |
652 | if (*cursor == 0) { |
653 | /* if cursor is 0, we start new iteration */ |
654 | defragRaxNode(&s->rax->head); |
655 | /* assign the iterator node callback before the seek, so that the |
656 | * initial nodes that are processed till the first item are covered */ |
657 | ri.node_cb = defragRaxNode; |
658 | raxSeek(&ri,"^",NULL((void*)0),0); |
659 | } else { |
660 | /* if cursor is non-zero, we seek to the static 'last' */ |
661 | if (!raxSeek(&ri,">", last, sizeof(last))) { |
662 | *cursor = 0; |
663 | raxStop(&ri); |
664 | return 0; |
665 | } |
666 | /* assign the iterator node callback after the seek, so that the |
667 | * initial nodes that are processed till now aren't covered */ |
668 | ri.node_cb = defragRaxNode; |
669 | } |
670 | |
671 | (*cursor)++; |
672 | while (raxNext(&ri)) { |
673 | void *newdata = activeDefragAlloc(ri.data); |
674 | if (newdata) |
675 | raxSetData(ri.node, ri.data=newdata), (*defragged)++; |
676 | server.stat_active_defrag_scanned++; |
677 | if (++iterations > 128) { |
678 | if (ustime() > endtime) { |
679 | serverAssert(ri.key_len==sizeof(last))((ri.key_len==sizeof(last))?(void)0 : (_serverAssert("ri.key_len==sizeof(last)" ,"defrag.c",679),__builtin_unreachable())); |
680 | memcpy(last,ri.key,ri.key_len); |
681 | raxStop(&ri); |
682 | return 1; |
683 | } |
684 | iterations = 0; |
685 | } |
686 | } |
687 | raxStop(&ri); |
688 | *cursor = 0; |
689 | return 0; |
690 | } |
691 | |
692 | /* optional callback used defrag each rax element (not including the element pointer itself) */ |
693 | typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged); |
694 | |
695 | /* defrag radix tree including: |
696 | * 1) rax struct |
697 | * 2) rax nodes |
698 | * 3) rax entry data (only if defrag_data is specified) |
699 | * 4) call a callback per element, and allow the callback to return a new pointer for the element */ |
700 | long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { |
701 | long defragged = 0; |
702 | raxIterator ri; |
703 | rax* rax; |
704 | if ((rax = activeDefragAlloc(*raxref))) |
705 | defragged++, *raxref = rax; |
706 | rax = *raxref; |
707 | raxStart(&ri,rax); |
708 | ri.node_cb = defragRaxNode; |
709 | defragRaxNode(&rax->head); |
710 | raxSeek(&ri,"^",NULL((void*)0),0); |
711 | while (raxNext(&ri)) { |
712 | void *newdata = NULL((void*)0); |
713 | if (element_cb) |
714 | newdata = element_cb(&ri, element_cb_data, &defragged); |
715 | if (defrag_data && !newdata) |
716 | newdata = activeDefragAlloc(ri.data); |
717 | if (newdata) |
718 | raxSetData(ri.node, ri.data=newdata), defragged++; |
719 | } |
720 | raxStop(&ri); |
721 | return defragged; |
722 | } |
723 | |
724 | typedef struct { |
725 | streamCG *cg; |
726 | streamConsumer *c; |
727 | } PendingEntryContext; |
728 | |
729 | void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) { |
730 | UNUSED(defragged)((void) defragged); |
731 | PendingEntryContext *ctx = privdata; |
732 | streamNACK *nack = ri->data, *newnack; |
733 | nack->consumer = ctx->c; /* update nack pointer to consumer */ |
734 | newnack = activeDefragAlloc(nack); |
735 | if (newnack) { |
736 | /* update consumer group pointer to the nack */ |
737 | void *prev; |
738 | raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); |
739 | serverAssert(prev==nack)((prev==nack)?(void)0 : (_serverAssert("prev==nack","defrag.c" ,739),__builtin_unreachable())); |
740 | /* note: we don't increment 'defragged' that's done by the caller */ |
741 | } |
742 | return newnack; |
743 | } |
744 | |
745 | void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) { |
746 | streamConsumer *c = ri->data; |
747 | streamCG *cg = privdata; |
748 | void *newc = activeDefragAlloc(c); |
749 | if (newc) { |
750 | /* note: we don't increment 'defragged' that's done by the caller */ |
751 | c = newc; |
752 | } |
753 | sds newsds = activeDefragSds(c->name); |
754 | if (newsds) |
755 | (*defragged)++, c->name = newsds; |
756 | if (c->pel) { |
757 | PendingEntryContext pel_ctx = {cg, c}; |
758 | *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); |
759 | } |
760 | return newc; /* returns NULL if c was not defragged */ |
761 | } |
762 | |
763 | void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) { |
764 | streamCG *cg = ri->data; |
765 | UNUSED(privdata)((void) privdata); |
766 | if (cg->consumers) |
767 | *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); |
768 | if (cg->pel) |
769 | *defragged += defragRadixTree(&cg->pel, 0, NULL((void*)0), NULL((void*)0)); |
770 | return NULL((void*)0); |
771 | } |
772 | |
773 | long defragStream(redisDb *db, dictEntry *kde) { |
774 | long defragged = 0; |
775 | robj *ob = dictGetVal(kde)((kde)->v.val); |
776 | serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM)((ob->type == 6 && ob->encoding == 10)?(void)0 : (_serverAssert("ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM" ,"defrag.c",776),__builtin_unreachable())); |
777 | stream *s = ob->ptr, *news; |
778 | |
779 | /* handle the main struct */ |
780 | if ((news = activeDefragAlloc(s))) |
781 | defragged++, ob->ptr = s = news; |
782 | |
783 | if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { |
784 | rax *newrax = activeDefragAlloc(s->rax); |
785 | if (newrax) |
786 | defragged++, s->rax = newrax; |
787 | defragLater(db, kde); |
788 | } else |
789 | defragged += defragRadixTree(&s->rax, 1, NULL((void*)0), NULL((void*)0)); |
790 | |
791 | if (s->cgroups) |
792 | defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL((void*)0)); |
793 | return defragged; |
794 | } |
795 | |
796 | /* Defrag a module key. This is either done immediately or scheduled |
797 | * for later. Returns then number of pointers defragged. |
798 | */ |
799 | long defragModule(redisDb *db, dictEntry *kde) { |
800 | robj *obj = dictGetVal(kde)((kde)->v.val); |
801 | serverAssert(obj->type == OBJ_MODULE)((obj->type == 5)?(void)0 : (_serverAssert("obj->type == OBJ_MODULE" ,"defrag.c",801),__builtin_unreachable())); |
802 | long defragged = 0; |
803 | |
804 | if (!moduleDefragValue(dictGetKey(kde)((kde)->key), obj, &defragged)) |
805 | defragLater(db, kde); |
806 | |
807 | return defragged; |
808 | } |
809 | |
810 | /* for each key we scan in the main dict, this function will attempt to defrag |
811 | * all the various pointers it has. Returns a stat of how many pointers were |
812 | * moved. */ |
813 | long defragKey(redisDb *db, dictEntry *de) { |
814 | sds keysds = dictGetKey(de)((de)->key); |
815 | robj *newob, *ob; |
816 | unsigned char *newzl; |
817 | long defragged = 0; |
818 | sds newsds; |
819 | |
820 | /* Try to defrag the key name. */ |
821 | newsds = activeDefragSds(keysds); |
822 | if (newsds) |
823 | defragged++, de->key = newsds; |
824 | if (dictSize(db->expires)((db->expires)->ht[0].used+(db->expires)->ht[1].used )) { |
825 | /* Dirty code: |
826 | * I can't search in db->expires for that key after i already released |
827 | * the pointer it holds it won't be able to do the string compare */ |
828 | uint64_t hash = dictGetHash(db->dict, de->key); |
829 | replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); |
830 | } |
831 | |
832 | /* Try to defrag robj and / or string value. */ |
833 | ob = dictGetVal(de)((de)->v.val); |
834 | if ((newob = activeDefragStringOb(ob, &defragged))) { |
835 | de->v.val = newob; |
836 | ob = newob; |
837 | } |
838 | |
839 | if (ob->type == OBJ_STRING0) { |
840 | /* Already handled in activeDefragStringOb. */ |
841 | } else if (ob->type == OBJ_LIST1) { |
842 | if (ob->encoding == OBJ_ENCODING_QUICKLIST9) { |
843 | defragged += defragQuicklist(db, de); |
844 | } else if (ob->encoding == OBJ_ENCODING_ZIPLIST5) { |
845 | if ((newzl = activeDefragAlloc(ob->ptr))) |
846 | defragged++, ob->ptr = newzl; |
847 | } else { |
848 | serverPanic("Unknown list encoding")_serverPanic("defrag.c",848,"Unknown list encoding"),__builtin_unreachable (); |
849 | } |
850 | } else if (ob->type == OBJ_SET2) { |
851 | if (ob->encoding == OBJ_ENCODING_HT2) { |
852 | defragged += defragSet(db, de); |
853 | } else if (ob->encoding == OBJ_ENCODING_INTSET6) { |
854 | intset *newis, *is = ob->ptr; |
855 | if ((newis = activeDefragAlloc(is))) |
856 | defragged++, ob->ptr = newis; |
857 | } else { |
858 | serverPanic("Unknown set encoding")_serverPanic("defrag.c",858,"Unknown set encoding"),__builtin_unreachable (); |
859 | } |
860 | } else if (ob->type == OBJ_ZSET3) { |
861 | if (ob->encoding == OBJ_ENCODING_ZIPLIST5) { |
862 | if ((newzl = activeDefragAlloc(ob->ptr))) |
863 | defragged++, ob->ptr = newzl; |
864 | } else if (ob->encoding == OBJ_ENCODING_SKIPLIST7) { |
865 | defragged += defragZsetSkiplist(db, de); |
866 | } else { |
867 | serverPanic("Unknown sorted set encoding")_serverPanic("defrag.c",867,"Unknown sorted set encoding"),__builtin_unreachable (); |
868 | } |
869 | } else if (ob->type == OBJ_HASH4) { |
870 | if (ob->encoding == OBJ_ENCODING_ZIPLIST5) { |
871 | if ((newzl = activeDefragAlloc(ob->ptr))) |
872 | defragged++, ob->ptr = newzl; |
873 | } else if (ob->encoding == OBJ_ENCODING_HT2) { |
874 | defragged += defragHash(db, de); |
875 | } else { |
876 | serverPanic("Unknown hash encoding")_serverPanic("defrag.c",876,"Unknown hash encoding"),__builtin_unreachable (); |
877 | } |
878 | } else if (ob->type == OBJ_STREAM6) { |
879 | defragged += defragStream(db, de); |
880 | } else if (ob->type == OBJ_MODULE5) { |
881 | defragged += defragModule(db, de); |
882 | } else { |
883 | serverPanic("Unknown object type")_serverPanic("defrag.c",883,"Unknown object type"),__builtin_unreachable (); |
884 | } |
885 | return defragged; |
886 | } |
887 | |
888 | /* Defrag scan callback for the main db dictionary. */ |
889 | void defragScanCallback(void *privdata, const dictEntry *de) { |
890 | long defragged = defragKey((redisDb*)privdata, (dictEntry*)de); |
891 | server.stat_active_defrag_hits += defragged; |
892 | if(defragged) |
893 | server.stat_active_defrag_key_hits++; |
894 | else |
895 | server.stat_active_defrag_key_misses++; |
896 | server.stat_active_defrag_scanned++; |
897 | } |
898 | |
899 | /* Defrag scan callback for each hash table bucket, |
900 | * used in order to defrag the dictEntry allocations. */ |
901 | void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { |
902 | UNUSED(privdata)((void) privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ |
903 | while(*bucketref) { |
904 | dictEntry *de = *bucketref, *newde; |
905 | if ((newde = activeDefragAlloc(de))) { |
906 | *bucketref = newde; |
907 | } |
908 | bucketref = &(*bucketref)->next; |
909 | } |
910 | } |
911 | |
912 | /* Utility function to get the fragmentation ratio from jemalloc. |
913 | * It is critical to do that by comparing only heap maps that belong to |
914 | * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this |
915 | * fragmentation ratio in order to decide if a defrag action should be taken |
916 | * or not, a false detection can cause the defragmenter to waste a lot of CPU |
917 | * without the possibility of getting any results. */ |
918 | float getAllocatorFragmentation(size_t *out_frag_bytes) { |
919 | size_t resident, active, allocated; |
920 | zmalloc_get_allocator_info(&allocated, &active, &resident); |
921 | float frag_pct = ((float)active / allocated)*100 - 100; |
922 | size_t frag_bytes = active - allocated; |
923 | float rss_pct = ((float)resident / allocated)*100 - 100; |
924 | size_t rss_bytes = resident - allocated; |
925 | if(out_frag_bytes) |
926 | *out_frag_bytes = frag_bytes; |
927 | serverLog(LL_DEBUG0, |
928 | "allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)", |
929 | allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes); |
930 | return frag_pct; |
931 | } |
932 | |
933 | /* We may need to defrag other globals, one small allocation can hold a full allocator run. |
934 | * so although small, it is still important to defrag these */ |
935 | long defragOtherGlobals() { |
936 | long defragged = 0; |
937 | |
938 | /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. |
939 | * but we assume most of these are short lived, we only need to defrag allocations |
940 | * that remain static for a long time */ |
941 | defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB2); |
942 | defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL0); |
943 | defragged += moduleDefragGlobals(); |
944 | return defragged; |
945 | } |
946 | |
947 | /* returns 0 more work may or may not be needed (see non-zero cursor), |
948 | * and 1 if time is up and more work is needed. */ |
949 | int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { |
950 | if (de) { |
951 | robj *ob = dictGetVal(de)((de)->v.val); |
952 | if (ob->type == OBJ_LIST1) { |
953 | return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits); |
954 | } else if (ob->type == OBJ_SET2) { |
955 | server.stat_active_defrag_hits += scanLaterSet(ob, cursor); |
956 | } else if (ob->type == OBJ_ZSET3) { |
957 | server.stat_active_defrag_hits += scanLaterZset(ob, cursor); |
958 | } else if (ob->type == OBJ_HASH4) { |
959 | server.stat_active_defrag_hits += scanLaterHash(ob, cursor); |
960 | } else if (ob->type == OBJ_STREAM6) { |
961 | return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); |
962 | } else if (ob->type == OBJ_MODULE5) { |
963 | return moduleLateDefrag(dictGetKey(de)((de)->key), ob, cursor, endtime, &server.stat_active_defrag_hits); |
964 | } else { |
965 | *cursor = 0; /* object type may have changed since we schedule it for later */ |
966 | } |
967 | } else { |
968 | *cursor = 0; /* object may have been deleted already */ |
969 | } |
970 | return 0; |
971 | } |
972 | |
973 | /* static variables serving defragLaterStep to continue scanning a key from were we stopped last time. */ |
974 | static sds defrag_later_current_key = NULL((void*)0); |
975 | static unsigned long defrag_later_cursor = 0; |
976 | |
977 | /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ |
978 | int defragLaterStep(redisDb *db, long long endtime) { |
979 | unsigned int iterations = 0; |
980 | unsigned long long prev_defragged = server.stat_active_defrag_hits; |
981 | unsigned long long prev_scanned = server.stat_active_defrag_scanned; |
982 | long long key_defragged; |
983 | |
984 | do { |
985 | /* if we're not continuing a scan from the last call or loop, start a new one */ |
986 | if (!defrag_later_cursor) { |
987 | listNode *head = listFirst(db->defrag_later)((db->defrag_later)->head); |
988 | |
989 | /* Move on to next key */ |
990 | if (defrag_later_current_key) { |
991 | serverAssert(defrag_later_current_key == head->value)((defrag_later_current_key == head->value)?(void)0 : (_serverAssert ("defrag_later_current_key == head->value","defrag.c",991) ,__builtin_unreachable())); |
992 | listDelNode(db->defrag_later, head); |
993 | defrag_later_cursor = 0; |
994 | defrag_later_current_key = NULL((void*)0); |
995 | } |
996 | |
997 | /* stop if we reached the last one. */ |
998 | head = listFirst(db->defrag_later)((db->defrag_later)->head); |
999 | if (!head) |
1000 | return 0; |
1001 | |
1002 | /* start a new key */ |
1003 | defrag_later_current_key = head->value; |
1004 | defrag_later_cursor = 0; |
1005 | } |
1006 | |
1007 | /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ |
1008 | dictEntry *de = dictFind(db->dict, defrag_later_current_key); |
1009 | key_defragged = server.stat_active_defrag_hits; |
1010 | do { |
1011 | int quit = 0; |
1012 | if (defragLaterItem(de, &defrag_later_cursor, endtime)) |
1013 | quit = 1; /* time is up, we didn't finish all the work */ |
1014 | |
1015 | /* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields |
1016 | * (if we have a lot of pointers in one hash bucket, or rehashing), |
1017 | * check if we reached the time limit. */ |
1018 | if (quit || (++iterations > 16 || |
1019 | server.stat_active_defrag_hits - prev_defragged > 512 || |
1020 | server.stat_active_defrag_scanned - prev_scanned > 64)) { |
1021 | if (quit || ustime() > endtime) { |
1022 | if(key_defragged != server.stat_active_defrag_hits) |
1023 | server.stat_active_defrag_key_hits++; |
1024 | else |
1025 | server.stat_active_defrag_key_misses++; |
1026 | return 1; |
1027 | } |
1028 | iterations = 0; |
1029 | prev_defragged = server.stat_active_defrag_hits; |
1030 | prev_scanned = server.stat_active_defrag_scanned; |
1031 | } |
1032 | } while(defrag_later_cursor); |
1033 | if(key_defragged != server.stat_active_defrag_hits) |
1034 | server.stat_active_defrag_key_hits++; |
1035 | else |
1036 | server.stat_active_defrag_key_misses++; |
1037 | } while(1); |
1038 | } |
1039 | |
1040 | #define INTERPOLATE(x, x1, x2, y1, y2)( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) |
1041 | #define LIMIT(y, min, max)((y)<(min)? min: ((y)>(max)? max: (y))) ((y)<(min)? min: ((y)>(max)? max: (y))) |
1042 | |
1043 | /* decide if defrag is needed, and at what CPU effort to invest in it */ |
1044 | void computeDefragCycles() { |
1045 | size_t frag_bytes; |
1046 | float frag_pct = getAllocatorFragmentation(&frag_bytes); |
1047 | /* If we're not already running, and below the threshold, exit. */ |
1048 | if (!server.active_defrag_running) { |
1049 | if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) |
1050 | return; |
1051 | } |
1052 | |
1053 | /* Calculate the adaptive aggressiveness of the defrag */ |
1054 | int cpu_pct = INTERPOLATE(frag_pct,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower )) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min )) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower )) ) |
1055 | server.active_defrag_threshold_lower,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower )) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min )) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower )) ) |
1056 | server.active_defrag_threshold_upper,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower )) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min )) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower )) ) |
1057 | server.active_defrag_cycle_min,( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower )) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min )) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower )) ) |
1058 | server.active_defrag_cycle_max)( (server.active_defrag_cycle_min) + ((frag_pct)-(server.active_defrag_threshold_lower )) * ((server.active_defrag_cycle_max)-(server.active_defrag_cycle_min )) / ((server.active_defrag_threshold_upper)-(server.active_defrag_threshold_lower )) ); |
1059 | cpu_pct = LIMIT(cpu_pct,((cpu_pct)<(server.active_defrag_cycle_min)? server.active_defrag_cycle_min : ((cpu_pct)>(server.active_defrag_cycle_max)? server.active_defrag_cycle_max : (cpu_pct))) |
1060 | server.active_defrag_cycle_min,((cpu_pct)<(server.active_defrag_cycle_min)? server.active_defrag_cycle_min : ((cpu_pct)>(server.active_defrag_cycle_max)? server.active_defrag_cycle_max : (cpu_pct))) |
1061 | server.active_defrag_cycle_max)((cpu_pct)<(server.active_defrag_cycle_min)? server.active_defrag_cycle_min : ((cpu_pct)>(server.active_defrag_cycle_max)? server.active_defrag_cycle_max : (cpu_pct))); |
1062 | /* We allow increasing the aggressiveness during a scan, but don't |
1063 | * reduce it. */ |
1064 | if (!server.active_defrag_running || |
1065 | cpu_pct > server.active_defrag_running) |
1066 | { |
1067 | server.active_defrag_running = cpu_pct; |
1068 | serverLog(LL_VERBOSE1, |
1069 | "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", |
1070 | frag_pct, frag_bytes, cpu_pct); |
1071 | } |
1072 | } |
1073 | |
1074 | /* Perform incremental defragmentation work from the serverCron. |
1075 | * This works in a similar way to activeExpireCycle, in the sense that |
1076 | * we do incremental work across calls. */ |
1077 | void activeDefragCycle(void) { |
1078 | static int current_db = -1; |
1079 | static unsigned long cursor = 0; |
1080 | static redisDb *db = NULL((void*)0); |
1081 | static long long start_scan, start_stat; |
1082 | unsigned int iterations = 0; |
1083 | unsigned long long prev_defragged = server.stat_active_defrag_hits; |
1084 | unsigned long long prev_scanned = server.stat_active_defrag_scanned; |
1085 | long long start, timelimit, endtime; |
1086 | mstime_t latency; |
1087 | int quit = 0; |
1088 | |
1089 | if (!server.active_defrag_enabled) { |
1090 | if (server.active_defrag_running) { |
1091 | /* if active defrag was disabled mid-run, start from fresh next time. */ |
1092 | server.active_defrag_running = 0; |
1093 | if (db) |
1094 | listEmpty(db->defrag_later); |
1095 | defrag_later_current_key = NULL((void*)0); |
1096 | defrag_later_cursor = 0; |
1097 | current_db = -1; |
1098 | cursor = 0; |
1099 | db = NULL((void*)0); |
1100 | } |
1101 | return; |
1102 | } |
1103 | |
1104 | if (hasActiveChildProcess()) |
1105 | return; /* Defragging memory while there's a fork will just do damage. */ |
1106 | |
1107 | /* Once a second, check if the fragmentation justfies starting a scan |
1108 | * or making it more aggressive. */ |
1109 | run_with_period(1000)if ((1000 <= 1000/server.hz) || !(server.cronloops%((1000) /(1000/server.hz)))) { |
1110 | computeDefragCycles(); |
1111 | } |
1112 | if (!server.active_defrag_running) |
1113 | return; |
1114 | |
1115 | /* See activeExpireCycle for how timelimit is handled. */ |
1116 | start = ustime(); |
1117 | timelimit = 1000000*server.active_defrag_running/server.hz/100; |
1118 | if (timelimit <= 0) timelimit = 1; |
1119 | endtime = start + timelimit; |
1120 | latencyStartMonitor(latency)if (server.latency_monitor_threshold) { latency = mstime(); } else { latency = 0; }; |
1121 | |
1122 | do { |
1123 | /* if we're not continuing a scan from the last call or loop, start a new one */ |
1124 | if (!cursor) { |
1125 | /* finish any leftovers from previous db before moving to the next one */ |
1126 | if (db && defragLaterStep(db, endtime)) { |
1127 | quit = 1; /* time is up, we didn't finish all the work */ |
Value stored to 'quit' is never read | |
1128 | break; /* this will exit the function and we'll continue on the next cycle */ |
1129 | } |
1130 | |
1131 | /* Move on to next database, and stop if we reached the last one. */ |
1132 | if (++current_db >= server.dbnum) { |
1133 | /* defrag other items not part of the db / keys */ |
1134 | defragOtherGlobals(); |
1135 | |
1136 | long long now = ustime(); |
1137 | size_t frag_bytes; |
1138 | float frag_pct = getAllocatorFragmentation(&frag_bytes); |
1139 | serverLog(LL_VERBOSE1, |
1140 | "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", |
1141 | (int)((now - start_scan)/1000), (int)(server.stat_active_defrag_hits - start_stat), frag_pct, frag_bytes); |
1142 | |
1143 | start_scan = now; |
1144 | current_db = -1; |
1145 | cursor = 0; |
1146 | db = NULL((void*)0); |
1147 | server.active_defrag_running = 0; |
1148 | |
1149 | computeDefragCycles(); /* if another scan is needed, start it right away */ |
1150 | if (server.active_defrag_running != 0 && ustime() < endtime) |
1151 | continue; |
1152 | break; |
1153 | } |
1154 | else if (current_db==0) { |
1155 | /* Start a scan from the first database. */ |
1156 | start_scan = ustime(); |
1157 | start_stat = server.stat_active_defrag_hits; |
1158 | } |
1159 | |
1160 | db = &server.db[current_db]; |
1161 | cursor = 0; |
1162 | } |
1163 | |
1164 | do { |
1165 | /* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */ |
1166 | if (defragLaterStep(db, endtime)) { |
1167 | quit = 1; /* time is up, we didn't finish all the work */ |
1168 | break; /* this will exit the function and we'll continue on the next cycle */ |
1169 | } |
1170 | |
1171 | cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); |
1172 | |
1173 | /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys |
1174 | * (if we have a lot of pointers in one hash bucket or rehasing), |
1175 | * check if we reached the time limit. |
1176 | * But regardless, don't start a new db in this loop, this is because after |
1177 | * the last db we call defragOtherGlobals, which must be done in one cycle */ |
1178 | if (!cursor || (++iterations > 16 || |
1179 | server.stat_active_defrag_hits - prev_defragged > 512 || |
1180 | server.stat_active_defrag_scanned - prev_scanned > 64)) { |
1181 | if (!cursor || ustime() > endtime) { |
1182 | quit = 1; |
1183 | break; |
1184 | } |
1185 | iterations = 0; |
1186 | prev_defragged = server.stat_active_defrag_hits; |
1187 | prev_scanned = server.stat_active_defrag_scanned; |
1188 | } |
1189 | } while(cursor && !quit); |
1190 | } while(!quit); |
1191 | |
1192 | latencyEndMonitor(latency)if (server.latency_monitor_threshold) { latency = mstime() - latency ; }; |
1193 | latencyAddSampleIfNeeded("active-defrag-cycle",latency)if (server.latency_monitor_threshold && (latency) >= server.latency_monitor_threshold) latencyAddSample(("active-defrag-cycle" ),(latency));; |
1194 | } |
1195 | |
1196 | #else /* HAVE_DEFRAG */ |
1197 | |
1198 | void activeDefragCycle(void) { |
1199 | /* Not implemented yet. */ |
1200 | } |
1201 | |
1202 | void *activeDefragAlloc(void *ptr) { |
1203 | UNUSED(ptr)((void) ptr); |
1204 | return NULL((void*)0); |
1205 | } |
1206 | |
1207 | robj *activeDefragStringOb(robj *ob, long *defragged) { |
1208 | UNUSED(ob)((void) ob); |
1209 | UNUSED(defragged)((void) defragged); |
1210 | return NULL((void*)0); |
1211 | } |
1212 | |
1213 | #endif |