第一次使用ThreadPoolTaskExecutor实现线程池的经历,反复修改了多次代码才正常使用

news/2024/5/17 17:22:41 标签: java, synchronized, 线程池, 同步

1、前言

  在一个向第三方平台推送消息的场景中,为了提高程序的执行效率,每次发送消息,都创建一个新的线程来完成发送消息的任务,为了提供线程的使用性能,我选择了ThreadPoolTaskExecutor线程池,结果在使用的过程中,出现了较多的问题,这里记录一下避免以后再出现类似的错误(这些错误是不应该出现的,还是对ThreadPoolTaskExecutor使用不熟悉造成的)。

2、ThreadPoolTaskExecutor用法简介

  ThreadPoolTaskExecutor是Spring Framework提供的一个线程池实现,它继承自ThreadPoolExecutor类,并实现了AsyncTaskExecutor和SchedulingTaskExecutor接口,可以用于异步任务执行和定时任务调度。
  我们通过配置类定义了一个ThreadPoolTaskExecutor的Bean,并设置了核心线程数、最大线程数、队列容量和线程名称前缀等参数。然后在MyService类中注入了这个线程池,并在executeTask方法中使用taskExecutor.execute()来提交一个异步任务。
  通过这种方式,我们可以方便地使用ThreadPoolTaskExecutor来执行异步任务,实现多线程处理和任务调度的功能。

  以下是ThreadPoolTaskExecutor的基本用法示例:

java">import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ExecutorConfig {
    
    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("MyThread-");
        executor.initialize();
        return executor;
    }
}

java">import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class MyService {

    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;

    public void executeTask() {
        taskExecutor.execute(() -> {
            // 执行异步任务逻辑
            // ...
        });
    }
}

3、ThreadPoolTaskExecutor使用过程复现

3.1、最开始的代码——线程泄露

  以下代码是最开始的一版代码,这里出现了一个非常严重的错误,直接造成了线程泄露,或者说是完全错用了线程池,反而造成了更多的线程资源浪费。

java">/**
 * 消息推送
 * 20230609 hsh
 */
public class PushNotificationService {

    private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
    /**
     * 线程池
     * @return
     */
    public static Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
    /**
     * 消息推送
     */
    public static void pushNotification(EventTask eventTask, Map<String,Object> param) {
       Executor executor = getAsyncExecutor();
       executor.execute(new Runnable() {
           @Override
           public void run() {
                if(this.isSend(param)){//判断是否需要推送
                   String msg = "";
                   sendMsg(eventInfo);
                }
           }

           private boolean isSend(Map<String, Object> param) {
                
               return true;
           }
           private String sendMsg(String msg){
               
               return "发送";
           }
       });
    }

}

  上述代码的问题,主要发生在获取线程池的问题上,即每次发送消息都调用了getAsyncExecutor()方法,本意是获取一个线程,结果这里每次都会创建一个线程池,而且线程数至少会有10个,所以每次本来只需要创建一个线程处理消息发送,结果每次都创建了一个线程池,每个线程池至少还有10个线程,结果就出现了线程泄露问题。

3.2、第一次修改——解决线程泄露问题,但是线程不安全,在多线程环境下还是可能会导致创建多个线程池实例

  为了解决最开始出现的线程泄露问题,我把ThreadPoolTaskExecutor 作为对象的一个变量,每次获取的时候,判断是否为空,如果不为空时,就不再创建了。

java">/**
 * 消息推送
 * 20230609 hsh
 */
public class PushNotificationService {

    private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
    
	private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    /**
     * 线程池
     * @return
     */
    public static Executor getAsyncExecutor() {
    	if(executor == null){
            executor = new ThreadPoolTaskExecutor();
        }
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
    //省略……

}

  如果多个线程同时调用getAsyncExecutor()方法,还是可能会导致创建多个线程池实例。

3.3、第二次修改——解决线程安全问题,带来了性能问题

  为了解决线程安全问题,我直接使用了synchronized 关键字,代码如下:

java">/**
 * 消息推送
 * 20230609 hsh
 */
public class PushNotificationService {

    private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
    
	private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    /**
     * 线程池
     * @return
     */
    public static Executor getAsyncExecutor() {
    	 synchronized (executor){
            if(executor == null){
                executor = new ThreadPoolTaskExecutor();
            }
            executor.setCorePoolSize(30);
            executor.setMaxPoolSize(100);
            executor.setQueueCapacity(1000);
            executor.setThreadNamePrefix("AsyncThread-");
            executor.initialize();
            return executor;
        }
    }
    //省略……
}

  为了解决线程安全问题,我直接使用了synchronized 关键字,上述代码虽然可以正常运行,但是带来了非常严重的性能问题,因为synchronized 关键字包含了整个代码块,就相当于在getAsyncExecutor() 方法上使用了synchronized 关键字,该方法就变成了单线程执行了,所以效率非常低。

3.4、第二次修改——解决性能问题,带来了初始化异常

  为了解决性能问题,使用双重检查锁定(double-checked locking)机制来确保线程安全同时保证处理性能,代码如下:

java">/**
 * 消息推送
 * 20230609 hsh
 */
public class PushNotificationService {

    private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
    
	private static ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    /**
     * 线程池
     * @return
     */
    public static Executor getAsyncExecutor() {
    	 if (executor == null) {
            synchronized (executor) {
                if (executor == null) {
                    executor = new ThreadPoolTaskExecutor();
                    executor.setCorePoolSize(30);
                    executor.setMaxPoolSize(100);
                    executor.setQueueCapacity(1000);
                    executor.setThreadNamePrefix("AsyncThread-");
                    executor.initialize();
                }
            }
        }
        return executor;
    }
    //省略……
}

  通过双重检查锁定,确实可以实现性能的提升,但是这里忽略了一个细节,就是ThreadPoolTaskExecutor 初始化,因为这里当executor 对象不为空时,直接返回了,没有进行initialize()操作,所以报了“ThreadPoolTaskExecutor not initialized ”错误,因此声明ThreadPoolTaskExecutor executor 对象的时候,不能使用new ThreadPoolTaskExecutor()方法进行定义,同时为了避免指令重排序可能带来的问题,需要将 executor 声明为 volatile 类型,以确保在多线程环境下的可见性和正确的初始化顺序。

java">/**
 * 消息推送
 * 20230609 hsh
 */
public class PushNotificationService {

    private static Logger logger = LoggerFactory.getLogger(PushNotificationService.class);
    
	private static volatile ThreadPoolTaskExecutor executor;
    /**
     * 线程池
     * @return
     */
    public static Executor getAsyncExecutor() {
    	 if (executor == null) {
            synchronized (executor) {
                if (executor == null) {
                    executor = new ThreadPoolTaskExecutor();
                    executor.setCorePoolSize(30);
                    executor.setMaxPoolSize(100);
                    executor.setQueueCapacity(1000);
                    executor.setThreadNamePrefix("AsyncThread-");
                    executor.initialize();
                }
            }
        }
        return executor;
    }
    //省略……
}

4、总结

  1. 知识点1:ThreadPoolTaskExecutor用法
  2. 知识点2:synchronized 关键字
  3. 知识点3:双重检查锁定(double-checked locking)机制
  4. 知识点4:volatile 关键字

http://www.niftyadmin.cn/n/429363.html

相关文章

常见设计模式记录

单例模式 确保某一个类只有一个实例&#xff0c;而且自行实例化并向整个系统提供这 个实例。 //&#xff08;1&#xff09;懒汉式 public class Singleton { /* 持有私有静态实例&#xff0c;防止被引用&#xff0c;此处赋值为null&#xff0c;目的是实现延迟加载 */ private…

转行软件测试5年了,给还在犹豫的女生一点建议

首先你选择的方向是对的&#xff0c;软件测试这个岗位对于女生是相当友好的. 然后再说女生&#xff0c;软件行业&#xff0c;开发大部分都是男生&#xff0c;所以对于女生来说&#xff0c;因为天性&#xff0c;所以很多时候在互联网公司还是非常吃香的&#xff0c;加上女生本身…

公司招人面了一个00后测试,可以说是内卷届的天花板.....

公司前段缺人&#xff0c;也面了不少测试&#xff0c;结果竟然没有一个合适的。一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资也不低&#xff0c;面试的人很多&#xff0c;但平均水平很让人失望。令我印象最深的是一个00后测试员&#xff0c;他…

Linux系统时间介绍和校时

从timedatectl 可以看到有本地时间、UTC时间、RTC时间和时区信息&#xff0c;如下&#xff1a; Local time: Tue 2022-06-13 14:30:31 CST Universal time: Tue 2022-06-13 06:30:31 UTC RTC time: Tue 2022-06-13 06:30:32 Time zone: Asia/Shanghai (CST, 0800) Local time: …

企业成本发票不足,利润虚高,此类问题该如何解决?

《税筹顾问》专注于园区招商&#xff0c;您的贴身节税小能手&#xff0c;合理合规节税&#xff01; 企业利润很高的情况下&#xff0c;缺成本发票的问题又很严重&#xff0c;那么需要缴纳的企业所得税就会高的吓人了&#xff0c;那么企业利润很容易就超过300万&#xff0c;这样…

计划任务使用介绍

作者:lly 文章目录 前言一、使用说明1.1 发布模型1.2 创建并设置计划任务1.3 开启计划任务1.4 管理计划任务 二、结语 前言 iServer 11i(2023)对于处理自动化服务新增计划任务功能&#xff0c;该功能支持定时触发和监听文件变化触发执行模型&#xff0c;因此计划任务适用于以下…

0基础转行,网路工程和网络安全哪个更有发展前景?

对于初学者而言&#xff0c;初入IT行业最重要的就是选择一个热门且前景好的职业&#xff0c;而网络工程和网络安全作为IT行业的热门职业必然成为很多人的首选&#xff0c;那么网络工程和网络安全哪个发展前景好?小编带大家详细了解一下。 首先&#xff0c;我们对网络工程和网络…

Mysql数据库初体验

Mysql数据库初体验 一、数据库的基本概念1.数据&#xff08;Data&#xff09;2.表3.数据库4.数据库管理系统&#xff08;DBMS)5.数据库系统 二、数据库系统发展史1.第一代数据库2.第二代数据库3.第三代数据库 三、当今主流数据库介绍四、数据库分类1.关系数据库2.关系型 SQL 数…