多线程与锁


多线程与锁,介绍JVM内存模型,多线程以及线程同步(锁),还有线程池等知识。此文章大多数内容来源于网络,详见引用章节。

内存模型

内存模型概念

模型类别

  • 顺序一致性模型

    要求对某处理机缩写的值立即进行传播,确保该值已被所有处理机接受后才能继续其他指令的执行。

  • 释放一致性模型

    允许将某处理机所写的值延迟到释放锁时进行传播。

内存模型功能

  • Describes execution trace of a program
  • Describes possible behaviors of a program
  • Determines what values can be read at every point in the program

内存模型特征

  • Visibility 可见性:多核、多线程间数据的共享
  • Ordering 有序性:对内存的操作应该是有序的

JVM内存模型

理论基础

  • 定义了Java线程和内存交互的规则
  • 保证多线程程序结果的可预测,语义一致性
  • Heap Memory:用来在线程间共享内存
    • instance fields
    • static fileds
    • array elements
  • 线程本地变量(局部变量,方法参数等)在堆栈中,不受JMM(Java Memory Model)影响
  • JMM如何体现可见性?

    在JMM中,通过并发线程修改变量值,必须将线程变量同步回主存后,其他线程才能访问到。

  • JMM怎么体现有序性?

    通过Java提供的同步机制或者volatile关键字,来保证内存的访问顺序。

有序性、可见性

  • 程序顺序

    程序声明的顺序

  • 执行顺序

    JMM不保证线程对变量操作发生的顺序和被其他线程看到的是同样的顺序。JMM容许线程以写入变量时所不相同的次序把变量存入主存。

    • 线程内部本身遵循程序顺序,从线程外看到的是执行顺序
    • 编译器和处理器可能会为了性能优化,进行指令重排序
    • 程序执行为了优化也可能重新排序

Happens-Before Memory Model

  • 类似释放一致性模型
  • Partial odering(Happens-Before)

    如果B能够看到A的动作产生的结果,我们说A Happens-Before B,IMM定义了一些这样的规则:

    • Program order rule:Each action in a thread Happens-Before every action in that thread that comes later in the program order.
    • Monitor lock rule:An unlock on a monitor lock Happens-Before every subsequent lock on that same monitor lock.
    • Volatile variable rule:A write to a volatile field Happens-Before every subsequent read of that same filed.
    • ……

      The rules for Happens-Before

  • 带锁的线程和内存交互行为
    • 获取对象监视器的锁(lock)
    • 清空工作内存数据,从主内存复制变量到当前工作内存,即同步数据(read and load)
    • 执行代码,改变共享变量的值(use and assign)
    • 将工作内存数据刷回竹村(store and write)
    • 释放对象监视器的锁(unlock)
      带锁的线程和内存交互行为

JMM相关术语

  • Thread working copy memory(工作内存)
    • 在JVM规范中这是一个抽象的概念,对应的可能会是寄存器,CPU缓存,编译以及执行优化等。
    • 一个新产生的Thread有一个空的working memory。
    • 线程间无法相互直接访问,变量传递均需要通过主存来完成。
  • The main memory(主存)

    Java堆内存

  • Thread’s execution engine

    保证线程的正确执行顺序
    Java内存模型

  • JVM定义了6个原子操作来读写工作内存和主内存的数据以及lock、unlock实现更大范围的原子性:
    • lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
    • unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
    • read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
    • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
    • use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
    • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
    • store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
    • write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中
      如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行read和load操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。也就是read和load之间,store和write之间是可以插入其他指令的,如对主内存中的变量a、b进行访问时,可能的顺序是read a,read b,load b, load a。Java内存模型还规定了在执行上述八种基本操作时,必须满足如下规则:
    • 不允许read和load、store和write操作之一单独出现
    • 不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中。
    • 不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中。
    • 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作。
    • 一个变量在同一时刻只允许一条线程对其进行lock操作,lock和unlock必须成对出现
    • 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值
    • 如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许去unlock一个被其他线程锁定的变量。
    • 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)。

线程状态

从线程创建到线程死亡的生命周期,一共6个状态。具体可参考java.lang.Thread.State.

  • 新建状态NEW:新建线程,并且未调用start()方法之前。
  • 就绪(可运行)状态RUNNABLENEW调用start(),RUNNING调用yield()后,进入就绪状态,等待CPU资源调度。
  • 运行状态RUNNING(JDK没有定义该状态):线程正在执行
  • 阻塞状态BLOCKED:这个状态下, 是在多个线程有同步操作的场景, 比如正在等待另一个线程的synchronized 块的执行释放, 或者可重入的 synchronized块里别人调用wait() 方法, 也就是这里是线程在等待进入临界区。
  • 无限等待状态WAITING:调用了Object.wait(),join(),LockSupport.park()并且没有设置timeout时间,进入无限等待状态。Object.wait()后只能被Object.notify(),Object.notifyAll()唤醒。join()则需要等待指定的线程进入TERMINATED状态。
  • 限时等待TIMED_WAITING:调用Thread.sleep(long),Object.wait(long),Thread.join(long),LockSupport.parkNanos,LockSupport.parkUntil方法,指定了timeout时间,则进入限时等待状态。
  • 死亡状态TERMINATED:线程结束,正常运行结束或者抛出异常。
    此老外的图描述有误:threadX.join(long)是进入TIMED_WAITING状态的
    线程状态
    线程状态

顺序性/可见性/原子性

顺序性

如果在本线程内观察,所有操作都是有序的;如果在一个线程中观察另一个线程,所有操作都是无序的。前半句是指“线程内表现为串行语义”,后半句是指“指令重排序”现象和“工作内存中主内存同步延迟”现象。

可见性

可见性就是指当一个线程修改了线程共享变量的值,其它线程能够立即得知这个修改

原子性

原子性是指在一个操作中就是cpu不可以在中途暂停然后再调度,既不被中断操作,要不执行完成,要不就不执行。

线程上下文切换

什么是线程上下文切换?

CPU给每个线程分配一定的CPU时间,这个时间片很短,一般是几十ms。CPU通过不停切换线程执行程序。
CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再次加载这个任务的状态,从任务保存到再加载的过程就是一次上下文切换。

如何减少线程上下文切换?

上下文切换又分为2种:

  • 让步式上下文切换

    执行线程主动释放CPU,与锁竞争严重程度成正比,可通过减少锁竞争来避免.

    • 无锁并发编程/降低锁粒度:多线程竞争时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash取模分段,不同的线程处理不同段的数据。
    • CAS算法(or其他锁无关算法):Java的Atomic包使用CAS算法来更新数据,而不需要加锁。java.util.concurrent下的并发集合大多使用了CAS。
  • 抢占式上下文切换

    后者是指线程因分配的时间片用尽而被迫放弃CPU或者被其他优先级更高的线程所抢占,一般由于线程数大于CPU可用核心数引起,可通过调整线程数,适当减少线程数来避免。

    • 使用最少线程:避免创建不需要的线程,防止大量线程处于等待状态。
    • 协程(coroutine):在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。

JDK线程调度相关类

FutureTask

1
2
3
4
5
6
7
8
9
10
ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new Callable() { //Future是FutureTask的父接口,此处返回的是FutureTask
@Override
public String call() throws Exception {
System.out.println("call back when the thread ends!");
return "call back result";
}
});
System.out.println(future.get()); //future.get()方法是阻塞的
executor.shutdown();

CyclicBarrier

1
2
3
4
CyclicBarrier cb = new CyclicBarrier(int parties);
CyclicBarrier cb = new CyclicBarrier(int parties,Runnable barrierAction);
cb.await() //parties计数-1
cb.await(long timeout, TimeUnit unit) //等待timeout时间直至触发Barrier。如果超时则自行执行下去。

参数解析:

  • parties: parties意思是必须有“parties”个线程执行了await方法,所有线程才会执行await之后的代码。每await一次,count计数-1。循环迭代的时候count计数会被重置为parties。
  • barrierAction: the command to execute when the barrier is tripped.唤醒所有线程前执行的屏障动作。
    过程和代码如图:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    final CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    Thread a = new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("aStart");
    try {
    cyclicBarrier.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("aEnd");
    }
    });
    Thread b = new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("bStart");
    try {
    cyclicBarrier.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("bEnd");
    }
    });
    Thread c = new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("cStart");
    try {
    cyclicBarrier.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("cEnd");
    }
    });
    a.start();
    b.start();
    c.start();

CyclicBarrier

注意:如果在屏障等待的某个线程抛出InterruptedException,则其他所有线程都会抛出BrokenBarrierException。此时Cyclicbarrier处于Broken状态(被损坏)。
cb.isBroken()能知道CyclicBarrier是否已被损坏。cb.reset()可以重置CyclicBarrier状态。

CountDownLatch

可用于控制线程的执行顺序。

1
2
3
4
CountDownLatch cdh = CountDownLatch(int count)
cdl.countDown() //计数器-1,但**不会阻塞线程**,这区别于await()方法
cdh.await() //一直等到count计数为0,除非被其他线程打断。
cdh.await(long timeout, TimeUnit unit) //等待timeout时间直至count计数为0则返回true;如果超时则返回false。

参数解析:

  • count:count表示必须有“count”个线程执行了cdl.countDown()方法,使得计数器为0,主线程往下执行。
    CountDownLatch类似于join,但需要知道线程数量(用于倒计数):
    ![CountDownLatch](/resources/img/jvm/CountDownLatch.png)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    final CountDownLatch countDownLatch = new CountDownLatch(3);
    Thread a = new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("aEnd");
    countDownLatch.countDown();
    }
    });
    Thread b = new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("bEnd");
    countDownLatch.countDown();
    }
    });
    Thread c = new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println("cEnd");
    countDownLatch.countDown();
    }
    });
    a.start();
    b.start();
    c.start();
    countDownLatch.await(); //此处阻塞,等待countDown为0
    System.out.println("Main Thread goes on!");

join

1
thread.join(long millis) //将thread join进主线程,并且主线程只等待millis毫秒。超时后抛出IllegalArgumentException异常。

类似与上面countDownLatch,但是不需要知道线程数量,也不需要计数器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Thread a = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("aEnd");
}
});
Thread b = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("bEnd");
}
});
Thread c = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("cEnd");
}
});
a.start();
b.start();
c.start();
a.join();
b.join();
c.join();
System.out.println("Main Thread goes on!");

CountDownLatch和join还有一个明显的区别是:join进主线程的子线程必须执行完毕,主线程才能继续执行。而CountDownLatch则可以灵活地通过调用countDown()方法,让计数器为0边执行主线程,且countDown()方法无阻塞,子线程也可以继续执行。

Semaphore(信号量)

1
2
3
4
5
6
7
8
9
10
11
12
Semaphore(int permits表示许可数目,即同时允许) //permits表示许可数目,即同时允许permits个线程获取许可进行并发运行。
Semaphore(int permits, boolean fair) //fair表示是否公平,true表示公平竞争,即等待时间越久的线程越先获得许可。
semaphore.acquire(); //获取许可
semaphore.release(); //释放许可
semaphore.acquire(int permits); //获取permits个许可
semaphore.release(int permits); //释放permits个许可
semaphore.acquireUninterruptibly(int permits);//类似acquire(),但是被打断时,线程会继续申请许可,当申请到许可后,会标记线程状态为中断。
semaphore.acquireUninterruptibly();//
semaphore.tryAcquire();//尝试获取许可,非阻塞
semaphore.tryAcquire(int permits);//尝试获取permits个许可,非阻塞
semaphore.tryAcquire(long timeout, TimeUnit unit);//timeout时间内获取1个许可
semaphore.tryAcquire(int permits, long timeout, TimeUnit unit);//timeout时间内获取permits个许可

Semaphore比较简单,看代码一目了然,主要用于控制线程并发数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final Semaphore semaphore = new Semaphore(2);
Thread a = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("aEnd");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}
});
Thread b = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("bEnd");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}
});
Thread c = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("cEnd");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}
});
a.start();
b.start();
c.start();

Phaser

CyclicBarrierCountDownLatch无法满足时,再研究下。

Fork&Join

类似于MR,只不过一个是多进程(分布式),一个是多线程。

Sleep/wait

Thread.sleep(long millis)睡眠,让出CPU时间,但是不释放锁。millis时间后自动执行
Object.wait(long timeout)挂起,让出CPU时间,并且释放锁。timeout时间后抛出异常;timeout设置为0永不超时,需要其他线程使用notify/notifyAll唤醒。

线程池

JDK线程池

JDK线程池参数配置

  • corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Thread放入到等待队列中了;
  • maximumPoolSize:一般你用不到,当大于了这个值就会将Thread由一个丢弃处理机制来处理,但是当你发生:newFixedThreadPool的时候,corePoolSize和maximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中,看了后面的代码你就知道了。
  • workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的队列属性为:workers,为一个HashSet;内部被包装了一层,后面会看到这部分代码。
  • keepAliveTime:默认都是0,当线程没有任务处理后,保持多长时间,cachedPoolSize是默认60s,不推荐使用。
  • threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可;
  • handler:也就是参数maximumPoolSize达到后丢弃处理的方法,java提供了5种丢弃处理的方法,当然你也可以自己弄,主要是要实现接口:RejectedExecutionHandler中的方法:

java默认的是使用:AbortPolicy,他的作用是当出现这中情况的时候会抛出一个异常;其余的还包含:

​ 1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程

​ 2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。

​ 3、DiscardPolicy:什么也不做

​ 4、AbortPolicy:java默认,抛出一个异常:RejectedExecutionException。

线程池通常调用2个方法,submit方法或execute方法。其区别在于:submit传入Callable,并且有返回值Future;execute传入Runnable,没有返回值。

Executors.newCachedThreadPool

适用于大量短生命周期的线程。

  • 可配置参数
    corePoolSize: 0 线程池里面保持的线程,即使是空闲线程。通过参数allowCoreThreadTimeOut控制过期时间。
    maximumPoolSize:Integer.MAX_VALUE 线程池最大的线程数量限制
    keepAliveTime:60 如果线程池线程数大于corePoolSize,超过keepAliveTime的空闲线程将会被释放
    unit: TimeUnit.SECONDS keepAliveTime的时间单位
    workQueue: [同步队列]new SynchronousQueue() 保存未执行的任务。任务:仅限于被方法execute提交的Runnable Task。

    Executors.newFixedThreadPool

    线程数固定的,且线程关闭只能是因为异常或者调用了shutdown方法。有线程被关闭时,被阻塞的线程会加入线程池。
  • 可配置参数
    • corePoolSize:自定义,线程数
    • maximumPoolSize:同corePoolSize
    • keepAliveTime:0
    • unit:TimeUnit.MILLISECONDS
    • workQueue :new LinkedBlockingQueue()

Executors.newSingleThreadExecutor

特殊的newFixedThreadPool ,因为线程数是1

  • 可配置参数
    corePoolSize:1
    maximumPoolSize:1
    keepAliveTime:0
    unit:TimeUnit.MILLISECONDS
    workQueue :new LinkedBlockingQueue()

Executors.newScheduledThreadPool

Executors.newSingleThreadScheduledExecutor

Spring线程池(TaskExecutor实现类)

org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

它不支持任何对java.util.concurrent包的替换或者下行移植。Doug Lea和Dawid Kurzyniec对java.util.concurrent的实现都采用了不同的包结构,导致它们无法正确运行。 这个实现只能在Java 5环境中使用,但是却是这个环境中最常用的。它暴露的bean properties可以用来配置一个java.util.concurrent.ThreadPoolExecutor,把它包装到一个TaskExecutor中。如果你需要更加先进的类,比如ScheduledThreadPoolExecutor,我们建议你使用ConcurrentTaskExecutor来替代。

  • 可配置参数
    • corePoolSize:核心线程数,线程池维护线程的最少数量。默认1,运行时可修改,比如通过JMX。
    • keepAliveSeconds:线程池维护线程所允许的空闲时间。默认60
    • maxPoolSize:线程池(ThreadPoolExecutor)维护线程的最大数量
    • queueCapacity:线程池所使用的缓冲队列BlockingQueue大小,默认是Integer.MAX_VALUE。配置为正数,使用LinkedBlockingQueue;负数则使用SynchronousQueue
    • allowCoreThreadTimeOut:Specify whether to allow core threads to time out. This enables dynamic growing and shrinking even in combination with a non-zero queue (since the max pool size will only grow once the queue is full).默认false。
    • rejectedExecutionHandler:拒绝多线程任务的策略
  • execute(Runnable)方法 执行过程
    • 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
    • 如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。
    • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
    • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
    • 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
  • Reject策略预定义
    • ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
    • ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
    • ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
    • ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程).

org.springframework.core.task.SimpleAsyncTaskExecutor

这个实现不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。本质上不能算是一个池。

org.springframework.core.task.SyncTaskExecutor

这个实现不会异步执行。相反,每次调用都在发起调用的线程中执行。它的主要用处是在不需要多线程的时候,比如简单的test case。

org.springframework.scheduling.concurrent.ConcurrentTaskExecutor

这个实现是对Java 5 java.util.concurrent.Executor类的包装。有另一个备选, ThreadPoolTaskExecutor类,它暴露了Executor的配置参数作为bean属性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不满足需求,ConcurrentTaskExecutor是另外一个备选。

SimpleThreadPoolTaskExecutor

这个实现实际上是Quartz的SimpleThreadPool类的子类,它会监听Spring的生命周期回调。当你有线程池,需要在Quartz和非Quartz组件中共用时,这是它的典型用处。

TimerTaskExecutor

这个实现使用一个TimerTask作为其背后的实现。它和SyncTaskExecutor的不同在于,方法调用是在一个独立的线程中进行的,虽然在那个线程中是同步的。

WorkManagerTaskExecutor

这个实现使用了CommonJ WorkManager作为其底层实现,是在Spring context中配置CommonJ WorkManager应用的最重要的类。和SimpleThreadPoolTaskExecutor类似,这个类实现了WorkManager接口,因此可以直接作为WorkManager使用。

线程池优化

线程池优化关键点

  • 尽量减少线程切换和管理的开支

    要求线程数尽量少,这样可以减少线程切换和管理的开支

  • 最大化利用CPU

    要求尽量多的线程,以保证CPU资源最大化的利用。

合理的配置线程池

分析角度:

  1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  2. 任务的优先级:高,中和低。
  3. 任务的执行时间:长,中和短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。
  • 任务性质不同的任务可以用不同规模的线程池分开处理。

  • CPU密集型任务配置尽可能小的线程,如配置Ncpu+1个线程的线程池。

  • IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。

  • 混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。

  • 我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

  • 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

  • 执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

  • 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

    建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用。

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
  • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+ getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

1
protected void beforeExecute(Thread t, Runnable r) { }

死锁与活锁

  • 死锁
    • 互斥条件:一个资源每次只能被一个线程使用
    • 请求与保持条件:一个线程因请求资源而阻塞时,保持已获得的资源不释放
    • 不剥夺条件:线程获得的资源不能被强行剥夺
    • 循环等待条件:若干线程(>=2)形成一种头尾相接的循环的循环等待资源关系。
  • 活锁
    活锁指的是任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。 活锁和死锁的区别在于,处于活锁的实体是在不断的改变状态,所谓的“活”, 而处于死锁的实体表现为等待;活锁有可能自行解开,死锁则不能。

阻塞锁

阻塞时不占用CPU时间,通过notify等方法唤醒。线程竞争锁时需要切换线程上下文。

自旋锁

线程循环竞争锁,线程状态无须改变,所以少量线程竞争时响应快。但是如果存在大量线程自旋,则会占用大量的CPU资源。

  • 公平自旋锁:所有线程并发自旋。
  • 非公平自旋锁:线程入队列排队,顺序出队竞争自旋锁。

    阻塞锁和自旋锁相当于APP通知的“推”和“拉”。

乐观锁

每次去读取数据都会乐观地认为其他线程不会修改数据,所以读时不上锁,只有在修改数据时会判断此期间是否有线程修改了该数据。
这里可以用版本号等机制实现。如果写冲突可以返回错误给用户处理。
乐观锁适用于多读数据,提高吞吐量。
举例:MySQL的版本号机制乐观锁(类似于SVN)
乐观锁(版本号机制)

悲观锁

每次去读取数据都会悲观地认为其他线程会修改数据,所以每次读数据都会上锁。
传统的RMDB默认是悲观锁:行锁,表锁,读锁,写锁等。

可重入锁(递归锁)

Java的实现是ReentrantLock和synchronized
可重入锁可避免同一线程的死锁。
可重入锁同一线程(一般在递归函数),可多次获得锁。锁内部都有1个状态计数器,每次重入状态计数+1;退出则-1。

锁无关算法(CAS)

CAS概述

CAS即Compare And Swap,是一种锁无关算法。具体例子可以看java.util.concurrent下的并发集合以及原子类。

  • wait-free 等待无关
  • lock-free 锁无关:没有线程阻塞的情况下实现变量同步。
  • lock-based 基于锁

CAS必须是一个原子操作,用伪代码来表示:

1
2
3
4
5
6
7
8
template<class T>
bool CAS(T* addr,T expected,T value){
if(*addr==expected){
*addr = value;
return true;
}
return false;
}

CAS优点

  • CAS突出的是“无阻塞”,避免死锁,但是可能会造成活锁。
  • 阻塞/非阻塞:强调的是资源问题,阻塞当前线程会挂起,竞争锁还会造成线程的上下文切换。非阻塞表示不影响当前线程的执行,并且没有线程的上下文切换,即使正在自旋CAS的线程挂了,也不会阻塞其它线程自旋CAS。
  • 同步/异步:强调的是等待时间,同步表示必须等候方法处理完成并且返回;异步则是立刻返回结果,无须等待。

    CAS缺点

  • ABA问题:因为CAS需要在操作前检查下值有没有发生变化,如果没有则更新。但是如果一个值开始的时候是A,变成了B,又变成了A,那么使用CAS进行检查的时候会发现它的值没有发生变化,但是事实却不是如此。ABA问题的解决思路是使用版本号(乐观锁),如A-B-A变成1A-2B-3A
  • 循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。比如CAS实现的并发队列在队列为空时出队,结果队列长时间为空,则这段时间每个线程都在CAS自旋。
  • 只能保证一个共享变量的原子操作:对一个共享变成可以使用CAS进行原子操作,但是多个共享变量的原子操作就无法使用CAS,这个时候只能使用锁。 
  • 以JDK的并发类举个例子(基于JDK 7)
    • 原子操作的++i代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//利用AtomicInteger实现++i
AtomicInteger atomicInteger = new AtomicInteger(0);
atomicInteger.incrementAndGet();
//++i内部实现
public final int getAndIncrement() {
for (;;) { //for循环CAS自旋
int current = get(); //获取当前值
int next = current + 1; //当前值+1,next为CAS应该Set进去的值
if (compareAndSet(current, next)) //CAS操作:如果当前值等于期望值,则设置当前值为next,返回当前值。否则循环获取当前值做CAS操作。
return current;
}
}
//CAS内部实现
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);//原子CAS,操作系统orCPU指令直接支持
}
  • ConcurrentLinkedQueue的出队和入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//入队
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {//t,p都是tail节点
Node<E> q = p.next;//获取tail节点的next q
if (q == null) {//单线程下tail节点的next肯定是null多线程需要判断。
// p is last node 如果为null,证明此时此刻q的确是最后一个节点(null节点)
if (p.casNext(null, newNode)) { //通过CAS(这里是compareAndSwapObject,不是Set)去设置null为newNode(申请入队元素)。如果失败,证明其他线程先一步入队了
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time 如果此时tail节点不相等了,则更新tail节点
casTail(t, newNode); // Failure is OK. compareAndSwapObject失败也无所谓,证明有其他线程在之前也casNext成功,并更新了tail节点。
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) //p指向tail或者head
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head; //TODO:这一陀表达式再研究
else //p指向tail或者q(tail.next)
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;//TODO:这一陀表达式再研究
}
}
//出队
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) { //compareAndSwapObject如果队列不为空且CAS出队首元素,并将队首设置为null。如果不成功,证明队首元素被其他线程出队了。
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

分布式锁

  • 层次锁:具体参看本人blog:分布式令牌桶设计实现(流控)

synchronized

作用

  • Low level locking
  • 每个对象都有一个相关的lock对象(监视器)
  • Java语言没有提供分离的lock和unlock操作,但是在JVM提供了2个单独的指令monitorenter和monitorext来实现
  • 特性
    • Atomicity(原子性):Locking to obtain mutual exclusion
    • Visibility(可见性):Ensuring that changes to object fields made in one thread are seen in other threads(memory)
    • Ordering(顺序性):Ensuring that you aren’t surprised by the order in which statements are executed
    • Blocking:Cant’t interrupt

      synchronized原理

      synchronized的实现依赖于lock-free队列,基本思想是先自旋后阻塞。先自旋可以减少线程的上下文切换,获得更改的吞吐量,但代价是造成不公平锁。
      JVM中,锁有个名字叫“对象监视器”。
  • Contention List:所有请求锁的线程将被首先放置到该竞争队列
  • Entry List:Contention List中那些有资格成为候选人的线程被移到Entry List
  • Wait Set:那些调用wait方法被阻塞的线程被放置到Wait Set
  • OnDeck:任何时刻最多只能有一个线程正在竞争锁,该线程称为OnDeck
  • Owner:获得锁的线程称为Owner
  • !Owner:释放锁的线程
    在1会先自旋,无法获得锁才会进入ContentionList。ContentionList与EntryList同属等待队列。但是EntryList优先级较高。
    • EntryList优先级高,WaitSet的线程被notify唤醒会直接进入EntryList。
    • ContentionList是LIFO队列,EntryLis的存在减轻了队首竞争。(疑问?)
      synchronized原理

synchronized内存模型语义分析

  • synchronized内存操作

    通过对象引用找到同步对象,然后获取对象上的监视器锁

    • 当线程进入synchronized块之后:
      • a.清洗thread’s working memory
      • b.变量复制:对块内的变量执行assign原子操作
      • c.变量复制:对use变量执行read->load原子操作
    • 当线程退出synchronized块之前,对它在iworkingmemory中所有的assigned values执行store->write原子操作,写回mian memory
  • synchronized不足与发展
    • 不能跨越多个对象
      • 当在等待锁对象的时候不能中途放弃,知道成功
      • 等待没有超时限制
      • 不能中断阻塞
      • JDK5 提供了更灵活的锁机制:Lock和Condition
  • synchronized优化技术
    • 锁省略:锁对象的引用时,线程本地对象(线程的堆栈内的对象)。意思就是操作线程内局部变量不需要加锁。
      • 锁粗化:锁粗化就是把使用同一锁对象的相邻同步块合并的过程(减少线程上下文的切换)
      • 自适应锁优化技术:实现阻塞有2种技术,即让操作系统暂挂线程,直到线程被唤醒,或使用旋转(spin)锁。(指的分别是阻塞锁和自旋锁)
      • Hotspot可以对持有时间短的锁使用自旋锁,而对持有时间长的锁使用阻塞锁

        Condition

        Condition是一个接口,位于java.util.concurrent.locks.Condition
        1
        2
        3
        4
        5
        6
        7
        8
        9
        ReentrantLock reentrantLock = new ReentrantLock(true);
        Condition condition = reentrantLock.newCondition();
        condition.await(); //阻塞释放CPU资源和对象锁
        condition.awaitUninterruptibly();//无视中断阻塞,但中断后进入此方法,线程状态会被设置为`interrupted status`
        condition.awaitNanos(long nanosTimeout);//阻塞,返回`超时时间-等待时间`。超时时间为nanosTimeout,超时后抛出InterruptedException。
        condition.await(long time, TimeUnit unit);//同上,区别是时间单位
        condition.awaitUntil(Date deadline);//阻塞到某时刻
        condition.signal();//唤醒被该Condition对象阻塞的线程中的1个
        condition.signalAll();//唤醒所有被该Condition对象阻塞的线程

ReentrantLock

synchronized与ReentrantLock都是可重入锁,Synchronized在编译期,会在同步块的前后分别形成monitorenter和monitorexit这个两个字节码指令。(进入和退出对象监视器,分别代表lock和unclock)。
synchronized是非公平锁,在无法获取到锁的时候,会先进行自旋才会进入竞争锁队列。且synchronized不可中断(没有相关API)
ReentrantLock则可提供公平锁和非公平锁的选择,且可被中断(指持有锁的线程不释放锁时,正在等待的线程可以选择放弃等待),灵活控制锁的使用,减少死锁。
synchronized的作用是某个代码块,单个类,单个对象;
而ReentrantLock则可同时锁定多个Condition对象。

ReentrantReadWriteLock

ReentrantReadWriteLock 读锁之间不互斥,读写锁互斥,写锁之间互斥。用于读多少写的并发资源访问情况。

CopyOnWrite类集合:读不加锁,写时复制。让读性能更高。优于ReentrantReadWriteLock,但是消耗更多资源。

volatile

volatile变量的内存模型分析

  • 旧的内存模型

    保证读写volatile都直接发生在main memory中,线程的working memory不进行缓存(保证了可见性,但仍参与指令重排)

  • 新的内存模型

    如果当线程A写入volatile变量V而线程B读取V时,那么在写入V时,A可见的所有变量值现在都可以保证对B是可见的。结果就是作用更大的volatile语义,代价是访问volatile字段时会对性能产生一点点的影响。(A volatile var write Happens-Before read of the var)(保证了可见性和有序性)

  • 注意volatile变量不能保证原子性
  • 使用java.util.concurrent.atomic部分情景可以取代volatile,atomic的底层实现是CAS,而且保证了运算原子性。

引用

Java多线程中提到的原子性和可见性、有序性
Java内存模型
RE:关于JMM模型中工作内存、主内存和几个操作的理解
java 线程的几种状态
java内存模型与并发技术-阿里yangjs
多线程上下文切换
自己对多线程的一点思考
SPRING中的线程池ThreadPoolTaskExecutor
Spring中线程池的应用
Java多线程系列–“JUC集合”10之 ConcurrentLinkedQueue
java多线程-概念&创建启动&中断&守护线程&优先级&线程状态(多线程编程之一)
Java多线程编程:Callable、Future和FutureTask浅析(多线程编程之四)
JVM底层又是如何实现synchronized的
Java线程池架构原理和源码解析(ThreadPoolExecutor)