package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.class */
public class LocalSinkHandleTest {
    @Test
    public void testSend() {
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool memoryPool = (MemoryPool) Mockito.spy(new MemoryPool("test", 10485760L, 5242880L));
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(memoryPool);
        MPPDataExchangeManager.SinkHandleListener sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, "exchange_0", localMemoryManager);
        sharedTsBlockQueue.setMaxBytesCanReserve(Long.MAX_VALUE);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId2, tFragmentInstanceId, "exchange_0", sharedTsBlockQueue, (MPPDataExchangeManager.SourceHandleListener) Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
        LocalSinkHandle localSinkHandle = new LocalSinkHandle(tFragmentInstanceId, "exchange_0", tFragmentInstanceId2, sharedTsBlockQueue, sinkHandleListener);
        Assert.assertFalse(localSinkHandle.isFull().isDone());
        localSourceHandle.isBlocked();
        Assert.assertTrue(localSinkHandle.isFull().isDone());
        Assert.assertFalse(localSinkHandle.isFinished());
        Assert.assertFalse(localSinkHandle.isAborted());
        Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
        int i = 0;
        while (localSinkHandle.isFull().isDone()) {
            localSinkHandle.send(Utils.createMockTsBlock(1048576L));
            i++;
        }
        Assert.assertEquals(11L, i);
        Assert.assertFalse(localSinkHandle.isFull().isDone());
        Assert.assertFalse(localSinkHandle.isFinished());
        Assert.assertEquals(11534336L, localSinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(11))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId), "exchange_0", 1048576L, Long.MAX_VALUE);
        int i2 = 0;
        while (!sharedTsBlockQueue.isEmpty()) {
            sharedTsBlockQueue.remove();
            i2++;
        }
        Assert.assertEquals(11L, i2);
        Assert.assertTrue(localSinkHandle.isFull().isDone());
        Assert.assertFalse(localSinkHandle.isFinished());
        Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(11))).free("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId), "exchange_0", 1048576L);
        localSinkHandle.setNoMoreTsBlocks();
        Assert.assertTrue(localSinkHandle.isFull().isDone());
        Assert.assertTrue(localSinkHandle.isFinished());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.times(1))).onEndOfBlocks(localSinkHandle);
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.times(1))).onFinish(localSinkHandle);
    }

    @Test
    public void testAbort() {
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool memoryPool = (MemoryPool) Mockito.spy(new MemoryPool("test", 10485760L, 5242880L));
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(memoryPool);
        MPPDataExchangeManager.SinkHandleListener sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, "exchange_0", localMemoryManager);
        sharedTsBlockQueue.setMaxBytesCanReserve(Long.MAX_VALUE);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId2, tFragmentInstanceId, "exchange_0", sharedTsBlockQueue, (MPPDataExchangeManager.SourceHandleListener) Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class));
        LocalSinkHandle localSinkHandle = new LocalSinkHandle(tFragmentInstanceId, "exchange_0", tFragmentInstanceId2, sharedTsBlockQueue, sinkHandleListener);
        Assert.assertFalse(localSinkHandle.isFull().isDone());
        localSourceHandle.isBlocked();
        Assert.assertTrue(localSinkHandle.isFull().isDone());
        Assert.assertFalse(localSinkHandle.isFinished());
        Assert.assertFalse(localSinkHandle.isAborted());
        Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
        int i = 0;
        while (localSinkHandle.isFull().isDone()) {
            localSinkHandle.send(Utils.createMockTsBlock(1048576L));
            i++;
        }
        Assert.assertEquals(11L, i);
        ListenableFuture isFull = localSinkHandle.isFull();
        Assert.assertFalse(isFull.isDone());
        Assert.assertFalse(localSinkHandle.isFinished());
        Assert.assertEquals(11534336L, localSinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(memoryPool, Mockito.times(11))).reserve("q0", FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId), "exchange_0", 1048576L, Long.MAX_VALUE);
        localSinkHandle.abort();
        Assert.assertTrue(isFull.isDone());
        Assert.assertFalse(localSinkHandle.isFinished());
        Assert.assertTrue(localSinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.times(1))).onAborted(localSinkHandle);
    }
}
