从源码看Redis —— 关于主动过期那点事

前言

前段时间刷脉脉的时候,刷到了这样一个面试题:为什么大量Key过期的时候,会影响整个redis服务的延迟

如果你是面试者,你会怎么回答?

如果你只知道redis是一个单线程reactor模型,你可能会回答:因为redis是单线程模型,所以是因为处理删除key的事件耗时过长而导致其他事件排队引起延迟增加

这个回答能让面试官满意吗?可能还不够,他也许会继续问你:那redis主动处理过期key的时机是什么时候?处理的逻辑是什么?最坏情况下会增加多少延迟?如何判断是因为过期key而导致的延迟抖动?如何解决这个问题?

同样的,当时看到脉脉帖子上的回答,我也觉得不够完整,所以索性带着这些问题翻了下redis的源码。

源码解析

这里我们主要看主动过期部分的源码,对于惰性删除、命令调用到过期key引发的删除这些暂时不讨论

既然是看过期,我们翻到expire.c的activeExpireCycle函数来看下它到底做了什么

主动过期函数注释

看源码,函数的注释是非常重要,它能够帮忙传递开发者的想法,帮助我们更快的了解其用意

/* Try to expire a few timed out keys. The algorithm used is adaptive and
 * will use few CPU cycles if there are few expiring keys, otherwise
 * it will get more aggressive to avoid that too much memory is used by
 * keys that can be removed from the keyspace.
 *
 * Every expire cycle tests multiple databases: the next call will start
 * again from the next db. No more than CRON_DBS_PER_CALL databases are
 * tested at every iteration.
 *
 * The function can perform more or less work, depending on the "type"
 * argument. It can execute a "fast cycle" or a "slow cycle". The slow
 * cycle is the main way we collect expired cycles: this happens with
 * the "server.hz" frequency (usually 10 hertz).
 *
 * However the slow cycle can exit for timeout, since it used too much time.
 * For this reason the function is also invoked to perform a fast cycle
 * at every event loop cycle, in the beforeSleep() function. The fast cycle
 * will try to perform less work, but will do it much more often.
 *
 * The following are the details of the two expire cycles and their stop
 * conditions:
 *
 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
 * "fast" expire cycle that takes no longer than ACTIVE_EXPIRE_CYCLE_FAST_DURATION
 * microseconds, and is not repeated again before the same amount of time.
 * The cycle will also refuse to run at all if the latest slow cycle did not
 * terminate because of a time limit condition.
 *
 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
 * executed, where the time limit is a percentage of the REDIS_HZ period
 * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
 * fast cycle, the check of every database is interrupted once the number
 * of already expired keys in the database is estimated to be lower than
 * a given percentage, in order to avoid doing too much work to gain too
 * little memory.
 *
 * The configured expire "effort" will modify the baseline parameters in
 * order to do more work in both the fast and slow expire cycles.
 */

这段注释很长,大概的大意是

  • 它拥有一个自适应的算法,在过期key比较少的时候将使用很少的CPU周期,否则会进行激进的删除,从而减少内存使用
  • 每次过期循环都会遍历多个DB,且从会从上次遍历的下一个DB开始,每次循环变量不超过CRON_DBS_PER_CALL个DB
    • 注:#define CRON_DBS_PER_CALL 16
  • 函数拥有2个模式,即慢循环和快循环,根据传入的type来执行不同的模式
  • 慢循环是过期key的删除主力,也就是说慢循环模式会删除更多的过期key,慢循环是在每次server.hz执行的,server.hz通常为10
    • 注:server.hz为redis参数,含义为内部定时任务每秒执行的次数,默认为10,可以认为是100ms执行一次
  • 慢循环拥有超时控制,超过执行时间会退出
  • 因此有个快循环模式,在事件循环的beforeSleep处被调用,用于补充删除,删除更少但是调用更频繁
  • 如果为快速循环模式,将执行一个不超过ACTIVE_EXPIRE_CYCLE_FAST_DURATION时间的快速过期循环
    • 注:如果同时有慢速过期循环执行或者在和上一次循环在同时间段,则不会执行
    • 如果执行中遇到DB过期Key的百分比较低,则会打断循环
  • 如果为慢速循环模式,时间限制是由ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC的值决定的一个REDIS_HZ周期的百分比
  • effort 参数用于调整一些基线参数,以便更加激进的进行过期回收

主动过期函数源码

#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop.  每个DB遍历的key数量*/
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. 快速循环限制,单位微妙 */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. 最大CPU使用百分比*/
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
                                                   we do extra efforts. 触发额外回收的过期key占百分比*/

void activeExpireCycle(int type) {
    /* Adjust the running parameters according to the configured expire
     * effort. The default effort is 1, and the maximum configurable effort
     * is 10. effort参数 默认1 最大10 */
    unsigned long
    effort = server.active_expire_effort-1, /* Rescale from 0 to 9. 1-10转为0-9 */
    // 根据effort计算上面的一些参数实际值
    config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
                           ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
    config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
                                 ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
    config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
                                  2*effort,
    config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
                                    effort;

    /* This function has some global state in order to continue the work
     * incrementally across calls. 全局状态*/
    static unsigned int current_db = 0; /* Next DB to test. 下一个DB*/
    static int timelimit_exit = 0;      /* Time limit hit in previous call? 上次是否超时*/
    static long long last_fast_cycle = 0; /* When last fast cycle ran. 最后一个快速周期*/

    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL;
    long long start = ustime(), timelimit, elapsed;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. 当client暂停时,回收也不执行*/
    if (checkClientPauseTimeoutAndReturnIfPaused()) return;
    
    // 快速回收模式额外判断
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
        /* Don't start a fast cycle if the previous cycle did not exit
         * for time limit, unless the percentage of estimated stale keys is
         * too high. Also never repeat a fast cycle for the same period
         * as the fast cycle total duration itself. */
        // 如果上次超时了,这次就不执行快速循环,当然如果过期key占比过高则继续执行
        if (!timelimit_exit &&
            server.stat_expired_stale_perc < config_cycle_acceptable_stale)
            return;
        // 在上次执行的时间段内
        if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
            return;

        last_fast_cycle = start;
    }

    /* We usually should test CRON_DBS_PER_CALL per iteration, with
     * two exceptions:
     *
     * 1) Don't test more DBs than we have.
     * 2) If last time we hit the time limit, we want to scan all DBs
     * in this iteration, as there is work to do in some DB and we don't want
     * expired keys to use memory for too much time. 
     循环的DB数,默认为CRON_DBS_PER_CALL设置值,如果实际DB数少于CRON_DBS_PER_CALL或者上次执行超时了,则根据实际DB数遍历;
     因为如果上次超时了则认为有大量key过期,为了减轻内存压力,则会更加激进的回收 */
    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;

    /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
     * time per iteration. Since this function gets called with a frequency of
     * server.hz times per second, the following is the max amount of
     * microseconds we can spend in this function. 
     根据回收执行时间占比来限制CPU使用率 */
    timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
    timelimit_exit = 0;
    if (timelimit <= 0) timelimit = 1;

    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        timelimit = config_cycle_fast_duration; /* in microseconds. */

    /* Accumulate some global stats as we expire keys, to have some idea
     * about the number of keys that are already logically expired, but still
     * existing inside the database. 统计数据,记录抽样数和过期数,用于计算过期key占比*/
    long total_sampled = 0;
    long total_expired = 0;

    /* Sanity: There can't be any pending commands to propagate when
     * we're in cron 此时不能在进行主从数据同步等操作,所以需要设置标志位*/
    serverAssert(server.also_propagate.numops == 0);
    server.core_propagates = 1;
    server.propagate_no_multi = 1;
    // 回收循环,只要未超时,遍历dbs_per_call次
    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
        /* Expired and checked in a single loop. */
        unsigned long expired, sampled;

        redisDb *db = server.db+(current_db % server.dbnum);

        /* Increment the DB now so we are sure if we run out of time
         * in the current DB we'll restart from the next. This allows to
         * distribute the time evenly across DBs. */
        current_db++;

        /* Continue to expire if at the end of the cycle there are still
         * a big percentage of keys to expire, compared to the number of keys
         * we scanned. The percentage, stored in config_cycle_acceptable_stale
         * is not fixed, but depends on the Redis configured "expire effort". 
         过期并计算过期key占比,如果超出前面计算的标定值,则会一直执行到超时或者过期key占比下降*/
        do {
            unsigned long num, slots;
            long long now, ttl_sum;
            int ttl_samples;
            iteration++;

            /* If there is nothing to expire try next DB ASAP. 目标db过期字典没有数据,终止遍历*/
            if ((num = dictSize(db->expires)) == 0) {
                db->avg_ttl = 0;
                break;
            }
            slots = dictSlots(db->expires);
            now = mstime();

            /* When there are less than 1% filled slots, sampling the key
             * space is expensive, so stop here waiting for better times...
             * The dictionary will be resized asap. 字典使用率过低的情况下也终止遍历*/
            if (slots > DICT_HT_INITIAL_SIZE &&
                (num*100/slots < 1)) break;

            /* The main collection cycle. Sample random keys among keys
             * with an expire set, checking for expired ones. */
            expired = 0;
            sampled = 0;
            ttl_sum = 0;
            ttl_samples = 0;

            if (num > config_keys_per_loop)
                num = config_keys_per_loop;

            /* Here we access the low level representation of the hash table
             * for speed concerns: this makes this code coupled with dict.c,
             * but it hardly changed in ten years.
             *
             * Note that certain places of the hash table may be empty,
             * so we want also a stop condition about the number of
             * buckets that we scanned. However scanning for free buckets
             * is very fast: we are in the cache line scanning a sequential
             * array of NULL pointers, so we can scan a lot more buckets
             * than keys in the same time. */
            long max_buckets = num*20;
            long checked_buckets = 0;
            // 遍历所有的bucket,直到采样的key数达到标定值或者遍历完成
            while (sampled < num && checked_buckets < max_buckets) {
                for (int table = 0; table < 2; table++) {
                    if (table == 1 && !dictIsRehashing(db->expires)) break;

                    unsigned long idx = db->expires_cursor;
                    idx &= DICTHT_SIZE_MASK(db->expires->ht_size_exp[table]);
                    dictEntry *de = db->expires->ht_table[table][idx];
                    long long ttl;

                    /* Scan the current bucket of the current table. */
                    checked_buckets++;
                    while(de) {
                        /* Get the next entry now since this entry may get
                         * deleted. */
                        dictEntry *e = de;
                        de = de->next;

                        ttl = dictGetSignedIntegerVal(e)-now;
                        // 尝试过期当前key,成功就计数
                        if (activeExpireCycleTryExpire(db,e,now)) expired++;
                        if (ttl > 0) {
                            /* We want the average TTL of keys yet
                             * not expired. */
                            ttl_sum += ttl;
                            ttl_samples++;
                        }
                        sampled++;
                    }
                }
                db->expires_cursor++;
            }
            total_expired += expired;
            total_sampled += sampled;

            /* Update the average TTL stats for this database.辅助计算DB的平均ttl值,更新回DB */
            if (ttl_samples) {
                long long avg_ttl = ttl_sum/ttl_samples;

                /* Do a simple running average with a few samples.
                 * We just use the current estimate with a weight of 2%
                 * and the previous estimate with a weight of 98%. */
                if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
                db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
            }

            /* We can't block forever here even if there are many keys to
             * expire. So after a given amount of milliseconds return to the
             * caller waiting for the other active expire cycle. 超时限制*/
            if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
                elapsed = ustime()-start;
                if (elapsed > timelimit) {
                    timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                }
            }
            /* We don't repeat the cycle for the current database if there are
             * an acceptable amount of stale keys (logically expired but yet
             * not reclaimed). 过期key超过占比或者没有遍历数据的情况下,重复遍历DB */
        } while (sampled == 0 ||
                 (expired*100/sampled) > config_cycle_acceptable_stale);
    }

    serverAssert(server.core_propagates); /* This function should not be re-entrant */

    /* Propagate all DELs 传播同步删除的结果,并回复标志位*/
    propagatePendingCommands();

    server.core_propagates = 0;
    server.propagate_no_multi = 0;
    // 执行时间统计
    elapsed = ustime()-start;
    server.stat_expire_cycle_time_used += elapsed;
    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

    /* Update our estimate of keys existing but yet to be expired.
     * Running average with this sample accounting for 5%. 根据过期抽样结果占比的5%来更新一个未过期key的预估值*/
    double current_perc;
    if (total_sampled) {
        current_perc = (double)total_expired/total_sampled;
    } else
        current_perc = 0;
    server.stat_expired_stale_perc = (current_perc*0.05)+
                                     (server.stat_expired_stale_perc*0.95);
}

调用

Fast调用

该调用在server.cbeforeSleep函数下,可以看到

当启用主动过期,且不是从节点的情况下,每次调用beforeSleep都会调用一次Fast回收

void beforeSleep(struct aeEventLoop *eventLoop) {
    /*省略N行*/
    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
    /*省略N行*/
}

那么beforeSleep是哪里调用的呢

同样的在server.c的initServer函数下可以找到一个对ae.caeSetBeforeSleepProc函数的一个调用

aeSetBeforeSleepProc(server.el,beforeSleep);

该函数将beforeSleep赋值给事件循环aeEventLoop结构体的beforesleep指针

void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
    eventLoop->beforesleep = beforesleep;
}

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

然后我们在aeProcessEvents事件处理函数中找到这样的调用

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    /*省略N行*/
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);
    /*省略N行*/
    
    /* Check time events */
    if (flags & AE_TIME_EVENTS)    // 执行时间事件 之后会有相关
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

aeProcessEvents是哪里调用呢?答案是基本所有的操作都会调用到哦,总所周知redis是单线程模型,基本所有的操作都是转化为文件事件和时间实现来执行的,而aeProcessEvents就是所有事件的处理入口

主要的调用处有这2个地方

主事件循环

ae.c的aeMain函数

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

阻塞处理

networking.c的processEventsWhileBlocked函数 ,在aofrdb、脚本中断、module等地方都会调用到,有兴趣的读者可以进一步探究

/* This function is called by Redis in order to process a few events from
 * time to time while blocked into some not interruptible operation.
 * This allows to reply to clients with the -LOADING error while loading the
 * data set at startup or after a full resynchronization with the master
 * and so forth.
 *
 * It calls the event loop in order to process a few events. Specifically we
 * try to call the event loop 4 times as long as we receive acknowledge that
 * some event was processed, in order to go forward with the accept, read,
 * write, close sequence needed to serve a client.
 *
 * The function returns the total number of events processed. */
void processEventsWhileBlocked(void) {
    int iterations = 4; /* See the function top-comment. */

    /* Update our cached time since it is used to create and update the last
     * interaction time with clients and for other important things. */
    updateCachedTime(0);

    /* Note: when we are processing events while blocked (for instance during
     * busy Lua scripts), we set a global flag. When such flag is set, we
     * avoid handling the read part of clients using threaded I/O.
     * See https://github.com/redis/redis/issues/6988 for more info.
     * Note that there could be cases of nested calls to this function,
     * specifically on a busy script during async_loading rdb, and scripts
     * that came from AOF. */
    ProcessingEventsWhileBlocked++;
    while (iterations--) {
        long long startval = server.events_processed_while_blocked;
        long long ae_events = aeProcessEvents(server.el,
            AE_FILE_EVENTS|AE_DONT_WAIT|
            AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
        /* Note that server.events_processed_while_blocked will also get
         * incremented by callbacks called by the event loop handlers. */
        server.events_processed_while_blocked += ae_events;
        long long events = server.events_processed_while_blocked - startval;
        if (!events) break;
    }

    whileBlockedCron();

    ProcessingEventsWhileBlocked--;
    serverAssert(ProcessingEventsWhileBlocked >= 0);
}

Slow调用

我们可以在server.c的databasesCron函数找到对应的调用

当前节点为主节点的时候,会以Slow模式调用

void databasesCron(void) {
    /* Expire keys by random sampling. Not required for slaves
     * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }
    /*以下省略*/
}

同样的,我们找到调用方serverCron

这里注释告诉我们,这里的cron函数每秒执行server.hz,即默认每秒执行10次,100ms执行一次

  • 注:server.hz是一个配置,默认为10
/* This is our timer interrupt, called server.hz times per second.
 * Here is where we do a number of things that need to be done asynchronously.
 * For instance:
 *
 * - Active expired keys collection (it is also performed in a lazy way on
 *   lookup).
 * - Software watchdog.
 * - Update some statistic.
 * - Incremental rehashing of the DBs hash tables.
 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
 * - Clients timeout of different kinds.
 * - Replication reconnection.
 * - Many more...
 *
 * Everything directly called here will be called server.hz times per second,
 * so in order to throttle execution of things we want to do less frequently
 * a macro is used: run_with_period(milliseconds) { .... }
 */
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    /*省略N行*/
    /* Handle background operations on Redis databases. */
    databasesCron();
    /*省略N行*/
    return 1000/server.hz;    // 控制执行频率,即每秒server.hz次,具体使用见下方
}

然后在initServer函数下,可以找到时间事件的注册点

    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

那么redis是怎么控制时间事件的执行间隔的呢

我们回到ae.c来看processTimeEvents函数

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    monotime now = getMonotonicUs();
    while(te) {
        long long id;

        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
            if (te->refcount) {
                te = next;
                continue;
            }
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                now = getMonotonicUs();
            }
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        if (te->when <= now) {    // 如果预期执行时间<=当前时间
            int retval;

            id = te->id;
            te->refcount++;
            retval = te->timeProc(eventLoop, id, te->clientData);    // 事件回调返回值
            te->refcount--;
            processed++;
            now = getMonotonicUs();
            if (retval != AE_NOMORE) {
                te->when = now + retval * 1000;    // 根据事件回调返回的值来计算下一次执行时间
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

总结

整个redis主动过期源码的流程解读就到这里,现在我们回头来解答前面提到的那几个问题

Q:redis主动处理过期key的时机是什么时候?

A:1. 每次执行事件的时候都会尝试调用快速循环模式;2. 内部时间事件每秒执行server.hz次慢速循环模式,即默认每100ms执行一次

Q: 主动处理的逻辑是什么?

A:通常情况下循环遍历不超过16个DB,每个DB取20个过期key进行处理,并根据遍历和过期的情况,计算过期key占比,如果占比超过阈值,则持续回收至超时或者占比低于阈值。

Q: 最坏情况下会增加多少延迟?

A:在默认的参数下,由于快速循环的存在,每次事件最多会增加1ms的延迟;由于慢循环的存在,每100ms最多会增加25ms的延迟

Q:如何判断是因为过期key而导致的延迟抖动?

A:主动过期操作执行时间会统计至stat_expire_cycle_time_used可以由此进行判定

Q:如何解决这个问题?

A:避免大量key同时过期,如将key的过期时间在一定范围内进行随机化处理等。