title: 遇见小洪峰 author: Gamehu date: 2024-09-26 22:00:11 tags: DFX ---
离职系列 第十篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于线上的bug。
## 背景 其实说来这个问题,跟之前的{% post_link 遇见连接超时 %}有个遗留项也有一些关系,因为报错的源头,也是是数据库连接关闭,与上一次仅仅是我那块出问题不同的是,这次是大批量的租户多种任务都失败,飞书告警消息都把我弹麻了。 ## 问题 {% codeblock %} 2024-10-23 17:00:10,177 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:631 - 客户:2xx319,告警数据处理异常 org.springframework.jdbc.UncategorizedSQLException: 2024-10-23 17:00:10,176 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:511 - 客户:2xx319,集成平台巡检数据处理异常 024-10-23 17:00:10,175 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:547 - 客户:2xx319,服务状态数据处理异常 org.springframework.jdbc.UncategorizedSQLException: ... 35 common frames omitted 2024-10-23 17:00:10,174 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:582 - 客户:2xx319,心跳数据处理异常 ... 82 common frames omitted org.springframework.jdbc.UncategorizedSQLException: ### Cause: java.sql.SQLException: Connection is closed ; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93) at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.sql.SQLException: Connection is closed {% endcodeblock %} 这次定位很快,具体定位的就不再赘述,出了问题后,我想了想有两个明确的因素: 1. 上次类似的错误就发现了,连接池设置存在问题。 1. 再次检查,当前没有慢sql,所以初步判断是连接池问题。 2. 新上线了策略功能,策略把之前定时默认执行的任务,可更改为每个租户下每种类型单独的执行时间和周期。 1. 怀疑存在了N个客户N个任务都在同一时间点执行的问题,导致连接池耗尽。 ## 处理 1. 根据预留的后门,手动把核心任务给生成了,让线上能正常处理。 2. 因为之前已知了引入ShardingSphere后同时引入了HikariCP连接池,现在只留HikariCP连接池,并对参数进行调优。 1. 以下是同事调优后的参数:超时时间以及连接池大小都对应阿里云购买的高性能PG做了对应的调整。 {% codeblock %} master0: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: jdbc:postgresql://xx.aliyuncs.com:xx/xx username: xxx password: xxx connectionTimeout: 60000 idleTimeout: 600000 maxLifetime: 3600000 maximumPoolSize: 200 minimumIdle: 1 poolName: business-data-master0 {% endcodeblock %} 3. 临时的先让cron表达式有一定的偏移量比如 1. {% codeblock %} return timeList.stream() .map(time -> { String[] timeParts = parseTime(time); // TODO 临时解法:为每个cron添加随机偏移( 0~3分钟) int minuteOffset = ThreadLocalRandom.current().nextInt(4); // 生成 0~3 的随机数 int minute = (Integer.parseInt(timeParts[1]) + minuteOffset) % 60; // 防止超出 59 分钟 return "0 " + minute + " " + timeParts[0] + " * * ? "; }) .collect(Collectors.toList()); private static String[] parseTime(String time) { return time.split(":"); // 格式为 "HH:mm" } } {% endcodeblock %} 2、3做完之后,腾出缓冲时间着手长期解了,需要重新做下解法设计,以适配高并发的场景。 ## 解法设计1.0 具体的解法设计咋做,可看下之前的{% post_link 遇见多表查询 %},这儿就直接给出一些结论: 1. 任务错峰(随机延迟) 2. 任务限流(线程池 + 队列) 3. 任务优先级机制(先执行核心任务) ### UML: ![alt text](<4.png>) ### 流程图: ![alt text](<2.png>) ### 时序图: ![alt text](<3.png>) ### 关键伪代码 {% codeblock %} // 任务优先级定义 private enum TaskPriority { HIGH(0), MEDIUM(5), LOW(10); private final int value; TaskPriority(int value) { this.value = value; } public int getValue() { return value; } } /** * manage-biz Powerjob 调度类 * 优化版本 - 任务削峰与队列管理 * * @author hht * @since 2024-09-10 */ @Component(value = "manageBizPowerjobDispatcher") @Slf4j @RequiredArgsConstructor public class ManageBizPowerjobDispatcher { private final IXxxScheduleService XxxScheduleService; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); /** 平安通告powerjob任务id */ public static final String TASK_SAFETY_NOTICE_ID = "generateXxx"; public static final String SUCCESS = "success"; // 配置参数,可从配置文件注入 @Value("${powerjob.task.max-concurrent:10}") private int maxConcurrentTasks; @Value("${powerjob.task.queue-capacity:500}") private int queueCapacity; @Value("${powerjob.task.max-delay-minutes:5}") private int maxDelayMinutes; @Value("${powerjob.task.worker-threads:20}") private int workerThreads; // 延迟任务定义 @Data private static class DelayedTask implements Delayed { private final Runnable task; private final long executeTime; private final String taskId; private final String jobParams; @Override public long getDelay(TimeUnit unit) { return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } } // 优先级任务定义 @Data private static class PriorityTask implements Comparable { private final Runnable task; private final TaskPriority priority; private final String taskId; private final String jobParams; private final long createTime; @Override public int compareTo(PriorityTask other) { // 先按优先级排序,再按创建时间排序 int priorityCompare = Integer.compare(priority.getValue(), other.priority.getValue()); if (priorityCompare != 0) { return priorityCompare; } return Long.compare(createTime, other.createTime); } } /** * 1.单独的线程,负责从队列中获取任务并分发 * 2.协调延迟队列和优先级队列 * 3.控制任务的并发执行数量 */ private class TaskDispatcher implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // 先检查延迟队列 DelayedTask delayedTask = delayedTaskQueue.poll(); if (delayedTask != null) { // 将任务添加到优先级队列 submitToPriorityQueue(delayedTask.getTask(), TaskPriority.HIGH, delayedTask.getTaskId(), delayedTask.getJobParams()); continue; } // 从优先级队列取任务执行 PriorityTask priorityTask = priorityTaskQueue.take(); if (priorityTask != null) { try { // 获取信号量,控制并发 taskSemaphore.acquire(); // 记录任务开始执行 activeTaskCount.incrementAndGet(); taskExecutionCount.computeIfAbsent(priorityTask.getTaskId(), k -> new AtomicInteger(0)).incrementAndGet(); // 提交到线程池执行 executorService.submit(() -> { try { log.info("执行任务: {}, 参数: {}", priorityTask.getTaskId(), priorityTask.getJobParams()); priorityTask.getTask().run(); } catch (Exception e) { log.error("任务执行异常: {}", priorityTask.getTaskId(), e); } finally { // 释放信号量 taskSemaphore.release(); // 更新计数器 activeTaskCount.decrementAndGet(); AtomicInteger counter = taskExecutionCount.get(priorityTask.getTaskId()); if (counter != null) { counter.decrementAndGet(); } } }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("任务分发器异常", e); } } } } // 任务队列和执行器 private DelayQueue delayedTaskQueue; private PriorityBlockingQueue priorityTaskQueue; private ExecutorService executorService; private ExecutorService dispatcherService; private Semaphore taskSemaphore; private Random random; // 任务执行状态监控 private AtomicLong totalTasksReceived = new AtomicLong(0); private AtomicLong totalTasksExecuted = new AtomicLong(0); private AtomicInteger activeTaskCount = new AtomicInteger(0); private Map taskExecutionCount = new ConcurrentHashMap<>(); @PostConstruct public void init() { // 初始化任务队列 delayedTaskQueue = new DelayQueue<>(); priorityTaskQueue = new PriorityBlockingQueue<>(queueCapacity); // 初始化线程池 executorService = Executors.newFixedThreadPool(workerThreads, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "task-worker-" + counter.getAndIncrement()); thread.setDaemon(true); return thread; } }); // 初始化分发器线程, dispatcherService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r, "task-dispatcher"); thread.setDaemon(true); return thread; }); // 初始化信号量 taskSemaphore = new Semaphore(maxConcurrentTasks); // 初始化随机数生成器 random = new Random(); // 启动任务分发线程 dispatcherService.submit(new TaskDispatcher()); log.info("任务调度器初始化完成,最大并发任务数: {}, 队列容量: {}, 最大延迟分钟数: {}, 工作线程数: {}", maxConcurrentTasks, queueCapacity, maxDelayMinutes, workerThreads); } @PreDestroy public void shutdown() { // 关闭调度器 if (dispatcherService != null) { dispatcherService.shutdownNow(); } // 关闭执行器 if (executorService != null) { executorService.shutdown(); try { if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } log.info("任务调度器已关闭,总接收任务数: {}, 总执行任务数: {}", totalTasksReceived.get(), totalTasksExecuted.get()); } /** * 提交任务到延迟队列 */ private void submitToDelayQueue(Runnable task, String taskId, String jobParams) { // 随机延迟时间,在0到maxDelayMinutes分钟之间 long delayMs = random.nextInt((int) TimeUnit.MINUTES.toMillis(maxDelayMinutes)); DelayedTask delayedTask = new DelayedTask(task, delayMs, taskId, jobParams); delayedTaskQueue.offer(delayedTask); totalTasksReceived.incrementAndGet(); log.info("任务已提交到延迟队列: {}, 延迟: {}ms", taskId, delayMs); } /** * 提交任务到优先级队列 */ private void submitToPriorityQueue(Runnable task, TaskPriority priority, String taskId, String jobParams) { PriorityTask priorityTask = new PriorityTask(task, priority, taskId, jobParams); priorityTaskQueue.offer(priorityTask); log.info("任务已提交到优先级队列: {}, 优先级: {}", taskId, priority); } /** * 获取任务类型对应的优先级 */ private TaskPriority getTaskPriority(String taskId) { switch (taskId) { case TASK_SAFETY_NOTICE_ID: case TASK_PUSH_SERVICE_STATUS_ID: return TaskPriority.HIGH; case TASK_TENANT_SERVICE_PHASE_ID: case TASK_WEEKLY_SUMMARY: return TaskPriority.MEDIUM; default: return TaskPriority.LOW; } } /** * 创建可执行的任务 */ private Runnable createExecutableTask(String taskId, String jobParams, TaskContext taskContext) { switch (taskId) { case TASK_SAFETY_NOTICE_ID: return () -> generateXxxTask(taskContext); case TASK_OTHER: return () -> generateOtherTask(taskContext); ... default: throw new IllegalArgumentException("未知的任务类型: " + taskId); } } /** * 通用任务提交方法 */ private ProcessResult submitTask(String taskId, TaskContext taskContext) { try { totalTasksReceived.incrementAndGet(); // 检查任务执行情况,如果已有大量相同类型任务,加入延迟队列 int activeCount = taskExecutionCount.computeIfAbsent(taskId, k -> new AtomicInteger(0)).get(); if (activeCount > maxConcurrentTasks / 2) { log.warn("当前任务类型 {} 正在执行的数量较多: {}, 将使用延迟队列分散负载", taskId, activeCount); submitToDelayQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), taskId, taskContext.getJobParams()); } else { // 根据任务类型分配优先级 TaskPriority priority = getTaskPriority(taskId); submitToPriorityQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), priority, taskId, taskContext.getJobParams()); } return new ProcessResult(true, formatResponse(SUCCESS, taskId)); } catch (Exception e) { log.error("提交任务异常: {}", taskId, e); return new ProcessResult(false, formatResponse(e.getMessage(), taskId)); } } // ====== 以下是原始的PowerJob任务处理方法,改为使用队列系统 ====== /** * 告警任务 */ @PowerJobHandler(name = TASK_OTHER) public ProcessResult generateOtherTask(TaskContext taskContext) { log.info("==================== 调度触发(其它任务) ======================"); return submitTask(TASK_OTHER, taskContext); } private ProcessResult generateOtherTask(TaskContext taskContext) { try { StrategyJobParams jobParams = new StrategyJobParams(); if (StringUtils.hasLength(taskContext.getJobParams())) { jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class); } strategyScheduleService.generateTask(jobParams); totalTasksExecuted.incrementAndGet(); return new ProcessResult(true, formatResponse(SUCCESS, TASK_OTHER)); } catch (Exception e) { log.error("调度执行【其它任务】异常", e); return new ProcessResult(false, formatResponse(e.getMessage(), TASK_OTHER)); } } /** * 生成平安通告 */ @PowerJobHandler(name = TASK_SAFETY_NOTICE_ID) public ProcessResult generateXxx(TaskContext taskContext) { log.info("==================== 调度触发(平安通告) ======================"); return submitTask(TASK_SAFETY_NOTICE_ID, taskContext); } private ProcessResult generateXxxTask(TaskContext taskContext) { try { // 获取调度任务的参数 StrategyJobParams jobParams = null; if (StringUtils.hasLength(taskContext.getJobParams())) { jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class); } // 生成平安通告 XxxScheduleService.autoGenerateBatch(jobParams); totalTasksExecuted.incrementAndGet(); return new ProcessResult(true, formatResponse("success", TASK_SAFETY_NOTICE_ID)); } catch (Exception e) { log.error("调度执行【平安通告】异常", e); return new ProcessResult(false, formatResponse(e.getMessage(), TASK_SAFETY_NOTICE_ID)); } } private String formatResponse(String info, String id) { return String.format("{\"taskId\": \"%s\", \"info\": \"%s\"}", id, info); } } {% endcodeblock %} ## 解法设计2.0 主要是解决一些异常场景,比如: 1. 服务异常重启,任务丢了? 2. 信号量获取阻塞,所有任务堆积? 3. 发生异常,及时感知等 这一步还停留在设计阶段,也可能是我设计并落地,也先做个记录。 解法: 1. Redis代替内存队列,开启持久话,便于启动后恢复。 2. 核心业务单独维护信号量 3. 设置拒绝策略,当队列超过阈值直接异常返回给powerjob 1. 同时发送告警 4. 适当的动态调整信号量