title: 遇见小洪峰 author: Gamehu date: 2024-09-26 22:00:11
其实说来这个问题,跟之前的{% post_link 遇见连接超时 %}有个遗留项也有一些关系,因为报错的源头,也是是数据库连接关闭,与上一次仅仅是我那块出问题不同的是,这次是大批量的租户多种任务都失败,飞书告警消息都把我弹麻了。
{% codeblock %}
2024-10-23 17:00:10,177 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:631 - 客户:2xx319,告警数据处理异常 org.springframework.jdbc.UncategorizedSQLException: 2024-10-23 17:00:10,176 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:511 - 客户:2xx319,集成平台巡检数据处理异常 024-10-23 17:00:10,175 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:547 - 客户:2xx319,服务状态数据处理异常 org.springframework.jdbc.UncategorizedSQLException:
... 35 common frames omitted
2024-10-23 17:00:10,174 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:582 - 客户:2xx319,心跳数据处理异常
... 82 common frames omitted
org.springframework.jdbc.UncategorizedSQLException:
### Cause: java.sql.SQLException: Connection is closed
; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.sql.SQLException: Connection is closed
{% endcodeblock %}
这次定位很快,具体定位的就不再赘述,出了问题后,我想了想有两个明确的因素:
临时的先让cron表达式有一定的偏移量比如
{% codeblock %}
return timeList.stream()
.map(time -> {
String[] timeParts = parseTime(time);
// TODO 临时解法:为每个cron添加随机偏移( 0~3分钟)
int minuteOffset = ThreadLocalRandom.current().nextInt(4); // 生成 0~3 的随机数
int minute = (Integer.parseInt(timeParts[1]) + minuteOffset) % 60; // 防止超出 59 分钟
return "0 " + minute + " " + timeParts[0] + " * * ? ";
})
.collect(Collectors.toList());
private static String[] parseTime(String time) {
return time.split(":"); // 格式为 "HH:mm"
}
}
{% endcodeblock %}
2、3做完之后,腾出缓冲时间着手长期解了,需要重新做下解法设计,以适配高并发的场景。
具体的解法设计咋做,可看下之前的{% post_link 遇见多表查询 %},这儿就直接给出一些结论:
{% codeblock %}
// 任务优先级定义
private enum TaskPriority {
HIGH(0),
MEDIUM(5),
LOW(10);
private final int value;
TaskPriority(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
/**
* manage-biz Powerjob 调度类
* 优化版本 - 任务削峰与队列管理
*
* @author hht
* @since 2024-09-10
*/
@Component(value = "manageBizPowerjobDispatcher")
@Slf4j
@RequiredArgsConstructor
public class ManageBizPowerjobDispatcher {
private final IXxxScheduleService XxxScheduleService;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/** 平安通告powerjob任务id */
public static final String TASK_SAFETY_NOTICE_ID = "generateXxx";
public static final String SUCCESS = "success";
// 配置参数,可从配置文件注入
@Value("${powerjob.task.max-concurrent:10}")
private int maxConcurrentTasks;
@Value("${powerjob.task.queue-capacity:500}")
private int queueCapacity;
@Value("${powerjob.task.max-delay-minutes:5}")
private int maxDelayMinutes;
@Value("${powerjob.task.worker-threads:20}")
private int workerThreads;
// 延迟任务定义
@Data
private static class DelayedTask implements Delayed {
private final Runnable task;
private final long executeTime;
private final String taskId;
private final String jobParams;
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
// 优先级任务定义
@Data
private static class PriorityTask implements Comparable<PriorityTask> {
private final Runnable task;
private final TaskPriority priority;
private final String taskId;
private final String jobParams;
private final long createTime;
@Override
public int compareTo(PriorityTask other) {
// 先按优先级排序,再按创建时间排序
int priorityCompare = Integer.compare(priority.getValue(), other.priority.getValue());
if (priorityCompare != 0) {
return priorityCompare;
}
return Long.compare(createTime, other.createTime);
}
}
/**
* 1.单独的线程,负责从队列中获取任务并分发
* 2.协调延迟队列和优先级队列
* 3.控制任务的并发执行数量
*/
private class TaskDispatcher implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 先检查延迟队列
DelayedTask delayedTask = delayedTaskQueue.poll();
if (delayedTask != null) {
// 将任务添加到优先级队列
submitToPriorityQueue(delayedTask.getTask(), TaskPriority.HIGH, delayedTask.getTaskId(), delayedTask.getJobParams());
continue;
}
// 从优先级队列取任务执行
PriorityTask priorityTask = priorityTaskQueue.take();
if (priorityTask != null) {
try {
// 获取信号量,控制并发
taskSemaphore.acquire();
// 记录任务开始执行
activeTaskCount.incrementAndGet();
taskExecutionCount.computeIfAbsent(priorityTask.getTaskId(), k -> new AtomicInteger(0)).incrementAndGet();
// 提交到线程池执行
executorService.submit(() -> {
try {
log.info("执行任务: {}, 参数: {}", priorityTask.getTaskId(), priorityTask.getJobParams());
priorityTask.getTask().run();
} catch (Exception e) {
log.error("任务执行异常: {}", priorityTask.getTaskId(), e);
} finally {
// 释放信号量
taskSemaphore.release();
// 更新计数器
activeTaskCount.decrementAndGet();
AtomicInteger counter = taskExecutionCount.get(priorityTask.getTaskId());
if (counter != null) {
counter.decrementAndGet();
}
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("任务分发器异常", e);
}
}
}
}
// 任务队列和执行器
private DelayQueue<DelayedTask> delayedTaskQueue;
private PriorityBlockingQueue<PriorityTask> priorityTaskQueue;
private ExecutorService executorService;
private ExecutorService dispatcherService;
private Semaphore taskSemaphore;
private Random random;
// 任务执行状态监控
private AtomicLong totalTasksReceived = new AtomicLong(0);
private AtomicLong totalTasksExecuted = new AtomicLong(0);
private AtomicInteger activeTaskCount = new AtomicInteger(0);
private Map<String, AtomicInteger> taskExecutionCount = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 初始化任务队列
delayedTaskQueue = new DelayQueue<>();
priorityTaskQueue = new PriorityBlockingQueue<>(queueCapacity);
// 初始化线程池
executorService = Executors.newFixedThreadPool(workerThreads, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "task-worker-" + counter.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
// 初始化分发器线程,
dispatcherService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "task-dispatcher");
thread.setDaemon(true);
return thread;
});
// 初始化信号量
taskSemaphore = new Semaphore(maxConcurrentTasks);
// 初始化随机数生成器
random = new Random();
// 启动任务分发线程
dispatcherService.submit(new TaskDispatcher());
log.info("任务调度器初始化完成,最大并发任务数: {}, 队列容量: {}, 最大延迟分钟数: {}, 工作线程数: {}",
maxConcurrentTasks, queueCapacity, maxDelayMinutes, workerThreads);
}
@PreDestroy
public void shutdown() {
// 关闭调度器
if (dispatcherService != null) {
dispatcherService.shutdownNow();
}
// 关闭执行器
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
log.info("任务调度器已关闭,总接收任务数: {}, 总执行任务数: {}",
totalTasksReceived.get(), totalTasksExecuted.get());
}
/**
* 提交任务到延迟队列
*/
private void submitToDelayQueue(Runnable task, String taskId, String jobParams) {
// 随机延迟时间,在0到maxDelayMinutes分钟之间
long delayMs = random.nextInt((int) TimeUnit.MINUTES.toMillis(maxDelayMinutes));
DelayedTask delayedTask = new DelayedTask(task, delayMs, taskId, jobParams);
delayedTaskQueue.offer(delayedTask);
totalTasksReceived.incrementAndGet();
log.info("任务已提交到延迟队列: {}, 延迟: {}ms", taskId, delayMs);
}
/**
* 提交任务到优先级队列
*/
private void submitToPriorityQueue(Runnable task, TaskPriority priority, String taskId, String jobParams) {
PriorityTask priorityTask = new PriorityTask(task, priority, taskId, jobParams);
priorityTaskQueue.offer(priorityTask);
log.info("任务已提交到优先级队列: {}, 优先级: {}", taskId, priority);
}
/**
* 获取任务类型对应的优先级
*/
private TaskPriority getTaskPriority(String taskId) {
switch (taskId) {
case TASK_SAFETY_NOTICE_ID:
case TASK_PUSH_SERVICE_STATUS_ID:
return TaskPriority.HIGH;
case TASK_TENANT_SERVICE_PHASE_ID:
case TASK_WEEKLY_SUMMARY:
return TaskPriority.MEDIUM;
default:
return TaskPriority.LOW;
}
}
/**
* 创建可执行的任务
*/
private Runnable createExecutableTask(String taskId, String jobParams, TaskContext taskContext) {
switch (taskId) {
case TASK_SAFETY_NOTICE_ID:
return () -> generateXxxTask(taskContext);
case TASK_OTHER:
return () -> generateOtherTask(taskContext);
...
default:
throw new IllegalArgumentException("未知的任务类型: " + taskId);
}
}
/**
* 通用任务提交方法
*/
private ProcessResult submitTask(String taskId, TaskContext taskContext) {
try {
totalTasksReceived.incrementAndGet();
// 检查任务执行情况,如果已有大量相同类型任务,加入延迟队列
int activeCount = taskExecutionCount.computeIfAbsent(taskId, k -> new AtomicInteger(0)).get();
if (activeCount > maxConcurrentTasks / 2) {
log.warn("当前任务类型 {} 正在执行的数量较多: {}, 将使用延迟队列分散负载", taskId, activeCount);
submitToDelayQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), taskId, taskContext.getJobParams());
} else {
// 根据任务类型分配优先级
TaskPriority priority = getTaskPriority(taskId);
submitToPriorityQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), priority, taskId, taskContext.getJobParams());
}
return new ProcessResult(true, formatResponse(SUCCESS, taskId));
} catch (Exception e) {
log.error("提交任务异常: {}", taskId, e);
return new ProcessResult(false, formatResponse(e.getMessage(), taskId));
}
}
// ====== 以下是原始的PowerJob任务处理方法,改为使用队列系统 ======
/**
* 告警任务
*/
@PowerJobHandler(name = TASK_OTHER)
public ProcessResult generateOtherTask(TaskContext taskContext) {
log.info("==================== 调度触发(其它任务) ======================");
return submitTask(TASK_OTHER, taskContext);
}
private ProcessResult generateOtherTask(TaskContext taskContext) {
try {
StrategyJobParams jobParams = new StrategyJobParams();
if (StringUtils.hasLength(taskContext.getJobParams())) {
jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
}
strategyScheduleService.generateTask(jobParams);
totalTasksExecuted.incrementAndGet();
return new ProcessResult(true, formatResponse(SUCCESS, TASK_OTHER));
} catch (Exception e) {
log.error("调度执行【其它任务】异常", e);
return new ProcessResult(false, formatResponse(e.getMessage(), TASK_OTHER));
}
}
/**
* 生成平安通告
*/
@PowerJobHandler(name = TASK_SAFETY_NOTICE_ID)
public ProcessResult generateXxx(TaskContext taskContext) {
log.info("==================== 调度触发(平安通告) ======================");
return submitTask(TASK_SAFETY_NOTICE_ID, taskContext);
}
private ProcessResult generateXxxTask(TaskContext taskContext) {
try {
// 获取调度任务的参数
StrategyJobParams jobParams = null;
if (StringUtils.hasLength(taskContext.getJobParams())) {
jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
}
// 生成平安通告
XxxScheduleService.autoGenerateBatch(jobParams);
totalTasksExecuted.incrementAndGet();
return new ProcessResult(true, formatResponse("success", TASK_SAFETY_NOTICE_ID));
} catch (Exception e) {
log.error("调度执行【平安通告】异常", e);
return new ProcessResult(false, formatResponse(e.getMessage(), TASK_SAFETY_NOTICE_ID));
}
}
private String formatResponse(String info, String id) {
return String.format("{\"taskId\": \"%s\", \"info\": \"%s\"}", id, info);
}
}
{% endcodeblock %}
主要是解决一些异常场景,比如:
这一步还停留在设计阶段,也可能是我设计并落地,也先做个记录。
解法: