package org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog;

import java.io.ByteArrayInputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.compress.compressors.bzip2.BZip2Constants;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.api.DistributedLogManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.exceptions.WriteException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.FailpointUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog.util.Utils;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Assert;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Rule;
import org.apache.pulsar.functions.runtime.shaded.org.junit.Test;
import org.apache.pulsar.functions.runtime.shaded.org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/shaded/org/apache/distributedlog/TestAppendOnlyStreamWriter.class */
public class TestAppendOnlyStreamWriter extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestAppendOnlyStreamWriter.class);

    @Rule
    public TestName testNames = new TestName();

    @Test(timeout = 60000)
    public void testBasicReadAndWriteBehavior() throws Exception {
        String methodName = this.testNames.getMethodName();
        BKDistributedLogManager createNewDLM = createNewDLM(conf, methodName);
        BKDistributedLogManager createNewDLM2 = createNewDLM(conf, methodName);
        byte[] bytes = DLMTestUtil.repeatString("abc", 51).getBytes();
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        appendOnlyStreamWriter.write(DLMTestUtil.repeatString("abc", 11).getBytes());
        appendOnlyStreamWriter.write(DLMTestUtil.repeatString("abc", 40).getBytes());
        appendOnlyStreamWriter.force(false);
        appendOnlyStreamWriter.close();
        AppendOnlyStreamReader appendOnlyStreamReader = createNewDLM2.getAppendOnlyStreamReader();
        byte[] bArr = new byte[bytes.length];
        Assert.assertEquals(23L, appendOnlyStreamReader.read(bArr, 0, 23));
        Assert.assertEquals(appendOnlyStreamReader.read(bArr, 23, 31), 31L);
        byte[] bArr2 = new byte[bytes.length];
        Assert.assertEquals(appendOnlyStreamReader.read(bArr2, 0, bytes.length), (bytes.length - 23) - 31);
        Assert.assertEquals(new ByteArrayInputStream(bArr2).read(bArr, 54, (bytes.length - 23) - 31), (bytes.length - 23) - 31);
        Assert.assertArrayEquals(bArr, bytes);
        appendOnlyStreamReader.close();
        createNewDLM2.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testWriteFutureDoesNotCompleteUntilWritePersisted() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, methodName);
        byte[] bytes = DLMTestUtil.repeatString("abc", 51).getBytes();
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        CompletableFuture<DLSN> write = appendOnlyStreamWriter.write(DLMTestUtil.repeatString("abc", 11).getBytes());
        Thread.sleep(1000L);
        Assert.assertFalse(write.isDone());
        appendOnlyStreamWriter.force(false);
        Utils.ioResult(write, 5L, TimeUnit.SECONDS);
        appendOnlyStreamWriter.close();
        createNewDLM.close();
        AppendOnlyStreamReader appendOnlyStreamReader = createNewDLM2.getAppendOnlyStreamReader();
        Assert.assertEquals(31L, appendOnlyStreamReader.read(new byte[bytes.length], 0, 31));
        appendOnlyStreamReader.close();
        createNewDLM2.close();
    }

    @Test(timeout = 60000)
    public void testPositionUpdatesOnlyAfterWriteCompletion() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(10000);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        BKDistributedLogManager createNewDLM2 = createNewDLM(distributedLogConfiguration, methodName);
        byte[] bytes = DLMTestUtil.repeatString("abc", 11).getBytes();
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        CompletableFuture<DLSN> write = appendOnlyStreamWriter.write(bytes);
        Thread.sleep(100L);
        Assert.assertFalse(write.isDone());
        Assert.assertEquals(0L, appendOnlyStreamWriter.position());
        appendOnlyStreamWriter.force(false);
        Assert.assertEquals(bytes.length, appendOnlyStreamWriter.position());
        appendOnlyStreamWriter.close();
        createNewDLM.close();
        AppendOnlyStreamReader appendOnlyStreamReader = createNewDLM2.getAppendOnlyStreamReader();
        Assert.assertEquals(bytes.length, appendOnlyStreamReader.read(new byte[bytes.length], 0, bytes.length));
        Assert.assertEquals(bytes.length, appendOnlyStreamReader.position());
        appendOnlyStreamReader.close();
        createNewDLM2.close();
    }

    @Test(timeout = 60000)
    public void testPositionDoesntUpdateBeforeWriteCompletion() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(BZip2Constants.BASEBLOCKSIZE);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setOutputBufferSize(1048576);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        byte[] bytes = DLMTestUtil.repeatString("abc", 11).getBytes();
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        Assert.assertEquals(0L, appendOnlyStreamWriter.position());
        Thread.sleep(500L);
        appendOnlyStreamWriter.write(bytes);
        Assert.assertEquals(0L, appendOnlyStreamWriter.position());
        appendOnlyStreamWriter.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testPositionUpdatesOnlyAfterWriteCompletionWithoutFsync() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(1000);
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setOutputBufferSize(1048576);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        byte[] bytes = DLMTestUtil.repeatString("abc", 11).getBytes();
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        Assert.assertEquals(0L, appendOnlyStreamWriter.position());
        Utils.ioResult(appendOnlyStreamWriter.write(bytes));
        Thread.sleep(100L);
        Assert.assertEquals(33L, appendOnlyStreamWriter.position());
        appendOnlyStreamWriter.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testWriterStartsAtTxidZeroForEmptyStream() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setImmediateFlushEnabled(true);
        distributedLogConfiguration.setOutputBufferSize(1024);
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        Utils.ioResult(createNewDLM.getWriterMetadataStore().getLog(createDLMURI("/" + methodName), methodName, true, true));
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        Utils.ioResult(appendOnlyStreamWriter.write(DLMTestUtil.repeatString("a", 1025).getBytes()));
        appendOnlyStreamWriter.close();
        createNewDLM.close();
    }

    @Test(timeout = 60000)
    public void testOffsetGapAfterSegmentWriterFailure() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(60000);
        distributedLogConfiguration.setOutputBufferSize(1048576);
        distributedLogConfiguration.setLogSegmentSequenceNumberValidationEnabled(false);
        Assert.assertEquals(105L, writeRecordsAndReadThemBackAfterInjectingAFailedTransmit(distributedLogConfiguration, methodName, 5, 10));
    }

    @Test(timeout = 60000)
    public void testNoOffsetGapAfterSegmentWriterFailure() throws Exception {
        String methodName = this.testNames.getMethodName();
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.setImmediateFlushEnabled(false);
        distributedLogConfiguration.setPeriodicFlushFrequencyMilliSeconds(60000);
        distributedLogConfiguration.setOutputBufferSize(1048576);
        distributedLogConfiguration.setDisableRollingOnLogSegmentError(true);
        try {
            writeRecordsAndReadThemBackAfterInjectingAFailedTransmit(distributedLogConfiguration, methodName, 5, 10);
            Assert.fail("should have thrown");
        } catch (BKTransmitException e) {
        }
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, methodName);
        long lastTxId = createNewDLM.getLastTxId();
        Assert.assertEquals(lastTxId, read(createNewDLM, lastTxId));
    }

    long writeRecordsAndReadThemBackAfterInjectingAFailedTransmit(DistributedLogConfiguration distributedLogConfiguration, String str, int i, int i2) throws Exception {
        BKDistributedLogManager createNewDLM = createNewDLM(distributedLogConfiguration, str);
        Utils.ioResult(createNewDLM.getWriterMetadataStore().getLog(createDLMURI("/" + str), str, true, true));
        AppendOnlyStreamWriter appendOnlyStreamWriter = createNewDLM.getAppendOnlyStreamWriter();
        byte[] bytes = DLMTestUtil.repeatString("A", i).getBytes();
        for (int i3 = 0; i3 < i2; i3++) {
            appendOnlyStreamWriter.write(bytes);
        }
        appendOnlyStreamWriter.force(false);
        Assert.assertEquals(1 * i2 * i, read(createNewDLM, 1 * i2 * i));
        for (int i4 = 0; i4 < i2; i4++) {
            try {
                appendOnlyStreamWriter.write(bytes);
            } catch (Throwable th) {
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
                throw th;
            }
        }
        try {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer, FailpointUtils.FailPointActions.FailPointAction_Throw);
            appendOnlyStreamWriter.force(false);
            Assert.fail("should have thown ⊙﹏⊙");
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
        } catch (WriteException e) {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
        }
        appendOnlyStreamWriter.write(bytes);
        for (int i5 = 0; i5 < i2; i5++) {
            appendOnlyStreamWriter.write(bytes);
        }
        appendOnlyStreamWriter.force(false);
        appendOnlyStreamWriter.markEndOfStream();
        appendOnlyStreamWriter.close();
        long lastTxId = createNewDLM.getLastTxId();
        Assert.assertEquals((3 * i2 * i) + 5, lastTxId);
        long read = read(createNewDLM, lastTxId);
        createNewDLM.close();
        return read;
    }

    long read(DistributedLogManager distributedLogManager, long j) throws Exception {
        AppendOnlyStreamReader appendOnlyStreamReader = distributedLogManager.getAppendOnlyStreamReader();
        byte[] bArr = new byte[1];
        long j2 = 0;
        while (j2 < j) {
            try {
                try {
                    j2 += appendOnlyStreamReader.read(bArr, 0, 1);
                } catch (EndOfStreamException e) {
                    LOG.info("Caught ex", (Throwable) e);
                    appendOnlyStreamReader.close();
                }
            } finally {
                appendOnlyStreamReader.close();
            }
        }
        return j2;
    }
}
