package org.apache.iotdb.db.pipe.extractor;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.wal.utils.WALEntryHandler;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.class */
public class PipeRealtimeExtractTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class);
    private final String dataRegion1 = "1";
    private final String dataRegion2 = "2";
    private final String pattern1 = "root.sg.d";
    private final String pattern2 = "root.sg.d.a";
    private final String[] device = {"root", "sg", "d"};
    private final AtomicBoolean alive = new AtomicBoolean();
    private File tmpDir;
    private File tsFileDir;
    private ExecutorService writeService;
    private ExecutorService listenerService;

    @Before
    public void setUp() throws IOException {
        this.writeService = Executors.newFixedThreadPool(2);
        this.listenerService = Executors.newFixedThreadPool(4);
        this.tmpDir = new File(Files.createTempDirectory("pipeRealtimeExtractor", new FileAttribute[0]).toString());
        this.tsFileDir = new File(this.tmpDir.getPath() + File.separator + "sequence" + File.separator + "root.sg");
    }

    @After
    public void tearDown() {
        this.writeService.shutdownNow();
        this.listenerService.shutdownNow();
        FileUtils.deleteDirectory(this.tmpDir);
    }

    @Test
    public void testRealtimeExtractProcess() {
        try {
            PipeRealtimeDataRegionHybridExtractor pipeRealtimeDataRegionHybridExtractor = new PipeRealtimeDataRegionHybridExtractor();
            try {
                PipeRealtimeDataRegionHybridExtractor pipeRealtimeDataRegionHybridExtractor2 = new PipeRealtimeDataRegionHybridExtractor();
                try {
                    pipeRealtimeDataRegionHybridExtractor2 = new PipeRealtimeDataRegionHybridExtractor();
                    try {
                        pipeRealtimeDataRegionHybridExtractor2 = new PipeRealtimeDataRegionHybridExtractor();
                        try {
                            pipeRealtimeDataRegionHybridExtractor.customize(new PipeParameters(new HashMap<String, String>() { // from class: org.apache.iotdb.db.pipe.extractor.PipeRealtimeExtractTest.1
                                {
                                    put("extractor.pattern", "root.sg.d");
                                }
                            }), new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("1"), (PipeTaskMeta) null)));
                            pipeRealtimeDataRegionHybridExtractor2.customize(new PipeParameters(new HashMap<String, String>() { // from class: org.apache.iotdb.db.pipe.extractor.PipeRealtimeExtractTest.2
                                {
                                    put("extractor.pattern", "root.sg.d.a");
                                }
                            }), new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("1"), (PipeTaskMeta) null)));
                            pipeRealtimeDataRegionHybridExtractor2.customize(new PipeParameters(new HashMap<String, String>() { // from class: org.apache.iotdb.db.pipe.extractor.PipeRealtimeExtractTest.3
                                {
                                    put("extractor.pattern", "root.sg.d");
                                }
                            }), new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("2"), (PipeTaskMeta) null)));
                            pipeRealtimeDataRegionHybridExtractor2.customize(new PipeParameters(new HashMap<String, String>() { // from class: org.apache.iotdb.db.pipe.extractor.PipeRealtimeExtractTest.4
                                {
                                    put("extractor.pattern", "root.sg.d.a");
                                }
                            }), new PipeTaskRuntimeConfiguration(new PipeTaskExtractorRuntimeEnvironment("1", 1L, Integer.parseInt("2"), (PipeTaskMeta) null)));
                            PipeRealtimeDataRegionExtractor[] pipeRealtimeDataRegionExtractorArr = {pipeRealtimeDataRegionHybridExtractor, pipeRealtimeDataRegionHybridExtractor2, pipeRealtimeDataRegionHybridExtractor2, pipeRealtimeDataRegionHybridExtractor2};
                            pipeRealtimeDataRegionExtractorArr[0].start();
                            pipeRealtimeDataRegionExtractorArr[1].start();
                            List asList = Arrays.asList(write2DataRegion(10, "1", 0), write2DataRegion(10, "2", 0));
                            this.alive.set(true);
                            List asList2 = Arrays.asList(listen(pipeRealtimeDataRegionExtractorArr[0], event -> {
                                return Integer.valueOf(event instanceof TabletInsertionEvent ? 1 : 2);
                            }, 10 << 1), listen(pipeRealtimeDataRegionExtractorArr[1], event2 -> {
                                return 1;
                            }, 10));
                            try {
                                ((Future) asList2.get(0)).get(10L, TimeUnit.MINUTES);
                                ((Future) asList2.get(1)).get(10L, TimeUnit.MINUTES);
                            } catch (TimeoutException e) {
                                LOGGER.warn("Time out when listening extractor", e);
                                this.alive.set(false);
                                Assert.fail();
                            }
                            asList.forEach(future -> {
                                try {
                                    future.get();
                                } catch (InterruptedException | ExecutionException e2) {
                                    throw new RuntimeException(e2);
                                }
                            });
                            pipeRealtimeDataRegionExtractorArr[2].start();
                            pipeRealtimeDataRegionExtractorArr[3].start();
                            List asList3 = Arrays.asList(write2DataRegion(10, "1", 10), write2DataRegion(10, "2", 10));
                            this.alive.set(true);
                            List asList4 = Arrays.asList(listen(pipeRealtimeDataRegionExtractorArr[0], event3 -> {
                                return Integer.valueOf(event3 instanceof TabletInsertionEvent ? 1 : 2);
                            }, 10 << 1), listen(pipeRealtimeDataRegionExtractorArr[1], event4 -> {
                                return 1;
                            }, 10), listen(pipeRealtimeDataRegionExtractorArr[2], event5 -> {
                                return Integer.valueOf(event5 instanceof TabletInsertionEvent ? 1 : 2);
                            }, 10 << 1), listen(pipeRealtimeDataRegionExtractorArr[3], event6 -> {
                                return 1;
                            }, 10));
                            try {
                                ((Future) asList4.get(0)).get(10L, TimeUnit.MINUTES);
                                ((Future) asList4.get(1)).get(10L, TimeUnit.MINUTES);
                                ((Future) asList4.get(2)).get(10L, TimeUnit.MINUTES);
                                ((Future) asList4.get(3)).get(10L, TimeUnit.MINUTES);
                            } catch (TimeoutException e2) {
                                LOGGER.warn("Time out when listening extractor", e2);
                                this.alive.set(false);
                                Assert.fail();
                            }
                            asList3.forEach(future2 -> {
                                try {
                                    future2.get();
                                } catch (InterruptedException | ExecutionException e3) {
                                    throw new RuntimeException(e3);
                                }
                            });
                            pipeRealtimeDataRegionHybridExtractor2.close();
                            pipeRealtimeDataRegionHybridExtractor2.close();
                            pipeRealtimeDataRegionHybridExtractor2.close();
                            pipeRealtimeDataRegionHybridExtractor.close();
                        } finally {
                            try {
                                pipeRealtimeDataRegionHybridExtractor2.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th2) {
                    throw th2;
                }
            } finally {
            }
        } catch (Exception e3) {
            throw new RuntimeException(e3);
        }
    }

    private Future<?> write2DataRegion(int i, String str, int i2) {
        File file = new File(this.tsFileDir.getPath() + File.separator + str + File.separator + "0");
        file.mkdirs();
        return this.writeService.submit(() -> {
            for (int i3 = i2; i3 < i2 + i; i3++) {
                File file2 = new File(file, String.format("%s-%s-0-0.tsfile", Integer.valueOf(i3), Integer.valueOf(i3)));
                try {
                    file2.createNewFile();
                    TsFileResource tsFileResource = new TsFileResource(file2);
                    tsFileResource.updateStartTime(String.join(".", this.device), 0L);
                    PipeInsertionDataNodeListener.getInstance().listenToInsertNode(str, (WALEntryHandler) Mockito.mock(WALEntryHandler.class), new InsertRowNode(new PlanNodeId(String.valueOf(i3)), new PartialPath(this.device), false, new String[]{"a"}, (TSDataType[]) null, 0L, (Object[]) null, false), tsFileResource);
                    PipeInsertionDataNodeListener.getInstance().listenToInsertNode(str, (WALEntryHandler) Mockito.mock(WALEntryHandler.class), new InsertRowNode(new PlanNodeId(String.valueOf(i3)), new PartialPath(this.device), false, new String[]{"b"}, (TSDataType[]) null, 0L, (Object[]) null, false), tsFileResource);
                    PipeInsertionDataNodeListener.getInstance().listenToTsFile(str, tsFileResource);
                } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        });
    }

    private Future<?> listen(PipeRealtimeDataRegionExtractor pipeRealtimeDataRegionExtractor, Function<Event, Integer> function, int i) {
        return this.listenerService.submit(() -> {
            int i2 = 0;
            while (this.alive.get() && i2 < i) {
                try {
                    try {
                        Event supply = pipeRealtimeDataRegionExtractor.supply();
                        if (supply != null) {
                            i2 += ((Integer) function.apply(supply)).intValue();
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } finally {
                    Assert.assertEquals(i, i2);
                }
            }
        });
    }
}
