package kafka.coordinator.transaction;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import kafka.log.remote.RemoteLogReaderTest;
import kafka.network.SocketServer;
import kafka.server.IntegrationTestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterFeature;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import scala.Array$;
import scala.Predef$;
import scala.collection.SeqOps;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ProducerIntegrationTest.scala */
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {@ClusterConfigProperty(key = "transaction.state.log.replication.factor", value = "1"), @ClusterConfigProperty(key = "transaction.state.log.num.partitions", value = "1"), @ClusterConfigProperty(key = "transaction.state.log.min.isr", value = "1"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1")})
@ScalaSignature(bytes = "\u0006\u0005\u0005\rd\u0001\u0002\u0005\n\u0001AAQa\u0006\u0001\u0005\u0002aAQa\u0007\u0001\u0005\u0002qAQ!\u0012\u0001\u0005\u0002\u0019CQA\u0019\u0001\u0005\u0002\rDQ!\u001c\u0001\u0005\u00029DQ\u0001\u001f\u0001\u0005\neDQa\u001f\u0001\u0005\nq\u0014q\u0003\u0015:pIV\u001cWM]%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u0005)Y\u0011a\u0003;sC:\u001c\u0018m\u0019;j_:T!\u0001D\u0007\u0002\u0017\r|wN\u001d3j]\u0006$xN\u001d\u0006\u0002\u001d\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0012!\t\u0011R#D\u0001\u0014\u0015\u0005!\u0012!B:dC2\f\u0017B\u0001\f\u0014\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012!\u0007\t\u00035\u0001i\u0011!C\u0001\u0016i\u0016\u001cH/\u00168jcV,\u0007K]8ek\u000e,'/\u00133t)\ti\u0002\u0005\u0005\u0002\u0013=%\u0011qd\u0005\u0002\u0005+:LG\u000fC\u0003\"\u0005\u0001\u0007!%A\bdYV\u001cH/\u001a:J]N$\u0018M\\2f!\t\u0019S&D\u0001%\u0015\t)c%\u0001\u0003uKN$(BA\u0014)\u0003\u0019\u0019w.\\7p]*\u0011a\"\u000b\u0006\u0003U-\na!\u00199bG\",'\"\u0001\u0017\u0002\u0007=\u0014x-\u0003\u0002/I\ty1\t\\;ti\u0016\u0014\u0018J\\:uC:\u001cW\r\u000b\u0003\u0003aY:\u0004CA\u00195\u001b\u0005\u0011$BA\u001a%\u0003\r\t\u0007/[\u0005\u0003kI\u0012Ab\u00117vgR,'\u000fV3tiN\fQA^1mk\u0016d\u0013\u0001O\u0016\u0004sqj\u0004CA\u0019;\u0013\tY$GA\u0006DYV\u001cH/\u001a:UKN$\u0018aD7fi\u0006$\u0017\r^1WKJ\u001c\u0018n\u001c8%\u0003yJ!a\u0010!\u0002\u0017%\u0013\u0005kX\u001a`g}Kek\r\u0006\u0003\u0003\n\u000bq\"T3uC\u0012\fG/\u0019,feNLwN\u001c\u0006\u0003O\rS!\u0001\u0012\u0015\u0002\rM,'O^3s\u0003\u0005\"Xm\u001d;Ue\u0006t7/Y2uS>tw+\u001b;i\u0003:$w+\u001b;i_V$8+\u001a8e)\tir\tC\u0003I\u0007\u0001\u0007!%A\u0004dYV\u001cH/\u001a:)\t\r\u0001dG\u0013\u0017\u0004\u0017js6fA\u001dM\u001b\u0006Aa-Z1ukJ,7\u000fL\u0001OW\u0015y%k\u0015-Z!\t\t\u0004+\u0003\u0002Re\tq1\t\\;ti\u0016\u0014h)Z1ukJ,\u0017a\u00024fCR,(/\u001a\u0013\u0002)&\u0011QKV\u0001\u0014)J\u000bejU!D)&{ej\u0018,F%NKuJ\u0014\u0006\u0003/\n\u000bqAR3biV\u0014X-A\u0004wKJ\u001c\u0018n\u001c8\u001c\u0003\u0001Y3!\u000f'\\Y\u0005a6&B(S'bk6$A\u0001,\u0007ebu\fL\u0001aW\u0015y%k\u0015-b7\u0005\u0011\u0011A\r;fgR$&/\u00198tC\u000e$\u0018n\u001c8XSRD\u0017J\u001c<bY&$7+\u001a8e\u0003:$WI\u001c3Uq:\u0014V-];fgR\u001cVM\u001c;\u0015\u0005u!\u0007\"\u0002%\u0005\u0001\u0004\u0011\u0003\u0006\u0002\u00031m\u0019d3aZ5lW\rID\n\u001b\u0017\u0002\u001d.\u001a\u0011\b\u00146-\u0003q[3!\u000f'mY\u0005\u0001\u0017!\b;fgR$&/\u00198tC\u000e$\u0018n\u001c8XSRD7+\u001a8e\u001f\u001a47/\u001a;\u0015\u0005uy\u0007\"\u0002%\u0006\u0001\u0004\u0011\u0003\u0006B\u00031mEd3A\u001d;wW\rIDj\u001d\u0017\u0002\u001d.\u001a\u0011\bT;-\u0003q[3!\u000f'xY\u0005\u0001\u0017a\u0004<fe&4\u00170\u00168jcV,\u0017\nZ:\u0015\u0005uQ\b\"B\u0011\u0007\u0001\u0004\u0011\u0013A\u00048fqR\u0004&o\u001c3vG\u0016\u0014\u0018\n\u001a\u000b\u0006{\u0006\u0005\u0011\u0011\u0003\t\u0003%yL!a`\n\u0003\t1{gn\u001a\u0005\b\u0003\u00079\u0001\u0019AA\u0003\u0003\u0019\u0011'o\\6feB!\u0011qAA\u0007\u001b\t\tIAC\u0002\u0002\f5\tqA\\3uo>\u00148.\u0003\u0003\u0002\u0010\u0005%!\u0001D*pG.,GoU3sm\u0016\u0014\bbBA\n\u000f\u0001\u0007\u0011QC\u0001\tY&\u001cH/\u001a8feB!\u0011qCA\u000e\u001b\t\tIBC\u0002\u0002\f\u0019JA!!\b\u0002\u001a\taA*[:uK:,'OT1nK\"Z\u0001!!\t\u0002(\u0005%\u0012QGA\u001c!\r\t\u00141E\u0005\u0004\u0003K\u0011$aE\"mkN$XM\u001d+fgR$UMZ1vYR\u001c\u0018!\u0002;za\u0016\u001cHFAA\u0016I\t\ti#\u0003\u0003\u00020\u0005E\u0012!B&S\u0003\u001a#&bAA\u001ae\u0005!A+\u001f9f\u0003A\u0019XM\u001d<feB\u0013x\u000e]3si&,7\u000f\f\u0006\u0002:\u0005-\u0013\u0011KA,\u0003;Z\u0013\"a\u000f\u0002B\u0005\rc'a\u0012\u0011\u0007E\ni$C\u0002\u0002@I\u0012Qc\u00117vgR,'oQ8oM&<\u0007K]8qKJ$\u00180A\u0002lKf\f#!!\u0012\u0002QQ\u0014\u0018M\\:bGRLwN\u001c\u0018ti\u0006$XM\f7pO:\u0012X\r\u001d7jG\u0006$\u0018n\u001c8/M\u0006\u001cGo\u001c:\"\u0005\u0005%\u0013!A\u0019,\u0013\u0005m\u0012\u0011IA'm\u0005\u001d\u0013EAA(\u0003\u0011\"(/\u00198tC\u000e$\u0018n\u001c8/gR\fG/\u001a\u0018m_\u001etc.^7/a\u0006\u0014H/\u001b;j_:\u001c8&CA\u001e\u0003\u0003\n\u0019FNA$C\t\t)&A\u000fue\u0006t7/Y2uS>tgf\u001d;bi\u0016tCn\\4/[&tg&[:sW%\tY$!\u0011\u0002ZY\n9%\t\u0002\u0002\\\u0005\u0001sN\u001a4tKR\u001ch\u0006^8qS\u000et#/\u001a9mS\u000e\fG/[8o]\u0019\f7\r^8sW%\tY$!\u0011\u0002`Y\n9%\t\u0002\u0002b\u0005arN\u001a4tKR\u001ch\u0006^8qS\u000etc.^7/a\u0006\u0014H/\u001b;j_:\u001c\b")
/* loaded from: input_file:kafka/coordinator/transaction/ProducerIntegrationTest.class */
public class ProducerIntegrationTest {
    @ClusterTests({@ClusterTest(metadataVersion = MetadataVersion.IBP_3_3_IV3)})
    public void testUniqueProducerIds(ClusterInstance clusterInstance) {
        verifyUniqueIds(clusterInstance);
    }

    @ClusterTests({@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0)}), @ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)}), @ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})})
    public void testTransactionWithAndWithoutSend(ClusterInstance clusterInstance) {
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "foobar");
        hashMap.put("client.id", RemoteLogReaderTest.TOPIC);
        hashMap.put("enable.idempotence", "true");
        Producer producer = clusterInstance.producer(hashMap);
        try {
            producer.initTransactions();
            producer.beginTransaction();
            producer.send(new ProducerRecord(RemoteLogReaderTest.TOPIC, "key".getBytes(), "value".getBytes()));
            producer.commitTransaction();
            producer.beginTransaction();
            producer.commitTransaction();
            producer.close();
        } catch (Throwable th) {
            if (producer != null) {
                producer.close();
            }
            throw th;
        }
    }

    @ClusterTests({@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0)}), @ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)}), @ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})})
    public void testTransactionWithInvalidSendAndEndTxnRequestSent(ClusterInstance clusterInstance) {
        NewTopic configs = new NewTopic("foobar", 1, (short) 1).configs(Collections.singletonMap("max.message.bytes", "100"));
        HashMap hashMap = new HashMap();
        hashMap.put("transactional.id", "test-txn");
        hashMap.put("client.id", RemoteLogReaderTest.TOPIC);
        hashMap.put("enable.idempotence", "true");
        Admin admin = clusterInstance.admin();
        Producer producer = clusterInstance.producer(hashMap);
        try {
            admin.createTopics(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(configs, Nil$.MODULE$)).asJava());
            producer.initTransactions();
            producer.beginTransaction();
            Assertions.assertInstanceOf(RecordTooLargeException.class, Assertions.assertThrows(ExecutionException.class, () -> {
                producer.send(new ProducerRecord(configs.name(), Array$.MODULE$.fill(100, () -> {
                    return (byte) 0;
                }, ClassTag$.MODULE$.Byte()), Array$.MODULE$.fill(100, () -> {
                    return (byte) 0;
                }, ClassTag$.MODULE$.Byte()))).get();
            }).getCause());
            producer.abortTransaction();
            admin.close();
            producer.close();
        } catch (Throwable th) {
            if (admin != null) {
                admin.close();
            }
            if (producer != null) {
                producer.close();
            }
            throw th;
        }
    }

    @ClusterTests({@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 0)}), @ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)}), @ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})})
    public void testTransactionWithSendOffset(ClusterInstance clusterInstance) {
        String str = "my-input-topic";
        ObjectRef create = ObjectRef.create(clusterInstance.producer());
        try {
            RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).foreach(obj -> {
                return $anonfun$testTransactionWithSendOffset$1(create, str, BoxesRunTime.unboxToInt(obj));
            });
            if (((Producer) create.elem) != null) {
                ((Producer) create.elem).close();
            }
            String str2 = "foobar";
            HashMap hashMap = new HashMap();
            hashMap.put("transactional.id", "foobar");
            hashMap.put("client.id", RemoteLogReaderTest.TOPIC);
            hashMap.put("enable.idempotence", "true");
            HashMap hashMap2 = new HashMap();
            hashMap2.put("group.id", "test-consumer-group");
            hashMap2.put("auto.offset.reset", "earliest");
            create.elem = clusterInstance.producer(hashMap);
            Consumer consumer = clusterInstance.consumer(hashMap2);
            try {
                ((Producer) create.elem).initTransactions();
                ((Producer) create.elem).beginTransaction();
                consumer.subscribe(List.of("my-input-topic"));
                ObjectRef create2 = ObjectRef.create((Object) null);
                TestUtils.waitForCondition(() -> {
                    create2.elem = consumer.poll(Duration.ZERO);
                    return Predef$.MODULE$.boolean2Boolean(((ConsumerRecords) create2.elem).count() == 5);
                }, 15000L, "poll records size not match");
                ConsumerRecord consumerRecord = (ConsumerRecord) StreamSupport.stream(((ConsumerRecords) create2.elem).spliterator(), false).reduce((consumerRecord2, consumerRecord3) -> {
                    return consumerRecord3;
                }).orElse(null);
                ((Producer) create.elem).sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1)), consumer.groupMetadata());
                ((Producer) create.elem).commitTransaction();
                consumer.close();
                Admin admin = clusterInstance.admin();
                try {
                    TestUtils.waitForCondition(() -> {
                        return Predef$.MODULE$.boolean2Boolean(((Collection) admin.listTransactions().all().get()).stream().filter(transactionListing -> {
                            String transactionalId = transactionListing.transactionalId();
                            return transactionalId == null ? str2 == null : transactionalId.equals(str2);
                        }).anyMatch(transactionListing2 -> {
                            return transactionListing2.state() == TransactionState.COMPLETE_COMMIT;
                        }));
                    }, 15000L, "transaction is not in COMPLETE_COMMIT state");
                } finally {
                    if (admin != null) {
                        admin.close();
                    }
                }
            } catch (Throwable th) {
                if (((Producer) create.elem) != null) {
                    ((Producer) create.elem).close();
                }
                if (consumer != null) {
                    consumer.close();
                }
                throw th;
            }
        } finally {
            if (((Producer) create.elem) != null) {
                ((Producer) create.elem).close();
            }
        }
    }

    private void verifyUniqueIds(ClusterInstance clusterInstance) {
        Seq seq = CollectionConverters$.MODULE$.ListHasAsScala((List) clusterInstance.brokerSocketServers().stream().flatMap(socketServer -> {
            return IntStream.range(0, 1001).parallel().mapToObj(i -> {
                return BoxesRunTime.boxToLong($anonfun$verifyUniqueIds$2(this, socketServer, clusterInstance, i));
            });
        }).collect(Collectors.toList())).asScala().toSeq();
        int size = 1001 * clusterInstance.brokerIds().size();
        Assertions.assertEquals(size, seq.size(), "Expected exactly " + size + " IDs");
        Assertions.assertEquals(size, ((SeqOps) seq.distinct()).size(), "Found duplicate producer IDs");
    }

    private long nextProducerId(SocketServer socketServer, ListenerName listenerName) {
        Deadline fromNow = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds().fromNow();
        boolean z = true;
        InitProducerIdResponse initProducerIdResponse = null;
        while (z && fromNow.hasTimeLeft()) {
            initProducerIdResponse = (InitProducerIdResponse) IntegrationTestUtils$.MODULE$.connectAndReceive((InitProducerIdRequest) new InitProducerIdRequest.Builder(new InitProducerIdRequestData().setProducerEpoch((short) -1).setProducerId(-1L).setTransactionalId((String) null).setTransactionTimeoutMs(10)).build(), socketServer, listenerName, ClassTag$.MODULE$.apply(InitProducerIdResponse.class));
            z = initProducerIdResponse.data().errorCode() == Errors.COORDINATOR_LOAD_IN_PROGRESS.code();
        }
        Assertions.assertTrue(fromNow.hasTimeLeft());
        Assertions.assertEquals(Errors.NONE.code(), initProducerIdResponse.data().errorCode());
        return initProducerIdResponse.data().producerId();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testTransactionWithSendOffset$1(ObjectRef objectRef, String str, int i) {
        return (RecordMetadata) ((Producer) objectRef.elem).send(new ProducerRecord(str, ("key-" + i).getBytes(), ("value-" + i).getBytes())).get();
    }

    public static final /* synthetic */ long $anonfun$verifyUniqueIds$2(ProducerIntegrationTest producerIntegrationTest, SocketServer socketServer, ClusterInstance clusterInstance, int i) {
        return producerIntegrationTest.nextProducerId(socketServer, clusterInstance.clientListener());
    }
}
