今天运维在执行线上功能时发现服务异常,跟进之后发现Redis出现问题,如下:

[ERROR] [2022-05-26 00:05:01.518] [c.q.d.utils.DataAccessSampleRedisUtil:326][Thread-154] 服务名称:ivdg-data-access-service–> 查询缓存数据出错:Unknown redis exception; nested exception is java.util.concurrent.RejectedExecutionException: event executor terminated
org.springframework.data.redis.RedisSystemException: Unknown redis exception; nested exception is java.util.concurrent.RejectedExecutionException: event executor terminated
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.getFallback(FallbackExceptionTranslationStrategy.java:53)
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:43)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:273)
at org.springframework.data.redis.connection.lettuce.LettuceScriptingCommands.convertLettuceAccessException(LettuceScriptingCommands.java:236)
at org.springframework.data.redis.connection.lettuce.LettuceScriptingCommands.evalSha(LettuceScriptingCommands.java:195)
at org.springframework.data.redis.connection.DefaultedRedisConnection.evalSha(DefaultedRedisConnection.java:1502)
at org.springframework.data.redis.connection.DefaultStringRedisConnection.evalSha(DefaultStringRedisConnection.java:1757)
at sun.reflect.GeneratedMethodAccessor606.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.data.redis.core.CloseSuppressingInvocationHandler.invoke(CloseSuppressingInvocationHandler.java:61)
at com.sun.proxy.Proxy258.evalSha(UnknownSource)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.eval(DefaultScriptExecutor.java:77)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.lambdaProxy258.evalSha(Unknown Source) at org.springframework.data.redis.core.script.DefaultScriptExecutor.eval(DefaultScriptExecutor.java:77) at org.springframework.data.redis.core.script.DefaultScriptExecutor.lambdaProxy258.evalSha(UnknownSource)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.eval(DefaultScriptExecutor.java:77)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.lambdaexecute0(DefaultScriptExecutor.java:68)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:58)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:52)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:350)atcom.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.lambda0(DefaultScriptExecutor.java:68) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175) at org.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:58) at org.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:52) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:350) at com.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.lambda0(DefaultScriptExecutor.java:68)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:175)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:58)atorg.springframework.data.redis.core.script.DefaultScriptExecutor.execute(DefaultScriptExecutor.java:52)atorg.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:350)atcom.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.lambdascanData7(DataAccessSampleRedisUtil.java:324)atjava.util.stream.Streams7(DataAccessSampleRedisUtil.java:324) at java.util.stream.Streams7(DataAccessSampleRedisUtil.java:324)atjava.util.stream.StreamsRangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.stream.IntPipelineHead.forEach(IntPipeline.java:559)atcom.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.scanData(DataAccessSampleRedisUtil.java:318)atcom.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.handleHourData(DataSamplingInspectionHourStatisticsServiceImpl.java:69)atcom.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.statisticsDataByTime(DataSamplingInspectionHourStatisticsServiceImpl.java:58)atcom.qsdi.dataAccess.job.HourStatisticsByVehicleJobHandler.execute(HourStatisticsByVehicleJobHandler.java:29)atcom.xxl.job.core.thread.JobThread.run(JobThread.java:163)Causedby:java.util.concurrent.RejectedExecutionException:eventexecutorterminatedatio.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)atio.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)atio.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)atio.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)atio.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)atio.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:261)atio.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:177)atio.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:50)atio.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:32)atio.lettuce.core.protocol.CommandExpiryWriter.potentiallyExpire(CommandExpiryWriter.java:168)atio.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:115)atio.lettuce.core.RedisChannelHandler.dispatch(RedisChannelHandler.java:195)atio.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:176)atio.lettuce.core.AbstractRedisAsyncCommands.dispatch(AbstractRedisAsyncCommands.java:474)atio.lettuce.core.AbstractRedisAsyncCommands.evalsha(AbstractRedisAsyncCommands.java:512)atsun.reflect.GeneratedMethodAccessor608.invoke(UnknownSource)atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)atjava.lang.reflect.Method.invoke(Method.java:498)atio.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:63)atio.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79)atcom.sun.proxy.Head.forEach(IntPipeline.java:559) at com.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.scanData(DataAccessSampleRedisUtil.java:318) at com.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.handleHourData(DataSamplingInspectionHourStatisticsServiceImpl.java:69) at com.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.statisticsDataByTime(DataSamplingInspectionHourStatisticsServiceImpl.java:58) at com.qsdi.dataAccess.job.HourStatisticsByVehicleJobHandler.execute(HourStatisticsByVehicleJobHandler.java:29) at com.xxl.job.core.thread.JobThread.run(JobThread.java:163) Caused by: java.util.concurrent.RejectedExecutionException: event executor terminated at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926) at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353) at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346) at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828) at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818) at io.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:261) at io.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:177) at io.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:50) at io.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:32) at io.lettuce.core.protocol.CommandExpiryWriter.potentiallyExpire(CommandExpiryWriter.java:168) at io.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:115) at io.lettuce.core.RedisChannelHandler.dispatch(RedisChannelHandler.java:195) at io.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:176) at io.lettuce.core.AbstractRedisAsyncCommands.dispatch(AbstractRedisAsyncCommands.java:474) at io.lettuce.core.AbstractRedisAsyncCommands.evalsha(AbstractRedisAsyncCommands.java:512) at sun.reflect.GeneratedMethodAccessor608.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:63) at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79) at com.sun.proxy.Head.forEach(IntPipeline.java:559)atcom.qsdi.dataAccess.utils.DataAccessSampleRedisUtil.scanData(DataAccessSampleRedisUtil.java:318)atcom.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.handleHourData(DataSamplingInspectionHourStatisticsServiceImpl.java:69)atcom.qsdi.dataAccess.service.impl.DataSamplingInspectionHourStatisticsServiceImpl.statisticsDataByTime(DataSamplingInspectionHourStatisticsServiceImpl.java:58)atcom.qsdi.dataAccess.job.HourStatisticsByVehicleJobHandler.execute(HourStatisticsByVehicleJobHandler.java:29)atcom.xxl.job.core.thread.JobThread.run(JobThread.java:163)Causedby:java.util.concurrent.RejectedExecutionException:eventexecutorterminatedatio.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)atio.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)atio.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)atio.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)atio.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)atio.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:261)atio.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:177)atio.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:50)atio.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:32)atio.lettuce.core.protocol.CommandExpiryWriter.potentiallyExpire(CommandExpiryWriter.java:168)atio.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:115)atio.lettuce.core.RedisChannelHandler.dispatch(RedisChannelHandler.java:195)atio.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:176)atio.lettuce.core.AbstractRedisAsyncCommands.dispatch(AbstractRedisAsyncCommands.java:474)atio.lettuce.core.AbstractRedisAsyncCommands.evalsha(AbstractRedisAsyncCommands.java:512)atsun.reflect.GeneratedMethodAccessor608.invoke(UnknownSource)atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)atjava.lang.reflect.Method.invoke(Method.java:498)atio.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:63)atio.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79)atcom.sun.proxy.Proxy254.evalsha(Unknown Source)
at org.springframework.data.redis.connection.lettuce.LettuceScriptingCommands.evalSha(LettuceScriptingCommands.java:193)
… 23 common frames omitted

如上代码段所示, 再调用redis执行lua脚本查询时提示异常,具体异常描述是由SingleThreadEventExecutor这个类调用execute方法时出现的异常,源码如下:

private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

分析这个方法,大意是将当前写事件提交到 SingleThreadEventExecutor 任务队列中。报错是在 addTask 这个方法:

/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, “task”);
if (!offerTask(task)) {
reject(task);
}
}

这个方法判断如果没有插入任务队列成功就调用 reject (task) 拒绝任务,reject (task) 里面抛出的异常就是我们看到的最外面的异常。于是看一看 offerTask 方法:

final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}

@Override
public boolean isShutdown() {
    return state >= ST_SHUTDOWN;
}

private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

这里可以看出 offerTask 的时候校验了当前的 SingleThreadEventExecutor 的状态是否是结束和终止。那么问题来了,是什么导致的 SingleThreadEventExecutor 的状态为终止的勒,通过前面缘由中日志可以看到 SingleThreadEventExecutor 也打印了堆溢出的错误日志,搜索这个日志是哪里打印的,发现:

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                "be called before run() implementation terminates.");
                    }
                }

                try {
                    // Run all remaining tasks and shutdown hooks. At this point the event loop
                    // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                    // graceful shutdown with quietPeriod.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }

                    // Now we want to make sure no more tasks can be added from this point. This is
                    // achieved by switching the state. Any new tasks beyond this point will be rejected.
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                            break;
                        }
                    }

                    // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                    // No need to loop here, this is the final pass.
                    confirmShutdown();
                } finally {
                    try {
                        cleanup();
                    } finally {
                        // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                        // the future. The user may block on the future and once it unblocks the JVM may terminate
                        // and start unloading classes.
                        // See https://github.com/netty/netty/issues/6596.
                        FastThreadLocal.removeAll();

                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.countDown();
                        int numUserTasks = drainTasks();
                        if (numUserTasks > 0 && logger.isWarnEnabled()) {
                            logger.warn("An event executor terminated with " +
                                    "non-empty task queue (" + numUserTasks + ')');
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

这个方法,这个方法里面同时在 finally 设置了状态为 ST_TERMINATED,这个方法大意是启动 eventloop 真正的线程来轮询读写事件,这个方法将在首次执行 excute 方法的时候被调用,注意这里的 executor 变量是一个线程池,于是查阅资料才认识到,eventloopgroup 下的每个 eventloop 实际上都会引用到一个线程池,这个线程池线程数目与 eventloop 总大小相同,是 fork/join 框架创建的,真正执行轮询逻辑实际上是这个线程池去提交的。eventloop 的意义实际上只是为了保存任务队列。真正的去轮询任务队列的逻辑是由其代理的线程池去执行的。

这样也就清楚了,整个事故实际很简单,就是先由于内存溢出的错误导致 eventloop 的状态为终止,这个终止状态据我看到的代码貌似是不可逆的,然后当在调用 chanel.write 的时候提交任务到 eventloop 校验状态为终止所以报了拒绝错误。

解决这个 bug 的重要途径是找到内存溢出的原因,这里就不赘述了,因为在网上没有搜到这个错的原因,所以跟了一下代码,在这里做个记录,后续分析一下到底是哪里有内存溢出问题再贴上解决方案

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐