遇见洪峰.md 21 KB

title: 遇见小洪峰 author: Gamehu date: 2024-09-26 22:00:11

tags: DFX

离职系列 第十篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于线上的bug。

背景

其实说来这个问题,跟之前的{% post_link 遇见连接超时 %}有个遗留项也有一些关系,因为报错的源头,也是是数据库连接关闭,与上一次仅仅是我那块出问题不同的是,这次是大批量的租户多种任务都失败,飞书告警消息都把我弹麻了。

问题

{% codeblock %}

2024-10-23 17:00:10,177 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:631 - 客户:2xx319,告警数据处理异常 org.springframework.jdbc.UncategorizedSQLException: 2024-10-23 17:00:10,176 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:511 - 客户:2xx319,集成平台巡检数据处理异常 024-10-23 17:00:10,175 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:547 - 客户:2xx319,服务状态数据处理异常 org.springframework.jdbc.UncategorizedSQLException:

... 35 common frames omitted

2024-10-23 17:00:10,174 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:582 - 客户:2xx319,心跳数据处理异常

... 82 common frames omitted

org.springframework.jdbc.UncategorizedSQLException:

 ### Cause: java.sql.SQLException: Connection is closed
        ; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed
          at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93)
          at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439)
          at java.base/java.lang.Thread.run(Thread.java:840)
        Caused by: java.sql.SQLException: Connection is closed

   {% endcodeblock %}

这次定位很快,具体定位的就不再赘述,出了问题后,我想了想有两个明确的因素:

  1. 上次类似的错误就发现了,连接池设置存在问题。
    1. 再次检查,当前没有慢sql,所以初步判断是连接池问题。
  2. 新上线了策略功能,策略把之前定时默认执行的任务,可更改为每个租户下每种类型单独的执行时间和周期。
    1. 怀疑存在了N个客户N个任务都在同一时间点执行的问题,导致连接池耗尽。

处理

  1. 根据预留的后门,手动把核心任务给生成了,让线上能正常处理。
  2. 因为之前已知了引入ShardingSphere后同时引入了HikariCP连接池,现在只留HikariCP连接池,并对参数进行调优。
    1. 以下是同事调优后的参数:超时时间以及连接池大小都对应阿里云购买的高性能PG做了对应的调整。 {% codeblock %} master0: dataSourceClassName: com.zaxxer.hikari.HikariDataSource driverClassName: org.postgresql.Driver jdbcUrl: jdbc:postgresql://xx.aliyuncs.com:xx/xx username: xxx password: xxx connectionTimeout: 60000 idleTimeout: 600000 maxLifetime: 3600000 maximumPoolSize: 200 minimumIdle: 1 poolName: business-data-master0 {% endcodeblock %}
  3. 临时的先让cron表达式有一定的偏移量比如

    1. {% codeblock %}

         return timeList.stream()
             .map(time -> {
                 String[] timeParts = parseTime(time);
                 // TODO 临时解法:为每个cron添加随机偏移( 0~3分钟)
                 int minuteOffset = ThreadLocalRandom.current().nextInt(4); // 生成 0~3 的随机数
                 int minute = (Integer.parseInt(timeParts[1]) + minuteOffset) % 60; // 防止超出 59 分钟
                 return "0 " + minute + " " + timeParts[0] + " * * ? ";
             })
             .collect(Collectors.toList());
          private static String[] parseTime(String time) {
              return time.split(":"); // 格式为 "HH:mm"
          }
      }
      

{% endcodeblock %}

2、3做完之后,腾出缓冲时间着手长期解了,需要重新做下解法设计,以适配高并发的场景。

解法设计1.0

具体的解法设计咋做,可看下之前的{% post_link 遇见多表查询 %},这儿就直接给出一些结论:

  1. 任务错峰(随机延迟)
  2. 任务限流(线程池 + 队列)
  3. 任务优先级机制(先执行核心任务)

    UML:

    alt text

流程图:

alt text

时序图:

alt text

关键伪代码

{% codeblock %}

  // 任务优先级定义
  private enum TaskPriority {
      HIGH(0),
      MEDIUM(5),
      LOW(10);

      private final int value;

      TaskPriority(int value) {
          this.value = value;
      }

      public int getValue() {
          return value;
      }
  }

  /**
  * manage-biz Powerjob 调度类
  * 优化版本 - 任务削峰与队列管理
  *
  * @author hht
  * @since 2024-09-10
  */
  @Component(value = "manageBizPowerjobDispatcher")
  @Slf4j
  @RequiredArgsConstructor
  public class ManageBizPowerjobDispatcher {
      private final IXxxScheduleService XxxScheduleService;
      private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

      /** 平安通告powerjob任务id */
      public static final String TASK_SAFETY_NOTICE_ID = "generateXxx";

      public static final String SUCCESS = "success";

      // 配置参数,可从配置文件注入
      @Value("${powerjob.task.max-concurrent:10}")
      private int maxConcurrentTasks;

      @Value("${powerjob.task.queue-capacity:500}")
      private int queueCapacity;

      @Value("${powerjob.task.max-delay-minutes:5}")
      private int maxDelayMinutes;

      @Value("${powerjob.task.worker-threads:20}")
      private int workerThreads;



      // 延迟任务定义
      @Data
      private static class DelayedTask implements Delayed {
          private final Runnable task;
          private final long executeTime;
          private final String taskId;
          private final String jobParams;

          @Override
          public long getDelay(TimeUnit unit) {
              return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
          }

      }

      // 优先级任务定义
      @Data
      private static class PriorityTask implements Comparable<PriorityTask> {
          private final Runnable task;
          private final TaskPriority priority;
          private final String taskId;
          private final String jobParams;
          private final long createTime;

          @Override
          public int compareTo(PriorityTask other) {
              // 先按优先级排序,再按创建时间排序
              int priorityCompare = Integer.compare(priority.getValue(), other.priority.getValue());
              if (priorityCompare != 0) {
                  return priorityCompare;
              }
              return Long.compare(createTime, other.createTime);
          }
      }

      /**
        * 1.单独的线程,负责从队列中获取任务并分发
        * 2.协调延迟队列和优先级队列
        * 3.控制任务的并发执行数量
       */
      private class TaskDispatcher implements Runnable {
          @Override
          public void run() {
              while (!Thread.currentThread().isInterrupted()) {
                  try {
                      // 先检查延迟队列
                      DelayedTask delayedTask = delayedTaskQueue.poll();
                      if (delayedTask != null) {
                          // 将任务添加到优先级队列
                          submitToPriorityQueue(delayedTask.getTask(), TaskPriority.HIGH, delayedTask.getTaskId(), delayedTask.getJobParams());
                          continue;
                      }

                      // 从优先级队列取任务执行
                      PriorityTask priorityTask = priorityTaskQueue.take();
                      if (priorityTask != null) {
                          try {
                              // 获取信号量,控制并发
                              taskSemaphore.acquire();

                              // 记录任务开始执行
                              activeTaskCount.incrementAndGet();
                              taskExecutionCount.computeIfAbsent(priorityTask.getTaskId(), k -> new AtomicInteger(0)).incrementAndGet();

                              // 提交到线程池执行
                              executorService.submit(() -> {
                                  try {
                                      log.info("执行任务: {}, 参数: {}", priorityTask.getTaskId(), priorityTask.getJobParams());
                                      priorityTask.getTask().run();
                                  } catch (Exception e) {
                                      log.error("任务执行异常: {}", priorityTask.getTaskId(), e);
                                  } finally {
                                      // 释放信号量
                                      taskSemaphore.release();
                                      // 更新计数器
                                      activeTaskCount.decrementAndGet();
                                      AtomicInteger counter = taskExecutionCount.get(priorityTask.getTaskId());
                                      if (counter != null) {
                                          counter.decrementAndGet();
                                      }
                                  }
                              });
                          } catch (InterruptedException e) {
                              Thread.currentThread().interrupt();
                              break;
                          }
                      }
                  } catch (InterruptedException e) {
                      Thread.currentThread().interrupt();
                      break;
                  } catch (Exception e) {
                      log.error("任务分发器异常", e);
                  }
              }
          }
      }

      // 任务队列和执行器
      private DelayQueue<DelayedTask> delayedTaskQueue;
      private PriorityBlockingQueue<PriorityTask> priorityTaskQueue;
      private ExecutorService executorService;
      private ExecutorService dispatcherService;
      private Semaphore taskSemaphore;
      private Random random;

      // 任务执行状态监控
      private AtomicLong totalTasksReceived = new AtomicLong(0);
      private AtomicLong totalTasksExecuted = new AtomicLong(0);
      private AtomicInteger activeTaskCount = new AtomicInteger(0);
      private Map<String, AtomicInteger> taskExecutionCount = new ConcurrentHashMap<>();

      @PostConstruct
      public void init() {
          // 初始化任务队列
          delayedTaskQueue = new DelayQueue<>();
          priorityTaskQueue = new PriorityBlockingQueue<>(queueCapacity);

          // 初始化线程池
          executorService = Executors.newFixedThreadPool(workerThreads, new ThreadFactory() {
              private final AtomicInteger counter = new AtomicInteger(1);

              @Override
              public Thread newThread(Runnable r) {
                  Thread thread = new Thread(r, "task-worker-" + counter.getAndIncrement());
                  thread.setDaemon(true);
                  return thread;
              }
          });

          // 初始化分发器线程,
          dispatcherService = Executors.newSingleThreadExecutor(r -> {
              Thread thread = new Thread(r, "task-dispatcher");
              thread.setDaemon(true);
              return thread;
          });

          // 初始化信号量
          taskSemaphore = new Semaphore(maxConcurrentTasks);

          // 初始化随机数生成器
          random = new Random();

          // 启动任务分发线程
          dispatcherService.submit(new TaskDispatcher());

          log.info("任务调度器初始化完成,最大并发任务数: {}, 队列容量: {}, 最大延迟分钟数: {}, 工作线程数: {}", 
                  maxConcurrentTasks, queueCapacity, maxDelayMinutes, workerThreads);
      }

      @PreDestroy
      public void shutdown() {
          // 关闭调度器
          if (dispatcherService != null) {
              dispatcherService.shutdownNow();
          }

          // 关闭执行器
          if (executorService != null) {
              executorService.shutdown();
              try {
                  if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
                      executorService.shutdownNow();
                  }
              } catch (InterruptedException e) {
                  executorService.shutdownNow();
                  Thread.currentThread().interrupt();
              }
          }

          log.info("任务调度器已关闭,总接收任务数: {}, 总执行任务数: {}", 
                  totalTasksReceived.get(), totalTasksExecuted.get());
      }

      /**
      * 提交任务到延迟队列
      */
      private void submitToDelayQueue(Runnable task, String taskId, String jobParams) {
          // 随机延迟时间,在0到maxDelayMinutes分钟之间
          long delayMs = random.nextInt((int) TimeUnit.MINUTES.toMillis(maxDelayMinutes));
          DelayedTask delayedTask = new DelayedTask(task, delayMs, taskId, jobParams);
          delayedTaskQueue.offer(delayedTask);
          totalTasksReceived.incrementAndGet();

          log.info("任务已提交到延迟队列: {}, 延迟: {}ms", taskId, delayMs);
      }

      /**
      * 提交任务到优先级队列
      */
      private void submitToPriorityQueue(Runnable task, TaskPriority priority, String taskId, String jobParams) {
          PriorityTask priorityTask = new PriorityTask(task, priority, taskId, jobParams);
          priorityTaskQueue.offer(priorityTask);

          log.info("任务已提交到优先级队列: {}, 优先级: {}", taskId, priority);
      }

      /**
      * 获取任务类型对应的优先级
      */
      private TaskPriority getTaskPriority(String taskId) {
          switch (taskId) {
              case TASK_SAFETY_NOTICE_ID:
              case TASK_PUSH_SERVICE_STATUS_ID:
                  return TaskPriority.HIGH;
              case TASK_TENANT_SERVICE_PHASE_ID:
              case TASK_WEEKLY_SUMMARY:
                  return TaskPriority.MEDIUM;
              default:
                  return TaskPriority.LOW;
          }
      }

      /**
      * 创建可执行的任务
      */
      private Runnable createExecutableTask(String taskId, String jobParams, TaskContext taskContext) {
          switch (taskId) {
              case TASK_SAFETY_NOTICE_ID:
                  return () -> generateXxxTask(taskContext);
              case TASK_OTHER:
                  return () -> generateOtherTask(taskContext);
              ...
              default:
                  throw new IllegalArgumentException("未知的任务类型: " + taskId);
          }
      }

      /**
      * 通用任务提交方法
      */
      private ProcessResult submitTask(String taskId, TaskContext taskContext) {
          try {
              totalTasksReceived.incrementAndGet();

              // 检查任务执行情况,如果已有大量相同类型任务,加入延迟队列
              int activeCount = taskExecutionCount.computeIfAbsent(taskId, k -> new AtomicInteger(0)).get();
              if (activeCount > maxConcurrentTasks / 2) {
                  log.warn("当前任务类型 {} 正在执行的数量较多: {}, 将使用延迟队列分散负载", taskId, activeCount);
                  submitToDelayQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), taskId, taskContext.getJobParams());
              } else {
                  // 根据任务类型分配优先级
                  TaskPriority priority = getTaskPriority(taskId);
                  submitToPriorityQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), priority, taskId, taskContext.getJobParams());
              }

              return new ProcessResult(true, formatResponse(SUCCESS, taskId));
          } catch (Exception e) {
              log.error("提交任务异常: {}", taskId, e);
              return new ProcessResult(false, formatResponse(e.getMessage(), taskId));
          }
      }

      // ====== 以下是原始的PowerJob任务处理方法,改为使用队列系统 ======

      /**
      * 告警任务
      */
      @PowerJobHandler(name = TASK_OTHER)
      public ProcessResult generateOtherTask(TaskContext taskContext) {
          log.info("==================== 调度触发(其它任务) ======================");
          return submitTask(TASK_OTHER, taskContext);
      }

      private ProcessResult generateOtherTask(TaskContext taskContext) {
          try {
              StrategyJobParams jobParams = new StrategyJobParams();
              if (StringUtils.hasLength(taskContext.getJobParams())) {
                  jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
              }
              strategyScheduleService.generateTask(jobParams);
              totalTasksExecuted.incrementAndGet();
              return new ProcessResult(true, formatResponse(SUCCESS, TASK_OTHER));
          } catch (Exception e) {
              log.error("调度执行【其它任务】异常", e);
              return new ProcessResult(false, formatResponse(e.getMessage(), TASK_OTHER));
          }
      }


      /**
      * 生成平安通告
      */
      @PowerJobHandler(name = TASK_SAFETY_NOTICE_ID)
      public ProcessResult generateXxx(TaskContext taskContext) {
          log.info("==================== 调度触发(平安通告) ======================");
          return submitTask(TASK_SAFETY_NOTICE_ID, taskContext);
      }

      private ProcessResult generateXxxTask(TaskContext taskContext) {
          try {
              // 获取调度任务的参数
              StrategyJobParams jobParams = null;
              if (StringUtils.hasLength(taskContext.getJobParams())) {
                  jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
              }
              // 生成平安通告
              XxxScheduleService.autoGenerateBatch(jobParams);
              totalTasksExecuted.incrementAndGet();
              return new ProcessResult(true, formatResponse("success", TASK_SAFETY_NOTICE_ID));
          } catch (Exception e) {
              log.error("调度执行【平安通告】异常", e);
              return new ProcessResult(false, formatResponse(e.getMessage(), TASK_SAFETY_NOTICE_ID));
          }
      }

      private String formatResponse(String info, String id) {
          return String.format("{\"taskId\": \"%s\", \"info\": \"%s\"}", id, info);
      }
  }

{% endcodeblock %}

解法设计2.0

主要是解决一些异常场景,比如:

  1. 服务异常重启,任务丢了?
  2. 信号量获取阻塞,所有任务堆积?
  3. 发生异常,及时感知等

这一步还停留在设计阶段,也可能是我设计并落地,也先做个记录。

解法:

  1. Redis代替内存队列,开启持久话,便于启动后恢复。
  2. 核心业务单独维护信号量
  3. 设置拒绝策略,当队列超过阈值直接异常返回给powerjob
    1. 同时发送告警
  4. 适当的动态调整信号量