package org.apache.kafka.connect.file;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/connect/file/FileStreamSourceTaskTest.class */
public class FileStreamSourceTaskTest {
    private static final String TOPIC = "test";
    private File tempFile;
    private Map<String, String> config;
    private OffsetStorageReader offsetStorageReader;
    private SourceTaskContext context;
    private FileStreamSourceTask task;

    @BeforeEach
    public void setup() throws IOException {
        this.tempFile = File.createTempFile("file-stream-source-task-test", null);
        this.config = new HashMap();
        this.config.put("file", this.tempFile.getAbsolutePath());
        this.config.put("topic", TOPIC);
        this.config.put("batch.size", String.valueOf(2000));
        this.task = new FileStreamSourceTask(2);
        this.offsetStorageReader = (OffsetStorageReader) Mockito.mock(OffsetStorageReader.class);
        this.context = (SourceTaskContext) Mockito.mock(SourceTaskContext.class);
        this.task.initialize(this.context);
    }

    @AfterEach
    public void teardown() {
        this.tempFile.delete();
    }

    @Test
    public void testNormalLifecycle() throws InterruptedException, IOException {
        expectOffsetLookupReturnNone();
        this.task.start(this.config);
        OutputStream newOutputStream = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        Assertions.assertNull(this.task.poll());
        newOutputStream.write("partial line".getBytes());
        newOutputStream.flush();
        Assertions.assertNull(this.task.poll());
        newOutputStream.write(" finished\n".getBytes());
        newOutputStream.flush();
        List poll = this.task.poll();
        Assertions.assertEquals(1, poll.size());
        Assertions.assertEquals(TOPIC, ((SourceRecord) poll.get(0)).topic());
        Assertions.assertEquals("partial line finished", ((SourceRecord) poll.get(0)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll.get(0)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 22L), ((SourceRecord) poll.get(0)).sourceOffset());
        Assertions.assertNull(this.task.poll());
        newOutputStream.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
        newOutputStream.flush();
        List poll2 = this.task.poll();
        Assertions.assertEquals(4, poll2.size());
        Assertions.assertEquals("line1", ((SourceRecord) poll2.get(0)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(0)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 28L), ((SourceRecord) poll2.get(0)).sourceOffset());
        Assertions.assertEquals("line2", ((SourceRecord) poll2.get(1)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(1)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 35L), ((SourceRecord) poll2.get(1)).sourceOffset());
        Assertions.assertEquals("line3", ((SourceRecord) poll2.get(2)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(2)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 41L), ((SourceRecord) poll2.get(2)).sourceOffset());
        Assertions.assertEquals("line4", ((SourceRecord) poll2.get(3)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll2.get(3)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 47L), ((SourceRecord) poll2.get(3)).sourceOffset());
        newOutputStream.write("subsequent text".getBytes());
        newOutputStream.flush();
        List poll3 = this.task.poll();
        Assertions.assertEquals(1, poll3.size());
        Assertions.assertEquals("", ((SourceRecord) poll3.get(0)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), ((SourceRecord) poll3.get(0)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 48L), ((SourceRecord) poll3.get(0)).sourceOffset());
        newOutputStream.close();
        this.task.stop();
        verifyAll();
    }

    @Test
    public void testBatchSize() throws IOException, InterruptedException {
        expectOffsetLookupReturnNone();
        this.config.put("batch.size", "5000");
        this.task.start(this.config);
        OutputStream newOutputStream = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        writeTimesAndFlush(newOutputStream, 10000, "Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...\n".getBytes());
        Assertions.assertEquals(2, this.task.bufferSize());
        Assertions.assertEquals(5000, this.task.poll().size());
        Assertions.assertEquals(128, this.task.bufferSize());
        Assertions.assertEquals(5000, this.task.poll().size());
        Assertions.assertEquals(128, this.task.bufferSize());
        newOutputStream.close();
        this.task.stop();
        verifyAll();
    }

    @Test
    public void testBufferResize() throws IOException, InterruptedException {
        expectOffsetLookupReturnNone();
        this.config.put("batch.size", Integer.toString(1000));
        this.task.start(this.config);
        OutputStream newOutputStream = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        Assertions.assertEquals(2, this.task.bufferSize());
        writeAndAssertBufferSize(1000, newOutputStream, "1\n".getBytes(), 2);
        writeAndAssertBufferSize(1000, newOutputStream, "3 \n".getBytes(), 4);
        writeAndAssertBufferSize(1000, newOutputStream, "7     \n".getBytes(), 8);
        writeAndAssertBufferSize(1000, newOutputStream, "8      \n".getBytes(), 8);
        writeAndAssertBufferSize(1000, newOutputStream, "9       \n".getBytes(), 16);
        byte[] bArr = new byte[1025];
        Arrays.fill(bArr, (byte) 42);
        bArr[bArr.length - 1] = 10;
        writeAndAssertBufferSize(1000, newOutputStream, bArr, 2048);
        writeAndAssertBufferSize(1000, newOutputStream, "9       \n".getBytes(), 2048);
        newOutputStream.close();
        this.task.stop();
        verifyAll();
    }

    private void writeAndAssertBufferSize(int i, OutputStream outputStream, byte[] bArr, int i2) throws IOException, InterruptedException {
        writeTimesAndFlush(outputStream, i, bArr);
        List poll = this.task.poll();
        Assertions.assertEquals(i, poll.size());
        String str = new String(bArr, 0, bArr.length - 1);
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(str, ((SourceRecord) it.next()).value());
        }
        Assertions.assertEquals(i2, this.task.bufferSize());
    }

    private void writeTimesAndFlush(OutputStream outputStream, int i, byte[] bArr) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            outputStream.write(bArr);
        }
        outputStream.flush();
    }

    @Test
    public void testUsingSystemInputSourceOnMissingFile() throws InterruptedException {
        System.setIn(new ByteArrayInputStream("line\n".getBytes()));
        this.config.remove("file");
        this.task.start(this.config);
        List poll = this.task.poll();
        Assertions.assertEquals(1, poll.size());
        Assertions.assertEquals(TOPIC, ((SourceRecord) poll.get(0)).topic());
        Assertions.assertEquals("line", ((SourceRecord) poll.get(0)).value());
        this.task.stop();
    }

    @Test
    public void testInvalidFile() throws InterruptedException {
        this.config.put("file", "bogusfilename");
        this.task.start(this.config);
        for (int i = 0; i < 3; i++) {
            Assertions.assertNull(this.task.poll());
        }
    }

    private void expectOffsetLookupReturnNone() {
        Mockito.when(this.context.offsetStorageReader()).thenReturn(this.offsetStorageReader);
        Mockito.when(this.offsetStorageReader.offset(ArgumentMatchers.anyMap())).thenReturn((Object) null);
    }

    private void verifyAll() {
        ((SourceTaskContext) Mockito.verify(this.context)).offsetStorageReader();
        ((OffsetStorageReader) Mockito.verify(this.offsetStorageReader)).offset(ArgumentMatchers.anyMap());
    }
}
