Java-线程

线程

Java语言内置了多线程支持。当Java程序启动的时候,实际上是启动了一个JVM进程,然后,JVM启动主线程来执行 main() 方法。

创建线程

总体有两类方法

继承 Thread

方法一:从Thread派生一个自定义类,然后覆写run()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 多线程
public class Main {
    public static void main(String[] args) {
        Thread t = new MyThread();
        t.start(); // 启动新线程
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("start new thread!");
    }
}

执行上述代码,注意到start()方法会在内部自动调用实例的run()方法。

实现 Runnable 接口

方法二:创建Thread实例时,传入一个Runnable实例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 多线程
public class Main {
    public static void main(String[] args) {
        Thread t = new Thread(new MyRunnable());
        t.start(); // 启动新线程
    }
}

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("start new thread!");
    }
}

或者用Java 8引入的lambda语法进一步简写为:

1
2
3
4
5
6
7
8
9
// 多线程
public class Main {
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            System.out.println("start new thread!");
        });
        t.start(); // 启动新线程
    }
}

线程的优先级

可以对线程设定优先级,设定优先级的方法是:

1
Thread.setPriority(int n) // 1~10, 默认值5

JVM自动把1(低)~10(高)的优先级映射到操作系统实际优先级上(不同操作系统有不同的优先级数量)。优先级高的线程被操作系统调度的优先级较高,操作系统对高优先级线程可能调度更频繁,但我们决不能通过设置优先级来确保高优先级的线程一定会先执行。

线程的状态

Java线程的状态有以下几种:

  • New:新创建的线程,尚未执行;

  • Runnable:运行中的线程,正在执行run()方法的Java代码;

  • Blocked:运行中的线程,因为某些操作被阻塞而挂起;

  • Waiting:运行中的线程,因为某些操作在等待中;

  • Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;

  • Terminated:线程已终止,因为run()方法执行完毕。

  • 等待执行完毕:可以通过 t.join() 等待 t 线程结束后再继续运行

  • 中断执行:在其他线程中对目标线程调用interrupt()方法

守护线程

守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。

  • 可以用于执行定时任务等
  • 注意守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

创建守护线程需要在调用 start() 方法前,调用 setDaemon(true) 把该线程标记为守护线程:

1
2
3
Thread t = new MyThread();
t.setDaemon(true);
t.start();

线程池

创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间 线程池维护一定数量的线程,能接收大量小任务并分发到这些线程上进行处理。

Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

1
2
3
4
5
6
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
// 关闭线程池: 等待正在执行的任务先完成,然后再关闭
executor.shutdown();

三类关闭线程池的方法

  • shutdown()方法等待正在执行的任务先完成,然后再关闭
  • shutdownNow() 会立刻停止正在执行的任务
  • awaitTermination()则会等待指定的时间让线程池关闭。

限制线程池的线程数量

因为 ExecutorService 只是接口,Java 标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池;
  • SingleThreadExecutor:仅单线程执行的线程池。

也可以通过 ThreadPoolExecutor 自定义:

1
2
3
4
5
6
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(
        min, max,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());

ScheduledThreadPool

放入 ScheduledThreadPool 的任务可以定期反复执行。

创建一个ScheduledThreadPool仍然是通过Executors类:

1
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

1
2
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

1
2
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务结束后间隔 3 秒重复执行,我们可以这样写:

1
2
// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

ForkJoinPool

Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。

定义一个 Fork/Join 任务:核心代码 SumTask 继承自 RecursiveTask ,在 compute() 方法中,关键是如何“分裂”出子任务并且提交子任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class SumTask extends RecursiveTask<Long> {
    protected Long compute() {
    	if(/*任务足够小*/){
    		return ...;
    	}
        // “分裂”子任务:
        SumTask subtask1 = new SumTask(...);
        SumTask subtask2 = new SumTask(...);
        // invokeAll会并行运行两个子任务:
        invokeAll(subtask1, subtask2);
        // 获得子任务的结果:
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        // 汇总结果:
        return subresult1 + subresult2;
    }
}

提交任务

1
2
3
// fork/join:
        ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
        Long result = ForkJoinPool.commonPool().invoke(task);

Future

Callable 和 Future

Runnable 的方法没有返回值 Callable 接口的方法有一个泛型返回值:

1
2
3
4
5
class Task implements Callable<String> {
    public String call() throws Exception {
        return longTimeCalculation(); 
    }
}

ExecutorService.submit() 方法,可以看到,它返回了一个 Future 类型,一个 Future 类型的实例代表一个未来能获取结果的对象,提供如下方法

  • get() :获取结果(可能会等待)
    • 如果异步任务已经完成,我们就直接获得结果
    • 如果异步任务还没有完成,那么 get() 会阻塞,直到任务完成后才返回结果
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

CompletableFuture

将 Future 转为 CompletableFuture:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CompletableFuture<T> completableFuture = new CompletableFuture<>();
executorService.submit(() -> {
	try {
		T result = future.get();
		completableFuture.complete(result);
	} catch (Exception e) {
		completableFuture.completeExceptionally(e);
	}
});
return completableFuture;

当异步任务完成或者发生异常时,自动调用回调对象的回调方法

创建一个 CompletableFuture 是通过 CompletableFuture.supplyAsync() 实现的,它需要一个实现了 Supplier 接口的对象:

1
2
3
public interface Supplier<T> {
    T get();
}

串行执行:CompletableFuture 还有 thenApplyAsync 用于成功后执行下一个任务,thenAccept 获取结果

1
2
3
4
5
6
7
8
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
	return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
	System.out.println("price: " + result);
});

组合:

  • anyOf() 可以实现“任意个 CompletableFuture 只要一个成功”
  • allOf()可以实现“所有CompletableFuture都必须成功”
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
	return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
	return fetchPrice((String) code, "https://money.163.com/price/");
});

// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

完整案例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) throws Exception {
        // 创建异步执行任务:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
        // 如果执行成功:
        cf.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 如果执行异常:
        cf.exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);
    }

    static Double fetchPrice() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20;
    }
}

线程同步

线程安全和原子操作

线程安全:如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe)

  • 不可变对象也是线程安全的,因为它们只能读不能写
  • 类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

JVM规范定义了几种原子操作

  • 基本类型(longdouble除外)赋值,例如:int n = m
  • 引用类型赋值,例如:List<String> list = anotherList

线程安全容器

interfacenon-thread-safethread-safe
ListArrayListCopyOnWriteArrayList
MapHashMapConcurrentHashMap
SetHashSet / TreeSetCopyOnWriteArraySet
QueueArrayDeque / LinkedListArrayBlockingQueue / LinkedBlockingQueue
DequeArrayDeque / LinkedListLinkedBlockingDeque

synchronized

Java synchronized 的锁是可重入的:对同一个线程,可以在获取到锁以后继续获取同一个锁

当成函数使用

语法是:

1
2
3
synchronized(Object) { // 获取锁
    ...
} // 释放锁

例如:实现一个线程安全的计数器(允许并发访问)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public int get() {
        return count;
    }
}

修饰方法

对于实例方法,用 synchronized 修饰这个方法等同于锁住 this 对象。下面两种写法是等价的:

1
2
3
public synchronized void add(int n) { // 锁住this
    count += n;
} // 解锁
1
2
3
4
5
public void add(int n) {
    synchronized(this) { // 锁住this
        count += n;
    } // 解锁
}

对 static方法添加 synchronized ,锁住的是该类的 Class 实例。(对于 static 方法,是没有 this 实例的,因为 static 方法是针对类而不是实例。但是我们注意到任何一个类都有一个由JVM自动创建的 Class 实例)

以下两者等价

1
2
3
4
5
6
7
8
public synchronized static void test(int n) {
    ...
}
public static void test(int n) {
	synchronized(Counter.class) {
		...
	}
}

java.util.concurrent

ReentrantLock 重入锁

我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

java.util.concurrent.locks 包提供的 ReentrantLock 用于替代 synchronized 加锁,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

条件锁

可以通过以下方法由某个锁获取条件锁

1
2
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await() 会释放当前锁,进入等待状态;和 tryLock() 类似, await() 可以在等待指定时间后,如果还没有被其他线程通过 signal()signalAll() 唤醒,可以自己醒来
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。
1
2
3
4
5
if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他线程唤醒
} else {
    // 指定时间内没有被其他线程唤醒
}

例如创建一个任务队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TaskQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

读写锁

允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    // 注意: 一对读锁和写锁必须从同一个rwlock获取:
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        wlock.lock(); // 加写锁
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 释放写锁
        }
    }

    public int[] get() {
        rlock.lock(); // 加读锁
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 释放读锁
        }
    }
}

版本锁

读写锁偏向于读者:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写

版本锁 StampedLock 通过维护一个版本,允许在读的过程中写。

  • StampedLock 可以提高并发率,代价是逻辑更为复杂
  • StampedLock 是不可重入锁,不能在一个线程中反复获取同一个锁
  • StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

写的过程和读写锁一样,但是读的过程比较复杂:

  • 创建一个 StampedLock 对象
  • 尝试获取一个乐观读锁:stampedLock.tryOptimisticRead();
  • 检查获取乐观读锁后是否有其他写锁发生:stampedLock.validate(stamp)
  • 若有新的写,需要获取悲观读锁并重新读。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。 乐观:乐观地估计读的过程中大概率不会有写入 悲观:读的过程中拒绝有写入

版本锁的读取过程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// Read...
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
    stamp = stampedLock.readLock(); // 获取一个悲观读锁
    try {
        // Re-read
    } finally {
        stampedLock.unlockRead(stamp); // 释放悲观读锁
    }
}

信号量

Semaphore 本质上就是一个信号计数器,用于限制同一时间的最大访问数量。

  • acquire 加锁,release 解锁
  • tryAcquire 可以指定最大等待时间
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public class AccessLimitControl {
    // 任意时刻仅允许最多3个线程获取许可:
    final Semaphore semaphore = new Semaphore(3);

    public String access() throws Exception {
        // 如果超过了许可数量,其他线程将在此等待:
        semaphore.acquire();
        try {
            // TODO:
            return UUID.randomUUID().toString();
        } finally {
            semaphore.release();
        }
    }
}

原子操作

Java的 java.util.concurrent 包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于 java.util.concurrent.atomic 包。

例如 AtomicInteger 提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

线程间通信

线程间的共享变量

在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。 如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!(在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,会有显著的延迟)

线程间共享的变量应采用关键字 volatile 声明, volatile 关键字的目的是告诉虚拟机:

  • 每次访问变量时,总是获取主内存的最新值;
  • 每次修改变量后,立刻回写到主内存。

Wait 和 Notify

  • wait() 方法必须在当前获取的锁对象上调用,例如获取的是 this 锁,则调用 this.wait()
  • 必须在 synchronized 块中才能调用 wait() 方法, wait() 调用时,会释放线程获得的锁, wait() 返回时,线程又会重新试图获得锁
  • 在相同的锁对象上调用 notify() 方法,可以让等待的线程被重新唤醒
  • 使用notifyAll()将唤醒所有当前正在this锁等待的线程,而notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)

例如一个任务队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();
    }

    public synchronized String getTask() throws InterruptedException {
    // 必须是while而非if
    // wait()方法返回时需要重新获得this锁。假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()的线程结束此方法后,才能释放this锁,随后,这3个线程中只能有一个获取到this锁,剩下两个将继续等待
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
}

线程上下文

Java标准库提供了一个特殊的 ThreadLocal ,它可以在一个线程中传递同一个对象。

ThreadLocal 实例通常总是以静态字段初始化如下:

1
static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
1
2
3
4
5
6
7
8
void processUser(user) {
    try {
        threadLocalUser.set(user);
        // ...线程中的所有方法都可以随时获取到该`User`实例:
    } finally {
        threadLocalUser.remove();
    }
}