package org.apache.iotdb.db.sync.pipedata;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.utils.SyncPathUtil;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.class */
public class BufferedPipeDataQueueTest {
    private static final Logger logger = LoggerFactory.getLogger(BufferedPipeDataQueueTest.class);
    File pipeLogDir = new File(SyncPathUtil.getReceiverPipeLogDir("pipe", "192.168.0.11", System.currentTimeMillis()));

    @Before
    public void setUp() throws Exception {
        if (this.pipeLogDir.exists()) {
            return;
        }
        this.pipeLogDir.mkdirs();
    }

    @After
    public void tearDown() throws IOException, StorageEngineException {
        FileUtils.deleteDirectory(this.pipeLogDir);
    }

    @Test
    public void testRecoveryAndClear() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            dataOutputStream.writeLong(1L);
            dataOutputStream.close();
            DataOutputStream dataOutputStream2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0L)), false));
            for (int i = 0; i < 4; i++) {
                new TsFilePipeData("", i).serialize(dataOutputStream2);
            }
            dataOutputStream2.close();
            DataOutputStream dataOutputStream3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4L)), false));
            for (int i2 = 4; i2 < 11; i2++) {
                new TsFilePipeData("", i2).serialize(dataOutputStream3);
            }
            dataOutputStream3.close();
            new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11L)), false)).close();
            BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            Assert.assertEquals(1L, bufferedPipeDataQueue.getCommitSerialNumber());
            Assert.assertEquals(10L, bufferedPipeDataQueue.getLastMaxSerialNumber());
            bufferedPipeDataQueue.clear();
            Assert.assertFalse(this.pipeLogDir.exists());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTake() {
        BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
        ArrayList arrayList = new ArrayList();
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.execute(() -> {
            try {
                arrayList.add(bufferedPipeDataQueue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        newSingleThreadExecutor.shutdownNow();
        Assert.assertEquals(0L, arrayList.size());
    }

    @Test
    public void testTakeAndOffer() {
        BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
        try {
            ArrayList arrayList = new ArrayList();
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            newSingleThreadExecutor.execute(() -> {
                try {
                    arrayList.add(bufferedPipeDataQueue.take());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            bufferedPipeDataQueue.offer(new TsFilePipeData("", 0L));
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            newSingleThreadExecutor.shutdownNow();
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
                Assert.fail();
            }
            Assert.assertEquals(1L, arrayList.size());
            bufferedPipeDataQueue.clear();
        } catch (Throwable th) {
            bufferedPipeDataQueue.clear();
            throw th;
        }
    }

    @Test
    public void testOfferNewPipe() {
        BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
        try {
            TsFilePipeData tsFilePipeData = new TsFilePipeData("fakePath", 1L);
            bufferedPipeDataQueue.offer(tsFilePipeData);
            ArrayList arrayList = new ArrayList();
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            newSingleThreadExecutor.execute(() -> {
                try {
                    arrayList.add(bufferedPipeDataQueue.take());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            newSingleThreadExecutor.shutdownNow();
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
                Assert.fail();
            }
            Assert.assertEquals(1L, arrayList.size());
            Assert.assertEquals(tsFilePipeData, arrayList.get(0));
            bufferedPipeDataQueue.clear();
        } catch (Throwable th) {
            bufferedPipeDataQueue.clear();
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testOfferAfterRecoveryWithEmptyPipeLog() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            dataOutputStream.writeLong(1L);
            dataOutputStream.close();
            ArrayList arrayList = new ArrayList();
            DataOutputStream dataOutputStream2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0L)), false));
            for (int i = 0; i < 4; i++) {
                TsFilePipeData tsFilePipeData = new TsFilePipeData("fake" + i, i);
                arrayList.add(tsFilePipeData);
                tsFilePipeData.serialize(dataOutputStream2);
            }
            dataOutputStream2.close();
            DataOutputStream dataOutputStream3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4L)), false));
            for (int i2 = 4; i2 < 8; i2++) {
                DeletionPipeData deletionPipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i2), 0L, 99L), i2);
                arrayList.add(deletionPipeData);
                deletionPipeData.serialize(dataOutputStream3);
            }
            for (int i3 = 8; i3 < 11; i3++) {
                DeletionPipeData deletionPipeData2 = new DeletionPipeData(new Deletion(new PartialPath("fake" + i3), 0L, 99L), i3);
                arrayList.add(deletionPipeData2);
                deletionPipeData2.serialize(dataOutputStream3);
            }
            dataOutputStream3.close();
            new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11L)), false)).close();
            BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            try {
                Assert.assertEquals(1L, bufferedPipeDataQueue.getCommitSerialNumber());
                Assert.assertEquals(10L, bufferedPipeDataQueue.getLastMaxSerialNumber());
                TsFilePipeData tsFilePipeData2 = new TsFilePipeData("fake11", 11L);
                arrayList.add(tsFilePipeData2);
                bufferedPipeDataQueue.offer(tsFilePipeData2);
                ArrayList arrayList2 = new ArrayList();
                ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                newSingleThreadExecutor.execute(() -> {
                    while (true) {
                        try {
                            arrayList2.add(bufferedPipeDataQueue.take());
                            bufferedPipeDataQueue.commit();
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                });
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                newSingleThreadExecutor.shutdownNow();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                    Assert.fail();
                }
                Assert.assertEquals(10L, arrayList2.size());
                for (int i4 = 0; i4 < 10; i4++) {
                    Assert.assertEquals(arrayList.get(i4 + 2), arrayList2.get(i4));
                }
                bufferedPipeDataQueue.clear();
            } catch (Throwable th) {
                bufferedPipeDataQueue.clear();
                throw th;
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testRecoveryWithEmptyPipeLog() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            dataOutputStream.writeLong(1L);
            dataOutputStream.close();
            ArrayList arrayList = new ArrayList();
            DataOutputStream dataOutputStream2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0L)), false));
            for (int i = 0; i < 4; i++) {
                TsFilePipeData tsFilePipeData = new TsFilePipeData("fake" + i, i);
                arrayList.add(tsFilePipeData);
                tsFilePipeData.serialize(dataOutputStream2);
            }
            dataOutputStream2.close();
            DataOutputStream dataOutputStream3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4L)), false));
            for (int i2 = 4; i2 < 8; i2++) {
                DeletionPipeData deletionPipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i2), 0L, 99L), i2);
                arrayList.add(deletionPipeData);
                deletionPipeData.serialize(dataOutputStream3);
            }
            for (int i3 = 8; i3 < 11; i3++) {
                DeletionPipeData deletionPipeData2 = new DeletionPipeData(new Deletion(new PartialPath("fake" + i3), 0L, 99L), i3);
                arrayList.add(deletionPipeData2);
                deletionPipeData2.serialize(dataOutputStream3);
            }
            dataOutputStream3.close();
            new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(11L)), false)).close();
            BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            try {
                Assert.assertEquals(1L, bufferedPipeDataQueue.getCommitSerialNumber());
                Assert.assertEquals(10L, bufferedPipeDataQueue.getLastMaxSerialNumber());
                ArrayList arrayList2 = new ArrayList();
                ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                newSingleThreadExecutor.execute(() -> {
                    while (true) {
                        try {
                            arrayList2.add(bufferedPipeDataQueue.take());
                            bufferedPipeDataQueue.commit();
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                });
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                newSingleThreadExecutor.shutdownNow();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                    Assert.fail();
                }
                Assert.assertEquals(9L, arrayList2.size());
                for (int i4 = 0; i4 < 9; i4++) {
                    Assert.assertEquals(arrayList.get(i4 + 2), arrayList2.get(i4));
                }
                bufferedPipeDataQueue.clear();
            } catch (Throwable th) {
                bufferedPipeDataQueue.clear();
                throw th;
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testRecoveryWithoutEmptyPipeLog() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            dataOutputStream.writeLong(1L);
            dataOutputStream.close();
            ArrayList arrayList = new ArrayList();
            DataOutputStream dataOutputStream2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0L)), false));
            for (int i = 0; i < 4; i++) {
                TsFilePipeData tsFilePipeData = new TsFilePipeData("fake" + i, i);
                arrayList.add(tsFilePipeData);
                tsFilePipeData.serialize(dataOutputStream2);
            }
            dataOutputStream2.close();
            DataOutputStream dataOutputStream3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4L)), false));
            for (int i2 = 4; i2 < 8; i2++) {
                DeletionPipeData deletionPipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i2), 0L, 99L), i2);
                arrayList.add(deletionPipeData);
                deletionPipeData.serialize(dataOutputStream3);
            }
            for (int i3 = 8; i3 < 11; i3++) {
                DeletionPipeData deletionPipeData2 = new DeletionPipeData(new Deletion(new PartialPath("fake" + i3), 0L, 99L), i3);
                arrayList.add(deletionPipeData2);
                deletionPipeData2.serialize(dataOutputStream3);
            }
            dataOutputStream3.close();
            BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            try {
                Assert.assertEquals(1L, bufferedPipeDataQueue.getCommitSerialNumber());
                Assert.assertEquals(10L, bufferedPipeDataQueue.getLastMaxSerialNumber());
                ArrayList arrayList2 = new ArrayList();
                ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                newSingleThreadExecutor.execute(() -> {
                    while (true) {
                        try {
                            arrayList2.add(bufferedPipeDataQueue.take());
                            bufferedPipeDataQueue.commit();
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                });
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                newSingleThreadExecutor.shutdownNow();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                    Assert.fail();
                }
                Assert.assertEquals(9L, arrayList2.size());
                for (int i4 = 0; i4 < 9; i4++) {
                    Assert.assertEquals(arrayList.get(i4 + 2), arrayList2.get(i4));
                }
                bufferedPipeDataQueue.clear();
            } catch (Throwable th) {
                bufferedPipeDataQueue.clear();
                throw th;
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testOfferWhileTaking() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            dataOutputStream.writeLong(1L);
            dataOutputStream.close();
            ArrayList arrayList = new ArrayList();
            DataOutputStream dataOutputStream2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0L)), false));
            for (int i = 0; i < 4; i++) {
                TsFilePipeData tsFilePipeData = new TsFilePipeData("fake" + i, i);
                arrayList.add(tsFilePipeData);
                tsFilePipeData.serialize(dataOutputStream2);
            }
            dataOutputStream2.close();
            DataOutputStream dataOutputStream3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4L)), false));
            for (int i2 = 4; i2 < 8; i2++) {
                DeletionPipeData deletionPipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i2), 0L, 99L), i2);
                arrayList.add(deletionPipeData);
                deletionPipeData.serialize(dataOutputStream3);
            }
            for (int i3 = 8; i3 < 11; i3++) {
                DeletionPipeData deletionPipeData2 = new DeletionPipeData(new Deletion(new PartialPath("fake" + i3), 0L, 99L), i3);
                arrayList.add(deletionPipeData2);
                deletionPipeData2.serialize(dataOutputStream3);
            }
            dataOutputStream3.close();
            BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            try {
                Assert.assertEquals(1L, bufferedPipeDataQueue.getCommitSerialNumber());
                Assert.assertEquals(10L, bufferedPipeDataQueue.getLastMaxSerialNumber());
                ArrayList arrayList2 = new ArrayList();
                ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                newSingleThreadExecutor.execute(() -> {
                    while (true) {
                        try {
                            arrayList2.add(bufferedPipeDataQueue.take());
                            bufferedPipeDataQueue.commit();
                        } catch (InterruptedException e) {
                            return;
                        } catch (Exception e2) {
                            e2.printStackTrace();
                            return;
                        }
                    }
                });
                for (int i4 = 11; i4 < 20; i4++) {
                    bufferedPipeDataQueue.offer(new DeletionPipeData(new Deletion(new PartialPath("fake" + i4), 0L, 0L), i4));
                }
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                newSingleThreadExecutor.shutdownNow();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                    Assert.fail();
                }
                Assert.assertEquals(18L, arrayList2.size());
                for (int i5 = 0; i5 < 9; i5++) {
                    Assert.assertEquals(arrayList.get(i5 + 2), arrayList2.get(i5));
                }
                bufferedPipeDataQueue.clear();
            } catch (Throwable th) {
                bufferedPipeDataQueue.clear();
                throw th;
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testOfferWhileTakingWithDiscontinuousSerialNumber() {
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            dataOutputStream.writeLong(1L);
            dataOutputStream.close();
            ArrayList arrayList = new ArrayList();
            DataOutputStream dataOutputStream2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(0L)), false));
            TsFilePipeData tsFilePipeData = new TsFilePipeData("fake3", 3L);
            arrayList.add(tsFilePipeData);
            tsFilePipeData.serialize(dataOutputStream2);
            dataOutputStream2.close();
            DataOutputStream dataOutputStream3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName(4L)), false));
            for (int i = 4; i < 8; i++) {
                DeletionPipeData deletionPipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0L, 99L), i);
                arrayList.add(deletionPipeData);
                deletionPipeData.serialize(dataOutputStream3);
            }
            DeletionPipeData deletionPipeData2 = new DeletionPipeData(new Deletion(new PartialPath("fake10"), 0L, 99L), 10L);
            arrayList.add(deletionPipeData2);
            deletionPipeData2.serialize(dataOutputStream3);
            dataOutputStream3.close();
            BufferedPipeDataQueue bufferedPipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            try {
                Assert.assertEquals(1L, bufferedPipeDataQueue.getCommitSerialNumber());
                Assert.assertEquals(10L, bufferedPipeDataQueue.getLastMaxSerialNumber());
                ArrayList arrayList2 = new ArrayList();
                ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                newSingleThreadExecutor.execute(() -> {
                    while (true) {
                        try {
                            PipeData take = bufferedPipeDataQueue.take();
                            logger.info(String.format("PipeData: %s", take));
                            arrayList2.add(take);
                            bufferedPipeDataQueue.commit();
                        } catch (InterruptedException e) {
                            return;
                        } catch (Exception e2) {
                            e2.printStackTrace();
                            return;
                        }
                    }
                });
                for (int i2 = 16; i2 < 20; i2++) {
                    if (!bufferedPipeDataQueue.offer(new DeletionPipeData(new Deletion(new PartialPath("fake" + i2), 0L, 0L), i2))) {
                        logger.info(String.format("Can not offer serialize number %d", Integer.valueOf(i2)));
                    }
                }
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                newSingleThreadExecutor.shutdownNow();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                    Assert.fail();
                }
                logger.info(String.format("PipeDataTakeList: %s", arrayList2));
                Assert.assertEquals(10L, arrayList2.size());
                for (int i3 = 0; i3 < 6; i3++) {
                    Assert.assertEquals(arrayList.get(i3), arrayList2.get(i3));
                }
                bufferedPipeDataQueue.clear();
            } catch (Throwable th) {
                bufferedPipeDataQueue.clear();
                throw th;
            }
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail();
        }
    }
}
