简介
说明
本文用示例介绍使用阻塞队列来实现顺序消费。
需求
机器要对手机按顺序做如下任务:生产、打包、发货。消费者等待收货。
代码
手机产品
package org.example.a; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class Phone { /** * 手机的状态: * PRODUCED: 已生产 * PACKED: 已打包 * DELIVERED: 已发货 * <p>手机的状态只能由PRODUCED->PACKED->DELIVERED转变 */ public enum Status { PRODUCED, PACKED, DELIVERED } // 默认状态为PRODUCED private Status status = Status.PRODUCED; private final int id; public Phone(int id) { this.id = id; } public void pack() { status = Status.PACKED; } public void deliver() { status = Status.DELIVERED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Phone id: " + id + ", status: " + status; } }
队列
import java.util.concurrent.LinkedBlockingQueue; public class PhoneQueue extends LinkedBlockingQueue<Phone> { }
任务
生产手机的任务
/** * 生产手机的任务。 */ public class Producer implements Runnable { private PhoneQueue phoneQueue; private int count = 0; private Random random = new Random(47); public Producer(PhoneQueue queue) { this.phoneQueue = queue; } @Override public void run() { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500)); //生产一部手机,这些手机是有序的 Phone phone = new Phone(count++); System.out.println(phone); //放到PhoneQueue中 phoneQueue.put(phone); } } catch (InterruptedException e) { System.out.println("Producer interrupted."); } System.out.println("Producer off."); } }
打包的任务
/** * 打包的任务 */ public class Packer implements Runnable { private PhoneQueue producedQueue; private PhoneQueue packedQueue; public Packer(PhoneQueue producedQueue, PhoneQueue packedQueue) { this.producedQueue = producedQueue; this.packedQueue = packedQueue; } @Override public void run() { try { while (!Thread.interrupted()) { //在取得下一个手机之前会一直阻塞 Phone phone = producedQueue.take(); phone.pack(); System.out.println(phone); packedQueue.put(phone); } } catch (InterruptedException e) { System.out.println("Packer interrupted."); } System.out.println("Packer off."); } }
发货的任务
/** * 发货的任务 */ public class Delivery implements Runnable { private PhoneQueue butteredQueue; private PhoneQueue finishedQueue; public Delivery(PhoneQueue butteredQueue, PhoneQueue finishedQueue) { this.finishedQueue = finishedQueue; this.butteredQueue = butteredQueue; } @Override public void run() { try { while (!Thread.interrupted()) { //在取得下一个手机之前会一直阻塞 Phone phone = butteredQueue.take(); phone.deliver(); System.out.println(phone); finishedQueue.put(phone); } } catch (InterruptedException e) { System.out.println("Deliverer interrupted."); } System.out.println("Deliverer off."); } }
消费者(买手机的人)
/** * 买手机的人,消费者。 */ public class Consumer implements Runnable { private PhoneQueue finishedQueue; private int count = 0; public Consumer(PhoneQueue finishedQueue) { this.finishedQueue = finishedQueue; } @Override public void run() { try { while (!Thread.interrupted()) { //在取得下一个手机之前会一直阻塞 Phone phone = finishedQueue.take(); //验证取得的手机是有序的,而且状态是DELIVERED的 if (phone.getId() != count++ || phone.getStatus() != Phone.Status.DELIVERED) { System.out.println("Error -> " + phone); System.exit(-1); } else { //使用手机 System.out.println(phone + "->Use"); } } } catch (InterruptedException e) { System.out.println("Consumer interrupted."); } System.out.println("Consumer off."); } }
主类
public class Demo { public static void main(String[] args) { PhoneQueue producedQueue = new PhoneQueue(); PhoneQueue packedQueue = new PhoneQueue(); PhoneQueue deliveredQueue = new PhoneQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Producer(producedQueue)); exec.execute(new Packer(producedQueue, packedQueue)); exec.execute(new org.example.a.Delivery(packedQueue, deliveredQueue)); exec.execute(new Consumer(deliveredQueue)); try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } exec.shutdownNow(); } }
执行结果
Phone id: 0, status: PRODUCED Phone id: 0, status: PACKED Phone id: 0, status: DELIVERED Phone id: 0, status: DELIVERED->Use Phone id: 1, status: PRODUCED Phone id: 1, status: PACKED Phone id: 1, status: DELIVERED Phone id: 1, status: DELIVERED->Use Phone id: 2, status: PRODUCED Phone id: 2, status: PACKED Phone id: 2, status: DELIVERED Phone id: 2, status: DELIVERED->Use Phone id: 3, status: PRODUCED Phone id: 3, status: PACKED Phone id: 3, status: DELIVERED Phone id: 3, status: DELIVERED->Use Phone id: 4, status: PRODUCED Phone id: 4, status: PACKED Phone id: 4, status: DELIVERED Phone id: 4, status: DELIVERED->Use Phone id: 5, status: PRODUCED Phone id: 5, status: PACKED Phone id: 5, status: DELIVERED Phone id: 5, status: DELIVERED->Use Phone id: 6, status: PRODUCED Phone id: 6, status: PACKED Phone id: 6, status: DELIVERED Phone id: 6, status: DELIVERED->Use Phone id: 7, status: PRODUCED Phone id: 7, status: PACKED Phone id: 7, status: DELIVERED Phone id: 7, status: DELIVERED->Use Consumer interrupted. Packer interrupted. Producer interrupted. Producer off. Deliverer interrupted. Packer off. Consumer off. Deliverer off.
请先
!