package kafka.api;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.api.AbstractConsumerTest;
import kafka.api.BaseConsumerTest;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockProducerInterceptor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Set;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

/* compiled from: PlaintextConsumerTest.scala */
@Timeout(600)
@ScalaSignature(bytes = "\u0006\u0005\t\rh\u0001B\u0012%\u0001%BQA\f\u0001\u0005\u0002=BQ!\r\u0001\u0005\u0002IBQa\u0019\u0001\u0005\n\u0011DQ! \u0001\u0005\u0002yDq!!\u0003\u0001\t\u0003\tY\u0001C\u0004\u0002\u0018\u0001!\t!!\u0007\t\u000f\u0005\u0015\u0002\u0001\"\u0001\u0002(!9\u00111\u0007\u0001\u0005\u0002\u0005U\u0002bBA!\u0001\u0011\u0005\u00111\t\u0005\b\u0003\u001f\u0002A\u0011AA)\u0011\u001d\ti\u0006\u0001C\u0005\u0003?Bq!a\u001e\u0001\t\u0003\tI\bC\u0004\u0002\u0006\u0002!\t!a\"\t\u000f\u0005M\u0005\u0001\"\u0001\u0002\u0016\"9\u0011\u0011\u0015\u0001\u0005\u0002\u0005\r\u0006bBAX\u0001\u0011\u0005\u0011\u0011\u0017\u0005\b\u0003{\u0003A\u0011AA`\u0011\u001d\tY\r\u0001C\u0001\u0003\u001bDq!!7\u0001\t\u0003\tY\u000eC\u0004\u0002h\u0002!\t!!;\t\u000f\u0005U\b\u0001\"\u0001\u0002x\"9!1\u0001\u0001\u0005\u0002\t\u0015\u0001b\u0002B\t\u0001\u0011\u0005!1\u0003\u0005\b\u0005?\u0001A\u0011\u0001B\u0011\u0011\u001d\u0011i\u0003\u0001C\u0001\u0005_AqAa\u000f\u0001\t\u0003\u0011i\u0004C\u0004\u0003J\u0001!\tAa\u0013\t\u000f\t]\u0003\u0001\"\u0001\u0003Z!9!Q\r\u0001\u0005\u0002\t\u001d\u0004b\u0002B:\u0001\u0011\u0005!Q\u000f\u0005\b\u0005\u0003\u0003A\u0011\u0001BB\u0011\u001d\u0011i\n\u0001C\u0001\u0005?CqA!,\u0001\t\u0003\u0011y\u000bC\u0004\u0003>\u0002!\tAa0\u0003+Ac\u0017-\u001b8uKb$8i\u001c8tk6,'\u000fV3ti*\u0011QEJ\u0001\u0004CBL'\"A\u0014\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001A\u000b\t\u0003W1j\u0011\u0001J\u0005\u0003[\u0011\u0012\u0001CQ1tK\u000e{gn];nKJ$Vm\u001d;\u0002\rqJg.\u001b;?)\u0005\u0001\u0004CA\u0016\u0001\u0003-!Xm\u001d;IK\u0006$WM]:\u0015\u0007MJd\t\u0005\u00025o5\tQGC\u00017\u0003\u0015\u00198-\u00197b\u0013\tATG\u0001\u0003V]&$\b\"\u0002\u001e\u0003\u0001\u0004Y\u0014AB9v_J,X\u000e\u0005\u0002=\u0007:\u0011Q(\u0011\t\u0003}Uj\u0011a\u0010\u0006\u0003\u0001\"\na\u0001\u0010:p_Rt\u0014B\u0001\"6\u0003\u0019\u0001&/\u001a3fM&\u0011A)\u0012\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\t+\u0004\"B$\u0003\u0001\u0004Y\u0014!D4s_V\u0004\bK]8u_\u000e|G\u000e\u000b\u0003\u0003\u0013V3\u0006C\u0001&T\u001b\u0005Y%B\u0001'N\u0003\u0019\u0001\u0018M]1ng*\u0011ajT\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u0001\u0016+A\u0003kk:LGOC\u0001S\u0003\ry'oZ\u0005\u0003).\u0013\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0003]\u000b!f\u001f3jgBd\u0017-\u001f(b[\u0016lh&];peVlWh\u001f\u0019~]\u001d\u0014x.\u001e9Qe>$xnY8m{m\fT\u0010\u000b\u0003\u00033~\u0003\u0007C\u0001.^\u001b\u0005Y&B\u0001/L\u0003!\u0001(o\u001c<jI\u0016\u0014\u0018B\u00010\\\u00051iU\r\u001e5pIN{WO]2f\u0003\u00151\u0018\r\\;fY\u0005\t\u0017%\u00012\u0002U\u001d,G\u000fV3tiF+xN];n\u0003:$wI]8vaB\u0013x\u000e^8d_2\u0004\u0016M]1nKR,'o]!mY\u0006yB/Z:u\u0011\u0016\fG-\u001a:t'\u0016\u0014\u0018.\u00197ju\u0016$Um]3sS\u0006d\u0017N_3\u0015\u0007M*\u0007\u0010C\u0003g\u0007\u0001\u0007q-\u0001\u0006tKJL\u0017\r\\5{KJ\u00042\u0001\u001b9s\u001b\u0005I'B\u00016l\u00035\u0019XM]5bY&T\u0018\r^5p]*\u0011A.\\\u0001\u0007G>lWn\u001c8\u000b\u0005\u001dr'BA8R\u0003\u0019\t\u0007/Y2iK&\u0011\u0011/\u001b\u0002\u000b'\u0016\u0014\u0018.\u00197ju\u0016\u0014\bc\u0001\u001btk&\u0011A/\u000e\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0003iYL!a^\u001b\u0003\t\tKH/\u001a\u0005\u0006s\u000e\u0001\rA_\u0001\rI\u0016\u001cXM]5bY&TXM\u001d\t\u0004Qn\u0014\u0018B\u0001?j\u00051!Um]3sS\u0006d\u0017N_3s\u0003\u0005\"Xm\u001d;IK\u0006$WM]:TKJL\u0017\r\\5{KJ$Um]3sS\u0006d\u0017N_3s)\u0011\u0019t0!\u0001\t\u000bi\"\u0001\u0019A\u001e\t\u000b\u001d#\u0001\u0019A\u001e)\t\u0011IUK\u0016\u0015\u0006\te{\u0016q\u0001\u0017\u0002C\u0006\u0019B/Z:u\u0003V$xn\u00144gg\u0016$(+Z:fiR)1'!\u0004\u0002\u0010!)!(\u0002a\u0001w!)q)\u0002a\u0001w!\"Q!S+WQ\u0015)\u0011lXA\u000bY\u0005\t\u0017\u0001\u0006;fgR<%o\\;q\u0007>t7/^7qi&|g\u000eF\u00034\u00037\ti\u0002C\u0003;\r\u0001\u00071\bC\u0003H\r\u0001\u00071\b\u000b\u0003\u0007\u0013V3\u0006&\u0002\u0004Z?\u0006\rB&A1\u0002#Q,7\u000f\u001e)beRLG/[8og\u001a{'\u000fF\u00034\u0003S\tY\u0003C\u0003;\u000f\u0001\u00071\bC\u0003H\u000f\u0001\u00071\b\u000b\u0003\b\u0013V3\u0006&B\u0004Z?\u0006EB&A1\u00027Q,7\u000f\u001e)beRLG/[8og\u001a{'/Q;u_\u000e\u0013X-\u0019;f)\u0015\u0019\u0014qGA\u001d\u0011\u0015Q\u0004\u00021\u0001<\u0011\u00159\u0005\u00021\u0001<Q\u0011A\u0011*\u0016,)\u000b!Iv,a\u0010-\u0003\u0005\fQ\u0004^3tiB\u000b'\u000f^5uS>t7OR8s\u0013:4\u0018\r\\5e)>\u0004\u0018n\u0019\u000b\u0006g\u0005\u0015\u0013q\t\u0005\u0006u%\u0001\ra\u000f\u0005\u0006\u000f&\u0001\ra\u000f\u0015\u0005\u0013%+f\u000bK\u0003\n3~\u000bi\u0005L\u0001b\u0003!!Xm\u001d;TK\u0016\\G#B\u001a\u0002T\u0005U\u0003\"\u0002\u001e\u000b\u0001\u0004Y\u0004\"B$\u000b\u0001\u0004Y\u0004\u0006\u0002\u0006J+ZCSAC-`\u00037b\u0013!Y\u0001\u0017g\u0016tGmQ8naJ,7o]3e\u001b\u0016\u001c8/Y4fgR)1'!\u0019\u0002l!9\u00111M\u0006A\u0002\u0005\u0015\u0014A\u00038v[J+7m\u001c:egB\u0019A'a\u001a\n\u0007\u0005%TGA\u0002J]RDq!!\u001c\f\u0001\u0004\ty'\u0001\u0002uaB!\u0011\u0011OA:\u001b\u0005Y\u0017bAA;W\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0017a\u0007;fgR\u0004\u0016M\u001d;ji&|g\u000eU1vg\u0016\fe\u000e\u001a*fgVlW\rF\u00034\u0003w\ni\bC\u0003;\u0019\u0001\u00071\bC\u0003H\u0019\u0001\u00071\b\u000b\u0003\r\u0013V3\u0006&\u0002\u0007Z?\u0006\rE&A1\u0002!Q,7\u000f^%oi\u0016\u00148-\u001a9u_J\u001cH#B\u001a\u0002\n\u0006-\u0005\"\u0002\u001e\u000e\u0001\u0004Y\u0004\"B$\u000e\u0001\u0004Y\u0004\u0006B\u0007J+ZCS!D-`\u0003#c\u0013!Y\u0001\"i\u0016\u001cH/\u00138uKJ\u001cW\r\u001d;peN<\u0016\u000e\u001e5Xe>twmS3z-\u0006dW/\u001a\u000b\u0006g\u0005]\u0015\u0011\u0014\u0005\u0006u9\u0001\ra\u000f\u0005\u0006\u000f:\u0001\ra\u000f\u0015\u0005\u001d%+f\u000bK\u0003\u000f3~\u000by\nL\u0001b\u0003\u0005\"Xm\u001d;D_:\u001cX/\\3NKN\u001c\u0018mZ3t/&$\bn\u0011:fCR,G+[7f)\u0015\u0019\u0014QUAT\u0011\u0015Qt\u00021\u0001<\u0011\u00159u\u00021\u0001<Q\u0011y\u0011*\u0016,)\u000b=Iv,!,-\u0003\u0005\fA\u0005^3ti\u000e{gn];nK6+7o]1hKN<\u0016\u000e\u001e5M_\u001e\f\u0005\u000f]3oIRKW.\u001a\u000b\u0006g\u0005M\u0016Q\u0017\u0005\u0006uA\u0001\ra\u000f\u0005\u0006\u000fB\u0001\ra\u000f\u0015\u0005!%+f\u000bK\u0003\u00113~\u000bY\fL\u0001b\u00039!Xm\u001d;MSN$Hk\u001c9jGN$RaMAa\u0003\u0007DQAO\tA\u0002mBQaR\tA\u0002mBC!E%V-\"*\u0011#W0\u0002J2\n\u0011-A\u0013uKN$\b+Y;tKN#\u0018\r^3O_R\u0004&/Z:feZ,GMQ=SK\n\fG.\u00198dKR)1'a4\u0002R\")!H\u0005a\u0001w!)qI\u0005a\u0001w!\"!#S+WQ\u0015\u0011\u0012lXAlY\u0005\t\u0017a\f;fgR\u0004VM\u001d)beRLG/[8o\u0019\u0016\fG-T3ue&\u001c7o\u00117fC:,\u0006oV5uQN+(m]2sS\n,G#B\u001a\u0002^\u0006}\u0007\"\u0002\u001e\u0014\u0001\u0004Y\u0004\"B$\u0014\u0001\u0004Y\u0004\u0006B\nJ+ZCSaE-`\u0003Kd\u0013!Y\u0001/i\u0016\u001cH\u000fU3s!\u0006\u0014H/\u001b;j_:d\u0015mZ'fiJL7m]\"mK\u0006tW\u000b],ji\"\u001cVOY:de&\u0014W\rF\u00034\u0003W\fi\u000fC\u0003;)\u0001\u00071\bC\u0003H)\u0001\u00071\b\u000b\u0003\u0015\u0013V3\u0006&\u0002\u000bZ?\u0006MH&A1\u0002YQ,7\u000f\u001e)feB\u000b'\u000f^5uS>tG*Z1e\u001b\u0016$(/[2t\u00072,\u0017M\\+q/&$\b.Q:tS\u001etG#B\u001a\u0002z\u0006m\b\"\u0002\u001e\u0016\u0001\u0004Y\u0004\"B$\u0016\u0001\u0004Y\u0004\u0006B\u000bJ+ZCS!F-`\u0005\u0003a\u0013!Y\u0001,i\u0016\u001cH\u000fU3s!\u0006\u0014H/\u001b;j_:d\u0015mZ'fiJL7m]\"mK\u0006tW\u000b],ji\"\f5o]5h]R)1Ga\u0002\u0003\n!)!H\u0006a\u0001w!)qI\u0006a\u0001w!\"a#S+WQ\u00151\u0012l\u0018B\bY\u0005\t\u0017a\u000b;fgR\u0004VM\u001d)beRLG/[8o\u0019\u0006<W*\u001a;sS\u000e\u001cx\u000b[3o%\u0016\fGmQ8n[&$H/\u001a3\u0015\u000bM\u0012)Ba\u0006\t\u000bi:\u0002\u0019A\u001e\t\u000b\u001d;\u0002\u0019A\u001e)\t]IUK\u0016\u0015\u0006/e{&Q\u0004\u0017\u0002C\u0006qC/Z:u#V|G/Y'fiJL7m\u001d(pi\u000e\u0013X-\u0019;fI&3gj\\)v_R\f7oQ8oM&<WO]3e)\u0015\u0019$1\u0005B\u0013\u0011\u0015Q\u0004\u00041\u0001<\u0011\u00159\u0005\u00041\u0001<Q\u0011A\u0012*\u0016,)\u000baIvLa\u000b-\u0003\u0005\fA\u0004^3ti\u000e{gn];nS:<w+\u001b;i\u001dVdGn\u0012:pkBLE\rF\u00034\u0005c\u0011\u0019\u0004C\u0003;3\u0001\u00071\bC\u0003H3\u0001\u00071\b\u000b\u0003\u001a\u0013V3\u0006&B\rZ?\neB&A1\u0002OQ,7\u000f\u001e(vY2<%o\\;q\u0013\u0012tu\u000e^*vaB|'\u000f^3e\u0013\u001a\u001cu.\\7jiRLgn\u001a\u000b\u0006g\t}\"\u0011\t\u0005\u0006ui\u0001\ra\u000f\u0005\u0006\u000fj\u0001\ra\u000f\u0015\u00055%+f\u000bK\u0003\u001b3~\u00139\u0005L\u0001b\u0003a\"Xm\u001d;Ti\u0006$\u0018nY\"p]N,X.\u001a:EKR,7\r^:OK^\u0004\u0016M\u001d;ji&|gn\u0011:fCR,G-\u00114uKJ\u0014Vm\u001d;beR$Ra\rB'\u0005\u001fBQAO\u000eA\u0002mBQaR\u000eA\u0002mBCaG%V-\"*1$W0\u0003V1\n\u0011-\u0001\buKN$XI\u001c3PM\u001a\u001cX\r^:\u0015\u000bM\u0012YF!\u0018\t\u000bib\u0002\u0019A\u001e\t\u000b\u001dc\u0002\u0019A\u001e)\tqIUK\u0016\u0015\u00069e{&1\r\u0017\u0002C\u0006\tD/Z:u'\u0016,7\u000e\u00165s_^\u001c\u0018\n\u001c7fO\u0006d7\u000b^1uK&3\u0007+\u0019:uSRLwN\\:O_R\f5o]5h]\u0016$G#B\u001a\u0003j\t-\u0004\"\u0002\u001e\u001e\u0001\u0004Y\u0004\"B$\u001e\u0001\u0004Y\u0004\u0006B\u000fJ+ZCS!H-`\u0005cb\u0013!Y\u0001\u0018i\u0016\u001cHOR3uG\"|eMZ:fiN4uN\u001d+j[\u0016$Ra\rB<\u0005sBQA\u000f\u0010A\u0002mBQa\u0012\u0010A\u0002mBCAH%V-\"*a$W0\u0003��1\n\u0011-A\u000euKN$\bk\\:ji&|gNU3ta\u0016\u001cGo\u001d+j[\u0016|W\u000f\u001e\u000b\u0006g\t\u0015%q\u0011\u0005\u0006u}\u0001\ra\u000f\u0005\u0006\u000f~\u0001\ra\u000f\u0015\u0005?%+f\u000bK\u0003 3~\u0013i\tL\u0001bQ\u0019y\"\u0011S0\u0003\u001cB!!1\u0013BL\u001b\t\u0011)J\u0003\u0002&\u001b&!!\u0011\u0014BK\u0005\u001d!\u0016.\\3pkRt\u0012aD\u0001\u001bi\u0016\u001cH\u000fU8tSRLwN\u001c*fgB,7\r^:XC.,W\u000f\u001d\u000b\u0006g\t\u0005&1\u0015\u0005\u0006u\u0001\u0002\ra\u000f\u0005\u0006\u000f\u0002\u0002\ra\u000f\u0015\u0005A%+f\u000bK\u0003!3~\u0013I\u000bL\u0001bQ\u0019\u0001#\u0011S0\u0003\u001c\u0006iC/Z:u!>\u001c\u0018\u000e^5p]^KG\u000f[#se>\u00148i\u001c8oK\u000e$\u0018n\u001c8SKN\u0004Xm\u0019;t/\u0006\\W-\u001e9\u0015\u000bM\u0012\tLa-\t\u000bi\n\u0003\u0019A\u001e\t\u000b\u001d\u000b\u0003\u0019A\u001e)\t\u0005JUK\u0016\u0015\u0006Ce{&\u0011\u0018\u0017\u0002C\"2\u0011E!%`\u00057\u000bq\u0004^3ti\u000ecwn]3MK\u00064Xm]$s_V\u0004xJ\\%oi\u0016\u0014(/\u001e9u)\u0015\u0019$\u0011\u0019Bb\u0011\u0015Q$\u00051\u0001<\u0011\u00159%\u00051\u0001<Q\u0019\u0011#qY0\u0003VB!!\u0011\u001aBi\u001b\t\u0011YMC\u0002&\u0005\u001bT1Aa4l\u0003\u0011!Xm\u001d;\n\t\tM'1\u001a\u0002\u0006\r2\f7._\u0011\u0003\u0005/\f1bS!G\u0017\u0006k\u0013\u0007\u000f\u00194c!\"!%S+WQ\u0015\u0011\u0013l\u0018BoY\u0005\t\u0007F\u0002\u0001\u0003\u0012~\u0013\tO\b\u0002\u00031\u0002")
/* loaded from: input_file:kafka/api/PlaintextConsumerTest.class */
public class PlaintextConsumerTest extends BaseConsumerTest {
    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testHeaders(String str, String str2) {
        ProducerRecord producerRecord = new ProducerRecord(tp().topic(), Predef$.MODULE$.int2Integer(tp().partition()), (Long) null, "key".getBytes(), "value".getBytes());
        producerRecord.headers().add("headerKey", "headerValue".getBytes());
        createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3()).send(producerRecord);
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        Assertions.assertEquals(0, createConsumer.assignment().size());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(1, createConsumer.assignment().size());
        createConsumer.seek(tp(), 0L);
        ArrayBuffer consumeRecords = consumeRecords(createConsumer, 1, consumeRecords$default$3());
        Assertions.assertEquals(1, consumeRecords.size());
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 1).foreach$mVc$sp(i -> {
            Header lastHeader = ((ConsumerRecord) consumeRecords.apply(i)).headers().lastHeader("headerKey");
            Assertions.assertEquals("headerValue", lastHeader == null ? null : new String(lastHeader.value()));
        });
    }

    private void testHeadersSerializeDeserialize(Serializer<byte[]> serializer, Deserializer<byte[]> deserializer) {
        createProducer(new ByteArraySerializer(), serializer, createProducer$default$3()).send(new ProducerRecord(tp().topic(), Predef$.MODULE$.int2Integer(tp().partition()), (Long) null, "key".getBytes(), "value".getBytes()));
        Consumer createConsumer = createConsumer(new ByteArrayDeserializer(), deserializer, createConsumer$default$3(), createConsumer$default$4());
        Assertions.assertEquals(0, createConsumer.assignment().size());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(1, createConsumer.assignment().size());
        createConsumer.seek(tp(), 0L);
        Assertions.assertEquals(1, consumeRecords(createConsumer, 1, consumeRecords$default$3()).size());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testHeadersSerializerDeserializer(String str, String str2) {
        testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testAutoOffsetReset(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        sendRecords(createProducer, 1, tp(), currentTimeMillis, sendRecords$default$5());
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        consumeAndVerifyRecords(createConsumer, 1, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testGroupConsumption(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        sendRecords(createProducer, 10, tp(), currentTimeMillis, sendRecords$default$5());
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), Nil$.MODULE$)).asJava());
        consumeAndVerifyRecords(createConsumer, 1, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPartitionsFor(String str, String str2) {
        createTopic("part-test", 2, createTopic$default$3(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        List partitionsFor = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4()).partitionsFor("part-test");
        Assertions.assertNotNull(partitionsFor);
        Assertions.assertEquals(2, partitionsFor.size());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPartitionsForAutoCreate(String str, String str2) {
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.partitionsFor("non-exist-topic");
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$testPartitionsForAutoCreate$1(createConsumer)) {
            if (System.currentTimeMillis() > currentTimeMillis + 15000) {
                Assertions.fail("Timed out while awaiting non empty partitions.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(15000L), 100L));
        }
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPartitionsForInvalidTopic(String str, String str2) {
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        Assertions.assertThrows(InvalidTopicException.class, () -> {
            createConsumer.partitionsFor(";3# ads,{234");
        });
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testSeek(String str, String str2) {
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        long j = 50 / 2;
        sendRecords(createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3()), (int) 50, tp(), 0, sendRecords$default$5());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        createConsumer.seekToEnd(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(50L, createConsumer.position(tp()));
        Assertions.assertTrue(createConsumer.poll(Duration.ofMillis(50L)).isEmpty());
        createConsumer.seekToBeginning(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(0L, createConsumer.position(tp()));
        consumeAndVerifyRecords(createConsumer, 1, 0, consumeAndVerifyRecords$default$4(), 0, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        createConsumer.seek(tp(), j);
        Assertions.assertEquals(j, createConsumer.position(tp()));
        consumeAndVerifyRecords(createConsumer, 1, (int) j, (int) j, j, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        sendCompressedMessages((int) 50, tp2());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp2(), Nil$.MODULE$)).asJava());
        createConsumer.seekToEnd(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp2(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(50L, createConsumer.position(tp2()));
        Assertions.assertTrue(createConsumer.poll(Duration.ofMillis(50L)).isEmpty());
        createConsumer.seekToBeginning(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp2(), Nil$.MODULE$)).asJava());
        Assertions.assertEquals(0L, createConsumer.position(tp2()));
        consumeAndVerifyRecords(createConsumer, 1, 0, consumeAndVerifyRecords$default$4(), consumeAndVerifyRecords$default$5(), consumeAndVerifyRecords$default$6(), tp2(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        createConsumer.seek(tp2(), j);
        Assertions.assertEquals(j, createConsumer.position(tp2()));
        consumeAndVerifyRecords(createConsumer, 1, (int) j, (int) j, j, consumeAndVerifyRecords$default$6(), tp2(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    private void sendCompressedMessages(int i, TopicPartition topicPartition) {
        Properties properties = new Properties();
        properties.setProperty("compression.type", CompressionType.GZIP.name);
        properties.setProperty("linger.ms", Integer.toString(Integer.MAX_VALUE));
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), properties);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), i).foreach(obj -> {
            return $anonfun$sendCompressedMessages$1(createProducer, topicPartition, BoxesRunTime.unboxToInt(obj));
        });
        createProducer.close();
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPartitionPauseAndResume(String str, String str2) {
        List asJava = CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava();
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        sendRecords(createProducer, 5, tp(), currentTimeMillis, sendRecords$default$5());
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(asJava);
        consumeAndVerifyRecords(createConsumer, 5, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        createConsumer.pause(asJava);
        long currentTimeMillis2 = System.currentTimeMillis();
        sendRecords(createProducer, 5, tp(), currentTimeMillis2, sendRecords$default$5());
        Assertions.assertTrue(createConsumer.poll(Duration.ofMillis(100L)).isEmpty());
        createConsumer.resume(asJava);
        consumeAndVerifyRecords(createConsumer, 5, 5, consumeAndVerifyRecords$default$4(), currentTimeMillis2, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testInterceptors(String str, String str2) {
        String str3 = "mock";
        MockConsumerInterceptor.resetCounters();
        MockProducerInterceptor.resetCounters();
        Properties properties = new Properties();
        properties.put("interceptor.classes", MockProducerInterceptor.class.getName());
        properties.put("mock.interceptor.append", "mock");
        KafkaProducer createProducer = createProducer(new StringSerializer(), new StringSerializer(), properties);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10).map(obj -> {
            return $anonfun$testInterceptors$1(this, createProducer, BoxesRunTime.unboxToInt(obj));
        }).foreach(future -> {
            return (RecordMetadata) future.get();
        });
        Assertions.assertEquals(10, MockProducerInterceptor.ONSEND_COUNT.intValue());
        Assertions.assertEquals(10, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue());
        Assertions.assertThrows(Throwable.class, () -> {
            createProducer.send((ProducerRecord) null);
        }, () -> {
            return "Should not allow sending a null record";
        });
        Assertions.assertEquals(1, MockProducerInterceptor.ON_ERROR_COUNT.intValue(), "Interceptor should be notified about exception");
        Assertions.assertEquals(0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), "Interceptor should not receive metadata with an exception when record is null");
        consumerConfig().setProperty("interceptor.classes", "org.apache.kafka.test.MockConsumerInterceptor");
        Consumer createConsumer = createConsumer(new StringDeserializer(), new StringDeserializer(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        createConsumer.seek(tp(), 0L);
        ArrayBuffer consumeRecords = consumeRecords(createConsumer, 10, consumeRecords$default$3());
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10).foreach$mVc$sp(i -> {
            ConsumerRecord consumerRecord = (ConsumerRecord) consumeRecords.apply(i);
            Assertions.assertEquals("key " + i, new String((String) consumerRecord.key()));
            Assertions.assertEquals(("value " + i + str3).toUpperCase(Locale.ROOT), new String((String) consumerRecord.value()));
        });
        int intValue = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
        createConsumer.commitSync(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(tp(), new OffsetAndMetadata(2L))}))).asJava());
        Assertions.assertEquals(2L, ((OffsetAndMetadata) createConsumer.committed(CollectionConverters$.MODULE$.SetHasAsJava((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{tp()}))).asJava()).get(tp())).offset());
        Assertions.assertEquals(intValue + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
        sendAndAwaitAsyncCommit(createConsumer, new Some(Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), new OffsetAndMetadata(5L))}))));
        Assertions.assertEquals(5L, ((OffsetAndMetadata) createConsumer.committed(CollectionConverters$.MODULE$.SetHasAsJava((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{tp()}))).asJava()).get(tp())).offset());
        Assertions.assertEquals(intValue + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
        createConsumer.close();
        createProducer.close();
        MockConsumerInterceptor.resetCounters();
        MockProducerInterceptor.resetCounters();
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testInterceptorsWithWrongKeyValue(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers(bootstrapServers$default$1()));
        properties.put("interceptor.classes", "org.apache.kafka.test.MockProducerInterceptor");
        properties.put("mock.interceptor.append", "mock");
        createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3()).send(new ProducerRecord(tp().topic(), Predef$.MODULE$.int2Integer(tp().partition()), "key".getBytes(), "value will not be modified".getBytes()));
        consumerConfig().setProperty("interceptor.classes", "org.apache.kafka.test.MockConsumerInterceptor");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        createConsumer.seek(tp(), 0L);
        Assertions.assertEquals("value will not be modified", new String((byte[]) ((ConsumerRecord) consumeRecords(createConsumer, 1, consumeRecords$default$3()).head()).value()));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testConsumeMessagesWithCreateTime(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        sendRecords(createProducer, 50, tp(), currentTimeMillis, sendRecords$default$5());
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        consumeAndVerifyRecords(createConsumer, 50, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        sendCompressedMessages(50, tp2());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp2(), Nil$.MODULE$)).asJava());
        consumeAndVerifyRecords(createConsumer, 50, 0, consumeAndVerifyRecords$default$4(), consumeAndVerifyRecords$default$5(), consumeAndVerifyRecords$default$6(), tp2(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testConsumeMessagesWithLogAppendTime(String str, String str2) {
        Properties properties = new Properties();
        properties.setProperty("message.timestamp.type", "LogAppendTime");
        createTopic("testConsumeMessagesWithLogAppendTime", 2, 2, properties, createTopic$default$5(), createTopic$default$6());
        long currentTimeMillis = System.currentTimeMillis();
        TopicPartition topicPartition = new TopicPartition("testConsumeMessagesWithLogAppendTime", 0);
        sendRecords(createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3()), 50, topicPartition, sendRecords$default$4(), sendRecords$default$5());
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topicPartition, Nil$.MODULE$)).asJava());
        TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
        consumeAndVerifyRecords(createConsumer, 50, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, TimestampType.LOG_APPEND_TIME, topicPartition, consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        TopicPartition topicPartition2 = new TopicPartition("testConsumeMessagesWithLogAppendTime", 1);
        sendCompressedMessages(50, topicPartition2);
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topicPartition2, Nil$.MODULE$)).asJava());
        TimestampType timestampType2 = TimestampType.LOG_APPEND_TIME;
        consumeAndVerifyRecords(createConsumer, 50, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, TimestampType.LOG_APPEND_TIME, topicPartition2, consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testListTopics(String str, String str2) {
        createTopic("part-test-topic-1", 2, createTopic$default$3(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        createTopic("part-test-topic-2", 2, createTopic$default$3(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        createTopic("part-test-topic-3", 2, createTopic$default$3(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        java.util.Map listTopics = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4()).listTopics();
        Assertions.assertNotNull(listTopics);
        Assertions.assertEquals(5, listTopics.size());
        Assertions.assertEquals(5, listTopics.keySet().size());
        Assertions.assertEquals(2, ((List) listTopics.get("part-test-topic-1")).size());
        Assertions.assertEquals(2, ((List) listTopics.get("part-test-topic-2")).size());
        Assertions.assertEquals(2, ((List) listTopics.get("part-test-topic-3")).size());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPauseStateNotPreservedByRebalance(String str, String str2) {
        if (str2.equals(GroupProtocol.CLASSIC.name)) {
            consumerConfig().setProperty("session.timeout.ms", "100");
            consumerConfig().setProperty("heartbeat.interval.ms", "30");
        }
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        sendRecords(createProducer, 5, tp(), currentTimeMillis, sendRecords$default$5());
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), Nil$.MODULE$)).asJava());
        consumeAndVerifyRecords(createConsumer, 5, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        createConsumer.pause(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon("topic2", Nil$.MODULE$)).asJava());
        consumeAndVerifyRecords(createConsumer, 0, 5, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPerPartitionLeadMetricsCleanUpWithSubscribe(String str, String str2) {
        createTopic("topic2", 2, brokerCount(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        sendRecords(createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3()), 1000, tp(), sendRecords$default$4(), sendRecords$default$5());
        consumerConfig().setProperty("group.id", "testPerPartitionLeadMetricsCleanUpWithSubscribe");
        consumerConfig().setProperty("client.id", "testPerPartitionLeadMetricsCleanUpWithSubscribe");
        Consumer<?, ?> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        AbstractConsumerTest.TestConsumerReassignmentListener testConsumerReassignmentListener = new AbstractConsumerTest.TestConsumerReassignmentListener(this);
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), new $colon.colon("topic2", Nil$.MODULE$))).asJava(), testConsumerReassignmentListener);
        ConsumerRecords awaitNonEmptyRecords = awaitNonEmptyRecords(createConsumer, tp(), awaitNonEmptyRecords$default$3());
        Assertions.assertEquals(1, testConsumerReassignmentListener.callsToAssigned(), "should be assigned once");
        HashMap hashMap = new HashMap();
        hashMap.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe");
        hashMap.put("topic", tp().topic());
        hashMap.put("partition", String.valueOf(tp().partition()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe");
        hashMap2.put("topic", tp2().topic());
        hashMap2.put("partition", String.valueOf(tp2().partition()));
        Metric metric = (Metric) createConsumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", hashMap));
        Assertions.assertNotNull(metric);
        Assertions.assertEquals(BoxesRunTime.boxToDouble(awaitNonEmptyRecords.count()), metric.metricValue(), "The lead should be " + awaitNonEmptyRecords.count());
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon("topic2", Nil$.MODULE$)).asJava(), testConsumerReassignmentListener);
        awaitRebalance(createConsumer, testConsumerReassignmentListener);
        Assertions.assertNull(createConsumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", hashMap)));
        Assertions.assertNull(createConsumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", hashMap2)));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPerPartitionLagMetricsCleanUpWithSubscribe(String str, String str2) {
        createTopic("topic2", 2, brokerCount(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        sendRecords(createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3()), 1000, tp(), sendRecords$default$4(), sendRecords$default$5());
        consumerConfig().setProperty("group.id", "testPerPartitionLagMetricsCleanUpWithSubscribe");
        consumerConfig().setProperty("client.id", "testPerPartitionLagMetricsCleanUpWithSubscribe");
        Consumer<?, ?> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        AbstractConsumerTest.TestConsumerReassignmentListener testConsumerReassignmentListener = new AbstractConsumerTest.TestConsumerReassignmentListener(this);
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), new $colon.colon("topic2", Nil$.MODULE$))).asJava(), testConsumerReassignmentListener);
        ConsumerRecords awaitNonEmptyRecords = awaitNonEmptyRecords(createConsumer, tp(), awaitNonEmptyRecords$default$3());
        Assertions.assertEquals(1, testConsumerReassignmentListener.callsToAssigned(), "should be assigned once");
        HashMap hashMap = new HashMap();
        hashMap.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe");
        hashMap.put("topic", tp().topic());
        hashMap.put("partition", String.valueOf(tp().partition()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe");
        hashMap2.put("topic", tp2().topic());
        hashMap2.put("partition", String.valueOf(tp2().partition()));
        Metric metric = (Metric) createConsumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", hashMap));
        Assertions.assertNotNull(metric);
        int count = 1000 - awaitNonEmptyRecords.count();
        Assertions.assertEquals(count, BoxesRunTime.unboxToDouble(metric.metricValue()), epsilon(), "The lag should be " + count);
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon("topic2", Nil$.MODULE$)).asJava(), testConsumerReassignmentListener);
        awaitRebalance(createConsumer, testConsumerReassignmentListener);
        Assertions.assertNull(createConsumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", hashMap)));
        Assertions.assertNull(createConsumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", hashMap2)));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPerPartitionLeadMetricsCleanUpWithAssign(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        sendRecords(createProducer, 1000, tp(), sendRecords$default$4(), sendRecords$default$5());
        sendRecords(createProducer, 1000, tp2(), sendRecords$default$4(), sendRecords$default$5());
        consumerConfig().setProperty("group.id", "testPerPartitionLeadMetricsCleanUpWithAssign");
        consumerConfig().setProperty("client.id", "testPerPartitionLeadMetricsCleanUpWithAssign");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        ConsumerRecords awaitNonEmptyRecords = awaitNonEmptyRecords(createConsumer, tp(), awaitNonEmptyRecords$default$3());
        HashMap hashMap = new HashMap();
        hashMap.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign");
        hashMap.put("topic", tp().topic());
        hashMap.put("partition", String.valueOf(tp().partition()));
        Metric metric = (Metric) createConsumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", hashMap));
        Assertions.assertNotNull(metric);
        Assertions.assertEquals(BoxesRunTime.boxToDouble(awaitNonEmptyRecords.count()), metric.metricValue(), "The lead should be " + awaitNonEmptyRecords.count());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp2(), Nil$.MODULE$)).asJava());
        awaitNonEmptyRecords(createConsumer, tp2(), awaitNonEmptyRecords$default$3());
        Assertions.assertNull(createConsumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", hashMap)));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPerPartitionLagMetricsCleanUpWithAssign(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        sendRecords(createProducer, 1000, tp(), sendRecords$default$4(), sendRecords$default$5());
        sendRecords(createProducer, 1000, tp2(), sendRecords$default$4(), sendRecords$default$5());
        consumerConfig().setProperty("group.id", "testPerPartitionLagMetricsCleanUpWithAssign");
        consumerConfig().setProperty("client.id", "testPerPartitionLagMetricsCleanUpWithAssign");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        ConsumerRecords awaitNonEmptyRecords = awaitNonEmptyRecords(createConsumer, tp(), awaitNonEmptyRecords$default$3());
        HashMap hashMap = new HashMap();
        hashMap.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign");
        hashMap.put("topic", tp().topic());
        hashMap.put("partition", String.valueOf(tp().partition()));
        Metric metric = (Metric) createConsumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", hashMap));
        Assertions.assertNotNull(metric);
        int count = 1000 - awaitNonEmptyRecords.count();
        Assertions.assertEquals(count, BoxesRunTime.unboxToDouble(metric.metricValue()), epsilon(), "The lag should be " + count);
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp2(), Nil$.MODULE$)).asJava());
        awaitNonEmptyRecords(createConsumer, tp2(), awaitNonEmptyRecords$default$3());
        Assertions.assertNull(createConsumer.metrics().get(new MetricName(tp().toString() + ".records-lag", "consumer-fetch-manager-metrics", "", hashMap)));
        Assertions.assertNull(createConsumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", hashMap)));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPerPartitionLagMetricsWhenReadCommitted(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        sendRecords(createProducer, 1000, tp(), sendRecords$default$4(), sendRecords$default$5());
        sendRecords(createProducer, 1000, tp2(), sendRecords$default$4(), sendRecords$default$5());
        consumerConfig().setProperty("isolation.level", "read_committed");
        consumerConfig().setProperty("group.id", "testPerPartitionLagMetricsCleanUpWithAssign");
        consumerConfig().setProperty("client.id", "testPerPartitionLagMetricsCleanUpWithAssign");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        awaitNonEmptyRecords(createConsumer, tp(), awaitNonEmptyRecords$default$3());
        HashMap hashMap = new HashMap();
        hashMap.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign");
        hashMap.put("topic", tp().topic());
        hashMap.put("partition", String.valueOf(tp().partition()));
        Assertions.assertNotNull((Metric) createConsumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", hashMap)));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testQuotaMetricsNotCreatedIfNoQuotasConfigured(String str, String str2) {
        KafkaProducer<byte[], byte[]> createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        sendRecords(createProducer, 1000, tp(), currentTimeMillis, sendRecords$default$5());
        Consumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        createConsumer.seek(tp(), 0L);
        consumeAndVerifyRecords(createConsumer, 1000, 0, consumeAndVerifyRecords$default$4(), currentTimeMillis, consumeAndVerifyRecords$default$6(), consumeAndVerifyRecords$default$7(), consumeAndVerifyRecords$default$8(), consumeAndVerifyRecords$default$9());
        brokers().foreach(kafkaBroker -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$1(this, kafkaBroker);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker2 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$2(this, kafkaBroker2);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker3 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$3(this, kafkaBroker3);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker4 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$4(this, kafkaBroker4);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker5 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$5(this, kafkaBroker5);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker6 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$6(this, kafkaBroker6);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker7 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$7(this, kafkaBroker7);
            return BoxedUnit.UNIT;
        });
        brokers().foreach(kafkaBroker8 -> {
            $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$8(this, kafkaBroker8);
            return BoxedUnit.UNIT;
        });
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testConsumingWithNullGroupId(String str, String str2) {
        TopicPartition topicPartition = new TopicPartition("test_topic", 0);
        createTopic("test_topic", createTopic$default$2(), createTopic$default$3(), createTopic$default$4(), createTopic$default$5(), createTopic$default$6());
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        createProducer.send(new ProducerRecord("test_topic", Predef$.MODULE$.int2Integer(0), "k1".getBytes(), "v1".getBytes())).get();
        createProducer.send(new ProducerRecord("test_topic", Predef$.MODULE$.int2Integer(0), "k2".getBytes(), "v2".getBytes())).get();
        createProducer.send(new ProducerRecord("test_topic", Predef$.MODULE$.int2Integer(0), "k3".getBytes(), "v3".getBytes())).get();
        createProducer.close();
        Properties properties = new Properties(consumerConfig());
        properties.put("auto.offset.reset", "earliest");
        properties.put("client.id", "consumer1");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties, new $colon.colon("group.id", Nil$.MODULE$));
        Properties properties2 = new Properties(consumerConfig());
        properties2.put("auto.offset.reset", "latest");
        properties2.put("client.id", "consumer2");
        Consumer createConsumer2 = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties2, new $colon.colon("group.id", Nil$.MODULE$));
        Properties properties3 = new Properties(consumerConfig());
        properties3.put("client.id", "consumer3");
        Consumer createConsumer3 = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties3, new $colon.colon("group.id", Nil$.MODULE$));
        createConsumer.assign(Arrays.asList(topicPartition));
        createConsumer2.assign(Arrays.asList(topicPartition));
        createConsumer3.assign(Arrays.asList(topicPartition));
        createConsumer3.seek(topicPartition, 1L);
        int count = createConsumer.poll(Duration.ofMillis(5000L)).count();
        Assertions.assertThrows(InvalidGroupIdException.class, () -> {
            createConsumer.commitSync();
        });
        Assertions.assertThrows(InvalidGroupIdException.class, () -> {
            createConsumer2.committed(CollectionConverters$.MODULE$.SetHasAsJava((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition}))).asJava());
        });
        int count2 = createConsumer2.poll(Duration.ofMillis(5000L)).count();
        int count3 = createConsumer3.poll(Duration.ofMillis(5000L)).count();
        createConsumer.unsubscribe();
        createConsumer2.unsubscribe();
        createConsumer3.unsubscribe();
        Assertions.assertTrue(createConsumer.assignment().isEmpty());
        Assertions.assertTrue(createConsumer2.assignment().isEmpty());
        Assertions.assertTrue(createConsumer3.assignment().isEmpty());
        createConsumer.close();
        createConsumer2.close();
        createConsumer3.close();
        Assertions.assertEquals(3, count, "Expected consumer1 to consume from earliest offset");
        Assertions.assertEquals(0, count2, "Expected consumer2 to consume from latest offset");
        Assertions.assertEquals(2, count3, "Expected consumer3 to consume from offset 1");
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testNullGroupIdNotSupportedIfCommitting(String str, String str2) {
        Properties properties = new Properties(consumerConfig());
        properties.put("auto.offset.reset", "earliest");
        properties.put("client.id", "consumer1");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties, new $colon.colon("group.id", Nil$.MODULE$));
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(tp(), Nil$.MODULE$)).asJava());
        Assertions.assertThrows(InvalidGroupIdException.class, () -> {
            createConsumer.commitSync();
        });
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testStaticConsumerDetectsNewPartitionCreatedAfterRestart(String str, String str2) {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
        createAdminClient.createTopics(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(new NewTopic("foo", 1, (short) 1), Nil$.MODULE$)).asJava()).all().get();
        Properties properties = new Properties();
        properties.put("group.id", "my-group-id");
        properties.put("group.instance.id", "my-instance-id");
        Consumer<?, ?> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties, createConsumer$default$4());
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon("foo", Nil$.MODULE$)).asJava());
        awaitAssignment(createConsumer, (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        createConsumer.close();
        Consumer<?, ?> createConsumer2 = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties, createConsumer$default$4());
        createConsumer2.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon("foo", Nil$.MODULE$)).asJava());
        awaitAssignment(createConsumer2, (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition})));
        createAdminClient.createPartitions(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("foo"), NewPartitions.increaseTo(2))}))).asJava()).all().get();
        awaitAssignment(createConsumer2, (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{topicPartition, topicPartition2})));
        createConsumer2.close();
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testEndOffsets(String str, String str2) {
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        long currentTimeMillis = System.currentTimeMillis();
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10000).map(obj -> {
            return $anonfun$testEndOffsets$1(this, currentTimeMillis, createProducer, BoxesRunTime.unboxToInt(obj));
        });
        createProducer.flush();
        Consumer<?, ?> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), Nil$.MODULE$)).asJava());
        awaitAssignment(createConsumer, (scala.collection.immutable.Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{tp(), tp2()})));
        Assertions.assertEquals(10000, (Long) createConsumer.endOffsets(CollectionConverters$.MODULE$.SetHasAsJava((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{tp()}))).asJava()).get(tp()));
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testSeekThrowsIllegalStateIfPartitionsNotAssigned(String str, String str2) {
        TopicPartition topicPartition = new TopicPartition(topic(), 0);
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), consumerConfig(), createConsumer$default$4());
        Assertions.assertEquals("No current assignment for partition " + topicPartition, ((Exception) Assertions.assertThrows(IllegalStateException.class, () -> {
            createConsumer.seekToEnd(Collections.singletonList(topicPartition));
        })).getMessage());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testFetchOffsetsForTime(String str, String str2) {
        KafkaProducer createProducer = createProducer(createProducer$default$1(), createProducer$default$2(), createProducer$default$3());
        HashMap hashMap = new HashMap();
        IntRef create = IntRef.create(0);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(i -> {
            TopicPartition topicPartition = new TopicPartition(this.topic(), i);
            this.sendRecords(createProducer, 100, topicPartition, 0L, this.sendRecords$default$5());
            hashMap.put(topicPartition, Predef$.MODULE$.long2Long(create.elem * 20));
            create.elem++;
        });
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            createConsumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(this.topic(), 0), Predef$.MODULE$.long2Long(-1L)));
        });
        java.util.Map offsetsForTimes = createConsumer.offsetsForTimes(hashMap);
        OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(new TopicPartition(topic(), 0));
        Assertions.assertEquals(0L, offsetAndTimestamp.offset());
        Assertions.assertEquals(0L, offsetAndTimestamp.timestamp());
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(0)), offsetAndTimestamp.leaderEpoch());
        OffsetAndTimestamp offsetAndTimestamp2 = (OffsetAndTimestamp) offsetsForTimes.get(new TopicPartition(topic(), 1));
        Assertions.assertEquals(20L, offsetAndTimestamp2.offset());
        Assertions.assertEquals(20L, offsetAndTimestamp2.timestamp());
        Assertions.assertEquals(Optional.of(BoxesRunTime.boxToInteger(0)), offsetAndTimestamp2.leaderEpoch());
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @Timeout(15)
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPositionRespectsTimeout(String str, String str2) {
        TopicPartition topicPartition = new TopicPartition(topic(), 15);
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topicPartition, Nil$.MODULE$)).asJava());
        Assertions.assertThrows(TimeoutException.class, () -> {
            createConsumer.position(topicPartition, Duration.ofSeconds(3L));
        });
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @Timeout(15)
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPositionRespectsWakeup(String str, String str2) {
        TopicPartition topicPartition = new TopicPartition(topic(), 15);
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topicPartition, Nil$.MODULE$)).asJava());
        CompletableFuture.runAsync(() -> {
            TimeUnit.SECONDS.sleep(1L);
            createConsumer.wakeup();
        });
        Assertions.assertThrows(WakeupException.class, () -> {
            createConsumer.position(topicPartition, Duration.ofSeconds(3L));
        });
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @Timeout(15)
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testPositionWithErrorConnectionRespectsWakeup(String str, String str2) {
        TopicPartition topicPartition = new TopicPartition(topic(), 15);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:12345");
        Consumer createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties, createConsumer$default$4());
        createConsumer.assign(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topicPartition, Nil$.MODULE$)).asJava());
        CompletableFuture.runAsync(() -> {
            TimeUnit.SECONDS.sleep(1L);
            createConsumer.wakeup();
        });
        Assertions.assertThrows(WakeupException.class, () -> {
            createConsumer.position(topicPartition, Duration.ofSeconds(100L));
        });
    }

    @MethodSource({"getTestQuorumAndGroupProtocolParametersAll"})
    @Flaky("KAFKA-18031")
    @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}")
    public void testCloseLeavesGroupOnInterrupt(String str, String str2) {
        Admin createAdminClient = createAdminClient(createAdminClient$default$1(), createAdminClient$default$2());
        Consumer<?, ?> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), createConsumer$default$3(), createConsumer$default$4());
        AbstractConsumerTest.TestConsumerReassignmentListener testConsumerReassignmentListener = new AbstractConsumerTest.TestConsumerReassignmentListener(this);
        createConsumer.subscribe(CollectionConverters$.MODULE$.SeqHasAsJava(new $colon.colon(topic(), Nil$.MODULE$)).asJava(), testConsumerReassignmentListener);
        awaitRebalance(createConsumer, testConsumerReassignmentListener);
        Assertions.assertEquals(1, testConsumerReassignmentListener.callsToAssigned());
        Assertions.assertEquals(0, testConsumerReassignmentListener.callsToRevoked());
        try {
            Thread.currentThread().interrupt();
            Assertions.assertThrows(InterruptException.class, () -> {
                createConsumer.close();
            });
            Thread.interrupted();
            Assertions.assertEquals(1, testConsumerReassignmentListener.callsToAssigned());
            Assertions.assertEquals(1, testConsumerReassignmentListener.callsToRevoked());
            ConsumerConfig consumerConfig = new ConsumerConfig(consumerConfig());
            int Integer2int = Predef$.MODULE$.Integer2int(consumerConfig.getInt("session.timeout.ms")) / 2;
            TestUtils$ testUtils$ = TestUtils$.MODULE$;
            long j = Integer2int;
            TestUtils$ testUtils$2 = TestUtils$.MODULE$;
            long currentTimeMillis = System.currentTimeMillis();
            while (!$anonfun$testCloseLeavesGroupOnInterrupt$2(consumerConfig, createAdminClient)) {
                if (System.currentTimeMillis() > currentTimeMillis + j) {
                    Assertions.fail($anonfun$testCloseLeavesGroupOnInterrupt$3(Integer2int));
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(j), 100L));
            }
        } catch (Throwable th) {
            Thread.interrupted();
            throw th;
        }
    }

    public static final /* synthetic */ boolean $anonfun$testPartitionsForAutoCreate$1(Consumer consumer) {
        return !consumer.partitionsFor("non-exist-topic").isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testPartitionsForAutoCreate$2() {
        return "Timed out while awaiting non empty partitions.";
    }

    public static final /* synthetic */ Future $anonfun$sendCompressedMessages$1(KafkaProducer kafkaProducer, TopicPartition topicPartition, int i) {
        return kafkaProducer.send(new ProducerRecord(topicPartition.topic(), Predef$.MODULE$.int2Integer(topicPartition.partition()), Predef$.MODULE$.long2Long(i), ("key " + i).getBytes(), ("value " + i).getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$testInterceptors$1(PlaintextConsumerTest plaintextConsumerTest, KafkaProducer kafkaProducer, int i) {
        return kafkaProducer.send(new ProducerRecord(plaintextConsumerTest.tp().topic(), Predef$.MODULE$.int2Integer(plaintextConsumerTest.tp().partition()), "key " + i, "value " + i));
    }

    private static final void assertNoMetric$1(KafkaBroker kafkaBroker, String str, QuotaType quotaType, String str2) {
        MetricName metricName = kafkaBroker.metrics().metricName("throttle-time", quotaType.toString(), "", new String[]{"user", "", "client-id", str2});
        Assertions.assertNull(kafkaBroker.metrics().metric(metricName), "Metric should not have been created " + metricName);
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$1(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "byte-rate", QuotaType.PRODUCE, plaintextConsumerTest.producerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$2(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "throttle-time", QuotaType.PRODUCE, plaintextConsumerTest.producerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$3(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "byte-rate", QuotaType.FETCH, plaintextConsumerTest.consumerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$4(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "throttle-time", QuotaType.FETCH, plaintextConsumerTest.consumerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$5(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "request-time", QuotaType.REQUEST, plaintextConsumerTest.producerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$6(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "throttle-time", QuotaType.REQUEST, plaintextConsumerTest.producerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$7(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "request-time", QuotaType.REQUEST, plaintextConsumerTest.consumerClientId());
    }

    public static final /* synthetic */ void $anonfun$testQuotaMetricsNotCreatedIfNoQuotasConfigured$8(PlaintextConsumerTest plaintextConsumerTest, KafkaBroker kafkaBroker) {
        assertNoMetric$1(kafkaBroker, "throttle-time", QuotaType.REQUEST, plaintextConsumerTest.consumerClientId());
    }

    public static final /* synthetic */ ProducerRecord $anonfun$testEndOffsets$1(PlaintextConsumerTest plaintextConsumerTest, long j, KafkaProducer kafkaProducer, int i) {
        ProducerRecord producerRecord = new ProducerRecord(plaintextConsumerTest.tp().topic(), Predef$.MODULE$.int2Integer(plaintextConsumerTest.tp().partition()), Predef$.MODULE$.long2Long(j + i), ("key " + i).getBytes(), ("value " + i).getBytes());
        kafkaProducer.send(producerRecord);
        return producerRecord;
    }

    public static final /* synthetic */ boolean $anonfun$testCloseLeavesGroupOnInterrupt$2(ConsumerConfig consumerConfig, Admin admin) {
        try {
            String string = consumerConfig.getString("group.id");
            return ((ConsumerGroupDescription) ((KafkaFuture) admin.describeConsumerGroups(Collections.singletonList(string)).describedGroups().get(string)).get()).members().isEmpty();
        } catch (Throwable th) {
            if (th instanceof ExecutionException ? true : th instanceof InterruptedException) {
                return false;
            }
            throw th;
        }
    }

    public static final /* synthetic */ String $anonfun$testCloseLeavesGroupOnInterrupt$3(int i) {
        return "Consumer did not leave the consumer group within " + i + " ms of close";
    }
}
