[Java] 并发与多线程

[Java] 并发与多线程

Scroll Down

1. 并发与并行

并行:A、B任务同时执行

并发:微观上A、B任务先后执行

以KTV为例子,并行就是有两个麦克风一起唱歌,并发就是你的麦克风使用完再轮到我使用。

2. 线程安全

所谓线程安全都是相对并发场景来说,在多线程高并发的情境下,有可能出现数据丢失或者数据被覆盖。

线程安全的核心是“要么只读,要么加锁”。合理利用好JDK提供的并发包(JUC),可以化腐朽为神奇。

线程的5种状态

  • NEW(新建状态)
  • RUNNABLE(可运行状态)
  • RUNNING(运行状态)
  • BLOCKER(阻塞状态)
  • DEAD(销毁)

创建线程的三种方式

  • 通过继承 Thread 类
  • 实现 Runnable 接口
  • 实现 callable 接口

相对第一种推荐第二种,实现 Runnable 接口使编程更灵活,对外暴露接口少,让使用者专注实现线程的 run() 方法上。

RUNNABLE

调用 start() 之后运行之前的状态。线程的 start() 不能被多次调用,否则会抛出IllegalStackException异常

RUNNING

run() 方法正在运行时线程的状态,线程可能会因为某些因素退出RUNNING, 如时间、异常、锁、调度等

BLOCKED

  • 同步阻塞:锁被其他线程占用
  • 主动阻塞:调用 Thread 的某些方法,主动让出 CPU 使用权,比如sleep(),join()等。
  • 等待阻塞:执行了wait()

DEAD

run()方法执行结束,或因异常退出的状态,此状态不可逆。


保证高并发下的线程安全的考量角度:

  • 数据单线程内可见。单线程总是安全的。限制数据单线程内可见,可以避免数据被其他线程篡改。参考 ThreadLocal。
  • 只读对象。只读对象总是安全的,它允许复制,拒绝写入。例如 String、Integer。一个对象想要拒绝任何写入,必须满足以下条件:使用 final 关键字修饰类,避免被继承;使用private final 关键字避免属性被中途修改;没有任何更新方法;返回值不能可变对象为引用。
  • 线程安全类。某些线程安全类内部有非常明确的线程安全机制。比如 StringBuffer 就是一个线程安全类,它采用synchronize 关键字修饰方法。
  • 同步与锁机制。 如果想要对某个对象执行并发更新操作,但又不属上述三类,需要开发工程师在代码中实现安全的同步机制。虽然这个机制支持的并发场景很有价值,但非常复制且容易出现问题。

3. 线程池

创建线程需要不断分配虚拟机栈、本地方法栈和程序计数器等线程私有的内存空间。在线程销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程风险。这时就需要线程池来协调线程,提高 CPU 及系统资源利用率,实现类似主次线程隔离、定时任务、周期执行任务等。

  • 利用线程池管理并复用线程、控制最大并发数等。
  • 实现任务线程队列缓存策略和拒绝机制。
  • 实现某些与时间相关的功能,如定时任务、周期任务。
  • 隔离线程环境。如交易服务和搜索服务在同一个服务器上,分别开启两个线程池,交易线程的资源损耗明显要大,因此,通过配置独立的线程池,将较慢的交易服务和搜索服务分开,避免各服务线程相互影响。

线程池创建

源码参数分析


    /**
    *源码分析
    */
    public ThreadPoolExecutor(
                    //常驻核心线程数,等于0则任务执行完之后,没有任何请求进入时销毁线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。
                    int corePoolSize, 
                              
                    //表示线程池能够容纳同时执行的最大线程数,必须大于1
                    int maximumPoolSize,
                              
                    //线程池中的线程空闲时间,当空闲时间达到 keepAliveTime 值时,线程会被销毁,知道只剩下corePoolSize个线程为止
                    long keepAliveTime,
                              
                    //时间单位,通常为 Time.SECONDS
                    TimeUnit unit,
                              
                    //缓存队列,当请求的线程数大于maximumPoolSize时,线程进入BlockingQueue阻塞队列  
                    BlockingQueue<Runnable> workQueue,
                              
                    //线程工厂,用来生产一组任务相同的线程。 
                    ThreadFactory threadFactory,
                              
                    //拒绝策略,当超过缓存workQueue的任务上限时,就可以通过该策略处理请求,一种简单的限流保护。
                    RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程工厂定义,规范有意义的线程命名

public class UserThreadFactory implements ThreadFactory {

    private final String namePrefix;

    private final AtomicInteger nextId = new AtomicInteger(1);

    // 定义线程组名称,在使用jstack来排查问题时,非常有帮助
    UserThreadFactory(String whatFeatureOfGroup) {
        namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + " -worker-";
    }


    @Override
    public Thread newThread(Runnable task) {
        String name = namePrefix + nextId.getAndIncrement();
        Thread thread = new Thread(null, task, name, 0L);
        System.out.println(thread.getName());
        return thread;
    }

    // 任务执行体
    class Task implements Runnable {
        private final AtomicLong count = new AtomicLong(0L);
        @Override
        public void run() {
            System.out.println("running_" +count.getAndIncrement());
        }
    }
}

定义拒绝策略,打印当前线程的状态

    public class UserRejectHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
            System.out.println("task rejected. " +executor.toString());
        }
    }

实现线程池

public class UserThreadPool {
    public static void main(String[] args) {

        // 缓存队列设置固定长度为2,为了快速触发 rejectHandler
        BlockingQueue queue = new LinkedBlockingQueue(2);

        // 设置外部任务线程的来源由1号房和2号房混合调用
        UserThreadFactory f1 = new UserThreadFactory("第 1 机房");
        UserThreadFactory f2 = new UserThreadFactory("第 2 机房");

        UserRejectHandler handler = new UserRejectHandler();

        // 核心线程数:1,容纳同时执行线程数:2,空闲时间:60,时间单位:秒,缓存队列:queue,线程工厂:f1,拒绝策略:handler
        ThreadPoolExecutor threadPoolFirst = new ThreadPoolExecutor(1, 2,60 , TimeUnit.SECONDS, queue, f1, handler);
        ThreadPoolExecutor threadPoolSecond = new ThreadPoolExecutor(1, 2,60 , TimeUnit.SECONDS, queue, f2, handler);

        Runnable task = new UserThreadFactory.Task();
        // 创建400个任务线程
        for (int i=0; i <= 400; i++) {
            threadPoolFirst.execute(task);
            threadPoolSecond.execute(task);
        }
        threadPoolFirst.shutdown();
        threadPoolSecond.shutdown();
    }
}

使用线程池需要注意以下几点:

  • 合理设置各类参数,应根据实际业务场景来设置合理的工作线程数
  • 线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
  • 创建线程或线程池时请指定有意义的线程名称,方便出错时回溯

在此致敬伟大的Java开拓者--Doug Lea。
编程不识Doug Lea,写尽Java也枉然。