Skip to content

Java 多线程

多线程是Java编程中的重要概念,它允许程序同时执行多个任务,提高程序的执行效率和响应性。Java提供了丰富的API来支持多线程编程。

线程创建方式

在Java中,有多种方式可以创建和启动线程,每种方式都有其特点和适用场景。

继承Thread类

通过继承Thread类并重写run()方法来创建线程是最直接的方式。

java
// 通过继承Thread类创建线程
class MyThread extends Thread {
    private String threadName;
    
    public MyThread(String name) {
        this.threadName = name;
    }
    
    @Override
    public void run() {
        System.out.println(threadName + " 线程开始执行");
        
        // 模拟一些工作
        for (int i = 1; i <= 5; i++) {
            System.out.println(threadName + " 执行任务 " + i);
            try {
                // 模拟耗时操作
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println(threadName + " 被中断");
                return;
            }
        }
        
        System.out.println(threadName + " 线程执行完毕");
    }
}

// 继承Thread类示例
public class ThreadExample {
    public static void main(String[] args) {
        System.out.println("=== 继承Thread类创建线程 ===");
        
        // 创建线程对象
        MyThread thread1 = new MyThread("线程1");
        MyThread thread2 = new MyThread("线程2");
        
        // 启动线程
        thread1.start();
        thread2.start();
        
        // 主线程继续执行
        System.out.println("主线程继续执行...");
        
        // 等待子线程执行完毕
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            System.err.println("主线程被中断: " + e.getMessage());
        }
        
        System.out.println("所有线程执行完毕");
    }
}

实现Runnable接口

实现Runnable接口是更推荐的创建线程方式,因为它避免了Java单继承的限制。

java
// 通过实现Runnable接口创建线程
class MyRunnable implements Runnable {
    private String taskName;
    private int iterations;
    
    public MyRunnable(String taskName, int iterations) {
        this.taskName = taskName;
        this.iterations = iterations;
    }
    
    @Override
    public void run() {
        System.out.println(taskName + " 任务开始执行");
        
        for (int i = 1; i <= iterations; i++) {
            System.out.println(taskName + " 执行第 " + i + " 次任务");
            
            // 模拟工作负载
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                System.out.println(taskName + " 被中断");
                Thread.currentThread().interrupt(); // 恢复中断状态
                return;
            }
        }
        
        System.out.println(taskName + " 任务执行完毕");
    }
}

// 实现Runnable接口示例
public class RunnableExample {
    public static void main(String[] args) {
        System.out.println("=== 实现Runnable接口创建线程 ===");
        
        // 创建Runnable任务
        MyRunnable task1 = new MyRunnable("任务1", 3);
        MyRunnable task2 = new MyRunnable("任务2", 5);
        
        // 创建线程对象并传入Runnable任务
        Thread thread1 = new Thread(task1);
        Thread thread2 = new Thread(task2);
        
        // 设置线程优先级
        thread1.setPriority(Thread.MIN_PRIORITY);
        thread2.setPriority(Thread.MAX_PRIORITY);
        
        // 启动线程
        thread1.start();
        thread2.start();
        
        // 使用Lambda表达式创建线程(Java 8+)
        Thread lambdaThread = new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " Lambda线程开始执行");
            
            for (int i = 1; i <= 3; i++) {
                System.out.println(threadName + " Lambda任务 " + i);
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    System.out.println(threadName + " Lambda线程被中断");
                    return;
                }
            }
            
            System.out.println(threadName + " Lambda线程执行完毕");
        }, "LambdaThread");
        
        lambdaThread.start();
        
        // 主线程工作
        System.out.println("主线程执行其他任务...");
        
        // 等待所有线程完成
        try {
            thread1.join();
            thread2.join();
            lambdaThread.join();
        } catch (InterruptedException e) {
            System.err.println("主线程被中断: " + e.getMessage());
        }
        
        System.out.println("所有线程执行完毕");
    }
}

实现Callable接口

Callable接口与Runnable类似,但可以返回结果并抛出异常。

java
import java.util.concurrent.*;

// 通过实现Callable接口创建可返回结果的线程
class MyCallable implements Callable<String> {
    private String taskName;
    private int duration;
    
    public MyCallable(String taskName, int duration) {
        this.taskName = taskName;
        this.duration = duration;
    }
    
    @Override
    public String call() throws Exception {
        System.out.println(taskName + " 开始执行");
        
        // 模拟耗时任务
        Thread.sleep(duration * 1000);
        
        // 返回结果
        String result = taskName + " 执行完成,耗时 " + duration + " 秒";
        System.out.println(result);
        
        return result;
    }
}

// Callable接口示例
public class CallableExample {
    public static void main(String[] args) {
        System.out.println("=== 实现Callable接口创建线程 ===");
        
        // 创建ExecutorService
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 创建Callable任务
        MyCallable task1 = new MyCallable("任务1", 2);
        MyCallable task2 = new MyCallable("任务2", 3);
        MyCallable task3 = new MyCallable("任务3", 1);
        
        // 提交任务并获取Future对象
        Future<String> future1 = executor.submit(task1);
        Future<String> future2 = executor.submit(task2);
        Future<String> future3 = executor.submit(task3);
        
        // 处理结果
        try {
            // 获取结果(阻塞等待)
            String result1 = future1.get();
            System.out.println("获取到结果: " + result1);
            
            // 带超时的获取结果
            String result2 = future2.get(5, TimeUnit.SECONDS);
            System.out.println("获取到结果: " + result2);
            
            // 检查任务是否完成
            if (future3.isDone()) {
                String result3 = future3.get();
                System.out.println("获取到结果: " + result3);
            }
            
        } catch (InterruptedException e) {
            System.err.println("主线程被中断: " + e.getMessage());
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            System.err.println("任务执行异常: " + e.getMessage());
        } catch (TimeoutException e) {
            System.err.println("任务执行超时: " + e.getMessage());
            // 取消超时的任务
            future2.cancel(true);
        }
        
        // 关闭ExecutorService
        executor.shutdown();
        
        // 等待所有任务完成
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("所有任务执行完毕");
    }
}

线程生命周期与状态转换

Java线程有六种状态,它们之间可以相互转换。

java
// 线程状态演示
public class ThreadStateExample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 线程生命周期与状态转换 ===");
        
        // 1. 新建状态 (NEW)
        Thread newThread = new Thread(() -> {
            System.out.println("线程执行中...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
        });
        
        System.out.println("1. 新建状态: " + newThread.getState()); // NEW
        
        // 2. 就绪状态 (RUNNABLE)
        newThread.start();
        Thread.sleep(100); // 给线程一些时间启动
        System.out.println("2. 就绪/运行状态: " + newThread.getState()); // RUNNABLE
        
        // 3. 阻塞状态演示
        Thread blockingThread = new Thread(() -> {
            synchronized (ThreadStateExample.class) {
                System.out.println("进入同步块");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    System.out.println("同步块中的线程被中断");
                }
            }
        });
        
        // 先启动一个线程占用锁
        synchronized (ThreadStateExample.class) {
            blockingThread.start();
            Thread.sleep(100);
            System.out.println("3. 阻塞状态: " + blockingThread.getState()); // BLOCKED
            Thread.sleep(3000); // 让第一个线程释放锁
        }
        
        Thread.sleep(100);
        System.out.println("4. 锁释放后: " + blockingThread.getState()); // TERMINATED or RUNNABLE
        
        // 4. 等待状态演示
        Object lock = new Object();
        Thread waitingThread = new Thread(() -> {
            synchronized (lock) {
                try {
                    System.out.println("线程进入等待状态");
                    lock.wait(); // 等待通知
                    System.out.println("线程被唤醒");
                } catch (InterruptedException e) {
                    System.out.println("等待中的线程被中断");
                }
            }
        });
        
        waitingThread.start();
        Thread.sleep(100);
        System.out.println("5. 等待状态: " + waitingThread.getState()); // WAITING
        
        // 通知等待的线程
        synchronized (lock) {
            lock.notify();
        }
        
        waitingThread.join();
        System.out.println("6. 等待后状态: " + waitingThread.getState()); // TERMINATED
        
        // 5. 超时等待状态演示
        Thread timedWaitingThread = new Thread(() -> {
            try {
                System.out.println("线程进入超时等待状态");
                Thread.sleep(2000);
                System.out.println("超时等待结束");
            } catch (InterruptedException e) {
                System.out.println("超时等待中的线程被中断");
            }
        });
        
        timedWaitingThread.start();
        Thread.sleep(100);
        System.out.println("7. 超时等待状态: " + timedWaitingThread.getState()); // TIMED_WAITING
        
        timedWaitingThread.join();
        System.out.println("8. 终止状态: " + timedWaitingThread.getState()); // TERMINATED
        
        // 线程状态转换图示
        printThreadStateDiagram();
    }
    
    private static void printThreadStateDiagram() {
        System.out.println("\n=== 线程状态转换图 ===");
        System.out.println("NEW (新建) --start()--> RUNNABLE (就绪/运行)");
        System.out.println("RUNNABLE --synchronized--> BLOCKED (阻塞)");
        System.out.println("RUNNABLE --wait()/join()--> WAITING (无限期等待)");
        System.out.println("RUNNABLE --sleep()/wait(timeout)--> TIMED_WAITING (限期等待)");
        System.out.println("RUNNABLE --任务完成--> TERMINATED (终止)");
        System.out.println("BLOCKED --获得锁--> RUNNABLE");
        System.out.println("WAITING --notify()/notifyAll()--> RUNNABLE");
        System.out.println("TIMED_WAITING --超时/notify()--> RUNNABLE");
    }
}

// 线程优先级和守护线程示例
class ThreadPriorityExample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 线程优先级和守护线程 ===");
        
        // 线程优先级演示
        System.out.println("主线程优先级: " + Thread.currentThread().getPriority());
        System.out.println("最大优先级: " + Thread.MAX_PRIORITY);
        System.out.println("最小优先级: " + Thread.MIN_PRIORITY);
        System.out.println("默认优先级: " + Thread.NORM_PRIORITY);
        
        // 创建不同优先级的线程
        Thread lowPriorityThread = new Thread(new CounterTask("低优先级线程"), "LowPriority");
        Thread normPriorityThread = new Thread(new CounterTask("普通优先级线程"), "NormPriority");
        Thread highPriorityThread = new Thread(new CounterTask("高优先级线程"), "HighPriority");
        
        // 设置优先级
        lowPriorityThread.setPriority(Thread.MIN_PRIORITY);
        normPriorityThread.setPriority(Thread.NORM_PRIORITY);
        highPriorityThread.setPriority(Thread.MAX_PRIORITY);
        
        // 启动线程
        lowPriorityThread.start();
        normPriorityThread.start();
        highPriorityThread.start();
        
        // 让线程运行一段时间
        Thread.sleep(3000);
        
        // 中断线程
        lowPriorityThread.interrupt();
        normPriorityThread.interrupt();
        highPriorityThread.interrupt();
        
        // 等待线程结束
        lowPriorityThread.join();
        normPriorityThread.join();
        highPriorityThread.join();
        
        System.out.println("\n--- 守护线程演示 ---");
        
        // 创建守护线程
        Thread daemonThread = new Thread(() -> {
            while (true) {
                System.out.println("守护线程正在运行...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("守护线程被中断");
                    break;
                }
            }
        }, "DaemonThread");
        
        // 设置为守护线程
        daemonThread.setDaemon(true);
        daemonThread.start();
        
        // 主线程执行其他任务
        System.out.println("主线程执行任务...");
        Thread.sleep(5000);
        System.out.println("主线程任务完成,程序即将退出");
        // 当主线程(用户线程)结束时,守护线程会自动结束
    }
}

// 计数任务类
class CounterTask implements Runnable {
    private String threadName;
    private volatile long count = 0;
    
    public CounterTask(String threadName) {
        this.threadName = threadName;
    }
    
    @Override
    public void run() {
        System.out.println(threadName + " 开始执行,优先级: " + Thread.currentThread().getPriority());
        
        while (!Thread.currentThread().isInterrupted()) {
            count++;
            // 模拟一些工作
            if (count % 1000000 == 0) {
                System.out.println(threadName + " 计数: " + count);
            }
        }
        
        System.out.println(threadName + " 结束,最终计数: " + count);
    }
}

同步机制

在多线程环境中,同步机制用于控制多个线程对共享资源的访问,防止数据不一致。

synchronized 关键字

synchronized关键字是最常用的同步机制,它可以修饰方法或代码块。

java
// 线程安全的计数器类
class SynchronizedCounter {
    private int count = 0;
    
    // 同步方法 - 锁定的是this对象
    public synchronized void increment() {
        count++;
        System.out.println(Thread.currentThread().getName() + " 增加计数: " + count);
    }
    
    // 同步方法
    public synchronized void decrement() {
        count--;
        System.out.println(Thread.currentThread().getName() + " 减少计数: " + count);
    }
    
    // 同步方法
    public synchronized int getCount() {
        return count;
    }
    
    // 静态同步方法 - 锁定的是类对象
    public static synchronized void staticMethod() {
        System.out.println("静态同步方法被调用");
    }
}

// 同步代码块示例
class SynchronizedBlockExample {
    private int value = 0;
    private final Object lock = new Object(); // 专用锁对象
    private static final Object staticLock = new Object(); // 静态锁对象
    
    // 同步代码块 - 锁定特定对象
    public void increment() {
        synchronized (lock) {
            value++;
            System.out.println(Thread.currentThread().getName() + " 增加值: " + value);
            try {
                Thread.sleep(100); // 模拟耗时操作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 同步代码块 - 锁定this对象
    public void decrement() {
        synchronized (this) {
            value--;
            System.out.println(Thread.currentThread().getName() + " 减少值: " + value);
        }
    }
    
    // 同步代码块 - 锁定类对象
    public void staticOperation() {
        synchronized (staticLock) {
            System.out.println(Thread.currentThread().getName() + " 执行静态操作");
        }
    }
    
    public int getValue() {
        synchronized (lock) {
            return value;
        }
    }
}

// synchronized关键字示例
public class SynchronizedExample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== synchronized关键字示例 ===");
        
        // 1. 同步方法演示
        synchronizedMethodDemo();
        
        // 2. 同步代码块演示
        synchronizedBlockDemo();
        
        // 3. 死锁演示
        deadlockDemo();
    }
    
    // 同步方法演示
    public static void synchronizedMethodDemo() throws InterruptedException {
        System.out.println("\n--- 同步方法演示 ---");
        
        SynchronizedCounter counter = new SynchronizedCounter();
        
        // 创建多个线程同时访问共享资源
        Thread[] threads = new Thread[5];
        
        for (int i = 0; i < threads.length; i++) {
            final int threadNum = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    counter.increment();
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    counter.decrement();
                }
            }, "Thread-" + threadNum);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终计数: " + counter.getCount());
    }
    
    // 同步代码块演示
    public static void synchronizedBlockDemo() throws InterruptedException {
        System.out.println("\n--- 同步代码块演示 ---");
        
        SynchronizedBlockExample example = new SynchronizedBlockExample();
        
        // 创建多个线程
        Thread[] threads = new Thread[3];
        
        for (int i = 0; i < threads.length; i++) {
            final int threadNum = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    example.increment();
                    example.decrement();
                    example.staticOperation();
                }
            }, "Worker-" + threadNum);
        }
        
        // 启动线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终值: " + example.getValue());
    }
    
    // 死锁演示
    public static void deadlockDemo() throws InterruptedException {
        System.out.println("\n--- 死锁演示 ---");
        
        // 创建两个锁对象
        final Object lock1 = new Object();
        final Object lock2 = new Object();
        
        // 线程1:先获取lock1,再获取lock2
        Thread thread1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("线程1 获取了 lock1");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                synchronized (lock2) {
                    System.out.println("线程1 获取了 lock2");
                }
            }
        }, "Thread1");
        
        // 线程2:先获取lock2,再获取lock1
        Thread thread2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("线程2 获取了 lock2");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                synchronized (lock1) {
                    System.out.println("线程2 获取了 lock1");
                }
            }
        }, "Thread2");
        
        // 启动线程
        thread1.start();
        thread2.start();
        
        // 等待一段时间观察死锁
        Thread.sleep(2000);
        
        System.out.println("线程1状态: " + thread1.getState());
        System.out.println("线程2状态: " + thread2.getState());
        
        // 中断线程以解除死锁
        thread1.interrupt();
        thread2.interrupt();
        
        thread1.join();
        thread2.join();
        
        System.out.println("死锁演示完成");
    }
}

// 线程安全的单例模式
class ThreadSafeSingleton {
    private static volatile ThreadSafeSingleton instance;
    
    private ThreadSafeSingleton() {
        // 私有构造方法
    }
    
    // 双重检查锁定实现
    public static ThreadSafeSingleton getInstance() {
        if (instance == null) {
            synchronized (ThreadSafeSingleton.class) {
                if (instance == null) {
                    instance = new ThreadSafeSingleton();
                }
            }
        }
        return instance;
    }
    
    public void doSomething() {
        System.out.println("单例对象执行操作: " + Thread.currentThread().getName());
    }
}

Lock 接口

Lock接口提供了比synchronized更灵活的锁定操作。

java
import java.util.concurrent.locks.*;

// 使用Lock接口的计数器
class LockCounter {
    private int count = 0;
    private final Lock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock(); // 获取锁
        try {
            count++;
            System.out.println(Thread.currentThread().getName() + " 增加计数: " + count);
        } finally {
            lock.unlock(); // 释放锁(必须在finally块中释放)
        }
    }
    
    public void decrement() {
        lock.lock();
        try {
            count--;
            System.out.println(Thread.currentThread().getName() + " 减少计数: " + count);
        } finally {
            lock.unlock();
        }
    }
    
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

// 读写锁示例
class ReadWriteLockExample {
    private String data = "初始数据";
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    
    // 读操作
    public String readData() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 读取数据: " + data);
            Thread.sleep(1000); // 模拟读取耗时
            return data;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            readLock.unlock();
        }
    }
    
    // 写操作
    public void writeData(String newData) {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 写入数据: " + newData);
            Thread.sleep(2000); // 模拟写入耗时
            data = newData;
            System.out.println(Thread.currentThread().getName() + " 写入完成");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            writeLock.unlock();
        }
    }
}

// 条件变量示例
class ConditionExample {
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean flag = false;
    
    public void waitForFlag() {
        lock.lock();
        try {
            while (!flag) {
                System.out.println(Thread.currentThread().getName() + " 等待标志变为true");
                condition.await(); // 等待条件满足
            }
            System.out.println(Thread.currentThread().getName() + " 标志已变为true,继续执行");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }
    
    public void setFlag() {
        lock.lock();
        try {
            flag = true;
            System.out.println(Thread.currentThread().getName() + " 设置标志为true");
            condition.signalAll(); // 唤醒所有等待的线程
        } finally {
            lock.unlock();
        }
    }
}

// Lock接口示例
public class LockExample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== Lock接口示例 ===");
        
        // 1. 基本Lock使用
        basicLockUsage();
        
        // 2. 读写锁使用
        readWriteLockUsage();
        
        // 3. 条件变量使用
        conditionVariableUsage();
        
        // 4. 公平锁和非公平锁
        fairLockDemo();
    }
    
    // 基本Lock使用
    public static void basicLockUsage() throws InterruptedException {
        System.out.println("\n--- 基本Lock使用 ---");
        
        LockCounter counter = new LockCounter();
        
        // 创建多个线程
        Thread[] threads = new Thread[5];
        
        for (int i = 0; i < threads.length; i++) {
            final int threadNum = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    counter.increment();
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    counter.decrement();
                }
            }, "LockThread-" + threadNum);
        }
        
        // 启动线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终计数: " + counter.getCount());
    }
    
    // 读写锁使用
    public static void readWriteLockUsage() throws InterruptedException {
        System.out.println("\n--- 读写锁使用 ---");
        
        ReadWriteLockExample example = new ReadWriteLockExample();
        
        // 创建读线程
        Thread[] readThreads = new Thread[3];
        for (int i = 0; i < readThreads.length; i++) {
            final int threadNum = i;
            readThreads[i] = new Thread(() -> {
                for (int j = 0; j < 2; j++) {
                    example.readData();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "Reader-" + threadNum);
        }
        
        // 创建写线程
        Thread writeThread = new Thread(() -> {
            try {
                Thread.sleep(1000); // 等待读线程开始
                example.writeData("更新后的数据");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Writer");
        
        // 启动所有线程
        for (Thread thread : readThreads) {
            thread.start();
        }
        writeThread.start();
        
        // 等待完成
        for (Thread thread : readThreads) {
            thread.join();
        }
        writeThread.join();
    }
    
    // 条件变量使用
    public static void conditionVariableUsage() throws InterruptedException {
        System.out.println("\n--- 条件变量使用 ---");
        
        ConditionExample example = new ConditionExample();
        
        // 创建等待线程
        Thread[] waitingThreads = new Thread[3];
        for (int i = 0; i < waitingThreads.length; i++) {
            final int threadNum = i;
            waitingThreads[i] = new Thread(() -> {
                example.waitForFlag();
            }, "Waiter-" + threadNum);
        }
        
        // 创建设置标志的线程
        Thread setterThread = new Thread(() -> {
            try {
                Thread.sleep(2000); // 等待一会儿再设置标志
                example.setFlag();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Setter");
        
        // 启动所有线程
        for (Thread thread : waitingThreads) {
            thread.start();
        }
        setterThread.start();
        
        // 等待完成
        for (Thread thread : waitingThreads) {
            thread.join();
        }
        setterThread.join();
        
        System.out.println("条件变量示例完成");
    }
    
    // 公平锁和非公平锁演示
    public static void fairLockDemo() throws InterruptedException {
        System.out.println("\n--- 公平锁和非公平锁演示 ---");
        
        // 非公平锁(默认)
        Lock unfairLock = new ReentrantLock();
        System.out.println("非公平锁测试:");
        testLockFairness(unfairLock, false);
        
        // 公平锁
        Lock fairLock = new ReentrantLock(true);
        System.out.println("\n公平锁测试:");
        testLockFairness(fairLock, true);
    }
    
    private static void testLockFairness(Lock lock, boolean isFair) throws InterruptedException {
        Thread[] threads = new Thread[5];
        
        for (int i = 0; i < threads.length; i++) {
            final int threadNum = i;
            threads[i] = new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " 获取到锁");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
            }, "FairTest-" + threadNum);
        }
        
        // 启动线程
        for (Thread thread : threads) {
            thread.start();
            Thread.sleep(10); // 短暂间隔,让线程按顺序请求锁
        }
        
        // 等待完成
        for (Thread thread : threads) {
            thread.join();
        }
    }
}

线程池

线程池是一种管理线程的机制,它可以有效地管理和复用线程,避免频繁创建和销毁线程带来的开销。

java
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;

// 线程池基础示例
public class ThreadPoolExample {
    public static void main(String[] args) {
        System.out.println("=== 线程池示例 ===");
        
        // 1. 固定大小线程池
        fixedThreadPoolExample();
        
        // 2. 缓存线程池
        cachedThreadPoolExample();
        
        // 3. 单线程池
        singleThreadPoolExample();
        
        // 4. 定时线程池
        scheduledThreadPoolExample();
        
        // 5. 自定义线程池
        customThreadPoolExample();
    }
    
    // 固定大小线程池
    public static void fixedThreadPoolExample() {
        System.out.println("\n--- 固定大小线程池 ---");
        
        // 创建固定大小为3的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        
        // 提交任务
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            fixedThreadPool.submit(() -> {
                System.out.println("任务 " + taskId + " 由 " + 
                                 Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(2000); // 模拟任务执行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务 " + taskId + " 执行完成");
            });
        }
        
        // 关闭线程池
        fixedThreadPool.shutdown();
        
        try {
            // 等待所有任务完成
            if (!fixedThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
                System.out.println("强制关闭未完成的任务");
                fixedThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            fixedThreadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("固定大小线程池示例完成");
    }
    
    // 缓存线程池
    public static void cachedThreadPoolExample() {
        System.out.println("\n--- 缓存线程池 ---");
        
        // 创建缓存线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        
        // 提交大量短时间任务
        for (int i = 1; i <= 20; i++) {
            final int taskId = i;
            cachedThreadPool.submit(() -> {
                System.out.println("缓存任务 " + taskId + " 由 " + 
                                 Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(1000); // 短时间任务
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("缓存任务 " + taskId + " 执行完成");
            });
        }
        
        // 关闭线程池
        cachedThreadPool.shutdown();
        
        try {
            if (!cachedThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
                cachedThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            cachedThreadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("缓存线程池示例完成");
    }
    
    // 单线程池
    public static void singleThreadPoolExample() {
        System.out.println("\n--- 单线程池 ---");
        
        // 创建单线程池
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        
        // 提交任务(会按顺序执行)
        List<Future<String>> futures = new ArrayList<>();
        
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            Future<String> future = singleThreadPool.submit(() -> {
                System.out.println("单线程任务 " + taskId + " 开始执行");
                Thread.sleep(1000);
                String result = "任务 " + taskId + " 执行结果";
                System.out.println("单线程任务 " + taskId + " 执行完成");
                return result;
            });
            futures.add(future);
        }
        
        // 获取结果
        for (Future<String> future : futures) {
            try {
                String result = future.get();
                System.out.println("获取到结果: " + result);
            } catch (InterruptedException | ExecutionException e) {
                System.err.println("获取结果时发生错误: " + e.getMessage());
            }
        }
        
        // 关闭线程池
        singleThreadPool.shutdown();
        
        try {
            if (!singleThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
                singleThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            singleThreadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("单线程池示例完成");
    }
    
    // 定时线程池
    public static void scheduledThreadPoolExample() {
        System.out.println("\n--- 定时线程池 ---");
        
        // 创建定时线程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
        
        // 延迟执行任务
        scheduledThreadPool.schedule(() -> {
            System.out.println("延迟3秒执行的任务,由 " + Thread.currentThread().getName() + " 执行");
        }, 3, TimeUnit.SECONDS);
        
        // 周期性执行任务
        ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println("周期性任务执行,时间: " + System.currentTimeMillis());
        }, 1, 2, TimeUnit.SECONDS); // 1秒后开始,每2秒执行一次
        
        // 运行5秒后取消周期性任务
        scheduledThreadPool.schedule(() -> {
            System.out.println("取消周期性任务");
            scheduledFuture.cancel(false);
        }, 5, TimeUnit.SECONDS);
        
        // 运行10秒后关闭线程池
        scheduledThreadPool.schedule(() -> {
            System.out.println("关闭定时线程池");
            scheduledThreadPool.shutdown();
        }, 10, TimeUnit.SECONDS);
        
        System.out.println("定时线程池示例启动,等待任务执行...");
    }
    
    // 自定义线程池
    public static void customThreadPoolExample() {
        System.out.println("\n--- 自定义线程池 ---");
        
        // 自定义线程池配置
        ThreadPoolExecutor customThreadPool = new ThreadPoolExecutor(
            2,                          // 核心线程数
            4,                          // 最大线程数
            60L,                        // 空闲线程存活时间
            TimeUnit.SECONDS,           // 时间单位
            new LinkedBlockingQueue<>(10), // 工作队列
            new ThreadFactory() {       // 线程工厂
                private int counter = 0;
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "CustomThread-" + counter++);
                    thread.setDaemon(false);
                    return thread;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        // 提交任务
        for (int i = 1; i <= 20; i++) {
            final int taskId = i;
            customThreadPool.submit(() -> {
                System.out.println("自定义任务 " + taskId + " 由 " + 
                                 Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("自定义任务 " + taskId + " 执行完成");
            });
        }
        
        // 监控线程池状态
        monitorThreadPool(customThreadPool);
        
        // 关闭线程池
        customThreadPool.shutdown();
        
        try {
            if (!customThreadPool.awaitTermination(30, TimeUnit.SECONDS)) {
                System.out.println("强制关闭线程池");
                customThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            customThreadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("自定义线程池示例完成");
    }
    
    // 监控线程池状态
    private static void monitorThreadPool(ThreadPoolExecutor executor) {
        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.printf("线程池状态 - 活跃线程数: %d, 任务总数: %d, 已完成任务数: %d%n",
                                    executor.getActiveCount(),
                                    executor.getTaskCount(),
                                    executor.getCompletedTaskCount());
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Monitor").start();
    }
}

// 线程池高级特性示例
class AdvancedThreadPoolExample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 线程池高级特性示例 ===");
        
        // 1. Future和Callable使用
        futureAndCallableExample();
        
        // 2. CompletableFuture异步编程
        completableFutureExample();
        
        // 3. 线程池拒绝策略
        rejectionPolicyExample();
        
        // 4. 线程池钩子方法
        threadPoolHookExample();
    }
    
    // Future和Callable使用
    public static void futureAndCallableExample() throws InterruptedException {
        System.out.println("\n--- Future和Callable使用 ---");
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 创建Callable任务
        List<Future<Integer>> futures = new ArrayList<>();
        
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            Callable<Integer> task = () -> {
                System.out.println("计算任务 " + taskId + " 开始");
                Thread.sleep(2000); // 模拟计算
                int result = taskId * taskId;
                System.out.println("计算任务 " + taskId + " 完成,结果: " + result);
                return result;
            };
            
            Future<Integer> future = executor.submit(task);
            futures.add(future);
        }
        
        // 获取结果
        System.out.println("获取计算结果:");
        for (int i = 0; i < futures.size(); i++) {
            try {
                Integer result = futures.get(i).get(3, TimeUnit.SECONDS);
                System.out.println("任务 " + (i + 1) + " 结果: " + result);
            } catch (ExecutionException e) {
                System.err.println("任务 " + (i + 1) + " 执行异常: " + e.getMessage());
            } catch (TimeoutException e) {
                System.err.println("任务 " + (i + 1) + " 执行超时");
                futures.get(i).cancel(true);
            }
        }
        
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("Future和Callable示例完成");
    }
    
    // CompletableFuture异步编程
    public static void completableFutureExample() {
        System.out.println("\n--- CompletableFuture异步编程 ---");
        
        // 异步执行任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1执行,线程: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "任务1结果";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2执行,线程: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "任务2结果";
        });
        
        // 组合两个异步任务
        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            System.out.println("组合任务执行,线程: " + Thread.currentThread().getName());
            return result1 + " + " + result2 + " = 组合结果";
        });
        
        // 处理结果
        combinedFuture.thenAccept(result -> {
            System.out.println("最终结果: " + result);
        }).join(); // 等待完成
        
        System.out.println("CompletableFuture示例完成");
    }
    
    // 线程池拒绝策略
    public static void rejectionPolicyExample() {
        System.out.println("\n--- 线程池拒绝策略 ---");
        
        // 创建小容量线程池来演示拒绝策略
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1), // 很小的队列
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "RejectionTestThread");
                }
            },
            new RejectedExecutionHandler() { // 自定义拒绝策略
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println("任务被拒绝: " + r.toString());
                    // 可以选择其他处理方式,如记录日志、放入数据库等
                }
            }
        );
        
        // 提交大量任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("执行任务 " + taskId);
                try {
                    Thread.sleep(3000); // 长时间任务
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务 " + taskId + " 完成");
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("拒绝策略示例完成");
    }
    
    // 线程池钩子方法
    public static void threadPoolHookExample() {
        System.out.println("\n--- 线程池钩子方法 ---");
        
        // 自定义ThreadPoolExecutor,重写钩子方法
        ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        ) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("线程 " + t.getName() + " 开始执行任务: " + r.toString());
                super.beforeExecute(t, r);
            }
            
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                if (t != null) {
                    System.err.println("任务执行异常: " + t.getMessage());
                } else {
                    System.out.println("任务执行完成: " + r.toString());
                }
                super.afterExecute(r, t);
            }
            
            @Override
            protected void terminated() {
                System.out.println("线程池已终止");
                super.terminated();
            }
        };
        
        // 提交任务
        for (int i = 1; i <= 3; i++) {
            final int taskId = i;
            customExecutor.submit(() -> {
                System.out.println("自定义线程池任务 " + taskId + " 执行");
                if (taskId == 2) {
                    throw new RuntimeException("模拟任务异常");
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("自定义线程池任务 " + taskId + " 完成");
            });
        }
        
        // 关闭线程池
        customExecutor.shutdown();
        
        try {
            if (!customExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                customExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            customExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("线程池钩子方法示例完成");
    }
}

通过本章节的学习,您已经掌握了Java多线程编程的核心概念和实践技巧,包括线程创建方式、线程生命周期管理、同步机制以及线程池的使用。这些知识对于开发高性能、高并发的Java应用程序至关重要。