package kafka.admin;

import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(serverProperties = {@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1"), @ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"), @ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"), @ClusterConfigProperty(key = "transaction.abort.timed.out.transaction.cleanup.interval.ms", value = "2000")})
/* loaded from: input_file:kafka/admin/AdminFenceProducersTest.class */
public class AdminFenceProducersTest {
    private static final String TXN_ID = "mytxnid";
    private static final String INCORRECT_BROKER_PORT = "225";
    private final ClusterInstance clusterInstance;
    private static final String TOPIC_NAME = "mytopic";
    private static final ProducerRecord<byte[], byte[]> RECORD = new ProducerRecord<>(TOPIC_NAME, (Object) null, new byte[1]);

    AdminFenceProducersTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    private KafkaProducer<byte[], byte[]> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.clusterInstance.bootstrapServers());
        properties.put("transactional.id", TXN_ID);
        properties.put("transaction.timeout.ms", "2000");
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        return new KafkaProducer<>(properties);
    }

    @ClusterTest
    void testFenceAfterProducerCommit() throws Exception {
        this.clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
        KafkaProducer<byte[], byte[]> createProducer = createProducer();
        try {
            Admin admin = this.clusterInstance.admin();
            try {
                createProducer.initTransactions();
                createProducer.beginTransaction();
                createProducer.send(RECORD).get();
                createProducer.commitTransaction();
                admin.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
                createProducer.beginTransaction();
                Assertions.assertInstanceOf(InvalidProducerEpochException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    createProducer.send(RECORD).get();
                }, "expected InvalidProducerEpochException")).getCause());
                Objects.requireNonNull(createProducer);
                Assertions.assertThrows(InvalidProducerEpochException.class, createProducer::commitTransaction);
                if (admin != null) {
                    admin.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    void testFenceProducerTimeoutMs() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:225");
        Admin admin = this.clusterInstance.admin(hashMap);
        try {
            Assertions.assertInstanceOf(TimeoutException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                admin.fenceProducers(Collections.singletonList(TXN_ID), new FenceProducersOptions().timeoutMs(0)).all().get();
            })).getCause());
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @ClusterTest
    void testFenceBeforeProducerCommit() throws Exception {
        this.clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
        KafkaProducer<byte[], byte[]> createProducer = createProducer();
        try {
            Admin admin = this.clusterInstance.admin();
            try {
                createProducer.initTransactions();
                createProducer.beginTransaction();
                createProducer.send(RECORD).get();
                admin.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
                ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
                    createProducer.send(RECORD).get();
                }, "expected ProducerFencedException");
                Assertions.assertTrue((executionException.getCause() instanceof ProducerFencedException) || (executionException.getCause() instanceof InvalidProducerEpochException));
                Objects.requireNonNull(createProducer);
                ApiException assertThrows = Assertions.assertThrows(ApiException.class, createProducer::commitTransaction, "Expected Exception");
                Assertions.assertTrue((assertThrows instanceof ProducerFencedException) || (assertThrows instanceof InvalidProducerEpochException));
                if (admin != null) {
                    admin.close();
                }
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
