package org.apache.iotdb.db.queryengine.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/LocalSourceHandleTest.class */
public class LocalSourceHandleTest {
    @Test
    public void testReceive() {
        new TFragmentInstanceId("q0", 1, "0");
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(Utils.createMockNonBlockedMemoryPool());
        MPPDataExchangeManager.SourceHandleListener sourceHandleListener = (MPPDataExchangeManager.SourceHandleListener) Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, "exchange_0", localMemoryManager, MoreExecutors.newDirectExecutorService());
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId, "exchange_0", sharedTsBlockQueue, sourceHandleListener);
        Assert.assertFalse(localSourceHandle.isBlocked().isDone());
        Assert.assertFalse(localSourceHandle.isAborted());
        Assert.assertFalse(localSourceHandle.isFinished());
        Assert.assertEquals(0L, localSourceHandle.getBufferRetainedSizeInBytes());
        sharedTsBlockQueue.add(Utils.createMockTsBlock(1048576L));
        sharedTsBlockQueue.setNoMoreTsBlocks(true);
        Assert.assertTrue(localSourceHandle.isBlocked().isDone());
        Assert.assertFalse(localSourceHandle.isAborted());
        Assert.assertFalse(localSourceHandle.isFinished());
        Assert.assertEquals(1048576L, localSourceHandle.getBufferRetainedSizeInBytes());
        Assert.assertTrue(localSourceHandle.isBlocked().isDone());
        localSourceHandle.receive();
        Assert.assertTrue(localSourceHandle.isBlocked().isDone());
        Assert.assertFalse(localSourceHandle.isAborted());
        Assert.assertTrue(localSourceHandle.isFinished());
        ((MPPDataExchangeManager.SourceHandleListener) Mockito.verify(sourceHandleListener, Mockito.times(1))).onFinished(localSourceHandle);
    }

    @Test
    public void testAbort() {
        new TFragmentInstanceId("q0", 1, "0");
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(Utils.createMockNonBlockedMemoryPool());
        MPPDataExchangeManager.SourceHandleListener sourceHandleListener = (MPPDataExchangeManager.SourceHandleListener) Mockito.mock(MPPDataExchangeManager.SourceHandleListener.class);
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId, "exchange_0", new SharedTsBlockQueue(tFragmentInstanceId, "exchange_0", localMemoryManager, MoreExecutors.newDirectExecutorService()), sourceHandleListener);
        ListenableFuture isBlocked = localSourceHandle.isBlocked();
        Assert.assertFalse(isBlocked.isDone());
        Assert.assertFalse(localSourceHandle.isAborted());
        Assert.assertFalse(localSourceHandle.isFinished());
        Assert.assertEquals(0L, localSourceHandle.getBufferRetainedSizeInBytes());
        localSourceHandle.abort();
        Assert.assertTrue(isBlocked.isDone());
        Assert.assertTrue(localSourceHandle.isAborted());
        Assert.assertFalse(localSourceHandle.isFinished());
        ((MPPDataExchangeManager.SourceHandleListener) Mockito.verify(sourceHandleListener, Mockito.times(1))).onAborted(localSourceHandle);
    }
}
