两阶段终止
两阶段终止同步模式之保护性暂停
同步模式之保护性暂停异步模式之生产者消费者
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
class Message { private int id; private Object message; public Message(int id, Object message) { this.id = id; this.message = message; } public int getId() { return id; } public Object getMessage() { return message; } }
@Slf4j(topic = "c.MessageQueue") class MessageQueue { private LinkedList<Message> queue; private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; queue = new LinkedList<>(); } public Message take() { synchronized (queue) { while (queue.isEmpty()) { log.debug("没货了, wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); queue.notifyAll(); return message; } } public void put(Message message) { synchronized (queue) { while (queue.size() == capacity) { log.debug("库存已达上限, wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); queue.notifyAll(); } } }
@Slf4j(topic = "c.TestProducerConsumer") public class TestProducerConsumer { public static void main(String[] args) { MessageQueue messageQueue = new MessageQueue(2); for (int i = 0; i < 4; i++) { int id = i; new Thread(() -> { try { log.debug("download..."); List<String> response = Downloader.download(); log.debug("try put message({})", id); messageQueue.put(new Message(id, response)); } catch (IOException e) { e.printStackTrace(); } }, "生产者" + i).start(); } new Thread(() -> { while (true) { Message message = messageQueue.take(); List<String> response = (List<String>) message.getMessage(); log.debug("take message({}): [{}] lines", message.getId(), response.size()); } }, "消费者").start(); } }
10:48:38.070 [生产者3] c.TestProducerConsumer - download... 10:48:38.070 [生产者0] c.TestProducerConsumer - download... 10:48:38.070 [消费者] c.MessageQueue - 没货了, wait 10:48:38.070 [生产者1] c.TestProducerConsumer - download... 10:48:38.070 [生产者2] c.TestProducerConsumer - download... 10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1) 10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2) 10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0) 10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3) 10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait 10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait 10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines 10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait 10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines 10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines 10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines 10:48:41.240 [消费者] c.MessageQueue - 没货了, wait
同步模式之顺序控制
同步模式之顺序控制同步模式之 Balking
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
@Service @Slf4j public class MonitorService { private volatile boolean stop; private volatile boolean starting; private Thread monitorThread; public void start() { // 缩小同步范围,提升性能 synchronized (this) { log.info("该监控线程已启动?({})", starting); if (starting) { return; } starting = true; } // 由于之前的 balking 模式,以下代码只可能被一个线程执行,因此无需互斥 monitorThread = new Thread(() -> { while (!stop) { report(); sleep(2); } // 这里的监控线程只可能启动一个,因此只需要用 volatile 保证 starting 的可见性 log.info("监控线程已停止..."); starting = false; }); stop = false; log.info("监控线程已启动..."); monitorThread.start(); } private void report() { Info info = new Info(); info.setTotal(Runtime.getRuntime().totalMemory()); info.setFree(Runtime.getRuntime().freeMemory()); info.setMax(Runtime.getRuntime().maxMemory()); info.setTime(System.currentTimeMillis()); MonitorController.QUEUE.offer(info); } private void sleep(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } public synchronized void stop() { stop = true; // 不加打断需要等到下一次 sleep 结束才能退出循环,这里是为了更快结束 monitorThread.interrupt(); } }
@RestController public class MonitorController { public static ArrayBlockingQueue<Info> QUEUE = new ArrayBlockingQueue(30); @Autowired private MonitorService monitorService; @GetMapping("/info") public List<Info> info() { ArrayList<Info> infos = new ArrayList<>(); QUEUE.drainTo(infos); return infos; } @GetMapping("/start") public void start() { monitorService.start(); } @GetMapping("/stop") public void stop() { monitorService.stop(); } }
当前端页面多次点击按钮调用 start 时输出
[http-nio-8080-exec-1] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(false) [http-nio-8080-exec-1] cn.itcast.monitor.service.MonitorService - 监控线程已启动... [http-nio-8080-exec-2] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [http-nio-8080-exec-3] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true) [http-nio-8080-exec-4] cn.itcast.monitor.service.MonitorService - 该监控线程已启动?(true)
它还经常用来实现线程安全的单例
public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; public static synchronized Singleton getInstance() { if (INSTANCE != null) { return INSTANCE; } INSTANCE = new Singleton(); return INSTANCE; } }
页面
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>监控程序</title> <script type="text/javascript" src="/js/echarts.min.js"></script> <script type="text/javascript" src="/js/jquery.min.js"></script> </head> <body style="height: 500px; margin: 0"> <div id="container" style="height: 100%"></div> <div> <input type="button" value="开始" id="start"> <input type="button" value="停止" id="stop"> </div> <script type="text/javascript"> $("#start").click(function(){ $.get("start"); }); $("#stop").click(function(){ $.get("stop"); }); const dom = document.getElementById("container"); const myChart = echarts.init(dom); option = null; let data = []; option = { title: { text: '动态数据 + 时间坐标轴' }, xAxis: { type: 'time', splitLine: { show: false } }, yAxis: { type: 'value', boundaryGap: [0, '100%'], splitLine: { show: false } }, series: [{ name: '总内存', type: 'line', showSymbol: true, areaStyle: {color: 'rgba(0, 200, 40, 0.7)'}, hoverAnimation: true, data: [] }, { name: '已使用', type: 'line', showSymbol: true, areaStyle: {color: 'rgba(0, 0, 200, 0.7)'}, hoverAnimation: true, data: [] }] }; setInterval(function () { $.get("info").done(function (infos) { while (data.length >= 10) { data.shift(); } data.push(...infos); let max = data.map(d => [d.time, d.max]); let total = data.map(d => [d.time, d.total]); let used = data.map(d => [d.time, d.total - d.free]); myChart.setOption({ series: [{ data: total }, { data: used }] }); }); }, 2000); myChart.setOption(option, true); </script> </body> </html>
对比一下保护性暂停模式:保护性暂停模式用在一个线程等待另一个线程的执行结果,当条件不满足时线程等待。
异步模式之工作线程
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工
饥饿
固定大小线程池会有饥饿现象
两个工人是同一个线程池中的两个线程
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
后厨做菜:没啥说的,做就是了比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
@Slf4j(topic = "c.TestStarvation") public class TestStarvation { static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅"); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); executorService .execute(() -> { log.debug("处理点餐..."); Future<String> f = executorService .submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); executorService .execute(() -> { log.debug("处理点餐..."); Future<String> f = executorService .submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
可能的输出
17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐... 17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:
@Slf4j(topic = "c.TestStarvation") public class TestStarvation { static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅"); static Random RANDOM = new Random(); static String cooking() { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main(String[] args) { ExecutorService waiterPool = Executors.newFixedThreadPool(1); ExecutorService cookPool = Executors.newFixedThreadPool(1); waiterPool.execute(() -> { log.debug("处理点餐..."); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); waiterPool.execute(() -> { log.debug("处理点餐..."); Future<String> f = cookPool.submit(() -> { log.debug("做菜"); return cooking(); }); try { log.debug("上菜: {}", f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }