package kafka.api;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig$;
import kafka.server.Defaults$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: PlaintextProducerSendTest.scala */
@ScalaSignature(bytes = "\u0006\u0001i2AAC\u0006\u0001!!)Q\u0003\u0001C\u0001-!)\u0001\u0004\u0001C\u00013!)A\u0006\u0001C\u00013!)a\u0006\u0001C\u00013!)\u0001\u0007\u0001C\u00013!)!\u0007\u0001C\u00013!)A\u0007\u0001C\u00013!)a\u0007\u0001C\u00013!)\u0001\b\u0001C\u00013\tI\u0002\u000b\\1j]R,\u0007\u0010\u001e)s_\u0012,8-\u001a:TK:$G+Z:u\u0015\taQ\"A\u0002ba&T\u0011AD\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0011\u0003\u0005\u0002\u0013'5\t1\"\u0003\u0002\u0015\u0017\t!\")Y:f!J|G-^2feN+g\u000e\u001a+fgR\fa\u0001P5oSRtD#A\f\u0011\u0005I\u0001\u0011a\u0005;fgR<&o\u001c8h'\u0016\u0014\u0018.\u00197ju\u0016\u0014H#\u0001\u000e\u0011\u0005mqR\"\u0001\u000f\u000b\u0003u\tQa]2bY\u0006L!a\b\u000f\u0003\tUs\u0017\u000e\u001e\u0015\u0003\u0005\u0005\u0002\"A\t\u0016\u000e\u0003\rR!\u0001\u0004\u0013\u000b\u0005\u00152\u0013a\u00026va&$XM\u001d\u0006\u0003O!\nQA[;oSRT\u0011!K\u0001\u0004_J<\u0017BA\u0016$\u0005\u0011!Vm\u001d;\u0002#Q,7\u000f\u001e\"bi\u000eD7+\u001b>f5\u0016\u0014x\u000e\u000b\u0002\u0004C\u0005QC/Z:u'\u0016tGmQ8naJ,7o]3e\u001b\u0016\u001c8/Y4f/&$\b\u000eT8h\u0003B\u0004XM\u001c3US6,\u0007F\u0001\u0003\"\u00035\"Xm\u001d;TK:$gj\u001c8D_6\u0004(/Z:tK\u0012lUm]:bO\u0016<\u0016\u000e\u001e5M_\u001e\f\u0005\u000f]3oIRKW.\u001a\u0015\u0003\u000b\u0005\n1\u0003^3ti\u0006+Ho\\\"sK\u0006$X\rV8qS\u000eD#AB\u0011\u0002;Q,7\u000f^*f]\u0012<\u0016\u000e\u001e5J]Z\fG.\u001b3De\u0016\fG/\u001a+j[\u0016D#aB\u0011\u0002/Q,7\u000f\u001e(p]\ncwnY6j]\u001e\u0004&o\u001c3vG\u0016\u0014\bF\u0001\u0005\"\u00039\"Xm\u001d;TK:$'+Z2pe\u0012\u0014\u0015\r^2i/&$\b.T1y%\u0016\fX/Z:u'&TX-\u00118e\u0011&<\u0007.\u001a:)\u0005%\t\u0003")
/* loaded from: input_file:kafka/api/PlaintextProducerSendTest.class */
public class PlaintextProducerSendTest extends BaseProducerSendTest {
    @Test
    public void testWrongSerializer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokerList());
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<byte[], byte[]> registerProducer = registerProducer(new KafkaProducer<>(properties));
        ProducerRecord producerRecord = new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(0), "key".getBytes(), "value".getBytes());
        Assertions.assertThrows(SerializationException.class, () -> {
            registerProducer.send(producerRecord);
        });
    }

    @Test
    public void testBatchSizeZero() {
        sendAndVerify(createProducer(brokerList(), Integer.MAX_VALUE, Integer.MAX_VALUE, 0, createProducer$default$5(), createProducer$default$6(), createProducer$default$7()), sendAndVerify$default$2(), sendAndVerify$default$3());
    }

    @Test
    public void testSendCompressedMessageWithLogAppendTime() {
        sendAndVerifyTimestamp(createProducer(brokerList(), Integer.MAX_VALUE, Integer.MAX_VALUE, createProducer$default$4(), "gzip", createProducer$default$6(), createProducer$default$7()), TimestampType.LOG_APPEND_TIME);
    }

    @Test
    public void testSendNonCompressedMessageWithLogAppendTime() {
        sendAndVerifyTimestamp(createProducer(brokerList(), Integer.MAX_VALUE, Integer.MAX_VALUE, createProducer$default$4(), createProducer$default$5(), createProducer$default$6(), createProducer$default$7()), TimestampType.LOG_APPEND_TIME);
    }

    @Test
    public void testAutoCreateTopic() {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(brokerList(), createProducer$default$2(), createProducer$default$3(), createProducer$default$4(), createProducer$default$5(), createProducer$default$6(), createProducer$default$7());
        try {
            Assertions.assertEquals(0L, ((RecordMetadata) createProducer.send(new ProducerRecord(topic(), (Integer) null, "key".getBytes(), "value".getBytes())).get()).offset(), "Should have offset 0");
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        } finally {
            createProducer.close();
        }
    }

    @Test
    public void testSendWithInvalidCreateTime() {
        Properties properties = new Properties();
        properties.setProperty(LogConfig$.MODULE$.MessageTimestampDifferenceMaxMsProp(), "1000");
        createTopic(topic(), 1, 2, properties);
        KafkaProducer<byte[], byte[]> createProducer = createProducer(brokerList(), createProducer$default$2(), createProducer$default$3(), createProducer$default$4(), createProducer$default$5(), createProducer$default$6(), createProducer$default$7());
        try {
            Throwable cause = Assertions.assertThrows(ExecutionException.class, () -> {
                createProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(System.currentTimeMillis() - 1001), "key".getBytes(), "value".getBytes())).get();
            }).getCause();
            Assertions.assertTrue(cause instanceof InvalidTimestampException);
            Assertions.assertEquals("One or more records have been rejected due to invalid timestamp", cause.getMessage());
            createProducer.close();
            createProducer = createProducer(brokerList(), createProducer$default$2(), createProducer$default$3(), createProducer$default$4(), "gzip", createProducer$default$6(), createProducer$default$7());
            try {
                Throwable cause2 = Assertions.assertThrows(ExecutionException.class, () -> {
                    createProducer.send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), Predef$.MODULE$.long2Long(System.currentTimeMillis() - 1001), "key".getBytes(), "value".getBytes())).get();
                }).getCause();
                Assertions.assertTrue(cause2 instanceof InvalidTimestampException);
                Assertions.assertEquals("One or more records have been rejected due to invalid timestamp", cause2.getMessage());
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testNonBlockingProducer() {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(brokerList(), createProducer$default$2(), createProducer$default$3(), createProducer$default$4(), createProducer$default$5(), 0L, createProducer$default$7());
        verifyMetadataNotAvailable$1(send$1(createProducer));
        verifySendSuccess$1(sendUntilQueued$1(createProducer));
        KafkaProducer<byte[], byte[]> createProducer2 = createProducer(brokerList(), 15000, createProducer$default$3(), 1100, createProducer$default$5(), 0L, 1500L);
        Future sendUntilQueued$1 = sendUntilQueued$1(createProducer2);
        verifyBufferExhausted$1(send$1(createProducer2));
        verifySendSuccess$1(sendUntilQueued$1);
    }

    @Test
    public void testSendRecordBatchWithMaxRequestSizeAndHigher() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokerList());
        KafkaProducer<byte[], byte[]> registerProducer = registerProducer(new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer()));
        int MessageMaxBytes = Defaults$.MODULE$.MessageMaxBytes() - (((94 + 1) + 1) + 3);
        ProducerRecord producerRecord = new ProducerRecord(topic(), new byte[0], new byte[MessageMaxBytes]);
        Assertions.assertEquals(((byte[]) producerRecord.value()).length, ((RecordMetadata) registerProducer.send(producerRecord).get()).serializedValueSize());
        ProducerRecord producerRecord2 = new ProducerRecord(topic(), new byte[0], new byte[MessageMaxBytes + 1]);
        Assertions.assertEquals(RecordTooLargeException.class, Assertions.assertThrows(ExecutionException.class, () -> {
            registerProducer.send(producerRecord2).get();
        }).getCause().getClass());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Future send$1(KafkaProducer kafkaProducer) {
        return kafkaProducer.send(new ProducerRecord(topic(), Predef$.MODULE$.int2Integer(0), "key".getBytes(), new byte[1000]));
    }

    public static final /* synthetic */ boolean $anonfun$testNonBlockingProducer$2(Future future) {
        if (!future.isDone()) {
            return true;
        }
        try {
            future.get();
            return true;
        } catch (ExecutionException unused) {
            return false;
        }
    }

    private final Future sendUntilQueued$1(KafkaProducer kafkaProducer) {
        Tuple2 $minus$greater$extension;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long computeUntilTrue$default$2 = TestUtils$.MODULE$.computeUntilTrue$default$2();
        long computeUntilTrue$default$3 = TestUtils$.MODULE$.computeUntilTrue$default$3();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            Future send$1 = send$1(kafkaProducer);
            if ($anonfun$testNonBlockingProducer$2(send$1)) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(send$1), BoxesRunTime.boxToBoolean(true));
                break;
            }
            if (System.currentTimeMillis() > currentTimeMillis + computeUntilTrue$default$2) {
                $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(send$1), BoxesRunTime.boxToBoolean(false));
                break;
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(computeUntilTrue$default$2), computeUntilTrue$default$3));
        }
        if ($minus$greater$extension != null) {
            return (Future) $minus$greater$extension._1();
        }
        throw new MatchError((Object) null);
    }

    private final void verifySendSuccess$1(Future future) {
        RecordMetadata recordMetadata = (RecordMetadata) future.get(30L, TimeUnit.SECONDS);
        Assertions.assertEquals(topic(), recordMetadata.topic());
        Assertions.assertEquals(0, recordMetadata.partition());
        Assertions.assertTrue(recordMetadata.offset() >= 0, new StringBuilder(15).append("Invalid offset ").append(recordMetadata).toString());
    }

    private static final void verifyMetadataNotAvailable$1(Future future) {
        Assertions.assertTrue(future.isDone());
        Assertions.assertEquals(TimeoutException.class, Assertions.assertThrows(ExecutionException.class, () -> {
            future.get();
        }).getCause().getClass());
    }

    private static final void verifyBufferExhausted$1(Future future) {
        Assertions.assertTrue(future.isDone());
        Assertions.assertEquals(BufferExhaustedException.class, Assertions.assertThrows(ExecutionException.class, () -> {
            future.get();
        }).getCause().getClass());
    }

    public static final /* synthetic */ Object $anonfun$testNonBlockingProducer$2$adapted(Future future) {
        return BoxesRunTime.boxToBoolean($anonfun$testNonBlockingProducer$2(future));
    }
}
