package org.apache.kafka.connect.file.integration;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.file.FileStreamSinkConnector;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
/* loaded from: input_file:org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.class */
public class FileStreamSinkConnectorIntegrationTest {
    private static final String CONNECTOR_NAME = "test-connector";
    private static final String TOPIC = "test-topic";
    private static final String MESSAGE_PREFIX = "Message ";
    private static final int NUM_MESSAGES = 5;
    private static final String FILE_NAME = "test-file";
    private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build();

    @BeforeEach
    public void setup() {
        this.connect.start();
        this.connect.kafka().createTopic(TOPIC);
        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
    }

    @AfterEach
    public void tearDown() {
        this.connect.stop();
    }

    @Test
    public void testSimpleSink() throws Exception {
        Path resolve = TestUtils.tempDirectory().toPath().resolve(FILE_NAME);
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(TOPIC, resolve.toString()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        verifyLinesInFile(resolve, NUM_MESSAGES, true);
    }

    @Test
    public void testAlterOffsets() throws Exception {
        Path resolve = TestUtils.tempDirectory().toPath().resolve(FILE_NAME);
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(TOPIC, resolve.toString()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        verifyLinesInFile(resolve, NUM_MESSAGES, true);
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        this.connect.alterSinkConnectorOffset(CONNECTOR_NAME, new TopicPartition(TOPIC, 0), 4L);
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not resume in time");
        verifyLinesInFile(resolve, 6, false);
    }

    @Test
    public void testResetOffsets() throws Exception {
        Path resolve = TestUtils.tempDirectory().toPath().resolve(FILE_NAME);
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs(TOPIC, resolve.toString()));
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not start in time");
        verifyLinesInFile(resolve, NUM_MESSAGES, true);
        this.connect.stopConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time");
        this.connect.resetConnectorOffsets(CONNECTOR_NAME);
        this.connect.resumeConnector(CONNECTOR_NAME);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, "Connector and task did not resume in time");
        verifyLinesInFile(resolve, 10, false);
    }

    @Test
    public void testSinkMultipleTopicsWithMultipleTasks() throws Exception {
        this.connect.kafka().createTopic("test-topic-2");
        produceMessagesToTopic("test-topic-2", NUM_MESSAGES);
        Path resolve = TestUtils.tempDirectory().toPath().resolve(FILE_NAME);
        Map<String, String> baseConnectorConfigs = baseConnectorConfigs("test-topic," + "test-topic-2", resolve.toString());
        baseConnectorConfigs.put("tasks.max", "2");
        this.connect.configureConnector(CONNECTOR_NAME, baseConnectorConfigs);
        this.connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2, "Connector and task did not start in time");
        verifyLinesInFile(resolve, 10, false);
    }

    private void produceMessagesToTopic(String str, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            this.connect.kafka().produce(str, "Message " + i2);
        }
    }

    private Map<String, String> baseConnectorConfigs(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", FileStreamSinkConnector.class.getName());
        hashMap.put("topics", str);
        hashMap.put("file", str2);
        return hashMap;
    }

    private void verifyLinesInFile(Path path, int i, boolean z) throws Exception {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(Files.newInputStream(path, new OpenOption[0])));
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            TestUtils.waitForCondition(() -> {
                bufferedReader.lines().forEach(str -> {
                    if (z) {
                        Assertions.assertEquals("Message " + String.valueOf(atomicInteger), str);
                    } else {
                        Assertions.assertTrue(str.startsWith(MESSAGE_PREFIX));
                    }
                    atomicInteger.getAndIncrement();
                });
                return atomicInteger.get() >= i;
            }, "Expected to read " + i + " lines from the file");
            bufferedReader.close();
            Assertions.assertEquals(i, Files.readAllLines(path).size());
        } catch (Throwable th) {
            try {
                bufferedReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
