从源码看Redis —— 关于主动过期那点事
前言
前段时间刷脉脉的时候,刷到了这样一个面试题:为什么大量Key过期的时候,会影响整个redis服务的延迟
如果你是面试者,你会怎么回答?
如果你只知道redis是一个单线程reactor模型,你可能会回答:因为redis是单线程模型,所以是因为处理删除key的事件耗时过长而导致其他事件排队引起延迟增加
这个回答能让面试官满意吗?可能还不够,他也许会继续问你:那redis主动处理过期key的时机是什么时候?处理的逻辑是什么?最坏情况下会增加多少延迟?如何判断是因为过期key而导致的延迟抖动?如何解决这个问题?
同样的,当时看到脉脉帖子上的回答,我也觉得不够完整,所以索性带着这些问题翻了下redis的源码。
源码解析
这里我们主要看主动过期部分的源码,对于惰性删除、命令调用到过期key引发的删除这些暂时不讨论
既然是看过期,我们翻到expire.c的activeExpireCycle函数
来看下它到底做了什么
主动过期函数注释
看源码,函数的注释是非常重要,它能够帮忙传递开发者的想法,帮助我们更快的了解其用意
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* Every expire cycle tests multiple databases: the next call will start
* again from the next db. No more than CRON_DBS_PER_CALL databases are
* tested at every iteration.
*
* The function can perform more or less work, depending on the "type"
* argument. It can execute a "fast cycle" or a "slow cycle". The slow
* cycle is the main way we collect expired cycles: this happens with
* the "server.hz" frequency (usually 10 hertz).
*
* However the slow cycle can exit for timeout, since it used too much time.
* For this reason the function is also invoked to perform a fast cycle
* at every event loop cycle, in the beforeSleep() function. The fast cycle
* will try to perform less work, but will do it much more often.
*
* The following are the details of the two expire cycles and their stop
* conditions:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than ACTIVE_EXPIRE_CYCLE_FAST_DURATION
* microseconds, and is not repeated again before the same amount of time.
* The cycle will also refuse to run at all if the latest slow cycle did not
* terminate because of a time limit condition.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
* fast cycle, the check of every database is interrupted once the number
* of already expired keys in the database is estimated to be lower than
* a given percentage, in order to avoid doing too much work to gain too
* little memory.
*
* The configured expire "effort" will modify the baseline parameters in
* order to do more work in both the fast and slow expire cycles.
*/
这段注释很长,大概的大意是
- 它拥有一个自适应的算法,在过期key比较少的时候将使用很少的CPU周期,否则会进行激进的删除,从而减少内存使用
- 每次过期循环都会遍历多个DB,且从会从上次遍历的下一个DB开始,每次循环变量不超过
CRON_DBS_PER_CALL
个DB- 注:
#define CRON_DBS_PER_CALL 16
- 注:
- 函数拥有2个模式,即慢循环和快循环,根据传入的type来执行不同的模式
- 慢循环是过期key的删除主力,也就是说慢循环模式会删除更多的过期key,慢循环是在每次
server.hz
执行的,server.hz
通常为10- 注:
server.hz
为redis参数,含义为内部定时任务每秒执行的次数,默认为10,可以认为是100ms执行一次
- 注:
- 慢循环拥有超时控制,超过执行时间会退出
- 因此有个快循环模式,在事件循环的
beforeSleep
处被调用,用于补充删除,删除更少但是调用更频繁 - 如果为快速循环模式,将执行一个不超过
ACTIVE_EXPIRE_CYCLE_FAST_DURATION
时间的快速过期循环- 注:如果同时有慢速过期循环执行或者在和上一次循环在同时间段,则不会执行
- 如果执行中遇到DB过期Key的百分比较低,则会打断循环
- 如果为慢速循环模式,时间限制是由
ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC
的值决定的一个REDIS_HZ
周期的百分比 - effort 参数用于调整一些基线参数,以便更加激进的进行过期回收
主动过期函数源码
#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. 每个DB遍历的key数量*/
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. 快速循环限制,单位微妙 */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. 最大CPU使用百分比*/
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
we do extra efforts. 触发额外回收的过期key占百分比*/
void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
* is 10. effort参数 默认1 最大10 */
unsigned long
effort = server.active_expire_effort-1, /* Rescale from 0 to 9. 1-10转为0-9 */
// 根据effort计算上面的一些参数实际值
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
/* This function has some global state in order to continue the work
* incrementally across calls. 全局状态*/
static unsigned int current_db = 0; /* Next DB to test. 下一个DB*/
static int timelimit_exit = 0; /* Time limit hit in previous call? 上次是否超时*/
static long long last_fast_cycle = 0; /* When last fast cycle ran. 最后一个快速周期*/
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. 当client暂停时,回收也不执行*/
if (checkClientPauseTimeoutAndReturnIfPaused()) return;
// 快速回收模式额外判断
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
// 如果上次超时了,这次就不执行快速循环,当然如果过期key占比过高则继续执行
if (!timelimit_exit &&
server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
// 在上次执行的时间段内
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start;
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time.
循环的DB数,默认为CRON_DBS_PER_CALL设置值,如果实际DB数少于CRON_DBS_PER_CALL或者上次执行超时了,则根据实际DB数遍历;
因为如果上次超时了则认为有大量key过期,为了减轻内存压力,则会更加激进的回收 */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function.
根据回收执行时间占比来限制CPU使用率 */
timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. 统计数据,记录抽样数和过期数,用于计算过期key占比*/
long total_sampled = 0;
long total_expired = 0;
/* Sanity: There can't be any pending commands to propagate when
* we're in cron 此时不能在进行主从数据同步等操作,所以需要设置标志位*/
serverAssert(server.also_propagate.numops == 0);
server.core_propagates = 1;
server.propagate_no_multi = 1;
// 回收循环,只要未超时,遍历dbs_per_call次
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort".
过期并计算过期key占比,如果超出前面计算的标定值,则会一直执行到超时或者过期key占比下降*/
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. 目标db过期字典没有数据,终止遍历*/
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. 字典使用率过低的情况下也终止遍历*/
if (slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
sampled = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > config_keys_per_loop)
num = config_keys_per_loop;
/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;
// 遍历所有的bucket,直到采样的key数达到标定值或者遍历完成
while (sampled < num && checked_buckets < max_buckets) {
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->expires)) break;
unsigned long idx = db->expires_cursor;
idx &= DICTHT_SIZE_MASK(db->expires->ht_size_exp[table]);
dictEntry *de = db->expires->ht_table[table][idx];
long long ttl;
/* Scan the current bucket of the current table. */
checked_buckets++;
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
de = de->next;
ttl = dictGetSignedIntegerVal(e)-now;
// 尝试过期当前key,成功就计数
if (activeExpireCycleTryExpire(db,e,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
ttl_sum += ttl;
ttl_samples++;
}
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;
/* Update the average TTL stats for this database.辅助计算DB的平均ttl值,更新回DB */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. 超时限制*/
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* We don't repeat the cycle for the current database if there are
* an acceptable amount of stale keys (logically expired but yet
* not reclaimed). 过期key超过占比或者没有遍历数据的情况下,重复遍历DB */
} while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
}
serverAssert(server.core_propagates); /* This function should not be re-entrant */
/* Propagate all DELs 传播同步删除的结果,并回复标志位*/
propagatePendingCommands();
server.core_propagates = 0;
server.propagate_no_multi = 0;
// 执行时间统计
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. 根据过期抽样结果占比的5%来更新一个未过期key的预估值*/
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}
调用
Fast调用
该调用在server.c
的beforeSleep
函数下,可以看到
当启用主动过期,且不是从节点的情况下,每次调用beforeSleep
都会调用一次Fast回收
void beforeSleep(struct aeEventLoop *eventLoop) {
/*省略N行*/
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/*省略N行*/
}
那么beforeSleep
是哪里调用的呢
同样的在server.c的initServer函数下
可以找到一个对ae.c
里aeSetBeforeSleepProc
函数的一个调用
aeSetBeforeSleepProc(server.el,beforeSleep);
该函数将beforeSleep
赋值给事件循环aeEventLoop
结构体的beforesleep
指针
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
然后我们在aeProcessEvents
事件处理函数中找到这样的调用
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
/*省略N行*/
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/*省略N行*/
/* Check time events */
if (flags & AE_TIME_EVENTS) // 执行时间事件 之后会有相关
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
aeProcessEvents
是哪里调用呢?答案是基本所有的操作都会调用到哦,总所周知redis
是单线程模型,基本所有的操作都是转化为文件事件和时间实现来执行的,而aeProcessEvents
就是所有事件的处理入口
主要的调用处有这2个地方
主事件循环
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
阻塞处理
networking.c的processEventsWhileBlocked函数
,在aof
、rdb
、脚本中断、module等地方都会调用到,有兴趣的读者可以进一步探究
/* This function is called by Redis in order to process a few events from
* time to time while blocked into some not interruptible operation.
* This allows to reply to clients with the -LOADING error while loading the
* data set at startup or after a full resynchronization with the master
* and so forth.
*
* It calls the event loop in order to process a few events. Specifically we
* try to call the event loop 4 times as long as we receive acknowledge that
* some event was processed, in order to go forward with the accept, read,
* write, close sequence needed to serve a client.
*
* The function returns the total number of events processed. */
void processEventsWhileBlocked(void) {
int iterations = 4; /* See the function top-comment. */
/* Update our cached time since it is used to create and update the last
* interaction time with clients and for other important things. */
updateCachedTime(0);
/* Note: when we are processing events while blocked (for instance during
* busy Lua scripts), we set a global flag. When such flag is set, we
* avoid handling the read part of clients using threaded I/O.
* See https://github.com/redis/redis/issues/6988 for more info.
* Note that there could be cases of nested calls to this function,
* specifically on a busy script during async_loading rdb, and scripts
* that came from AOF. */
ProcessingEventsWhileBlocked++;
while (iterations--) {
long long startval = server.events_processed_while_blocked;
long long ae_events = aeProcessEvents(server.el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that server.events_processed_while_blocked will also get
* incremented by callbacks called by the event loop handlers. */
server.events_processed_while_blocked += ae_events;
long long events = server.events_processed_while_blocked - startval;
if (!events) break;
}
whileBlockedCron();
ProcessingEventsWhileBlocked--;
serverAssert(ProcessingEventsWhileBlocked >= 0);
}
Slow调用
我们可以在server.c的databasesCron函数
找到对应的调用
当前节点为主节点的时候,会以Slow模式调用
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/*以下省略*/
}
同样的,我们找到调用方serverCron
这里注释告诉我们,这里的cron函数每秒执行server.hz
,即默认每秒执行10次,100ms执行一次
- 注:
server.hz
是一个配置,默认为10
/* This is our timer interrupt, called server.hz times per second.
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
*
* - Active expired keys collection (it is also performed in a lazy way on
* lookup).
* - Software watchdog.
* - Update some statistic.
* - Incremental rehashing of the DBs hash tables.
* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
* - Clients timeout of different kinds.
* - Replication reconnection.
* - Many more...
*
* Everything directly called here will be called server.hz times per second,
* so in order to throttle execution of things we want to do less frequently
* a macro is used: run_with_period(milliseconds) { .... }
*/
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/*省略N行*/
/* Handle background operations on Redis databases. */
databasesCron();
/*省略N行*/
return 1000/server.hz; // 控制执行频率,即每秒server.hz次,具体使用见下方
}
然后在initServer
函数下,可以找到时间事件的注册点
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
那么redis
是怎么控制时间事件的执行间隔的呢
我们回到ae.c
来看processTimeEvents函数
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
if (te->when <= now) { // 如果预期执行时间<=当前时间
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData); // 事件回调返回值
te->refcount--;
processed++;
now = getMonotonicUs();
if (retval != AE_NOMORE) {
te->when = now + retval * 1000; // 根据事件回调返回的值来计算下一次执行时间
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
总结
整个redis
主动过期源码的流程解读就到这里,现在我们回头来解答前面提到的那几个问题
Q:redis
主动处理过期key的时机是什么时候?
A:1. 每次执行事件的时候都会尝试调用快速循环模式;2. 内部时间事件每秒执行server.hz
次慢速循环模式,即默认每100ms执行一次
Q: 主动处理的逻辑是什么?
A:通常情况下循环遍历不超过16个DB,每个DB取20个过期key进行处理,并根据遍历和过期的情况,计算过期key占比,如果占比超过阈值,则持续回收至超时或者占比低于阈值。
Q: 最坏情况下会增加多少延迟?
A:在默认的参数下,由于快速循环的存在,每次事件最多会增加1ms的延迟;由于慢循环的存在,每100ms最多会增加25ms的延迟
Q:如何判断是因为过期key而导致的延迟抖动?
A:主动过期操作执行时间会统计至stat_expire_cycle_time_used
可以由此进行判定
Q:如何解决这个问题?
A:避免大量key同时过期,如将key的过期时间在一定范围内进行随机化处理等。