package org.apache.iotdb.db.queryengine.execution.exchange;

import java.util.Collections;
import java.util.concurrent.Executors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManagerTest.class */
public class MPPDataExchangeManagerTest {
    @Test
    public void testCreateLocalSinkHandle() {
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 0, "0");
        FragmentInstanceContext fragmentInstanceContext = (FragmentInstanceContext) Mockito.mock(FragmentInstanceContext.class);
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn((MemoryPool) Mockito.spy(new MemoryPool("test", 10240L, 5120L)));
        MPPDataExchangeManager mPPDataExchangeManager = new MPPDataExchangeManager(localMemoryManager, new TsBlockSerdeFactory(), Executors.newSingleThreadExecutor(), new IClientManager.Factory().createClientManager(new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
        ISinkHandle createShuffleSinkHandle = mPPDataExchangeManager.createShuffleSinkHandle(Collections.singletonList(new DownStreamChannelLocation(new TEndPoint(IoTDBDescriptor.getInstance().getConfig().getInternalAddress(), IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort()), tFragmentInstanceId2, "exchange_0")), new DownStreamChannelIndex(0), ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN, tFragmentInstanceId, "shuffleSink_0", fragmentInstanceContext);
        Assert.assertTrue(createShuffleSinkHandle instanceof ShuffleSinkHandle);
        LocalSourceHandle createLocalSourceHandleForFragment = mPPDataExchangeManager.createLocalSourceHandleForFragment(tFragmentInstanceId2, "exchange_0", "shuffleSink_0", tFragmentInstanceId, 0, th -> {
        });
        Assert.assertTrue(createLocalSourceHandleForFragment instanceof LocalSourceHandle);
        Assert.assertEquals(createShuffleSinkHandle.getChannel(0).getSharedTsBlockQueue(), createLocalSourceHandleForFragment.getSharedTsBlockQueue());
    }

    @Test
    public void testCreateLocalSourceHandle() {
        TFragmentInstanceId tFragmentInstanceId = new TFragmentInstanceId("q0", 1, "0");
        TFragmentInstanceId tFragmentInstanceId2 = new TFragmentInstanceId("q0", 0, "0");
        FragmentInstanceContext fragmentInstanceContext = (FragmentInstanceContext) Mockito.mock(FragmentInstanceContext.class);
        LocalMemoryManager localMemoryManager = (LocalMemoryManager) Mockito.mock(LocalMemoryManager.class);
        Mockito.when(localMemoryManager.getQueryPool()).thenReturn((MemoryPool) Mockito.spy(new MemoryPool("test", 10240L, 5120L)));
        MPPDataExchangeManager mPPDataExchangeManager = new MPPDataExchangeManager(localMemoryManager, new TsBlockSerdeFactory(), Executors.newSingleThreadExecutor(), new IClientManager.Factory().createClientManager(new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
        LocalSourceHandle createLocalSourceHandleForFragment = mPPDataExchangeManager.createLocalSourceHandleForFragment(tFragmentInstanceId, "exchange_0", "shuffleSink_0", tFragmentInstanceId2, 0, th -> {
        });
        Assert.assertTrue(createLocalSourceHandleForFragment instanceof LocalSourceHandle);
        ISinkHandle createShuffleSinkHandle = mPPDataExchangeManager.createShuffleSinkHandle(Collections.singletonList(new DownStreamChannelLocation(new TEndPoint(IoTDBDescriptor.getInstance().getConfig().getInternalAddress(), IoTDBDescriptor.getInstance().getConfig().getMppDataExchangePort()), tFragmentInstanceId, "exchange_0")), new DownStreamChannelIndex(0), ShuffleSinkHandle.ShuffleStrategyEnum.PLAIN, tFragmentInstanceId2, "shuffleSink_0", fragmentInstanceContext);
        Assert.assertTrue(createShuffleSinkHandle instanceof ShuffleSinkHandle);
        Assert.assertEquals(createShuffleSinkHandle.getChannel(0).getSharedTsBlockQueue(), createLocalSourceHandleForFragment.getSharedTsBlockQueue());
    }
}
