package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.mpp.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.testcontainers.shaded.com.google.common.util.concurrent.MoreExecutors;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/LocalSinkChannelTest.class */
public class LocalSinkChannelTest {
    @Test
    public void testSend() {
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool memoryPool = (MemoryPool) Mockito.spy(new MemoryPool("test", 10485760L, 5242880L));
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(memoryPool);
        MPPDataExchangeManager.SinkListener sinkListener = (MPPDataExchangeManager.SinkListener) Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, "exchange_0", localMemoryManager, MoreExecutors.newDirectExecutorService());
        LocalSinkChannel localSinkChannel = new LocalSinkChannel(tFragmentInstanceId2, sharedTsBlockQueue, sinkListener);
        sharedTsBlockQueue.setMaxBytesCanReserve(Long.MAX_VALUE);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId2, "exchange_0", sharedTsBlockQueue, (MPPDataExchangeManager.SourceHandleListener) Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
        Assert.assertFalse(localSinkChannel.isFull().isDone());
        localSourceHandle.isBlocked();
        Assert.assertTrue(localSinkChannel.isFull().isDone());
        Assert.assertFalse(localSinkChannel.isFinished());
        Assert.assertFalse(localSinkChannel.isAborted());
        Assert.assertEquals(0L, localSinkChannel.getBufferRetainedSizeInBytes());
        int i = 0;
        while (localSinkChannel.isFull().isDone()) {
            localSinkChannel.send(Utils.createMockTsBlock(1048576L));
            i++;
        }
        Assert.assertEquals(11L, i);
        Assert.assertFalse(localSinkChannel.isFull().isDone());
        Assert.assertFalse(localSinkChannel.isFinished());
        Assert.assertEquals(11534336L, localSinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(11))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId), "exchange_0", 1048576L, Long.MAX_VALUE);
        int i2 = 0;
        while (!sharedTsBlockQueue.isEmpty()) {
            sharedTsBlockQueue.remove();
            i2++;
        }
        Assert.assertEquals(11L, i2);
        Assert.assertTrue(localSinkChannel.isFull().isDone());
        Assert.assertFalse(localSinkChannel.isFinished());
        Assert.assertEquals(0L, localSinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(11))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId), "exchange_0", 1048576L);
        localSinkChannel.setNoMoreTsBlocks();
        Assert.assertTrue(localSinkChannel.isFull().isDone());
        Assert.assertTrue(localSinkChannel.isFinished());
        ((MPPDataExchangeManager.SinkListener) Mockito.verify(sinkListener, Mockito.times(1))).onEndOfBlocks(localSinkChannel);
        ((MPPDataExchangeManager.SinkListener) Mockito.verify(sinkListener, Mockito.times(1))).onFinish(localSinkChannel);
    }

    @Test
    public void testAbort() {
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool memoryPool = (MemoryPool) Mockito.spy(new MemoryPool("test", 10485760L, 5242880L));
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(memoryPool);
        MPPDataExchangeManager.SinkListener sinkListener = (MPPDataExchangeManager.SinkListener) Mockito.mock(MPPDataExchangeManager.SinkListener.class);
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, "exchange_0", localMemoryManager, MoreExecutors.newDirectExecutorService());
        LocalSinkChannel localSinkChannel = new LocalSinkChannel(tFragmentInstanceId2, sharedTsBlockQueue, sinkListener);
        sharedTsBlockQueue.setMaxBytesCanReserve(Long.MAX_VALUE);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId2, "exchange_0", sharedTsBlockQueue, (MPPDataExchangeManager.SourceHandleListener) Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
        Assert.assertFalse(localSinkChannel.isFull().isDone());
        localSourceHandle.isBlocked();
        Assert.assertTrue(localSinkChannel.isFull().isDone());
        Assert.assertFalse(localSinkChannel.isFinished());
        Assert.assertFalse(localSinkChannel.isAborted());
        Assert.assertEquals(0L, localSinkChannel.getBufferRetainedSizeInBytes());
        int i = 0;
        while (localSinkChannel.isFull().isDone()) {
            localSinkChannel.send(Utils.createMockTsBlock(1048576L));
            i++;
        }
        Assert.assertEquals(11L, i);
        ListenableFuture isFull = localSinkChannel.isFull();
        Assert.assertFalse(isFull.isDone());
        Assert.assertFalse(localSinkChannel.isFinished());
        Assert.assertEquals(11534336L, localSinkChannel.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(11))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId), "exchange_0", 1048576L, Long.MAX_VALUE);
        localSinkChannel.abort();
        Assert.assertTrue(isFull.isDone());
        Assert.assertFalse(localSinkChannel.isFinished());
        Assert.assertTrue(localSinkChannel.isAborted());
        ((MPPDataExchangeManager.SinkListener) Mockito.verify(sinkListener, Mockito.times(1))).onAborted(localSinkChannel);
    }
}
