package org.apache.iotdb.db.mpp.execution;

import io.airlift.units.Duration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
import org.apache.iotdb.db.mpp.execution.driver.DataDriverContext;
import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.exchange.StubSink;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.schedule.DriverTaskThread;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/DataDriverTest.class */
public class DataDriverTest {
    private static final String DATA_DRIVER_TEST_SG = "root.DataDriverTest";
    private final List<String> deviceIds = new ArrayList();
    private final List<MeasurementSchema> measurementSchemas = new ArrayList();
    private final List<TsFileResource> seqResources = new ArrayList();
    private final List<TsFileResource> unSeqResources = new ArrayList();

    @Before
    public void setUp() throws MetadataException, IOException, WriteProcessException {
        SeriesReaderTestUtil.setUp(this.measurementSchemas, this.deviceIds, this.seqResources, this.unSeqResources, DATA_DRIVER_TEST_SG);
    }

    @After
    public void tearDown() throws IOException {
        SeriesReaderTestUtil.tearDown(this.seqResources, this.unSeqResources);
    }

    @Test
    public void batchTest() {
        ExecutorService newFixedThreadPool = IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
        try {
            try {
                MeasurementPath measurementPath = new MeasurementPath("root.DataDriverTest.device0.sensor0", TSDataType.INT32);
                HashSet hashSet = new HashSet();
                hashSet.add("sensor0");
                hashSet.add("sensor1");
                FragmentInstanceId fragmentInstanceId = new FragmentInstanceId(new PlanFragmentId(new QueryId("stub_query"), 0), "stub-instance");
                FragmentInstanceStateMachine fragmentInstanceStateMachine = new FragmentInstanceStateMachine(fragmentInstanceId, newFixedThreadPool);
                DataRegion dataRegion = (DataRegion) Mockito.mock(DataRegion.class);
                FragmentInstanceContext createFragmentInstanceContext = FragmentInstanceContext.createFragmentInstanceContext(fragmentInstanceId, fragmentInstanceStateMachine);
                createFragmentInstanceContext.setDataRegion(dataRegion);
                DataDriverContext dataDriverContext = new DataDriverContext(createFragmentInstanceContext, 0);
                PlanNodeId planNodeId = new PlanNodeId("1");
                dataDriverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName());
                PlanNodeId planNodeId2 = new PlanNodeId("2");
                dataDriverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
                dataDriverContext.addOperatorContext(3, new PlanNodeId("3"), RowBasedTimeJoinOperator.class.getSimpleName());
                dataDriverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());
                SeriesScanOptions.Builder builder = new SeriesScanOptions.Builder();
                builder.withAllSensors(hashSet);
                Operator seriesScanOperator = new SeriesScanOperator((OperatorContext) dataDriverContext.getOperatorContexts().get(0), planNodeId, measurementPath, Ordering.ASC, builder.build());
                dataDriverContext.addSourceOperator(seriesScanOperator);
                dataDriverContext.addPath(measurementPath);
                seriesScanOperator.getOperatorContext().setMaxRunTime(new Duration(500.0d, TimeUnit.MILLISECONDS));
                MeasurementPath measurementPath2 = new MeasurementPath("root.DataDriverTest.device0.sensor1", TSDataType.INT32);
                Operator seriesScanOperator2 = new SeriesScanOperator((OperatorContext) dataDriverContext.getOperatorContexts().get(1), planNodeId2, measurementPath2, Ordering.ASC, builder.build());
                dataDriverContext.addSourceOperator(seriesScanOperator2);
                dataDriverContext.addPath(measurementPath2);
                seriesScanOperator2.getOperatorContext().setMaxRunTime(new Duration(500.0d, TimeUnit.MILLISECONDS));
                LimitOperator limitOperator = new LimitOperator((OperatorContext) dataDriverContext.getOperatorContexts().get(3), 250L, new RowBasedTimeJoinOperator((OperatorContext) dataDriverContext.getOperatorContexts().get(2), Arrays.asList(seriesScanOperator, seriesScanOperator2), Ordering.ASC, Arrays.asList(TSDataType.INT32, TSDataType.INT32), Arrays.asList(new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())), new AscTimeComparator()));
                Mockito.when(dataRegion.query(dataDriverContext.getPaths(), "root.DataDriverTest.device0", createFragmentInstanceContext, (Filter) null)).thenReturn(new QueryDataSource(this.seqResources, this.unSeqResources));
                createFragmentInstanceContext.initQueryDataSource(dataDriverContext.getPaths());
                createFragmentInstanceContext.initializeNumOfDrivers(1);
                StubSink stubSink = new StubSink(createFragmentInstanceContext);
                dataDriverContext.setSink(stubSink);
                IDriver iDriver = null;
                try {
                    iDriver = new DataDriver(limitOperator, dataDriverContext);
                    Assert.assertEquals(createFragmentInstanceContext.getId(), iDriver.getDriverTaskId().getFragmentInstanceId());
                    Assert.assertFalse(iDriver.isFinished());
                    while (!iDriver.isFinished()) {
                        Assert.assertEquals(FragmentInstanceState.RUNNING, fragmentInstanceStateMachine.getState());
                        Assert.assertTrue(iDriver.processFor(DriverTaskThread.EXECUTION_TIME_SLICE).isDone());
                    }
                    Assert.assertEquals(FragmentInstanceState.FLUSHING, fragmentInstanceStateMachine.getState());
                    int i = 0;
                    for (TsBlock tsBlock : stubSink.getTsBlocks()) {
                        Assert.assertEquals(2L, tsBlock.getValueColumnCount());
                        Assert.assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
                        Assert.assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
                        int i2 = 0;
                        while (i2 < tsBlock.getPositionCount()) {
                            Assert.assertEquals(i, tsBlock.getTimeByIndex(i2));
                            if (i < 200) {
                                Assert.assertEquals(20000 + i, tsBlock.getColumn(0).getInt(i2));
                                Assert.assertEquals(20000 + i, tsBlock.getColumn(1).getInt(i2));
                            } else if (i < 260 || ((i >= 300 && i < 380) || i >= 400)) {
                                Assert.assertEquals(10000 + i, tsBlock.getColumn(0).getInt(i2));
                                Assert.assertEquals(10000 + i, tsBlock.getColumn(1).getInt(i2));
                            } else {
                                Assert.assertEquals(i, tsBlock.getColumn(0).getInt(i2));
                                Assert.assertEquals(i, tsBlock.getColumn(1).getInt(i2));
                            }
                            i2++;
                            i++;
                        }
                    }
                    Assert.assertEquals(250L, i);
                    if (iDriver != null) {
                        iDriver.close();
                    }
                    newFixedThreadPool.shutdown();
                } catch (Throwable th) {
                    if (iDriver != null) {
                        iDriver.close();
                    }
                    throw th;
                }
            } catch (IllegalPathException | QueryProcessException e) {
                e.printStackTrace();
                Assert.fail();
                newFixedThreadPool.shutdown();
            }
        } catch (Throwable th2) {
            newFixedThreadPool.shutdown();
            throw th2;
        }
    }
}
