Semaphore:如何快速实现一个限流器?
信号量模型
- 信号量模型还是很简单的,可以简单概括为:一个计数器,一个等待队列,三个方法。
- 在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。
- init():设置计数器的初始值。
- down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
- up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
- 在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。
- 在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。
如何使用信号量
快速实现一个限流器
- 实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有⼀个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。
- 比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。
- 其中,数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。
- 所谓对象池,指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。
- 对象池,可以用 List 保存实例对象,这个很简单。
- 但关键是限流器的设计,这里的限流,指的是不允许多于 N 个线程同时进入临界区。
-
那如何快速实现⼀个这样的限流器呢?如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。
import java.util.List; import java.util.Vector; import java.util.concurrent.Semaphore; import java.util.function.Function; public class ObjPool<T, R> { final List<T> pool; // ⽤信号量实现限流器 final Semaphore semaphore; // 构造函数 ObjPool(int size, T t) { pool = new Vector<T>(){}; for (int i = 0; i < size; i++) { pool.add(t); } semaphore = new Semaphore(size); } // 利⽤对象池的对象,调⽤ func // function 的作用是转换,将一个值转为另外一个值 R exec(Function<T, R> function) throws InterruptedException { T t = null; semaphore.acquire(); try { t = pool.remove(0); return function.apply(t); } finally { pool.add(t); semaphore.release(); } } public static void main(String[] args) throws InterruptedException { // 创建对象池 ObjPool<Long, String> objPool = new ObjPool<Long, String>(10, 2L); // 通过对象池获取 t,之后执⾏ objPool.exec(t -> { System.out.println(t); return t.toString(); }); } }
- 我们用一个 List来保存对象实例,用 Semaphore 实现限流器。
- 关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。
- 在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,而其他线程则会阻塞在 acquire() 方法上。
- 我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。
- 如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
文章来源地址https://www.toymoban.com/news/detail-472092.html
文章来源:https://www.toymoban.com/news/detail-472092.html
到了这里,关于《Java并发编程实战》课程笔记(九)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!