package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.class */
public class SinkHandleTest {
    @Test
    public void testOneTimeNotBlockedSend() {
        TEndPoint tEndPoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool createMockNonBlockedMemoryPool = Utils.createMockNonBlockedMemoryPool();
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(createMockNonBlockedMemoryPool);
        IClientManager iClientManager = (IClientManager) Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient syncDataNodeMPPDataExchangeServiceClient = (SyncDataNodeMPPDataExchangeServiceClient) Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((SyncDataNodeMPPDataExchangeServiceClient) iClientManager.borrowClient(tEndPoint)).thenReturn(syncDataNodeMPPDataExchangeServiceClient);
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doNothing().when(syncDataNodeMPPDataExchangeServiceClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent) Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doNothing().when(syncDataNodeMPPDataExchangeServiceClient)).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.any(TNewDataBlockEvent.class));
        } catch (TException | IOException e) {
            e.printStackTrace();
            Assert.fail();
        }
        MPPDataExchangeManager.SinkHandleListener sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> createMockTsBlocks = Utils.createMockTsBlocks(1, 1048576L);
        SinkHandle sinkHandle = new SinkHandle(tEndPoint, tFragmentInstanceId, "exchange_0", tFragmentInstanceId2, localMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(1048576L), sinkHandleListener, iClientManager);
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(0L, sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(createMockTsBlocks.get(0));
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(1048576 + TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(1L, sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool) Mockito.verify(createMockNonBlockedMemoryPool, Mockito.timeout(100000L).times(2))).reserve("q0", 1048576L);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.verify(syncDataNodeMPPDataExchangeServiceClient, Mockito.timeout(10000L).times(1))).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.argThat(tNewDataBlockEvent -> {
                return tFragmentInstanceId.equals(tNewDataBlockEvent.getTargetFragmentInstanceId()) && "exchange_0".equals(tNewDataBlockEvent.getTargetPlanNodeId()) && tFragmentInstanceId2.equals(tNewDataBlockEvent.getSourceFragmentInstanceId()) && tNewDataBlockEvent.getStartSequenceId() == 0 && tNewDataBlockEvent.getBlockSizes().size() == 1;
            }));
        } catch (TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        for (int i = 0; i < 1; i++) {
            try {
                sinkHandle.getSerializedTsBlock(i);
            } catch (IOException e3) {
                e3.printStackTrace();
                Assert.fail();
            }
            Assert.assertTrue(sinkHandle.isFull().isDone());
        }
        Assert.assertFalse(sinkHandle.isFinished());
        sinkHandle.setNoMoreTsBlocks();
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(10000L).times(1))).onEndOfBlocks(sinkHandle);
        sinkHandle.acknowledgeTsBlock(0, 1);
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertTrue(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(1048576L, sinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(createMockNonBlockedMemoryPool, Mockito.timeout(100000L).times(1))).free("q0", 1048576L);
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(100000L).times(1))).onFinish(sinkHandle);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.verify(syncDataNodeMPPDataExchangeServiceClient, Mockito.timeout(10000L).times(1))).onEndOfDataBlockEvent((TEndOfDataBlockEvent) Mockito.argThat(tEndOfDataBlockEvent -> {
                return tFragmentInstanceId.equals(tEndOfDataBlockEvent.getTargetFragmentInstanceId()) && "exchange_0".equals(tEndOfDataBlockEvent.getTargetPlanNodeId()) && tFragmentInstanceId2.equals(tEndOfDataBlockEvent.getSourceFragmentInstanceId()) && 0 == tEndOfDataBlockEvent.getLastSequenceId();
            }));
        } catch (TException e4) {
            e4.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testMultiTimesBlockedSend() {
        TEndPoint tEndPoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool createMockBlockedMemoryPool = Utils.createMockBlockedMemoryPool("q0", 1, 1048576L);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(createMockBlockedMemoryPool);
        MPPDataExchangeManager.SinkHandleListener sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> createMockTsBlocks = Utils.createMockTsBlocks(1, 1048576L);
        IClientManager iClientManager = (IClientManager) Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient syncDataNodeMPPDataExchangeServiceClient = (SyncDataNodeMPPDataExchangeServiceClient) Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((SyncDataNodeMPPDataExchangeServiceClient) iClientManager.borrowClient(tEndPoint)).thenReturn(syncDataNodeMPPDataExchangeServiceClient);
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doNothing().when(syncDataNodeMPPDataExchangeServiceClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent) Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doNothing().when(syncDataNodeMPPDataExchangeServiceClient)).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.any(TNewDataBlockEvent.class));
        } catch (TException | IOException e) {
            e.printStackTrace();
            Assert.fail();
        }
        SinkHandle sinkHandle = new SinkHandle(tEndPoint, tFragmentInstanceId, "exchange_0", tFragmentInstanceId2, localMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(1048576L), sinkHandleListener, iClientManager);
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(0L, sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(createMockTsBlocks.get(0));
        Assert.assertFalse(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(1048576 + TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(1L, sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool) Mockito.verify(createMockBlockedMemoryPool, Mockito.timeout(100000L).times(2))).reserve("q0", 1048576L);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.verify(syncDataNodeMPPDataExchangeServiceClient, Mockito.timeout(10000L).times(1))).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.argThat(tNewDataBlockEvent -> {
                return tFragmentInstanceId.equals(tNewDataBlockEvent.getTargetFragmentInstanceId()) && "exchange_0".equals(tNewDataBlockEvent.getTargetPlanNodeId()) && tFragmentInstanceId2.equals(tNewDataBlockEvent.getSourceFragmentInstanceId()) && tNewDataBlockEvent.getStartSequenceId() == 0 && tNewDataBlockEvent.getBlockSizes().size() == 1;
            }));
        } catch (TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        for (int i = 0; i < 1; i++) {
            try {
                sinkHandle.getSerializedTsBlock(i);
            } catch (IOException e3) {
                e3.printStackTrace();
                Assert.fail();
            }
            Assert.assertFalse(sinkHandle.isFull().isDone());
        }
        Assert.assertFalse(sinkHandle.isFinished());
        sinkHandle.acknowledgeTsBlock(0, 1);
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(createMockBlockedMemoryPool, Mockito.timeout(100000L).times(1))).free("q0", 1048576L);
        sinkHandle.send(createMockTsBlocks.get(0));
        Assert.assertFalse(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(1048576 + TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(1L, sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool) Mockito.verify(createMockBlockedMemoryPool, Mockito.timeout(100000L).times(3))).reserve("q0", 1048576L);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.verify(syncDataNodeMPPDataExchangeServiceClient, Mockito.timeout(10000L).times(1))).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.argThat(tNewDataBlockEvent2 -> {
                return tFragmentInstanceId.equals(tNewDataBlockEvent2.getTargetFragmentInstanceId()) && "exchange_0".equals(tNewDataBlockEvent2.getTargetPlanNodeId()) && tFragmentInstanceId2.equals(tNewDataBlockEvent2.getSourceFragmentInstanceId()) && tNewDataBlockEvent2.getStartSequenceId() == 1 && tNewDataBlockEvent2.getBlockSizes().size() == 1;
            }));
        } catch (TException e4) {
            e4.printStackTrace();
            Assert.fail();
        }
        sinkHandle.setNoMoreTsBlocks();
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(10000L).times(1))).onEndOfBlocks(sinkHandle);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.verify(syncDataNodeMPPDataExchangeServiceClient, Mockito.timeout(10000L).times(1))).onEndOfDataBlockEvent((TEndOfDataBlockEvent) Mockito.argThat(tEndOfDataBlockEvent -> {
                return tFragmentInstanceId.equals(tEndOfDataBlockEvent.getTargetFragmentInstanceId()) && "exchange_0".equals(tEndOfDataBlockEvent.getTargetPlanNodeId()) && tFragmentInstanceId2.equals(tEndOfDataBlockEvent.getSourceFragmentInstanceId()) && 1 == tEndOfDataBlockEvent.getLastSequenceId();
            }));
        } catch (TException e5) {
            e5.printStackTrace();
            Assert.fail();
        }
        for (int i2 = 1; i2 < 2; i2++) {
            try {
                sinkHandle.getSerializedTsBlock(i2);
            } catch (IOException e6) {
                e6.printStackTrace();
                Assert.fail();
            }
        }
        Assert.assertFalse(sinkHandle.isFinished());
        sinkHandle.acknowledgeTsBlock(1, 2);
        Assert.assertTrue(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        ((MemoryPool) Mockito.verify(createMockBlockedMemoryPool, Mockito.timeout(100000L).times(2))).free("q0", 1048576L);
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(100000L).times(1))).onFinish(sinkHandle);
    }

    @Test
    public void testFailedSend() {
        TEndPoint tEndPoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool createMockBlockedMemoryPool = Utils.createMockBlockedMemoryPool("q0", 1, 1048576L);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(createMockBlockedMemoryPool);
        MPPDataExchangeManager.SinkHandleListener sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> createMockTsBlocks = Utils.createMockTsBlocks(1, 1048576L);
        IClientManager iClientManager = (IClientManager) Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient syncDataNodeMPPDataExchangeServiceClient = (SyncDataNodeMPPDataExchangeServiceClient) Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        Throwable tException = new TException("Mock exception");
        try {
            Mockito.when((SyncDataNodeMPPDataExchangeServiceClient) iClientManager.borrowClient(tEndPoint)).thenReturn(syncDataNodeMPPDataExchangeServiceClient);
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doThrow(new Throwable[]{tException}).when(syncDataNodeMPPDataExchangeServiceClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent) Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doThrow(new Throwable[]{tException}).when(syncDataNodeMPPDataExchangeServiceClient)).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.any(TNewDataBlockEvent.class));
        } catch (TException | IOException e) {
            e.printStackTrace();
            Assert.fail();
        }
        SinkHandle sinkHandle = new SinkHandle(tEndPoint, tFragmentInstanceId, "exchange_0", tFragmentInstanceId2, localMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(1048576L), sinkHandleListener, iClientManager);
        sinkHandle.setRetryIntervalInMs(0L);
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(0L, sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(createMockTsBlocks.get(0));
        Assert.assertFalse(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(1048576 + TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(1L, sinkHandle.getNumOfBufferedTsBlocks());
        ((MemoryPool) Mockito.verify(createMockBlockedMemoryPool, Mockito.timeout(100000L).times(2))).reserve("q0", 1048576L);
        try {
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.verify(syncDataNodeMPPDataExchangeServiceClient, Mockito.timeout(10000L).times(3))).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.argThat(tNewDataBlockEvent -> {
                return tFragmentInstanceId.equals(tNewDataBlockEvent.getTargetFragmentInstanceId()) && "exchange_0".equals(tNewDataBlockEvent.getTargetPlanNodeId()) && tFragmentInstanceId2.equals(tNewDataBlockEvent.getSourceFragmentInstanceId()) && tNewDataBlockEvent.getStartSequenceId() == 0 && tNewDataBlockEvent.getBlockSizes().size() == 1;
            }));
        } catch (TException e2) {
            e2.printStackTrace();
            Assert.fail();
        }
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(10000L).times(1))).onFailure(sinkHandle, tException);
        sinkHandle.setNoMoreTsBlocks();
        Assert.assertFalse(sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(10000L).times(0))).onEndOfBlocks(sinkHandle);
        sinkHandle.abort();
        Assert.assertTrue(sinkHandle.isAborted());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(100000L).times(1))).onAborted(sinkHandle);
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(100000L).times(0))).onFinish(sinkHandle);
    }

    @Test
    public void testAbort() {
        TEndPoint tEndPoint = new TEndPoint("remote", IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort());
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 0, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 1, "0");
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        MemoryPool memoryPool = (MemoryPool) Mockito.spy(new MemoryPool("test", 1048576L, 1048576L));
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn(memoryPool);
        MPPDataExchangeManager.SinkHandleListener sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener) Mockito.mock(MPPDataExchangeManager.SinkHandleListener.class);
        List<TsBlock> createMockTsBlocks = Utils.createMockTsBlocks(1, 1048576L);
        IClientManager iClientManager = (IClientManager) Mockito.mock(IClientManager.class);
        SyncDataNodeMPPDataExchangeServiceClient syncDataNodeMPPDataExchangeServiceClient = (SyncDataNodeMPPDataExchangeServiceClient) Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
        try {
            Mockito.when((SyncDataNodeMPPDataExchangeServiceClient) iClientManager.borrowClient(tEndPoint)).thenReturn(syncDataNodeMPPDataExchangeServiceClient);
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doNothing().when(syncDataNodeMPPDataExchangeServiceClient)).onEndOfDataBlockEvent((TEndOfDataBlockEvent) Mockito.any(TEndOfDataBlockEvent.class));
            ((SyncDataNodeMPPDataExchangeServiceClient) Mockito.doNothing().when(syncDataNodeMPPDataExchangeServiceClient)).onNewDataBlockEvent((TNewDataBlockEvent) Mockito.any(TNewDataBlockEvent.class));
        } catch (TException | IOException e) {
            e.printStackTrace();
            Assert.fail();
        }
        SinkHandle sinkHandle = new SinkHandle(tEndPoint, tFragmentInstanceId, "exchange_0", tFragmentInstanceId2, localMemoryManager, Executors.newSingleThreadExecutor(), Utils.createMockTsBlockSerde(1048576L), sinkHandleListener, iClientManager);
        Assert.assertTrue(sinkHandle.isFull().isDone());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(0L, sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.send(createMockTsBlocks.get(0));
        ListenableFuture isFull = sinkHandle.isFull();
        Assert.assertFalse(isFull.isDone());
        Assert.assertFalse(isFull.isCancelled());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertFalse(sinkHandle.isAborted());
        Assert.assertEquals(1048576 + TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(1L, sinkHandle.getNumOfBufferedTsBlocks());
        sinkHandle.abort();
        Assert.assertTrue(isFull.isDone());
        Assert.assertTrue(isFull.isCancelled());
        Assert.assertFalse(sinkHandle.isFinished());
        Assert.assertTrue(sinkHandle.isAborted());
        Assert.assertEquals(0L, sinkHandle.getBufferRetainedSizeInBytes());
        Assert.assertEquals(0L, sinkHandle.getNumOfBufferedTsBlocks());
        ((MPPDataExchangeManager.SinkHandleListener) Mockito.verify(sinkHandleListener, Mockito.timeout(100000L).times(1))).onAborted(sinkHandle);
        Assert.assertEquals(0L, memoryPool.getQueryMemoryReservedBytes("q0"));
    }
}
