分享一个生产者-消费者的真实场景

科技资讯 投稿 7800 0 评论

分享一个生产者-消费者的真实场景

0.背景

数据导入的特点是不定时,但量大。每次导入的数据量在几亿到几十亿上百亿之间。
如果使用dataset.write的方式写入,spark内部也是使用的sql connection以jdbc的方式进行写入。在这样的数据量之下,会非常慢,慢到完全无法接受。

语法为:

COPY table FROM '/mnt/g/file.csv' WITH CSV HEADER;

这样效率高了很多。

那么,现在就是使用spark读取hive,经过处理,再dataset.repartion(num重分区,将数据写入HDFS形成num个文件。再将这些小文件多线程批量copy到tbds。

把文件抽象成生产者,数据库连接抽象成消费者。生产者源源不断生产,消费者能力有限跟不上生产者的速率,就需要阻塞在消费端。

1.实现方式

1.1 方式1 线程池自带阻塞队列

我们批量写入是通过多线程来的,实现一个线程池的其中之一方法是通过Executors,并指定一个带线程数的参数。
这样的方式在线上7*24小时运行的业务系统中是绝对不推荐使用的,但在一些大数据平台的定时任务也不是完全禁止,看自身情况。

ThreadPoolExecutor来构建线程池,核心线程和最大线程相同,且阻塞队列默认为LinkedBlockingQueue,这个阻塞队列
没有设置长度,那么它的最大长度为Integer.MAX_VALUE
这样就可能造成内存的无限增长,内存耗尽导致OOM。

同时,刚好可以利用线程池的阻塞队列来构建消费者-生产者。

public static void main(String[] args throws Exception {
        List<File> fileList = cn.hutool.core.io.FileUtil.loopFiles(new File("测试路径";
        ExecutorService executorService = Executors.newFixedThreadPool(10;
        LongAdder longAdder = new LongAdder(;
        for(File file : fileList{
            try {
                executorService.execute(new TestRun(fileList, longAdder;
            } catch (Exception exception {
                exception.printStackTrace(;
            }
        }
        executorService.shutdown(;
    }

    public static class TestRun implements Runnable{
        private List<File> fileList;
        LongAdder longAdder;

        public TestRun(List<File> fileList, LongAdder longAdder {
            this.fileList = fileList;
            this.longAdder = longAdder;
        }

        @SneakyThrows
        @Override
        public void run( {
            try {
                // 可通过连接池
                longAdder.increment(;
                ConnectionUtils.getConnection(;
                System.out.println(Thread.currentThread( + "第"+ longAdder.longValue( + "/"+ fileList.size( +"个文件获取连接正在入库";
                Random random = new Random(;
                Thread.sleep(random.nextInt(1000;
                System.out.println(Thread.currentThread( + "第"+ longAdder.longValue( + "/"+ fileList.size( +"个文件完成入库归还连接";
            } finally {
            }
        }
    }

运行输出:

数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
Thread[pool-1-thread-5,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-9,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-1,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-2,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-7,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-10,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-6,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-8,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-3,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-1,5,main]第10/33个文件完成入库归还连接
数据库驱动加载成功
Thread[pool-1-thread-1,5,main]第11/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第11/33个文件完成入库归还连接
数据库驱动加载成功
.
.
.
数据库驱动加载成功
Thread[pool-1-thread-3,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-9,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-8,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-6,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-7,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-10,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-5,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-4,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-3,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-2,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-1,5,main]第33/33个文件完成入库归还连接


这里的longAdder只是为了方便观看,并没有严格按线程递增。
我们模拟33个文件,线程池的核心大小为10,可以看到最大只有10个文件在同时执行,只有当其中文件入库完毕,新的文件才能执行。达到了我们想要的效果。

1.2 方式2 使用阻塞队列+CountDownLatch

它是一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CountDownLatch是一种通用的同步工具,可用于多种目的。用计数1初始化的CountDownLatch用作简单的开/关锁存器或门:所有调用的线程都在门处等待,直到调用countDown的线程打开它。初始化为N的CountDownLatch可以用来让一个线程等待,直到N个线程完成了一些操作,或者一些操作已经完成了N次。

@Slf4j
public class ConnectionQueue {

    LinkedBlockingQueue<Connection> connections = null;

    private int size = 10;

    public ConnectionQueue(int size throws Exception{
        new ConnectionQueue(null, size;
    }

    public ConnectionQueue(LinkedBlockingQueue<Connection> connections, int size throws IllegalArgumentException{
        if (size <= 0 || size > 100 {
            throw new IllegalArgumentException("size 长度必须适宜,在1-100之间";
        }
        this.connections = connections;
        this.size = size;
    }

    /**
     * 初始化数据库连接
     */
    public void init({
        if (connections == null {
            connections = new LinkedBlockingQueue<>(size;
        }
        for (int i = 0; i < size; i++ {
            connections.add(ConnectionUtils.getConnection(;
        }
    }

    /**
     * 获取一个数据库连接,如果没有空闲连接将阻塞直到拿到连接
     * @return
     * @throws InterruptedException
     */
    public Connection get( throws InterruptedException {
        return connections.take(;
    }

    public Connection poll( throws InterruptedException {
        return connections.poll(;
    }


    /**
     * 归还空闲连接
     * @param connection
     */
    public void put(Connection connection{
        connections.add(connection;
    }

    public int size({
        return connections.size(;
    }

    /**
     * 销毁
     */
    public void destroy( {
        Iterator<Connection> it = connections.iterator(;
        while (it.hasNext( {
            Connection conn = it.next(;
            if (conn != null {
                try {
                    conn.close(;
                    log.info("关闭连接 " + conn;
                } catch (SQLException e {
                    log.error("关闭连接失败", e;
                }
            } else {
                log.info("conn = {}为空", conn;
            }
        }
        if (connections != null {
            connections.clear(;
        }
    }
}

同时使用CountDownLatch进行计数,await(直到所有线程都执行完毕,再进行资源销毁和其它业务操作。

public static void main(String[] args throws Exception {
        List<File> fileList = cn.hutool.core.io.FileUtil.loopFiles(new File("测试路径";
        ConnectionQueue connectionQueue = new ConnectionQueue(10;
        connectionQueue.init(;
        ExecutorService executorService = new ThreadPoolExecutor(10,
                10,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10,
                 (r, executor -> {
                     if (r instanceof Test.TestRun {
                         ((TestRun r.getCountDownLatch(.countDown(;
                     }
                     System.out.println(Thread.currentThread( +" reject countdown";
                 }
                ;
        CountDownLatch countDownLatch = new CountDownLatch(fileList.size(;
        for(File file : fileList{
            try {
                Connection conn = connectionQueue.get(;
                executorService.execute(new TestRun(countDownLatch, connectionQueue, fileList, conn;
            } catch (Exception exception {
                exception.printStackTrace(;
            }
        }

        countDownLatch.await(;
        executorService.shutdown(;
        connectionQueue.destroy(;
    }

    public static class TestRun implements Runnable{
        private CountDownLatch countDownLatch;
        private ConnectionQueue connectionQueue;
        private Connection connection;
        private List<File> fileList;

        public TestRun(CountDownLatch countDownLatch, ConnectionQueue connectionQueue, List<File> fileList, Connection connection {
            this.countDownLatch = countDownLatch;
            this.connectionQueue = connectionQueue;
            this.fileList = fileList;
            this.connection = connection;
        }

        public CountDownLatch getCountDownLatch( {
            return countDownLatch;
        }

        public void setCountDownLatch(CountDownLatch countDownLatch {
            this.countDownLatch = countDownLatch;
        }

        @SneakyThrows
        @Override
        public void run( {
            try {
                System.out.println(Thread.currentThread( + "第"+ countDownLatch.getCount( + "/"+ fileList.size( +"个文件获取连接正在入库";
                Random random = new Random(;
                Thread.sleep(random.nextInt(1000;
                System.out.println(Thread.currentThread( + "第"+ countDownLatch.getCount( + "/"+ fileList.size( +"个文件完成入库归还连接";
            } finally {
                connectionQueue.put(connection;
                countDownLatch.countDown(;
            }
        }
    }

执行结果:

数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
数据库驱动加载成功
Thread[pool-1-thread-1,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-3,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-2,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-10,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-6,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-7,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-8,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-9,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-5,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-4,5,main]第32/33个文件获取连接正在入库
Thread[pool-1-thread-8,5,main]第32/33个文件完成入库归还连接
Thread[pool-1-thread-8,5,main]第31/33个文件获取连接正在入库
Thread[pool-1-thread-8,5,main]第31/33个文件完成入库归还连接
Thread[pool-1-thread-8,5,main]第30/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第30/33个文件完成入库归还连接
...
Thread[pool-1-thread-2,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-5,5,main]第10/33个文件完成入库归还连接
Thread[pool-1-thread-4,5,main]第9/33个文件完成入库归还连接
Thread[pool-1-thread-9,5,main]第8/33个文件完成入库归还连接
Thread[pool-1-thread-2,5,main]第7/33个文件完成入库归还连接
Thread[pool-1-thread-6,5,main]第6/33个文件完成入库归还连接
Thread[pool-1-thread-7,5,main]第5/33个文件完成入库归还连接
Thread[pool-1-thread-10,5,main]第4/33个文件完成入库归还连接
Thread[pool-1-thread-3,5,main]第3/33个文件完成入库归还连接
Thread[pool-1-thread-1,5,main]第2/33个文件完成入库归还连接
Thread[pool-1-thread-8,5,main]第1/33个文件完成入库归还连接

1.2.1 如果线程池触发reject会发生什么?

需要注意的是,这里要考虑到线程池的拒绝策略。

AbortPolicy 默认策略,抛出异常
CallerRunsPolicy  从名字上可以看出,调用者执行
DiscardOldestPolicy 丢弃最老的任务,再尝试执行
DiscardPolicy  直接丢弃不做任何操作

ThreadPoolExecutor默认拒绝策略为AbortPolicy,就是抛出一个异常,那么这时候就执行不到后面的countdown
所以需要重写策略,在线程池队列已满拒绝新进任务的时候执行countdown,避免countDownLatch.await(永远等待。

1.3 方式3 使用Semaphore

synchronized 关键字和 Lock 锁实现了资源的并发访问控制,在同一时刻只允许一个线程进入临界区访问资源 (读锁除外。但考虑到另外一种场景,共享资源在同一时刻可以提供给多个线程访问,如厕所有多个坑位,可以同时提供给多人使用。这种场景下,就可以使用Semaphore信号量来实现。

当信号量许可>1,意味可以访问资源,如果信号量许可<=0,线程进入休眠。
当信号量许可=1,约等于synchronizedlock的效果。

在我们的场景下,共享资源就是数据库连接池N个,M个文件需要拿到连接池进行入库操作,但连接池数量N有限,远小于文件数M,所以需要对连接池的访问并发度进行控制。

Semaphore semaphore = new Semaphore(10;
允许线程池最多10个任务并行执行,只有当其它任务执行完毕归还permit,新的任务拿到permit才能开始执行。
public static void main(String[] args throws Exception {
        List<File> fileList = FileUtil.loopFiles(new File("测试路径";
        Semaphore semaphore = new Semaphore(10;

        Random random = new Random(;
        ExecutorService executorService = new ThreadPoolExecutor(10,
                10,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10;
        AtomicInteger count = new AtomicInteger(1;
        for (File file : fileList {
            semaphore.acquire(;
                executorService.execute(( -> {
                try {
                    int subCount = count.getAndIncrement(;
                    System.out.println(Thread.currentThread( + "第" + subCount + "/" + fileList.size( + "个文件获取连接正在入库";
                    // 模拟入库操作
                    int time = random.nextInt(1000;
                    Thread.sleep(time;
                    System.out.println(Thread.currentThread( + "第" + subCount + "/" + fileList.size( + "个文件完成入库归还连接";
                } catch (Exception e {
                    e.printStackTrace(;
                } finally {
                    semaphore.release(;
                }
            };
        }

        System.out.println("shutdown";
        executorService.shutdown(;
    }

因为我们的大数据框架本身有获取连接池的轮子,这里省略了从连接池获取连接的操作。

Thread[pool-1-thread-1,5,main]第1/33个文件获取连接正在入库
Thread[pool-1-thread-3,5,main]第3/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第2/33个文件获取连接正在入库
Thread[pool-1-thread-10,5,main]第5/33个文件获取连接正在入库
Thread[pool-1-thread-9,5,main]第4/33个文件获取连接正在入库
Thread[pool-1-thread-8,5,main]第8/33个文件获取连接正在入库
Thread[pool-1-thread-2,5,main]第9/33个文件获取连接正在入库
Thread[pool-1-thread-7,5,main]第7/33个文件获取连接正在入库
Thread[pool-1-thread-6,5,main]第6/33个文件获取连接正在入库
Thread[pool-1-thread-5,5,main]第10/33个文件获取连接正在入库
Thread[pool-1-thread-5,5,main]第10/33个文件完成入库归还连接
Thread[pool-1-thread-5,5,main]第11/33个文件获取连接正在入库
Thread[pool-1-thread-3,5,main]第3/33个文件完成入库归还连接
...
Thread[pool-1-thread-2,5,main]第23/33个文件完成入库归还连接
shutdown
Thread[pool-1-thread-2,5,main]第33/33个文件获取连接正在入库
Thread[pool-1-thread-4,5,main]第24/33个文件完成入库归还连接
Thread[pool-1-thread-5,5,main]第32/33个文件完成入库归还连接
Thread[pool-1-thread-1,5,main]第30/33个文件完成入库归还连接
Thread[pool-1-thread-9,5,main]第26/33个文件完成入库归还连接
Thread[pool-1-thread-3,5,main]第19/33个文件完成入库归还连接
Thread[pool-1-thread-2,5,main]第33/33个文件完成入库归还连接
Thread[pool-1-thread-8,5,main]第22/33个文件完成入库归还连接
Thread[pool-1-thread-6,5,main]第27/33个文件完成入库归还连接
Thread[pool-1-thread-10,5,main]第31/33个文件完成入库归还连接
Thread[pool-1-thread-7,5,main]第28/33个文件完成入库归还连接

1.3.1 如果引发了默认线程池拒绝策略,Semaphore会有问题吗?

我们知道CountDownLatch由于线程池拒绝策略,没有执行到countdown(会导致程序一直阻塞。那么Semaphore会有相应的问题吗?

acquire(,但没执行release(
写一个测试例子:

public static void main(String[] args throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(20;
        Semaphore semaphore = new Semaphore(10;
        ExecutorService executorService = new ThreadPoolExecutor(5,
                5,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1, (r, executor -> {
                    Random random = new Random(;
                    try {
                        Thread.sleep(random.nextInt(1000;
                    } catch (InterruptedException e {
                        e.printStackTrace(;
                    }
                    if (r instanceof TestRun {
                        ((TestRun r.getCountDownLatch(.countDown(;
//                                    ((TestRun r.getSemaphore(.release(;
                    }
                    System.out.println(Thread.currentThread( + " reject countdown " + semaphore.availablePermits(;
        };


        for (int i = 0; i < 30; i++ {
            semaphore.acquire(;
            Thread.sleep(100;
            executorService.execute(new TestRun(countDownLatch, semaphore;
        }

//        countDownLatch.await(;
        System.out.println("完成";
        executorService.shutdown(;
    }

    public static class TestRun implements Runnable {
        private CountDownLatch countDownLatch;
        private Semaphore semaphore;

        public TestRun(CountDownLatch countDownLatch, Semaphore semaphore {
            this.countDownLatch = countDownLatch;
            this.semaphore = semaphore;
        }

        public CountDownLatch getCountDownLatch( {
            return countDownLatch;
        }

        public void setCountDownLatch(CountDownLatch countDownLatch {
            this.countDownLatch = countDownLatch;
        }

        public Semaphore getSemaphore( {
            return semaphore;
        }

        public void setSemaphore(Semaphore semaphore {
            this.semaphore = semaphore;
        }

        @SneakyThrows
        @Override
        public void run( {
//            semaphore.acquire(;
            Random random = new Random(;
            Thread.sleep(random.nextInt(1000;
            countDownLatch.countDown(;
            semaphore.release(;
            System.out.println(Thread.currentThread( + " start" + " semaphore = " + semaphore.availablePermits(;
            System.out.println(Thread.currentThread( + " countdown";
        }
    }

执行日志:

Thread[pool-1-thread-1,5,main] start semaphore = 8
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 5
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 4
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 5
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 6
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 7
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 7
Thread[pool-1-thread-4,5,main] start semaphore = 5
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 5
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 4
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 3
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 3
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 4
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 4
Thread[pool-1-thread-4,5,main] start semaphore = 4
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 4
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 3
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 3
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 2
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 2
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 2
Thread[pool-1-thread-3,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 3
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 4
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-5,5,main] start semaphore = 5
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 6
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 6
完成
Thread[pool-1-thread-5,5,main] start semaphore = 4
Thread[pool-1-thread-5,5,main] countdown
Thread[pool-1-thread-2,5,main] start semaphore = 5
Thread[pool-1-thread-2,5,main] countdown
Thread[pool-1-thread-4,5,main] start semaphore = 6
Thread[pool-1-thread-4,5,main] countdown
Thread[pool-1-thread-3,5,main] start semaphore = 7
Thread[pool-1-thread-3,5,main] countdown

可以看到执行了3次reject,最后semaphore值为7,正常应该为初始值10。
首先程序能够正常执行完毕,然后并发度下降了。
如果极端情况下,触发拒绝策略增多,semaphore的值降为1,这里semaphore就变成了lock或者synchronized,多线程就失去了效果变成了单线程串行执行。

CallerRunsPolicy源码可知,这里的r即为调用者线程,在这里就是main线程。我们在main线程执行了acquire(,那么我们只需要重写拒绝策略,在这里执行release(就可保证并发度与初始值保持一致。

1.3.2 如果初始化的时候就为0

Semaphore semaphore = new Semaphore(0;

那么程序会永远阻塞不执行,因为没有可用的permit。

1.3.3 如果reject次数大于等于初始化长度

Semaphore semaphore = new Semaphore(10;
同时,线程池拒绝次数>= 10,理论上,这个时候Semaphore就会出现0或负数。
线程就会阻塞。

我模拟了很多次都没出现阻塞的情况。
把线程池大小调整为1,将Semaphore大小设置为>1,这里为4。

public static void main(String[] args throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(20;
        Semaphore semaphore = new Semaphore(4;
        ExecutorService executorService = new ThreadPoolExecutor(1,
                1,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1, (r, executor -> {
                    Random random = new Random(;
                    try {
                        Thread.sleep(random.nextInt(1000;
                    } catch (InterruptedException e {
                        e.printStackTrace(;
                    }
                    if (r instanceof TestRun {
                        ((TestRun r.getCountDownLatch(.countDown(;
        //                            ((TestRun r.getSemaphore(.acquire(;
//                                    ((TestRun r.getSemaphore(.release(;
                    }
                    System.out.println(Thread.currentThread( + " reject countdown " + semaphore.availablePermits(;
        };


        for (int i = 0; i < 30; i++ {
            semaphore.acquire(;
//            Thread.sleep(100;
            executorService.execute(new TestRun(countDownLatch, semaphore;
        }

//        countDownLatch.await(;
        System.out.println("完成";
        executorService.shutdown(;
    }

    public static class TestRun implements Runnable {
        private CountDownLatch countDownLatch;
        private Semaphore semaphore;

        public TestRun(CountDownLatch countDownLatch, Semaphore semaphore {
            this.countDownLatch = countDownLatch;
            this.semaphore = semaphore;
        }

        public CountDownLatch getCountDownLatch( {
            return countDownLatch;
        }

        public void setCountDownLatch(CountDownLatch countDownLatch {
            this.countDownLatch = countDownLatch;
        }

        public Semaphore getSemaphore( {
            return semaphore;
        }

        public void setSemaphore(Semaphore semaphore {
            this.semaphore = semaphore;
        }

        @SneakyThrows
        @Override
        public void run( {
//            semaphore.acquire(;
            Random random = new Random(;
            Thread.sleep(random.nextInt(1000;
            countDownLatch.countDown(;
            semaphore.release(;
            System.out.println(Thread.currentThread( + " start" + " semaphore = " + semaphore.availablePermits(;
            System.out.println(Thread.currentThread( + " countdown";
        }
    }

执行结果:

Thread[pool-1-thread-1,5,main] start semaphore = 2
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 2
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 1
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[main,5,main] reject countdown 0
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 0
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown
完成
Thread[pool-1-thread-1,5,main] start semaphore = 1
Thread[pool-1-thread-1,5,main] countdown

最后semaphore = 1.
当我将semaphore初始化值调整为3,5,2,最后semaphore的值总是为1。
线程池触发拒绝次数总是为semaphore初始化值-1

所以,结论是只要semaphore的初始值大于0,就不用担心程序会一直阻塞不执行。
同时,线程池触发拒绝策略,如果没有重写拒绝策略执行semaphore.release(,就会将并发度降低。

2. 总结

2.这里使用信号量最为简单便捷。
3.不管使用的是coundownlatch还是信号量,都要注意线程池拒绝的情况。
如果countdownlatch因为线程池拒绝策略没有执行countdown会导致await一直等待阻塞;
如果信号量因为线程池拒绝策略没有执行release,导致没有足够的permit,不会导致程序阻塞,但会降低并发 度。

编程笔记 » 分享一个生产者-消费者的真实场景

赞同 (39) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽