package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.shaded.org.jets3t.service.security.EncryptionUtil;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
import org.glassfish.hk2.utilities.BuilderHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;

@Category({ReplicationTests.class, LargeTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.class */
public class TestWALEntryStream {
    private static HBaseTestingUtility TEST_UTIL;
    private static Configuration CONF;
    private static FileSystem fs;
    private static MiniDFSCluster cluster;
    private WAL log;
    PriorityBlockingQueue<Path> walQueue;
    private PathWatcher pathWatcher;

    @Rule
    public TestName tn = new TestName();
    private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestWALEntryStream.class);
    private static final TableName tableName = TableName.valueOf("tablename");
    private static final byte[] family = Bytes.toBytes("column");
    private static final byte[] qualifier = Bytes.toBytes(BuilderHelper.QUALIFIER_KEY);
    private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
    private static final NavigableMap<byte[], Integer> scopes = getScopes();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream$PathWatcher.class */
    public class PathWatcher implements WALActionsListener {
        Path currentPath;

        PathWatcher() {
        }

        @Override // org.apache.hadoop.hbase.regionserver.wal.WALActionsListener
        public void preLogRoll(Path path, Path path2) throws IOException {
            TestWALEntryStream.this.walQueue.add(path2);
            this.currentPath = path2;
        }
    }

    private static NavigableMap<byte[], Integer> getScopes() {
        TreeMap treeMap = new TreeMap(Bytes.BYTES_COMPARATOR);
        treeMap.put(family, 1);
        return treeMap;
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TEST_UTIL = new HBaseTestingUtility();
        CONF = TEST_UTIL.getConfiguration();
        TEST_UTIL.startMiniDFSCluster(3);
        cluster = TEST_UTIL.getDFSCluster();
        fs = cluster.getFileSystem();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TEST_UTIL.shutdownMiniCluster();
    }

    @Before
    public void setUp() throws Exception {
        this.walQueue = new PriorityBlockingQueue<>();
        this.pathWatcher = new PathWatcher();
        WALFactory wALFactory = new WALFactory(CONF, this.tn.getMethodName());
        wALFactory.getWALProvider().addWALActionsListener(this.pathWatcher);
        this.log = wALFactory.getWAL(info);
    }

    @After
    public void tearDown() throws Exception {
        this.log.close();
    }

    @Test
    public void testDifferentCounts() throws Exception {
        int[] iArr = {1, 100};
        Boolean[] boolArr = {false, true};
        for (int i : new int[]{AbstractGangliaSink.BUFFER_SIZE, 60000}) {
            for (int i2 : iArr) {
                for (Boolean bool : boolArr) {
                    TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, bool.booleanValue());
                    this.mvcc.advanceTo(1L);
                    for (int i3 = 0; i3 < i; i3++) {
                        appendToLogAndSync(i2);
                    }
                    this.log.rollWriter();
                    WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
                    Throwable th = null;
                    int i4 = 0;
                    while (wALEntryStream.hasNext()) {
                        try {
                            try {
                                Assert.assertNotNull(wALEntryStream.next());
                                i4++;
                            } finally {
                            }
                        } catch (Throwable th2) {
                            if (wALEntryStream != null) {
                                if (th != null) {
                                    try {
                                        wALEntryStream.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    wALEntryStream.close();
                                }
                            }
                            throw th2;
                        }
                    }
                    Assert.assertEquals(i, i4);
                    Assert.assertFalse(wALEntryStream.hasNext());
                    if (wALEntryStream != null) {
                        if (0 != 0) {
                            try {
                                wALEntryStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            wALEntryStream.close();
                        }
                    }
                    this.log.close();
                    setUp();
                }
            }
        }
    }

    @Test
    public void testAppendsWithRolls() throws Exception {
        long position;
        Throwable th;
        long position2;
        Throwable th2;
        appendToLogAndSync();
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th3 = null;
        try {
            try {
                Assert.assertTrue(wALEntryStream.hasNext());
                WAL.Entry peek = wALEntryStream.peek();
                Assert.assertSame(peek, wALEntryStream.next());
                Assert.assertNotNull(peek);
                Assert.assertFalse(wALEntryStream.hasNext());
                Assert.assertNull(wALEntryStream.peek());
                Assert.assertNull(wALEntryStream.next());
                position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                appendToLogAndSync();
                wALEntryStream = new WALEntryStream(this.walQueue, CONF, position, this.log, null, new MetricsSource("1"));
                th = null;
            } catch (Throwable th5) {
                th3 = th5;
                throw th5;
            }
            try {
                try {
                    WAL.Entry next = wALEntryStream.next();
                    Assert.assertNotEquals(position, wALEntryStream.getPosition());
                    Assert.assertNotNull(next);
                    position2 = wALEntryStream.getPosition();
                    if (wALEntryStream != null) {
                        if (0 != 0) {
                            try {
                                wALEntryStream.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            wALEntryStream.close();
                        }
                    }
                    appendToLogAndSync();
                    this.log.rollWriter();
                    appendToLogAndSync();
                    wALEntryStream = new WALEntryStream(this.walQueue, CONF, position2, this.log, null, new MetricsSource("1"));
                    th2 = null;
                } catch (Throwable th7) {
                    th = th7;
                    throw th7;
                }
                try {
                    try {
                        WAL.Entry next2 = wALEntryStream.next();
                        Assert.assertNotEquals(position2, wALEntryStream.getPosition());
                        Assert.assertNotNull(next2);
                        WAL.Entry next3 = wALEntryStream.next();
                        Assert.assertNotEquals(position2, wALEntryStream.getPosition());
                        Assert.assertNotNull(next3);
                        Assert.assertFalse(wALEntryStream.hasNext());
                        wALEntryStream.getPosition();
                        if (wALEntryStream != null) {
                            if (0 == 0) {
                                wALEntryStream.close();
                                return;
                            }
                            try {
                                wALEntryStream.close();
                            } catch (Throwable th8) {
                                th2.addSuppressed(th8);
                            }
                        }
                    } catch (Throwable th9) {
                        th2 = th9;
                        throw th9;
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (wALEntryStream != null) {
                if (th3 != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th10) {
                        th3.addSuppressed(th10);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
        }
    }

    @Test
    public void testLogrollWhileStreaming() throws Exception {
        appendToLog("1");
        appendToLog(EncryptionUtil.DEFAULT_VERSION);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            Assert.assertEquals("1", getRow(wALEntryStream.next()));
            appendToLog("3");
            this.log.rollWriter();
            appendToLog("4");
            Assert.assertEquals(EncryptionUtil.DEFAULT_VERSION, getRow(wALEntryStream.next()));
            Assert.assertEquals(2L, this.walQueue.size());
            Assert.assertEquals("3", getRow(wALEntryStream.next()));
            Assert.assertEquals("4", getRow(wALEntryStream.next()));
            Assert.assertEquals(1L, this.walQueue.size());
            Assert.assertFalse(wALEntryStream.hasNext());
            if (wALEntryStream != null) {
                if (0 == 0) {
                    wALEntryStream.close();
                    return;
                }
                try {
                    wALEntryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (0 != 0) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testNewEntriesWhileStreaming() throws Exception {
        appendToLog("1");
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            wALEntryStream.next();
            appendToLog(EncryptionUtil.DEFAULT_VERSION);
            appendToLog("3");
            Assert.assertFalse(wALEntryStream.hasNext());
            wALEntryStream.reset();
            Assert.assertEquals(EncryptionUtil.DEFAULT_VERSION, getRow(wALEntryStream.next()));
            Assert.assertEquals("3", getRow(wALEntryStream.next()));
            Assert.assertFalse(wALEntryStream.hasNext());
            if (wALEntryStream != null) {
                if (0 == 0) {
                    wALEntryStream.close();
                    return;
                }
                try {
                    wALEntryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (0 != 0) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testResumeStreamingFromPosition() throws Exception {
        Throwable th;
        appendToLog("1");
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th2 = null;
        try {
            try {
                wALEntryStream.next();
                appendToLog(EncryptionUtil.DEFAULT_VERSION);
                appendToLog("3");
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                wALEntryStream = new WALEntryStream(this.walQueue, CONF, position, this.log, null, new MetricsSource("1"));
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals(EncryptionUtil.DEFAULT_VERSION, getRow(wALEntryStream.next()));
                    Assert.assertEquals("3", getRow(wALEntryStream.next()));
                    Assert.assertFalse(wALEntryStream.hasNext());
                    Assert.assertEquals(1L, this.walQueue.size());
                    if (wALEntryStream != null) {
                        if (0 == 0) {
                            wALEntryStream.close();
                            return;
                        }
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testPosition() throws Exception {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                wALEntryStream = new WALEntryStream(this.walQueue, CONF, position, this.log, null, new MetricsSource("1"));
                Throwable th3 = null;
                try {
                    try {
                        Assert.assertNotNull(wALEntryStream.next());
                        Assert.assertNotNull(wALEntryStream.next());
                        Assert.assertFalse(wALEntryStream.hasNext());
                        if (wALEntryStream != null) {
                            if (0 == 0) {
                                wALEntryStream.close();
                                return;
                            }
                            try {
                                wALEntryStream.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } catch (Throwable th6) {
                th = th6;
                throw th6;
            }
        } finally {
        }
    }

    @Test
    public void testEmptyStream() throws Exception {
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            Assert.assertFalse(wALEntryStream.hasNext());
            if (wALEntryStream != null) {
                if (0 == 0) {
                    wALEntryStream.close();
                    return;
                }
                try {
                    wALEntryStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (0 != 0) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    private ReplicationSource mockReplicationSource(boolean z, Configuration configuration) {
        ReplicationSourceManager replicationSourceManager = (ReplicationSourceManager) Mockito.mock(ReplicationSourceManager.class);
        Mockito.when(replicationSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0L));
        Server server = (Server) Mockito.mock(Server.class);
        ReplicationSource replicationSource = (ReplicationSource) Mockito.mock(ReplicationSource.class);
        Mockito.when(replicationSource.getSourceManager()).thenReturn(replicationSourceManager);
        Mockito.when(replicationSource.getSourceMetrics()).thenReturn(new MetricsSource("1"));
        Mockito.when(replicationSource.getWALFileLengthProvider()).thenReturn(this.log);
        Mockito.when(replicationSource.getServer()).thenReturn(server);
        Mockito.when(Boolean.valueOf(replicationSource.isRecovered())).thenReturn(Boolean.valueOf(z));
        return replicationSource;
    }

    private ReplicationSourceWALReader createReader(boolean z, Configuration configuration) {
        ReplicationSource mockReplicationSource = mockReplicationSource(z, configuration);
        Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).thenReturn(true);
        ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader(fs, configuration, this.walQueue, 0L, getDummyFilter(), mockReplicationSource);
        replicationSourceWALReader.start();
        return replicationSourceWALReader;
    }

    @Test
    public void testReplicationSourceWALReader() throws Exception {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                wALEntryStream.next();
                wALEntryStream.next();
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                Path peek = this.walQueue.peek();
                ReplicationSourceWALReader createReader = createReader(false, CONF);
                WALEntryBatch take = createReader.take();
                Assert.assertNotNull(take);
                Assert.assertEquals(3L, take.getWalEntries().size());
                Assert.assertEquals(position, take.getLastWalPosition());
                Assert.assertEquals(peek, take.getLastWalPath());
                Assert.assertEquals(3L, take.getNbRowKeys());
                appendToLog("foo");
                WALEntryBatch take2 = createReader.take();
                Assert.assertEquals(1L, take2.getNbEntries());
                Assert.assertEquals("foo", getRow(take2.getWalEntries().get(0)));
            } finally {
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (th != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testReplicationSourceWALReaderRecovered() throws Exception {
        appendEntriesToLogAndSync(10);
        Path peek = this.walQueue.peek();
        this.log.rollWriter();
        appendEntriesToLogAndSync(5);
        this.log.shutdown();
        Configuration configuration = new Configuration(CONF);
        configuration.setInt("replication.source.nb.capacity", 10);
        ReplicationSourceWALReader createReader = createReader(true, configuration);
        WALEntryBatch take = createReader.take();
        Assert.assertEquals(peek, take.getLastWalPath());
        Assert.assertEquals(10L, take.getNbEntries());
        Assert.assertFalse(take.isEndOfFile());
        WALEntryBatch take2 = createReader.take();
        Assert.assertEquals(peek, take2.getLastWalPath());
        Assert.assertEquals(0L, take2.getNbEntries());
        Assert.assertTrue(take2.isEndOfFile());
        Path peek2 = this.walQueue.peek();
        WALEntryBatch take3 = createReader.take();
        Assert.assertEquals(peek2, take3.getLastWalPath());
        Assert.assertEquals(5L, take3.getNbEntries());
        Assert.assertFalse(take3.isEndOfFile());
        Assert.assertSame(WALEntryBatch.NO_MORE_DATA, createReader.take());
    }

    @Test
    public void testReplicationSourceWALReaderWrongPosition() throws Exception {
        appendEntriesToLogAndSync(1);
        final Path peek = this.walQueue.peek();
        this.log.rollWriter();
        appendEntriesToLogAndSync(20);
        TEST_UTIL.waitFor(5000L, new Waiter.ExplainingPredicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntryStream.1
            @Override // org.apache.hadoop.hbase.Waiter.Predicate
            public boolean evaluate() throws Exception {
                return TestWALEntryStream.fs.getFileStatus(peek).getLen() > 0;
            }

            @Override // org.apache.hadoop.hbase.Waiter.ExplainingPredicate
            public String explainFailure() throws Exception {
                return peek + " has not been closed yet";
            }
        });
        long len = fs.getFileStatus(peek).getLen();
        ReplicationSourceWALReader createReader = createReader(false, CONF);
        WALEntryBatch take = createReader.take();
        Assert.assertEquals(peek, take.getLastWalPath());
        Assert.assertTrue("Position " + take.getLastWalPosition() + " is out of range, file length is " + len, take.getLastWalPosition() <= len);
        Assert.assertEquals(1L, take.getNbEntries());
        Assert.assertTrue(take.isEndOfFile());
        Path peek2 = this.walQueue.peek();
        WALEntryBatch take2 = createReader.take();
        Assert.assertEquals(peek2, take2.getLastWalPath());
        Assert.assertEquals(20L, take2.getNbEntries());
        Assert.assertFalse(take2.isEndOfFile());
        this.log.rollWriter();
        appendEntriesToLogAndSync(10);
        WALEntryBatch take3 = createReader.take();
        Assert.assertEquals(peek2, take3.getLastWalPath());
        Assert.assertEquals(0L, take3.getNbEntries());
        Assert.assertTrue(take3.isEndOfFile());
        Path peek3 = this.walQueue.peek();
        WALEntryBatch take4 = createReader.take();
        Assert.assertEquals(peek3, take4.getLastWalPath());
        Assert.assertEquals(10L, take4.getNbEntries());
        Assert.assertFalse(take4.isEndOfFile());
    }

    @Test
    public void testReplicationSourceWALReaderDisabled() throws IOException, InterruptedException, ExecutionException {
        appendEntriesToLogAndSync(3);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, this.log, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                wALEntryStream.next();
                wALEntryStream.next();
                wALEntryStream.next();
                long position = wALEntryStream.getPosition();
                if (wALEntryStream != null) {
                    if (0 != 0) {
                        try {
                            wALEntryStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        wALEntryStream.close();
                    }
                }
                Path peek = this.walQueue.peek();
                ReplicationSource mockReplicationSource = mockReplicationSource(false, CONF);
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                Mockito.when(Boolean.valueOf(mockReplicationSource.isPeerEnabled())).then(invocationOnMock -> {
                    atomicInteger.incrementAndGet();
                    return Boolean.valueOf(atomicBoolean.get());
                });
                ReplicationSourceWALReader replicationSourceWALReader = new ReplicationSourceWALReader(fs, CONF, this.walQueue, 0L, getDummyFilter(), mockReplicationSource);
                replicationSourceWALReader.start();
                ForkJoinTask submit = ForkJoinPool.commonPool().submit(() -> {
                    return replicationSourceWALReader.take();
                });
                TEST_UTIL.waitFor(30000L, () -> {
                    return atomicInteger.get() >= 5;
                });
                Assert.assertFalse(submit.isDone());
                atomicBoolean.set(true);
                WALEntryBatch wALEntryBatch = (WALEntryBatch) submit.get();
                Assert.assertNotNull(wALEntryBatch);
                Assert.assertEquals(3L, wALEntryBatch.getWalEntries().size());
                Assert.assertEquals(position, wALEntryBatch.getLastWalPosition());
                Assert.assertEquals(peek, wALEntryBatch.getLastWalPath());
                Assert.assertEquals(3L, wALEntryBatch.getNbRowKeys());
            } finally {
            }
        } catch (Throwable th3) {
            if (wALEntryStream != null) {
                if (th != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th3;
        }
    }

    private String getRow(WAL.Entry entry) {
        Cell cell = entry.getEdit().getCells().get(0);
        return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
    }

    private void appendToLog(String str) throws IOException {
        this.log.sync(this.log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), this.mvcc, scopes), getWALEdit(str)));
    }

    private void appendEntriesToLogAndSync(int i) throws IOException {
        long j = -1;
        for (int i2 = 0; i2 < i; i2++) {
            j = appendToLog(1);
        }
        this.log.sync(j);
    }

    private void appendToLogAndSync() throws IOException {
        appendToLogAndSync(1);
    }

    private void appendToLogAndSync(int i) throws IOException {
        this.log.sync(appendToLog(i));
    }

    private long appendToLog(int i) throws IOException {
        return this.log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), this.mvcc, scopes), getWALEdits(i));
    }

    private WALEdit getWALEdits(int i) {
        WALEdit wALEdit = new WALEdit();
        for (int i2 = 0; i2 < i; i2++) {
            wALEdit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, System.currentTimeMillis(), qualifier));
        }
        return wALEdit;
    }

    private WALEdit getWALEdit(String str) {
        WALEdit wALEdit = new WALEdit();
        wALEdit.add(new KeyValue(Bytes.toBytes(str), family, qualifier, System.currentTimeMillis(), qualifier));
        return wALEdit;
    }

    private WALEntryFilter getDummyFilter() {
        return new WALEntryFilter() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestWALEntryStream.2
            @Override // org.apache.hadoop.hbase.replication.WALEntryFilter
            public WAL.Entry filter(WAL.Entry entry) {
                return entry;
            }
        };
    }

    @Test
    public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
        appendToLog("1");
        appendToLog(EncryptionUtil.DEFAULT_VERSION);
        long asLong = this.log.getLogFileSizeIfBeingWritten(this.walQueue.peek()).getAsLong();
        AtomicLong atomicLong = new AtomicLong(asLong - 1);
        WALEntryStream wALEntryStream = new WALEntryStream(this.walQueue, CONF, 0L, path -> {
            return OptionalLong.of(atomicLong.get());
        }, null, new MetricsSource("1"));
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(wALEntryStream.hasNext());
                Assert.assertNotNull(wALEntryStream.next());
                Assert.assertFalse(wALEntryStream.hasNext());
                Thread.sleep(1000L);
                wALEntryStream.reset();
                Assert.assertFalse(wALEntryStream.hasNext());
                atomicLong.set(asLong);
                wALEntryStream.reset();
                Assert.assertTrue(wALEntryStream.hasNext());
                Assert.assertNotNull(wALEntryStream.next());
                Assert.assertFalse(wALEntryStream.hasNext());
                if (wALEntryStream != null) {
                    if (0 == 0) {
                        wALEntryStream.close();
                        return;
                    }
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (wALEntryStream != null) {
                if (th != null) {
                    try {
                        wALEntryStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    wALEntryStream.close();
                }
            }
            throw th4;
        }
    }
}
