package kafka.log.remote;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Option;

/* loaded from: input_file:kafka/log/remote/RemoteLogOffsetReaderTest.class */
class RemoteLogOffsetReaderTest {
    private final MockTime time = new MockTime();
    private final TopicPartition topicPartition = new TopicPartition(RemoteLogReaderTest.TOPIC, 0);
    private Path logDir;
    private LeaderEpochFileCache cache;
    private MockRemoteLogManager rlm;

    /* loaded from: input_file:kafka/log/remote/RemoteLogOffsetReaderTest$MockRemoteLogManager.class */
    private static class MockRemoteLogManager extends RemoteLogManager {
        private final ReadWriteLock lock;

        public MockRemoteLogManager(int i, int i2, String str) throws IOException {
            super(RemoteLogOffsetReaderTest.rlmConfig(i, i2), 1, str, "mock-cluster-id", new MockTime(), topicPartition -> {
                return Optional.empty();
            }, (topicPartition2, l) -> {
            }, new BrokerTopicStats(true), new Metrics());
            this.lock = new ReentrantReadWriteLock();
        }

        public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicPartition topicPartition, long j, long j2, LeaderEpochFileCache leaderEpochFileCache) throws RemoteStorageException {
            this.lock.readLock().lock();
            try {
                Optional<FileRecords.TimestampAndOffset> of = Optional.of(new FileRecords.TimestampAndOffset(100L, 90L, Optional.of(3)));
                this.lock.readLock().unlock();
                return of;
            } catch (Throwable th) {
                this.lock.readLock().unlock();
                throw th;
            }
        }

        void pause() {
            this.lock.writeLock().lock();
        }

        void resume() {
            this.lock.writeLock().unlock();
        }
    }

    RemoteLogOffsetReaderTest() {
    }

    @BeforeEach
    void setUp() throws IOException {
        this.logDir = Files.createTempDirectory("kafka-test", new FileAttribute[0]);
        this.cache = new LeaderEpochFileCache(this.topicPartition, new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)), this.time.scheduler);
        this.rlm = new MockRemoteLogManager(2, 1, this.logDir.toString());
    }

    @AfterEach
    void tearDown() throws IOException {
        this.rlm.close();
        Utils.delete(this.logDir.toFile());
    }

    @Test
    public void testReadRemoteLog() throws Exception {
        AsyncOffsetReadFutureHolder asyncOffsetRead = this.rlm.asyncOffsetRead(this.topicPartition, Long.valueOf(this.time.milliseconds()), 0L, this.cache, Option::empty);
        asyncOffsetRead.taskFuture().get(1L, TimeUnit.SECONDS);
        Assertions.assertTrue(asyncOffsetRead.taskFuture().isDone());
        OffsetResultHolder.FileRecordsOrError fileRecordsOrError = (OffsetResultHolder.FileRecordsOrError) asyncOffsetRead.taskFuture().get();
        Assertions.assertFalse(fileRecordsOrError.hasException());
        Assertions.assertTrue(fileRecordsOrError.hasTimestampAndOffset());
        Assertions.assertEquals(new FileRecords.TimestampAndOffset(100L, 90L, Optional.of(3)), fileRecordsOrError.timestampAndOffset().get());
    }

    @Test
    public void testTaskQueueFullAndCancelTask() throws Exception {
        this.rlm.pause();
        ArrayList<AsyncOffsetReadFutureHolder> arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            arrayList.add(this.rlm.asyncOffsetRead(this.topicPartition, Long.valueOf(this.time.milliseconds()), 0L, this.cache, Option::empty));
        }
        Assertions.assertThrows(TimeoutException.class, () -> {
            ((AsyncOffsetReadFutureHolder) arrayList.get(0)).taskFuture().get(10L, TimeUnit.MILLISECONDS);
        });
        Assertions.assertEquals(0L, arrayList.stream().filter(asyncOffsetReadFutureHolder -> {
            return asyncOffsetReadFutureHolder.taskFuture().isDone();
        }).count());
        Assertions.assertThrows(RejectedExecutionException.class, () -> {
            arrayList.add(this.rlm.asyncOffsetRead(this.topicPartition, Long.valueOf(this.time.milliseconds()), 0L, this.cache, Option::empty));
        });
        ((AsyncOffsetReadFutureHolder) arrayList.get(2)).jobFuture().cancel(false);
        this.rlm.resume();
        for (AsyncOffsetReadFutureHolder asyncOffsetReadFutureHolder2 : arrayList) {
            if (!asyncOffsetReadFutureHolder2.jobFuture().isCancelled()) {
                asyncOffsetReadFutureHolder2.taskFuture().get(1L, TimeUnit.SECONDS);
            }
        }
        Assertions.assertEquals(3, arrayList.size());
        Assertions.assertEquals(2L, arrayList.stream().filter(asyncOffsetReadFutureHolder3 -> {
            return asyncOffsetReadFutureHolder3.taskFuture().isDone();
        }).count());
        Assertions.assertEquals(1L, arrayList.stream().filter(asyncOffsetReadFutureHolder4 -> {
            return !asyncOffsetReadFutureHolder4.taskFuture().isDone();
        }).count());
    }

    @Test
    public void testThrowErrorOnFindOffsetByTimestamp() throws Exception {
        final RemoteStorageException remoteStorageException = new RemoteStorageException("Error");
        MockRemoteLogManager mockRemoteLogManager = new MockRemoteLogManager(2, 1, this.logDir.toString()) { // from class: kafka.log.remote.RemoteLogOffsetReaderTest.1
            @Override // kafka.log.remote.RemoteLogOffsetReaderTest.MockRemoteLogManager
            public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicPartition topicPartition, long j, long j2, LeaderEpochFileCache leaderEpochFileCache) throws RemoteStorageException {
                throw remoteStorageException;
            }
        };
        try {
            AsyncOffsetReadFutureHolder asyncOffsetRead = mockRemoteLogManager.asyncOffsetRead(this.topicPartition, Long.valueOf(this.time.milliseconds()), 0L, this.cache, Option::empty);
            asyncOffsetRead.taskFuture().get(1L, TimeUnit.SECONDS);
            Assertions.assertTrue(asyncOffsetRead.taskFuture().isDone());
            Assertions.assertTrue(((OffsetResultHolder.FileRecordsOrError) asyncOffsetRead.taskFuture().get()).hasException());
            Assertions.assertEquals(remoteStorageException, ((OffsetResultHolder.FileRecordsOrError) asyncOffsetRead.taskFuture().get()).exception().get());
            mockRemoteLogManager.close();
        } catch (Throwable th) {
            try {
                mockRemoteLogManager.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static RemoteLogManagerConfig rlmConfig(int i, int i2) {
        Properties properties = new Properties();
        properties.put("remote.log.storage.system.enable", "true");
        properties.put("remote.log.storage.manager.class.name", "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager");
        properties.put("remote.log.metadata.manager.class.name", "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager");
        properties.put("remote.log.reader.threads", Integer.valueOf(i));
        properties.put("remote.log.reader.max.pending.tasks", Integer.valueOf(i2));
        return new RemoteLogManagerConfig(new AbstractConfig(RemoteLogManagerConfig.configDef(), properties, false));
    }
}
