package org.apache.hadoop.contrib.bkjournal;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:test-classes/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.class */
public class TestBookKeeperJournalManager {
    private static final long DEFAULT_SEGMENT_SIZE = 1000;
    private static final String zkEnsemble = "localhost:2181";
    private static Thread bkthread;
    private ZooKeeper zkc;
    static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
    protected static Configuration conf = new Configuration();

    private static ZooKeeper connectZooKeeper(String str) throws IOException, KeeperException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(zkEnsemble, 3600, new Watcher() { // from class: org.apache.hadoop.contrib.bkjournal.TestBookKeeperJournalManager.1
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
            }
        });
        if (countDownLatch.await(3L, TimeUnit.SECONDS)) {
            return zooKeeper;
        }
        throw new IOException("Zookeeper took too long to connect");
    }

    @BeforeClass
    public static void setupBookkeeper() throws Exception {
        bkthread = new Thread() { // from class: org.apache.hadoop.contrib.bkjournal.TestBookKeeperJournalManager.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    String[] strArr = {String.valueOf(5)};
                    TestBookKeeperJournalManager.LOG.info("Starting bk");
                    LocalBookKeeper.main(strArr);
                } catch (InterruptedException e) {
                } catch (Exception e2) {
                    TestBookKeeperJournalManager.LOG.error("Error starting local bk", e2);
                }
            }
        };
        bkthread.start();
        if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000L)) {
            throw new Exception("Error starting zookeeper/bookkeeper");
        }
        ZooKeeper connectZooKeeper = connectZooKeeper(zkEnsemble);
        boolean z = false;
        int i = 0;
        while (true) {
            if (i >= 10) {
                break;
            }
            try {
                try {
                } finally {
                    connectZooKeeper.close();
                }
            } catch (KeeperException e) {
            }
            if (connectZooKeeper.getChildren("/ledgers/available", false).size() == 5) {
                z = true;
                break;
            } else {
                Thread.sleep(DEFAULT_SEGMENT_SIZE);
                i++;
            }
        }
        if (z) {
        } else {
            throw new IOException("Not enough bookies started");
        }
    }

    @Before
    public void setup() throws Exception {
        this.zkc = connectZooKeeper(zkEnsemble);
    }

    @After
    public void teardown() throws Exception {
        this.zkc.close();
    }

    @AfterClass
    public static void teardownBookkeeper() throws Exception {
        if (bkthread != null) {
            bkthread.interrupt();
            bkthread.join();
        }
    }

    @Test
    public void testSimpleWrite() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-simplewrite"));
        EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(1L);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                startLogSegment.close();
                bookKeeperJournalManager.finalizeLogSegment(1L, 100L);
                Assert.assertNotNull(this.zkc.exists(bookKeeperJournalManager.finalizedLedgerZNode(1L, 100L), false));
                Assert.assertNull(this.zkc.exists(bookKeeperJournalManager.inprogressZNode(), false));
                return;
            }
            FSEditLogOp noOpInstance = FSEditLogTestUtil.getNoOpInstance();
            noOpInstance.setTransactionId(j2);
            startLogSegment.write(noOpInstance);
            j = j2 + 1;
        }
    }

    @Test
    public void testNumberOfTransactions() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-txncount"));
        EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(1L);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                startLogSegment.close();
                bookKeeperJournalManager.finalizeLogSegment(1L, 100L);
                Assert.assertEquals(100L, bookKeeperJournalManager.getNumberOfTransactions(1L, true));
                return;
            } else {
                FSEditLogOp noOpInstance = FSEditLogTestUtil.getNoOpInstance();
                noOpInstance.setTransactionId(j2);
                startLogSegment.write(noOpInstance);
                j = j2 + 1;
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v31, types: [org.apache.hadoop.hdfs.server.namenode.FSEditLogOp, long] */
    @Test
    public void testNumberOfTransactionsWithGaps() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-gaps"));
        long j = 1;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= 3) {
                break;
            }
            long j4 = j;
            EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(j4);
            long j5 = 1;
            while (true) {
                long j6 = j5;
                if (j6 <= DEFAULT_SEGMENT_SIZE) {
                    ?? noOpInstance = FSEditLogTestUtil.getNoOpInstance();
                    j++;
                    noOpInstance.setTransactionId((long) noOpInstance);
                    startLogSegment.write((FSEditLogOp) noOpInstance);
                    j5 = j6 + 1;
                }
            }
            startLogSegment.close();
            bookKeeperJournalManager.finalizeLogSegment(j4, j - 1);
            Assert.assertNotNull(this.zkc.exists(bookKeeperJournalManager.finalizedLedgerZNode(j4, j - 1), false));
            j2 = j3 + 1;
        }
        this.zkc.delete(bookKeeperJournalManager.finalizedLedgerZNode(1001L, 2000L), -1);
        Assert.assertEquals(DEFAULT_SEGMENT_SIZE, bookKeeperJournalManager.getNumberOfTransactions(1L, true));
        try {
            bookKeeperJournalManager.getNumberOfTransactions(1001L, true);
            Assert.fail("Should have thrown corruption exception by this point");
        } catch (JournalManager.CorruptionException e) {
        }
        Assert.assertEquals(DEFAULT_SEGMENT_SIZE, bookKeeperJournalManager.getNumberOfTransactions(2001L, true));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v40, types: [org.apache.hadoop.hdfs.server.namenode.FSEditLogOp, long] */
    @Test
    public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
        Configuration configuration = conf;
        URI create = URI.create("bookkeeper://localhost:2181/hdfsjournal-inprogressAtEnd");
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(configuration, create);
        long j = 1;
        long j2 = 0;
        long j3 = create;
        while (true) {
            long j4 = j2;
            if (j4 >= 3) {
                break;
            }
            long j5 = j;
            EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(j5);
            long j6 = 1;
            while (true) {
                long j7 = j6;
                if (j7 <= DEFAULT_SEGMENT_SIZE) {
                    ?? noOpInstance = FSEditLogTestUtil.getNoOpInstance();
                    j++;
                    noOpInstance.setTransactionId((long) noOpInstance);
                    startLogSegment.write((FSEditLogOp) noOpInstance);
                    j6 = j7 + 1;
                }
            }
            startLogSegment.close();
            bookKeeperJournalManager.finalizeLogSegment(j5, j - 1);
            long j8 = j - 1;
            Assert.assertNotNull(this.zkc.exists(bookKeeperJournalManager.finalizedLedgerZNode(j5, j8), false));
            j2 = j4 + 1;
            j3 = j8;
        }
        EditLogOutputStream startLogSegment2 = bookKeeperJournalManager.startLogSegment(j);
        long j9 = 1;
        long j10 = j3;
        while (true) {
            long j11 = j9;
            if (j11 > 500) {
                startLogSegment2.setReadyToFlush();
                startLogSegment2.flush();
                startLogSegment2.abort();
                startLogSegment2.close();
                Assert.assertEquals(j - 1, bookKeeperJournalManager.getNumberOfTransactions(1L, true));
                return;
            }
            FSEditLogOp noOpInstance2 = FSEditLogTestUtil.getNoOpInstance();
            long j12 = j10;
            j10 = 1;
            j++;
            noOpInstance2.setTransactionId(j12);
            startLogSegment2.write(noOpInstance2);
            j9 = j11 + 1;
        }
    }

    /* JADX WARN: Type inference failed for: r0v49, types: [org.apache.hadoop.hdfs.server.namenode.FSEditLogOp, long] */
    @Test
    public void testWriteRestartFrom1() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-restartFrom1"));
        long j = 1;
        EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(1L);
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 > DEFAULT_SEGMENT_SIZE) {
                break;
            }
            ?? noOpInstance = FSEditLogTestUtil.getNoOpInstance();
            j++;
            noOpInstance.setTransactionId((long) noOpInstance);
            startLogSegment.write((FSEditLogOp) noOpInstance);
            j2 = j3 + 1;
        }
        startLogSegment.close();
        bookKeeperJournalManager.finalizeLogSegment(1L, j - 1);
        try {
            bookKeeperJournalManager.startLogSegment(1L);
            Assert.fail("Shouldn't be able to start another journal from 1 when one already exists");
        } catch (Exception e) {
            LOG.info("Caught exception as expected", e);
        }
        try {
            bookKeeperJournalManager.startLogSegment(DEFAULT_SEGMENT_SIZE);
            Assert.fail("Shouldn't be able to start another journal from " + DEFAULT_SEGMENT_SIZE + " when one already exists");
        } catch (IOException e2) {
            LOG.info("Caught exception as expected", e2);
        }
        long j4 = 1001;
        EditLogOutputStream startLogSegment2 = bookKeeperJournalManager.startLogSegment(1001L);
        Assert.assertNotNull(startLogSegment2);
        long j5 = 1;
        while (true) {
            long j6 = j5;
            if (j6 > DEFAULT_SEGMENT_SIZE) {
                startLogSegment2.close();
                bookKeeperJournalManager.finalizeLogSegment(1001L, j4 - 1);
                Assert.assertNotNull(bookKeeperJournalManager.startLogSegment(4000L));
                return;
            } else {
                FSEditLogOp noOpInstance2 = FSEditLogTestUtil.getNoOpInstance();
                long j7 = j4;
                j4 = j7 + 1;
                noOpInstance2.setTransactionId(j7);
                startLogSegment2.write(noOpInstance2);
                j5 = j6 + 1;
            }
        }
    }

    @Test
    public void testTwoWriters() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-dualWriter"));
        BookKeeperJournalManager bookKeeperJournalManager2 = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-dualWriter"));
        bookKeeperJournalManager.startLogSegment(1L);
        try {
            bookKeeperJournalManager2.startLogSegment(1L);
            Assert.fail("Shouldn't have been able to open the second writer");
        } catch (IOException e) {
            LOG.info("Caught exception as expected", e);
        }
    }

    @Test
    public void testSimpleRead() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-simpleread"));
        EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(1L);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 10000) {
                startLogSegment.close();
                bookKeeperJournalManager.finalizeLogSegment(1L, 10000L);
                EditLogInputStream inputStream = bookKeeperJournalManager.getInputStream(1L, true);
                try {
                    Assert.assertEquals(10000L, FSEditLogTestUtil.countTransactionsInStream(inputStream));
                    inputStream.close();
                    return;
                } catch (Throwable th) {
                    inputStream.close();
                    throw th;
                }
            }
            FSEditLogOp noOpInstance = FSEditLogTestUtil.getNoOpInstance();
            noOpInstance.setTransactionId(j2);
            startLogSegment.write(noOpInstance);
            j = j2 + 1;
        }
    }

    @Test
    public void testSimpleRecovery() throws Exception {
        BookKeeperJournalManager bookKeeperJournalManager = new BookKeeperJournalManager(conf, URI.create("bookkeeper://localhost:2181/hdfsjournal-simplerecovery"));
        EditLogOutputStream startLogSegment = bookKeeperJournalManager.startLogSegment(1L);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                startLogSegment.setReadyToFlush();
                startLogSegment.flush();
                startLogSegment.abort();
                startLogSegment.close();
                Assert.assertNull(this.zkc.exists(bookKeeperJournalManager.finalizedLedgerZNode(1L, 100L), false));
                Assert.assertNotNull(this.zkc.exists(bookKeeperJournalManager.inprogressZNode(), false));
                bookKeeperJournalManager.recoverUnfinalizedSegments();
                Assert.assertNotNull(this.zkc.exists(bookKeeperJournalManager.finalizedLedgerZNode(1L, 100L), false));
                Assert.assertNull(this.zkc.exists(bookKeeperJournalManager.inprogressZNode(), false));
                return;
            }
            FSEditLogOp noOpInstance = FSEditLogTestUtil.getNoOpInstance();
            noOpInstance.setTransactionId(j2);
            startLogSegment.write(noOpInstance);
            j = j2 + 1;
        }
    }
}
