package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.commons.memory.MemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.class */
public class SharedTsBlockQueueTest {

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest$ReceiveTask.class */
    private static class ReceiveTask implements Runnable {
        private final SharedTsBlockQueue queue;
        private final AtomicReference<Integer> numOfTsBlocksToReceive;
        private final AtomicReference<Integer> numOfTimesBlocked;
        private final ExecutorService executor;

        public ReceiveTask(SharedTsBlockQueue sharedTsBlockQueue, AtomicReference<Integer> atomicReference, AtomicReference<Integer> atomicReference2, ExecutorService executorService) {
            this.queue = (SharedTsBlockQueue) Validate.notNull(sharedTsBlockQueue);
            this.numOfTsBlocksToReceive = (AtomicReference) Validate.notNull(atomicReference);
            this.numOfTimesBlocked = (AtomicReference) Validate.notNull(atomicReference2);
            this.executor = (ExecutorService) Validate.notNull(executorService);
        }

        @Override // java.lang.Runnable
        public void run() {
            ListenableFuture listenableFuture = null;
            while (true) {
                if (this.numOfTsBlocksToReceive.get().intValue() <= 0) {
                    break;
                }
                synchronized (this.queue) {
                    listenableFuture = this.queue.isBlocked();
                    if (!listenableFuture.isDone()) {
                        break;
                    }
                    this.queue.remove();
                    this.numOfTsBlocksToReceive.updateAndGet(num -> {
                        return Integer.valueOf(num.intValue() - 1);
                    });
                }
                break;
            }
            if (listenableFuture != null) {
                this.numOfTimesBlocked.updateAndGet(num2 -> {
                    return Integer.valueOf(num2.intValue() + 1);
                });
                listenableFuture.addListener(new ReceiveTask(this.queue, this.numOfTsBlocksToReceive, this.numOfTimesBlocked, this.executor), this.executor);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest$SendTask.class */
    private static class SendTask implements Runnable {
        private final SharedTsBlockQueue queue;
        private final long mockTsBlockSize;
        private final AtomicReference<Integer> numOfTsBlocksToSend;
        private final AtomicReference<Integer> numOfTimesBlocked;
        private final ExecutorService executor;

        public SendTask(SharedTsBlockQueue sharedTsBlockQueue, long j, AtomicReference<Integer> atomicReference, AtomicReference<Integer> atomicReference2, ExecutorService executorService) {
            this.queue = (SharedTsBlockQueue) Validate.notNull(sharedTsBlockQueue);
            Validate.isTrue(j > 0);
            this.mockTsBlockSize = j;
            this.numOfTsBlocksToSend = (AtomicReference) Validate.notNull(atomicReference);
            this.numOfTimesBlocked = (AtomicReference) Validate.notNull(atomicReference2);
            this.executor = (ExecutorService) Validate.notNull(executorService);
        }

        @Override // java.lang.Runnable
        public void run() {
            ListenableFuture listenableFuture = null;
            while (this.numOfTsBlocksToSend.get().intValue() > 0) {
                synchronized (this.queue) {
                    listenableFuture = this.queue.add(Utils.createMockTsBlock(this.mockTsBlockSize));
                }
                this.numOfTsBlocksToSend.updateAndGet(num -> {
                    return Integer.valueOf(num.intValue() - 1);
                });
                if (!listenableFuture.isDone()) {
                    break;
                }
            }
            if (listenableFuture != null) {
                this.numOfTimesBlocked.updateAndGet(num2 -> {
                    return Integer.valueOf(num2.intValue() + 1);
                });
                listenableFuture.addListener(new SendTask(this.queue, this.mockTsBlockSize, this.numOfTsBlocksToSend, this.numOfTimesBlocked, this.executor), this.executor);
            } else {
                synchronized (this.queue) {
                    this.queue.setNoMoreTsBlocks(true);
                }
            }
        }
    }

    @Test(timeout = 15000)
    public void concurrencyTest() {
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn((MemoryPool) Mockito.spy(new MemoryPool("test", (MemoryManager) Mockito.spy(new MemoryManager(10485760L)), 5242880L)));
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(new TFragmentInstanceId("q0", 0, "0"), "test", localMemoryManager, MoreExecutors.newDirectExecutorService());
        sharedTsBlockQueue.getCanAddTsBlock().set((Object) null);
        sharedTsBlockQueue.setMaxBytesCanReserve(Long.MAX_VALUE);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        AtomicReference atomicReference = new AtomicReference(0);
        AtomicReference atomicReference2 = new AtomicReference(0);
        AtomicReference atomicReference3 = new AtomicReference(Integer.valueOf(StatementTestUtils.TEST_SERIES_SLOT_NUM));
        AtomicReference atomicReference4 = new AtomicReference(Integer.valueOf(StatementTestUtils.TEST_SERIES_SLOT_NUM));
        newFixedThreadPool.submit(new SendTask(sharedTsBlockQueue, 1048576L, atomicReference3, atomicReference, newFixedThreadPool));
        newFixedThreadPool.submit(new ReceiveTask(sharedTsBlockQueue, atomicReference4, atomicReference2, newFixedThreadPool));
        while (((Integer) atomicReference3.get()).intValue() != 0 && ((Integer) atomicReference4.get()).intValue() != 0) {
            System.out.println(String.format("Sender %d: %d, Receiver %d: %d", atomicReference.get(), atomicReference3.get(), atomicReference2.get(), atomicReference4.get()));
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                Assert.fail(e.getMessage());
            }
        }
    }
}
