分布式消息队列有activeMQ,kafka,RabbitMQ等,消息队列使程序之间解耦,提升程序响应效率。
场景模拟:就是用户注册的时候,在注册成 功以后发放积分。这个场景在一般来说,我们会这么去实 现 但是实际上,我们需要考虑两个问题 1. 性能,在注册这个环节里面,假如添加用户需要花费1秒 钟,增加积分需要花费 1 秒钟,那么整个注册结果的返回就可能需要大于2秒,虽然影响不是很大,但是在量 比较大的时候,我们也需要做一些优化 ; 2. 耦合,添加用户和增加积分,可以认为是两个领域,也 就是说,增加积分并不是注册必须要具备的功能,但是 一旦增加积分这个逻辑出现异常,就会导致注册失败。 这种耦合在程序设计的时候是一定要规避的 因此我们可以通过异步的方式来实现
package com.pattern.blockquue; /** * Created by chenli on 2019/10/30. */ public class UserServiceDemo { public boolean register(){ User user=new User(); user.setName("demo"); addUser(user); addPoint(user); return true; } private void addPoint(User user) { System.out.println("添加1000积分成功"+user.getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } private void addUser(User user) { System.out.println("添加用户成功"+user.getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new UserServiceDemo().register(); } }修改后
package com.pattern.blockquue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by chenli on 2019/10/30. */ public class UserService { private final ExecutorService executorService= Executors.newSingleThreadExecutor(); private volatile boolean isRunning = true; ArrayBlockingQueue arrayBlockingQueue=new ArrayBlockingQueue(10); { init(); } public void init(){ executorService.execute(()->{ while (isRunning){ try { User user=(User)arrayBlockingQueue.take(); addPoint(user); } catch (InterruptedException e) { e.printStackTrace(); } } }); } public boolean register(){ User user=new User(); user.setName("demo"); addUser(user); arrayBlockingQueue.add(user); return true; } private void addPoint(User user) { System.out.println("添加1000积分成功"+user.getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } private void addUser(User user) { System.out.println("添加用户成功"+user.getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new UserService().register(); } }在这个案例中,我们使用了 ArrayBlockingQueue 基于数 组的阻塞队列,来优化代码的执行逻辑。
阻塞队列这块的应用场景,比较多的仍然是对于生产者消 费者场景的应用,但是由于分布式架构的普及,是的大家 更多的关注在分布式消息队列上。所以其实如果把阻塞队 列比作成分布式消息队列的话,那么所谓的生产者和消费 者其实就是基于阻塞队列的解耦。 另外,阻塞队列是一个 fifo 的队列;
1.阻塞队列的插入操作 add(e):添加元素到队列中,如果队满,继续插入会报元素错误,IllegalStateException; offer(e):添加元素到队列中,同时返回该元素是否插入成功true/false; put(e):当队列元素满了后再插入,则出现阻塞状态; 2.移除操作 remove():当队列为空时,调用 remove 会返回 false, 如果元素移除成功,则返回true poll(): 当队列中存在元素,则从队列中取出一个元素, 如果队列为空,则直接返回null ; take():基于阻塞的方式获取队列中的元素,如果队列为 空,则take方法会一直阻塞,直到队列中有新的数据可 以消费;
构造方法 ArrayBlockingQueue提供了三个构造方法,分别如下。 capacity: 表示数组的长度,也就是队列的长度 。 fair:表示是否为公平的阻塞队列,默认情况下构造的是非 公平的阻塞队列,它提供了接收一个几个 作为数据初始化的方法 ;
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //重入锁,出 队和入队持有这一把锁 notEmpty = lock.newCondition(); //初始化非空 等待队列 notFull = lock.newCondition(); //初始化非满 等待队列 }1.add方法 从父类的add方法可以看到,如判断队列是否满了,如果满了直接抛异常
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }2.offer方法 add方法最终还是调用offer方法来添加数据,返回一个添加成功或者失败的布尔值反馈; 主要做了:
判断添加的数据是否为空添加重入锁判断队列长度,如果队列长度等于数组长度,表示满了 直接返回false否则,直接调用enqueue将元素添加到队列中 public boolean offer(E e) { checkNotNull(e); //对请求数据做判断 final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }enqueue方法
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; //通过 putIndex 对数据赋 值 if (++putIndex == items.length) // 当 putIndex 等于数组长度时,将 putIndex 重置为 0 putIndex = 0; count++;//记录队列元素的个数 notEmpty.signal();//唤醒处于等待状态下的线程,表 示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以 开始取出元素 }当putIndex等于队列长度时,则需要重置putIndex=0;因为ArrayBlockingQueue是一个FIFO的队列,队列添加 元素时,是从队尾获取putIndex来存储元素,当putIndex 等于数组长度时,下次就需要从数组头部开始添加了。 put()方法
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //这个也是获得锁,但 是和 lock 的区别是,这个方法优先允许在等待时由其他线程调 用等待线程的 interrupt 方法来中断等待直接返回。而 lock 方法是尝试获得锁成功后才响应中断 try { while (count == items.length) notFull.await();//队列满了的情况下,当前 线程将会被 notFull 条件对象挂起加到等待队列中 enqueue(e); } finally { lock.unlock(); } }take方法 take方法是阻塞获取队列中的元素,有就删除,没有就阻塞;阻塞是可以阻断的,如果队列中没有数据,加入notEmpty队列等待(有数据就直接取走,没有就等待),如果有新的线程put的数据,put会唤醒take线程;
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //如果队列为空的情况 下,直接通过 await 方法阻塞 return dequeue(); } finally { lock.unlock(); } }dequeue 方法 这个是出队列的方法,主要是删除队列头部的元素并发返 回给客户端 takeIndex,是用来记录拿数据的索引值
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //默认获取 0 位置 的元素 items[takeIndex] = null;//将该位置的元素设置为 空 if (++takeIndex == items.length)//这里的作用 也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部 位置开始获取数据 takeIndex = 0; count--;//记录 元素个数递减 if (itrs != null) itrs.elementDequeued();//同时更新迭代器中 的元素数据 notFull.signal();//触发 因为队列满了以后导致的 被阻塞的线程 return x; }