package kafka.server;

import com.yammer.metrics.core.Meter;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.OptionalInt;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.log.UnifiedLog;
import kafka.security.JaasTestUtils;
import kafka.server.AbstractFetcherThread;
import kafka.server.epoch.util.MockBlockingSender;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordValidationStats;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Int$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.MapOps;
import scala.collection.Seq;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0005\t\u0015g\u0001\u0002\u0016,\u0001ABQa\u000e\u0001\u0005\u0002aBqa\u000f\u0001C\u0002\u0013%A\b\u0003\u0004I\u0001\u0001\u0006I!\u0010\u0005\b\u0013\u0002\u0011\r\u0011\"\u0003=\u0011\u0019Q\u0005\u0001)A\u0005{!91\n\u0001b\u0001\n\u0013a\u0004B\u0002'\u0001A\u0003%Q\bC\u0004N\u0001\t\u0007I\u0011\u0002(\t\rI\u0003\u0001\u0015!\u0003P\u0011\u001d\u0019\u0006A1A\u0005\n9Ca\u0001\u0016\u0001!\u0002\u0013y\u0005bB+\u0001\u0005\u0004%IA\u0016\u0005\u0007O\u0002\u0001\u000b\u0011B,\t\u000f!\u0004!\u0019!C\u0005S\"1\u0011\u000f\u0001Q\u0001\n)DqA\u001d\u0001C\u0002\u0013%1\u000f\u0003\u0004x\u0001\u0001\u0006I\u0001\u001e\u0005\bq\u0002\u0011\r\u0011\"\u0003z\u0011\u001d\t\t\u0001\u0001Q\u0001\niDq!a\u0001\u0001\t\u0013\t)\u0001C\u0005\u0002,\u0001\t\n\u0011\"\u0003\u0002.!9\u00111\t\u0001\u0005\u0002\u0005\u0015\u0003bBA2\u0001\u0011%\u0011Q\r\u0005\n\u0003\u0003\u0004\u0011\u0013!C\u0005\u0003\u0007Dq!a2\u0001\t\u0003\t)\u0005C\u0004\u0002R\u0002!\t!a5\t\u000f\u0005E\b\u0001\"\u0001\u0002F!9\u0011Q\u001f\u0001\u0005\u0002\u0005\u0015\u0003bBA}\u0001\u0011%\u00111 \u0005\b\u0005\u0007\u0001A\u0011AA#\u0011\u001d\u00119\u0001\u0001C\u0001\u0003\u000bBqAa\u0003\u0001\t\u0003\t)\u0005C\u0004\u0003\u0010\u0001!\t!!\u0012\t\u000f\tM\u0001\u0001\"\u0001\u0002F!9!q\u0003\u0001\u0005\u0002\u0005\u0015\u0003b\u0002B\u000e\u0001\u0011\u0005\u0011Q\t\u0005\b\u0005?\u0001A\u0011\u0001B\u0011\u0011\u001d\u0011Y\u0005\u0001C\u0005\u0005\u001bBqAa\u0013\u0001\t\u0013\u0011y\bC\u0004\u0003\u0018\u0002!IA!'\t\u000f\t}\u0005\u0001\"\u0001\u0003\"\nA\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$Vm\u001d;\u000b\u00051j\u0013AB:feZ,'OC\u0001/\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0019\u0011\u0005I*T\"A\u001a\u000b\u0003Q\nQa]2bY\u0006L!AN\u001a\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t\u0011\b\u0005\u0002;\u00015\t1&\u0001\u0003ucA\u0004T#A\u001f\u0011\u0005y2U\"A \u000b\u0005\u0001\u000b\u0015AB2p[6|gN\u0003\u0002/\u0005*\u00111\tR\u0001\u0007CB\f7\r[3\u000b\u0003\u0015\u000b1a\u001c:h\u0013\t9uH\u0001\bU_BL7\rU1si&$\u0018n\u001c8\u0002\u000bQ\f\u0004\u000f\r\u0011\u0002\tQ\f\u0004/M\u0001\u0006iF\u0002\u0018\u0007I\u0001\u0005iJ\u0002\u0018'A\u0003ueA\f\u0004%\u0001\u0005u_BL7-\u001332+\u0005y\u0005C\u0001 Q\u0013\t\tvH\u0001\u0003Vk&$\u0017!\u0003;pa&\u001c\u0017\nZ\u0019!\u0003!!x\u000e]5d\u0013\u0012\u0014\u0014!\u0003;pa&\u001c\u0017\n\u001a\u001a!\u0003!!x\u000e]5d\u0013\u0012\u001cX#A,\u0011\takvlT\u0007\u00023*\u0011!lW\u0001\nS6lW\u000f^1cY\u0016T!\u0001X\u001a\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002_3\n\u0019Q*\u00199\u0011\u0005\u0001,W\"A1\u000b\u0005\t\u001c\u0017\u0001\u00027b]\u001eT\u0011\u0001Z\u0001\u0005U\u00064\u0018-\u0003\u0002gC\n11\u000b\u001e:j]\u001e\f\u0011\u0002^8qS\u000eLEm\u001d\u0011\u0002\u001d\t\u0014xn[3s\u000b:$\u0007k\\5oiV\t!\u000e\u0005\u0002l_6\tAN\u0003\u0002n]\u00069a.\u001a;x_J\\'B\u0001\u0017B\u0013\t\u0001HN\u0001\bCe>\\WM]#oIB{\u0017N\u001c;\u0002\u001f\t\u0014xn[3s\u000b:$\u0007k\\5oi\u0002\n\u0001CZ1jY\u0016$\u0007+\u0019:uSRLwN\\:\u0016\u0003Q\u0004\"AO;\n\u0005Y\\#\u0001\u0005$bS2,G\rU1si&$\u0018n\u001c8t\u0003E1\u0017-\u001b7fIB\u000b'\u000f^5uS>t7\u000fI\u0001\u000e[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3\u0016\u0003i\u0004\"a\u001f@\u000e\u0003qT!!`\u0016\u0002\u00115,G/\u00193bi\u0006L!a ?\u0003%-\u0013\u0016M\u001a;NKR\fG-\u0019;b\u0007\u0006\u001c\u0007.Z\u0001\u000f[\u0016$\u0018\rZ1uC\u000e\u000b7\r[3!\u0003EIg.\u001b;jC24U\r^2i'R\fG/\u001a\u000b\t\u0003\u000f\ti!a\u0006\u0002\"A\u0019!(!\u0003\n\u0007\u0005-1FA\tJ]&$\u0018.\u00197GKR\u001c\u0007n\u0015;bi\u0016Dq!a\u0004\u0015\u0001\u0004\t\t\"A\u0004u_BL7-\u00133\u0011\tI\n\u0019bT\u0005\u0004\u0003+\u0019$AB(qi&|g\u000eC\u0004\u0002\u001aQ\u0001\r!a\u0007\u0002\u0017\u0019,Go\u00195PM\u001a\u001cX\r\u001e\t\u0004e\u0005u\u0011bAA\u0010g\t!Aj\u001c8h\u0011%\t\u0019\u0003\u0006I\u0001\u0002\u0004\t)#A\u0006mK\u0006$WM]#q_\u000eD\u0007c\u0001\u001a\u0002(%\u0019\u0011\u0011F\u001a\u0003\u0007%sG/A\u000ej]&$\u0018.\u00197GKR\u001c\u0007n\u0015;bi\u0016$C-\u001a4bk2$HeM\u000b\u0003\u0003_QC!!\n\u00022-\u0012\u00111\u0007\t\u0005\u0003k\ty$\u0004\u0002\u00028)!\u0011\u0011HA\u001e\u0003%)hn\u00195fG.,GMC\u0002\u0002>M\n!\"\u00198o_R\fG/[8o\u0013\u0011\t\t%a\u000e\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-A\u0004dY\u0016\fg.\u001e9\u0015\u0005\u0005\u001d\u0003c\u0001\u001a\u0002J%\u0019\u00111J\u001a\u0003\tUs\u0017\u000e\u001e\u0015\u0004-\u0005=\u0003\u0003BA)\u0003?j!!a\u0015\u000b\t\u0005U\u0013qK\u0001\u0004CBL'\u0002BA-\u00037\nqA[;qSR,'OC\u0002\u0002^\u0011\u000bQA[;oSRLA!!\u0019\u0002T\tI\u0011I\u001a;fe\u0016\u000b7\r[\u0001\u001bGJ,\u0017\r^3SKBd\u0017nY1GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u000b\u0013\u0003O\ni'!\"\u0002\n\u0006M\u0015QSAP\u0003S\u000b\u0019\fE\u0002;\u0003SJ1!a\u001b,\u0005Q\u0011V\r\u001d7jG\u00064U\r^2iKJ$\u0006N]3bI\"9\u0011qN\fA\u0002\u0005E\u0014\u0001\u00028b[\u0016\u0004B!a\u001d\u0002\u0002:!\u0011QOA?!\r\t9hM\u0007\u0003\u0003sR1!a\u001f0\u0003\u0019a$o\\8u}%\u0019\u0011qP\u001a\u0002\rA\u0013X\rZ3g\u0013\r1\u00171\u0011\u0006\u0004\u0003\u007f\u001a\u0004bBAD/\u0001\u0007\u0011QE\u0001\nM\u0016$8\r[3s\u0013\u0012Dq!a#\u0018\u0001\u0004\ti)\u0001\u0007ce>\\WM]\"p]\u001aLw\rE\u0002;\u0003\u001fK1!!%,\u0005-Y\u0015MZ6b\u0007>tg-[4\t\u000bI<\u0002\u0019\u0001;\t\u000f\u0005]u\u00031\u0001\u0002\u001a\u0006Q!/\u001a9mS\u000e\fWj\u001a:\u0011\u0007i\nY*C\u0002\u0002\u001e.\u0012aBU3qY&\u001c\u0017-T1oC\u001e,'\u000fC\u0004\u0002\"^\u0001\r!a)\u0002\u000bE,x\u000e^1\u0011\u0007i\n)+C\u0002\u0002(.\u0012ABU3qY&\u001c\u0017-U;pi\u0006Dq!a+\u0018\u0001\u0004\ti+\u0001\u000emK\u0006$WM]#oIB|\u0017N\u001c;CY>\u001c7.\u001b8h'\u0016tG\rE\u0002;\u0003_K1!!-,\u00051\u0011En\\2lS:<7+\u001a8e\u0011%\t)l\u0006I\u0001\u0002\u0004\t9,A\bnKR\fG-\u0019;b-\u0016\u00148/[8o!\u0011\tI,!0\u000e\u0005\u0005m&B\u0001!o\u0013\u0011\ty,a/\u0003\u001f5+G/\u00193bi\u00064VM]:j_:\fAe\u0019:fCR,'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$C-\u001a4bk2$H\u0005O\u000b\u0003\u0003\u000bTC!a.\u00022\u0005A3\u000f[8vY\u0012\u001cVM\u001c3MCR,7\u000f\u001e*fcV,7\u000f\u001e,feNLwN\\:Cs\u0012+g-Y;mi\"\u001a\u0011$a3\u0011\t\u0005E\u0013QZ\u0005\u0005\u0003\u001f\f\u0019F\u0001\u0003UKN$\u0018!F1tg\u0016\u0014H\u000fU1si&$\u0018n\u001c8Ti\u0006$Xm\u001d\u000b\u000b\u0003\u000f\n).a8\u0002j\u00065\bbBAl5\u0001\u0007\u0011\u0011\\\u0001\bM\u0016$8\r[3s!\rQ\u00141\\\u0005\u0004\u0003;\\#!F!cgR\u0014\u0018m\u0019;GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u0005\b\u0003CT\u0002\u0019AAr\u0003U\u0019\bn\\;mI\n+'+Z1es\u001a{'OR3uG\"\u00042AMAs\u0013\r\t9o\r\u0002\b\u0005>|G.Z1o\u0011\u001d\tYO\u0007a\u0001\u0003G\fQc\u001d5pk2$')\u001a+sk:\u001c\u0017\r^5oO2{w\rC\u0004\u0002pj\u0001\r!a9\u0002\u001fMDw.\u001e7e\u0005\u0016$U\r\\1zK\u0012\fQe\u001d5pk2$\u0007*\u00198eY\u0016,\u0005pY3qi&|gN\u0012:p[\ncwnY6j]\u001e\u001cVM\u001c3)\u0007m\tY-\u0001\u001dtQ>,H\u000e\u001a(pi\u001a+Go\u00195MK\u0006$WM]#q_\u000eDwJ\u001c$jeN$h)\u001a;dQ^KG\u000f\u001b+sk:\u001c\u0017\r^3P]\u001a+Go\u00195)\u0007q\tY-\u0001\u0012wKJLg-\u001f$fi\u000eDG*Z1eKJ,\u0005o\\2i\u001f:4\u0015N]:u\r\u0016$8\r\u001b\u000b\u0007\u0003\u000f\ni0a@\t\u000f\u0005UV\u00041\u0001\u00028\"9!\u0011A\u000fA\u0002\u0005\u0015\u0012aD3q_\u000eDg)\u001a;dQ\u000e{WO\u001c;\u0002\u0003NDw.\u001e7e)J,hnY1uK&3G*Z1eKJ\u0014V\r\u001d7jKN<\u0016\u000e\u001e5ESZ,'oZ5oO\u0016\u0003xn\u00195O_R\\en\\<o)>4u\u000e\u001c7po\u0016\u0014\bf\u0001\u0010\u0002L\u0006iC/Z:u)J,hnY1uK>sg)\u001a;dQ\u0012{Wm\u001d(piV\u0003H-\u0019;f\u0011&<\u0007nV1uKJl\u0017M]6)\u0007}\tY-A\u000fuKN$H*Y4JgV\u0003H-\u0019;fI^CWM\u001c(p%\u0016\u001cwN\u001d3tQ\r\u0001\u00131Z\u0001Ig\"|W\u000f\u001c3DCR\u001c\u0007.\u0012=dKB$\u0018n\u001c8Ge>l'\t\\8dW&twmU3oI^CWM\\*ikR$\u0018N\\4E_^t'+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012D3!IAf\u0003\u0019\u001a\bn\\;mIV\u0003H-\u0019;f%\u0016\f7o]5h]6,g\u000e\u001e\"zi\u0016\u001c\u0018J\\'fiJL7m\u001d\u0015\u0004E\u0005-\u0017AR:i_VdGMT8u+B$\u0017\r^3SK\u0006\u001c8/[4o[\u0016tGOQ=uKNLe.T3ue&\u001c7o\u00165f]:{'+Z1tg&<g.\\3oiNLe\u000e\u0015:pOJ,7o\u001d\u0015\u0004G\u0005-\u0017A\u0004;fgR\u0014U/\u001b7e\r\u0016$8\r\u001b\u0015\u0004I\u0005-\u0017A\f;fgRdunY1m\r\u0016$8\r[\"p[BdW\r^5p]&3\u0007*[4i/\u0006$XM]7be.,\u0006\u000fZ1uK\u0012$B!a\u0012\u0003$!9!QE\u0013A\u0002\u0005\r\u0018\u0001\u00065jO\"<\u0016\r^3s[\u0006\u00148.\u00169eCR,G\rK\u0002&\u0005S\u0001BAa\u000b\u000325\u0011!Q\u0006\u0006\u0005\u0005_\t9&\u0001\u0004qCJ\fWn]\u0005\u0005\u0005g\u0011iCA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgRDs!\nB\u001c\u0005\u0007\u0012)\u0005\u0005\u0003\u0003:\t}RB\u0001B\u001e\u0015\u0011\u0011iD!\f\u0002\u0011A\u0014xN^5eKJLAA!\u0011\u0003<\tYa+\u00197vKN{WO]2f\u0003!\u0011wn\u001c7fC:\u001cH\u0006\u0002B$\u0005\u0013J\u0012!A\r\u0002\u0001\u0005\tc.Z<PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s!\u0006\u0014H/\u001b;j_:\u0014Vm];miRA!q\nB;\u0005s\u0012Y\b\u0005\u0003\u0003R\t=d\u0002\u0002B*\u0005SrAA!\u0016\u0003f9!!q\u000bB2\u001d\u0011\u0011IF!\u0019\u000f\t\tm#q\f\b\u0005\u0003o\u0012i&C\u0001F\u0013\t\u0019E)\u0003\u0002/\u0005&\u0011\u0001)Q\u0005\u0004\u0005Oz\u0014aB7fgN\fw-Z\u0005\u0005\u0005W\u0012i'\u0001\u0011PM\u001a\u001cX\r\u001e$pe2+\u0017\rZ3s\u000bB|7\r\u001b*fgB|gn]3ECR\f'b\u0001B4\u007f%!!\u0011\u000fB:\u00059)\u0005o\\2i\u000b:$wJ\u001a4tKRTAAa\u001b\u0003n!1!q\u000f\u0014A\u0002u\n!\u0001\u001e9\t\u000f\u0005\rb\u00051\u0001\u0002&!9!Q\u0010\u0014A\u0002\u0005m\u0011!C3oI>3gm]3u))\u0011yE!!\u0003\u0004\nM%Q\u0013\u0005\u0007\u0005o:\u0003\u0019A\u001f\t\u000f\t\u0015u\u00051\u0001\u0003\b\u0006)QM\u001d:peB!!\u0011\u0012BH\u001b\t\u0011YIC\u0002\u0003\u000e~\n\u0001\u0002\u001d:pi>\u001cw\u000e\\\u0005\u0005\u0005#\u0013YI\u0001\u0004FeJ|'o\u001d\u0005\b\u0003G9\u0003\u0019AA\u0013\u0011\u001d\u0011ih\na\u0001\u00037\ta$Y:tKJ$\bK]8dKN\u001c\b+\u0019:uSRLwN\u001c#bi\u0006<\u0006.\u001a8\u0015\t\u0005\u001d#1\u0014\u0005\b\u0005;C\u0003\u0019AAr\u00035I7OU3bgNLwM\\5oO\u0006!1\u000f^;c)!\t9Ea)\u00034\n]\u0006b\u0002BSS\u0001\u0007!qU\u0001\na\u0006\u0014H/\u001b;j_:\u0004BA!+\u000306\u0011!1\u0016\u0006\u0004\u0005[k\u0013aB2mkN$XM]\u0005\u0005\u0005c\u0013YKA\u0005QCJ$\u0018\u000e^5p]\"9!QW\u0015A\u0002\u0005e\u0015A\u0004:fa2L7-Y'b]\u0006<WM\u001d\u0005\b\u0005sK\u0003\u0019\u0001B^\u0003\rawn\u001a\t\u0005\u0005{\u0013\t-\u0004\u0002\u0003@*\u0019!\u0011X\u0017\n\t\t\r'q\u0018\u0002\u000b+:Lg-[3e\u0019><\u0007")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final Uuid topicId1 = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic1"), topicId1()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic2"), topicId2())}));
    private final BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, JaasTestUtils.SSL_CERTIFICATE_CN, 1000);
    private final FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions = new FailedPartitions();
    private final KRaftMetadataCache metadataCache;

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    private Uuid topicId1() {
        return this.topicId1;
    }

    private Uuid topicId2() {
        return this.topicId2;
    }

    private Map<String, Uuid> topicIds() {
        return this.topicIds;
    }

    private BrokerEndPoint brokerEndPoint() {
        return this.brokerEndPoint;
    }

    public FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions() {
        return this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions;
    }

    private KRaftMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private InitialFetchState initialFetchState(Option<Uuid> option, long j, int i) {
        return new InitialFetchState(option, new BrokerEndPoint(0, JaasTestUtils.SSL_CERTIFICATE_CN, 9092), i, j);
    }

    private int initialFetchState$default$3() {
        return 1;
    }

    @AfterEach
    public void cleanup() {
        TestUtils$.MODULE$.clearYammerMetrics();
    }

    private ReplicaFetcherThread createReplicaFetcherThread(String str, int i, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, ReplicaManager replicaManager, ReplicaQuota replicaQuota, BlockingSend blockingSend, MetadataVersion metadataVersion) {
        LogContext logContext = new LogContext("[ReplicaFetcher replicaId=" + kafkaConfig.brokerId() + ", leaderId=" + blockingSend.brokerEndPoint().id() + ", fetcherId=" + i + "] ");
        return new ReplicaFetcherThread(str, new RemoteLeaderEndPoint(logContext.logPrefix(), blockingSend, new FetchSessionHandler(logContext, blockingSend.brokerEndPoint().id()), kafkaConfig, replicaManager, replicaQuota, () -> {
            return MetadataVersion.MINIMUM_VERSION;
        }, () -> {
            return 1L;
        }), kafkaConfig, failedPartitions, replicaManager, replicaQuota, logContext.logPrefix(), () -> {
            return metadataVersion;
        });
    }

    private MetadataVersion createReplicaFetcherThread$default$8() {
        return MetadataVersion.latestTesting();
    }

    @Test
    public void shouldSendLatestRequestVersionsByDefault() {
        MetadataVersion latestTesting = MetadataVersion.latestTesting();
        Assertions.assertEquals(ApiKeys.FETCH.latestVersion(true), latestTesting.fetchRequestVersion());
        Assertions.assertEquals(ApiKeys.LIST_OFFSETS.latestVersion(true), latestTesting.listOffsetRequestVersion());
    }

    public void assertPartitionStates(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3) {
        new $colon.colon(t1p0(), new $colon.colon(t1p1(), new $colon.colon(t2p1(), Nil$.MODULE$))).foreach(topicPartition -> {
            $anonfun$assertPartitionStates$1(abstractFetcherThread, z, z2, z3, topicPartition);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(blockingSend.sendRequest((AbstractRequest.Builder) ArgumentMatchers.any())).thenThrow(new Throwable[]{new NullPointerException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Assertions.assertEquals((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), Errors.UNKNOWN_SERVER_ERROR, -1, -1L))})), createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, null, blockingSend, MetadataVersion.latestTesting()).leader().fetchEpochEndOffsets((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p0().partition()).setLeaderEpoch(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(t1p1().partition()).setLeaderEpoch(0))}))), "results from leader epoch request should have undefined offset");
        ((BlockingSend) Mockito.verify(blockingSend)).sendRequest((AbstractRequest.Builder) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotFetchLeaderEpochOnFirstFetchWithTruncateOnFetch() {
        verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting(), 0);
    }

    private void verifyFetchLeaderEpochOnFirstFetch(MetadataVersion metadataVersion, int i) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(5)).thenReturn(new Some(new OffsetAndEpoch(0L, 5)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender(CollectionConverters$.MODULE$.MapHasAsJava((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), newOffsetForLeaderPartitionResult(t1p0(), 5, 1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), newOffsetForLeaderPartitionResult(t1p1(), 5, 1L))}))).asJava(), brokerEndPoint(), Time.SYSTEM);
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, QuotaFactory.UNBOUNDED_QUOTA, mockBlockingSender, metadataVersion);
        createReplicaFetcherThread.addPartitions((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some<>(topicId1()), 0L, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some<>(topicId1()), 0L, 1))})));
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        createReplicaFetcherThread.doWork();
        Assertions.assertEquals(i, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
    }

    @Test
    public void shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower() {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        final KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        final ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        final ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ObjectRef create = ObjectRef.create(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(115L));
        Mockito.when(unifiedLog.latestEpoch()).thenAnswer(invocationOnMock -> {
            return (Option) create.elem;
        });
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(unifiedLog.endOffsetForEpoch(3)).thenReturn(new Some(new OffsetAndEpoch(129L, 2)));
        Mockito.when(unifiedLog.endOffsetForEpoch(2)).thenReturn(new Some(new OffsetAndEpoch(119L, 1)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(200)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        stub(partition, replicaManager, unifiedLog);
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), Time.SYSTEM);
        final LogContext logContext = new LogContext("[ReplicaFetcher replicaId=" + fromProps.brokerId() + ", leaderId=" + brokerEndPoint().id() + ", fetcherId=0] ");
        final RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicationQuotaManager, () -> {
            return MetadataVersion.MINIMUM_VERSION;
        }, () -> {
            return 1L;
        });
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread(this, remoteLeaderEndPoint, fromProps, replicaManager, replicationQuotaManager, logContext) { // from class: kafka.server.ReplicaFetcherThreadTest$$anon$1
            public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponseData.PartitionData partitionData) {
                return None$.MODULE$;
            }

            {
                FailedPartitions kafka$server$ReplicaFetcherThreadTest$$failedPartitions = this.kafka$server$ReplicaFetcherThreadTest$$failedPartitions();
                String logPrefix = logContext.logPrefix();
                ReplicaFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1 replicaFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1 = new ReplicaFetcherThreadTest$$anon$1$$anonfun$$lessinit$greater$1(null);
            }
        };
        replicaFetcherThread.addPartitions((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 200, 1)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), initialFetchState(new Some(topicId1()), 200, 1))})));
        Set set = (Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0(), t1p1()}));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        set.foreach(topicPartition -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(replicaFetcherThread, topicPartition);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), partitionData$1(t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(141L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        create.elem = new Some(BoxesRunTime.boxToInteger(4));
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(2, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(2))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(140)), "Expected " + t1p0() + " to truncate to offset 140 (truncation offsets: " + forClass.getAllValues() + ")");
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(141)), "Expected " + t1p1() + " to truncate to offset 141 (truncation offsets: " + forClass.getAllValues() + ")");
        set.foreach(topicPartition2 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(replicaFetcherThread, topicPartition2);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), partitionData$1(t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(130L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(3).setEndOffset(131L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(3, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(4))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(129)), "Expected to truncate to offset 129 (truncation offsets: " + forClass.getAllValues() + ")");
        set.foreach(topicPartition3 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(replicaFetcherThread, topicPartition3);
            return BoxedUnit.UNIT;
        });
        mockBlockingSender.setFetchPartitionDataForNextResponse((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), partitionData$1(t1p0().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(120L))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), partitionData$1(t1p1().partition(), new FetchResponseData.EpochEndOffset().setEpoch(2).setEndOffset(121L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        create.elem = None$.MODULE$;
        replicaFetcherThread.doWork();
        Assertions.assertEquals(0, mockBlockingSender.epochFetchCount());
        Assertions.assertEquals(4, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(6))).truncateTo(BoxesRunTime.unboxToLong(forClass.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue(CollectionConverters$.MODULE$.ListHasAsScala(forClass.getAllValues()).asScala().contains(BoxesRunTime.boxToInteger(119)), "Expected to truncate to offset 119 (truncation offsets: " + forClass.getAllValues() + ")");
        set.foreach(topicPartition4 -> {
            $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(replicaFetcherThread, topicPartition4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void testTruncateOnFetchDoesNotUpdateHighWatermark() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ReplicaAlterLogDirsManager replicaAlterLogDirsManager = (ReplicaAlterLogDirsManager) Mockito.mock(ReplicaAlterLogDirsManager.class);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(130)));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(5)));
        Mockito.when(unifiedLog.endOffsetForEpoch(4)).thenReturn(new Some(new OffsetAndEpoch(149L, 4)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(Int$.MODULE$.int2long(150)));
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.replicaAlterLogDirsManager()).thenReturn(replicaAlterLogDirsManager);
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(None$.MODULE$);
        LogContext logContext = new LogContext("[ReplicaFetcher replicaId=" + fromProps.brokerId() + ", leaderId=" + brokerEndPoint().id() + ", fetcherId=0] ");
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), Time.SYSTEM);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicationQuotaManager, () -> {
            return MetadataVersion.MINIMUM_VERSION;
        }, () -> {
            return 1L;
        }), fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, logContext.logPrefix(), () -> {
            return MetadataVersion.MINIMUM_VERSION;
        });
        replicaFetcherThread.addPartitions((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 150, 1))})));
        mockBlockingSender.setFetchPartitionDataForNextResponse((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(160L).setDivergingEpoch(new FetchResponseData.EpochEndOffset().setEpoch(4).setEndOffset(140L)))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        ((Partition) Mockito.verify(partition, Mockito.times(1))).truncateTo(140L, false);
        ((UnifiedLog) Mockito.verify(unifiedLog, Mockito.times(0))).maybeUpdateHighWatermark(ArgumentMatchers.anyLong());
    }

    @Test
    public void testLagIsUpdatedWhenNoRecords() {
        KafkaConfig$ kafkaConfig$ = KafkaConfig$.MODULE$;
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = kafkaConfig$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager) Mockito.mock(LogManager.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.highWatermark())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.latestEpoch()).thenReturn(new Some(BoxesRunTime.boxToInteger(2)));
        Mockito.when(unifiedLog.endOffsetForEpoch(0)).thenReturn(new Some(new OffsetAndEpoch(0L, 0)));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logEndOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        Mockito.when(unifiedLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Mockito.when(replicaManager.metadataCache()).thenReturn(metadataCache());
        Mockito.when(replicaManager.logManager()).thenReturn(logManager);
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(new Some(new LogAppendInfo(-1L, 0L, OptionalInt.empty(), -1L, -1L, -1L, -1L, RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L)));
        LogContext logContext = new LogContext("[ReplicaFetcher replicaId=" + fromProps.brokerId() + ", leaderId=" + brokerEndPoint().id() + ", fetcherId=0] ");
        MockBlockingSender mockBlockingSender = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint(), Time.SYSTEM);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("fetcher-thread", new RemoteLeaderEndPoint(logContext.logPrefix(), mockBlockingSender, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicationQuotaManager, () -> {
            return MetadataVersion.MINIMUM_VERSION;
        }, () -> {
            return 1L;
        }), fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicationQuotaManager, logContext.logPrefix(), () -> {
            return MetadataVersion.MINIMUM_VERSION;
        });
        replicaFetcherThread.addPartitions((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), initialFetchState(new Some(topicId1()), 0L, 1))})));
        Assertions.assertEquals(None$.MODULE$, replicaFetcherThread.fetchState(t1p0()).flatMap(partitionFetchState -> {
            return partitionFetchState.lag();
        }));
        mockBlockingSender.setFetchPartitionDataForNextResponse((scala.collection.Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new FetchResponseData.PartitionData().setPartitionIndex(t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setHighWatermark(0L).setRecords(MemoryRecords.EMPTY))})));
        mockBlockingSender.setIdsForNextResponse(topicIds());
        replicaFetcherThread.doWork();
        Assertions.assertEquals(1, mockBlockingSender.fetchCount());
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(0)), replicaFetcherThread.fetchState(t1p0()).flatMap(partitionFetchState2 -> {
            return partitionFetchState2.lag();
        }));
        Assertions.assertEquals(new Some(BoxesRunTime.boxToInteger(2)), replicaFetcherThread.fetchState(t1p0()).flatMap(partitionFetchState3 -> {
            return partitionFetchState3.lastFetchedEpoch();
        }));
    }

    @Test
    public void shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        blockingSend.initiateClose();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalArgumentException()});
        blockingSend.close();
        Mockito.when(BoxedUnit.UNIT).thenThrow(new Throwable[]{new IllegalStateException()});
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, null, blockingSend, MetadataVersion.latestTesting());
        createReplicaFetcherThread.start();
        createReplicaFetcherThread.initiateShutdown();
        createReplicaFetcherThread.awaitShutdown();
        ((BlockingSend) Mockito.verify(blockingSend)).initiateClose();
        ((BlockingSend) Mockito.verify(blockingSend)).close();
    }

    @Test
    public void shouldUpdateReassignmentBytesInMetrics() {
        assertProcessPartitionDataWhen(true);
    }

    @Test
    public void shouldNotUpdateReassignmentBytesInMetricsWhenNoReassignmentsInProgress() {
        assertProcessPartitionDataWhen(false);
    }

    @Test
    public void testBuildFetch() {
        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId1(), t1p0());
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicId1(), t1p1());
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(topicId2(), t2p1());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        ReplicaQuota replicaQuota = (ReplicaQuota) Mockito.mock(ReplicaQuota.class);
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(Mockito.mock(BrokerTopicStats.class));
        Mockito.when(replicaManager.localLogOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToBoolean(replicaQuota.isThrottled((TopicPartition) ArgumentMatchers.any()))).thenReturn(BoxesRunTime.boxToBoolean(false));
        Mockito.when(BoxesRunTime.boxToLong(unifiedLog.logStartOffset())).thenReturn(BoxesRunTime.boxToLong(0L));
        LogContext logContext = new LogContext("[ReplicaFetcher replicaId=" + fromProps.brokerId() + ", leaderId=" + brokerEndPoint().id() + ", fetcherId=0] ");
        RemoteLeaderEndPoint remoteLeaderEndPoint = new RemoteLeaderEndPoint(logContext.logPrefix(), blockingSend, new FetchSessionHandler(logContext, brokerEndPoint().id()), fromProps, replicaManager, replicaQuota, () -> {
            return MetadataVersion.MINIMUM_VERSION;
        }, () -> {
            return 1L;
        });
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", remoteLeaderEndPoint, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, replicaQuota, logContext.logPrefix(), () -> {
            return MetadataVersion.MINIMUM_VERSION;
        });
        Map map = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new PartitionFetchState(new Some(topicId1()), 150L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(topicId2()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch = replicaFetcherThread.leader().buildFetch(map);
        if (buildFetch == null) {
            throw new MatchError((Object) null);
        }
        Option option = (Option) buildFetch.result();
        Assertions.assertTrue(option.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch) option.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava(map.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple2._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        })).asJava(), fetchRequest.fetchData());
        Assertions.assertEquals(0, fetchRequest.replaced().size());
        Assertions.assertEquals(0, fetchRequest.removed().size());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition2, new FetchResponseData.PartitionData());
        linkedHashMap.put(topicIdPartition3, new FetchResponseData.PartitionData());
        remoteLeaderEndPoint.fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 123, linkedHashMap), ApiKeys.FETCH.latestVersion());
        Map map2 = (Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new PartitionFetchState(new Some(topicId1()), 155L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new PartitionFetchState(new Some(Uuid.randomUuid()), 160L, None$.MODULE$, 1, None$.MODULE$, Fetching$.MODULE$, None$.MODULE$))}));
        AbstractFetcherThread.ResultWithPartitions buildFetch2 = replicaFetcherThread.leader().buildFetch(map2);
        if (buildFetch2 == null) {
            throw new MatchError((Object) null);
        }
        Option option2 = (Option) buildFetch2.result();
        Map map3 = ((MapOps) map2.drop(1)).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError((Object) null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22._1();
            PartitionFetchState partitionFetchState = (PartitionFetchState) tuple22._2();
            return new Tuple2(topicPartition, new FetchRequest.PartitionData((Uuid) partitionFetchState.topicId().get(), partitionFetchState.fetchOffset(), 0L, Predef$.MODULE$.Integer2int(fromProps.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(partitionFetchState.currentLeaderEpoch())), Optional.empty()));
        });
        Assertions.assertTrue(option2.isDefined());
        FetchRequest.Builder fetchRequest2 = ((AbstractFetcherThread.ReplicaFetch) option2.get()).fetchRequest();
        Assertions.assertEquals(CollectionConverters$.MODULE$.MapHasAsJava(map3).asJava(), fetchRequest2.fetchData());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), fetchRequest2.replaced());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition), fetchRequest2.removed());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLocalFetchCompletionIfHighWatermarkUpdated(boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        Some some = z ? new Some(BoxesRunTime.boxToLong(100L)) : None$.MODULE$;
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        Mockito.when(unifiedLog.maybeUpdateHighWatermark(100L)).thenReturn(some);
        Some some2 = new Some(Mockito.mock(LogAppendInfo.class));
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica((MemoryRecords) ArgumentMatchers.any(), BoxesRunTime.unboxToBoolean(ArgumentMatchers.any()))).thenReturn(some2);
        Buffer empty = Buffer$.MODULE$.empty();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        replicaManager.completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        Mockito.when(BoxedUnit.UNIT).thenAnswer(invocationOnMock -> {
            return empty.$plus$plus$eq((scala.collection.immutable.Seq) invocationOnMock.getArguments()[0]);
        });
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(new BrokerTopicStats());
        ReplicaFetcherThread createReplicaFetcherThread = createReplicaFetcherThread("replica-fetcher", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend, MetadataVersion.latestTesting());
        TopicPartition topicPartition = new TopicPartition("testTopic", 0);
        TopicPartition topicPartition2 = new TopicPartition("testTopic", 1);
        FetchResponseData.PartitionData highWatermark = new FetchResponseData.PartitionData().setRecords(MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))})).setHighWatermark(100L);
        createReplicaFetcherThread.processPartitionData(topicPartition, 0L, highWatermark.setPartitionIndex(0));
        createReplicaFetcherThread.processPartitionData(topicPartition2, 0L, highWatermark.setPartitionIndex(1));
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        createReplicaFetcherThread.doWork();
        if (z) {
            Assertions.assertEquals(new $colon.colon(topicPartition, new $colon.colon(topicPartition2, Nil$.MODULE$)), empty);
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        } else {
            ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).completeDelayedFetchRequests((Seq) ArgumentMatchers.any());
        }
        Assertions.assertEquals(Buffer$.MODULE$.empty(), createReplicaFetcherThread.partitionsWithNewHighWatermark());
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, int i, long j) {
        return newOffsetForLeaderPartitionResult(topicPartition, Errors.NONE, i, j);
    }

    private OffsetForLeaderEpochResponseData.EpochEndOffset newOffsetForLeaderPartitionResult(TopicPartition topicPartition, Errors errors, int i, long j) {
        return new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(errors.code()).setLeaderEpoch(i).setEndOffset(j);
    }

    private void assertProcessPartitionDataWhen(boolean z) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        TestUtils$ testUtils$3 = TestUtils$.MODULE$;
        int RandomPort = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$4 = TestUtils$.MODULE$;
        None$ none$ = None$.MODULE$;
        TestUtils$ testUtils$5 = TestUtils$.MODULE$;
        None$ none$2 = None$.MODULE$;
        TestUtils$ testUtils$6 = TestUtils$.MODULE$;
        None$ none$3 = None$.MODULE$;
        TestUtils$ testUtils$7 = TestUtils$.MODULE$;
        TestUtils$ testUtils$8 = TestUtils$.MODULE$;
        int RandomPort2 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$9 = TestUtils$.MODULE$;
        int RandomPort3 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$10 = TestUtils$.MODULE$;
        int RandomPort4 = TestUtils$.MODULE$.RandomPort();
        TestUtils$ testUtils$11 = TestUtils$.MODULE$;
        None$ none$4 = None$.MODULE$;
        TestUtils$ testUtils$12 = TestUtils$.MODULE$;
        TestUtils$ testUtils$13 = TestUtils$.MODULE$;
        TestUtils$ testUtils$14 = TestUtils$.MODULE$;
        TestUtils$ testUtils$15 = TestUtils$.MODULE$;
        TestUtils$ testUtils$16 = TestUtils$.MODULE$;
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(testUtils$.createBrokerConfig(1, true, true, RandomPort, none$, none$2, none$3, true, false, RandomPort2, false, RandomPort3, false, RandomPort4, none$4, 1, false, 1, (short) 1, false));
        BlockingSend blockingSend = (BlockingSend) Mockito.mock(BlockingSend.class);
        Mockito.when(blockingSend.brokerEndPoint()).thenReturn(brokerEndPoint());
        UnifiedLog unifiedLog = (UnifiedLog) Mockito.mock(UnifiedLog.class);
        MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, Compression.NONE, TimestampType.CREATE_TIME, -1L, (short) -1, -1, -1, false, new SimpleRecord[]{new SimpleRecord(1000L, "foo".getBytes(StandardCharsets.UTF_8))});
        Mockito.when(unifiedLog.maybeUpdateHighWatermark(0L)).thenReturn(None$.MODULE$);
        Partition partition = (Partition) Mockito.mock(Partition.class);
        Mockito.when(partition.localLogOrException()).thenReturn(unifiedLog);
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isReassigning())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(BoxesRunTime.boxToBoolean(partition.isAddingLocalReplica())).thenReturn(BoxesRunTime.boxToBoolean(z));
        Mockito.when(partition.appendRecordsToFollowerOrFutureReplica(withRecords, false)).thenReturn(None$.MODULE$);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Mockito.when(replicaManager.getPartitionOrException((TopicPartition) ArgumentMatchers.any())).thenReturn(partition);
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Mockito.when(replicaManager.brokerTopicStats()).thenReturn(brokerTopicStats);
        createReplicaFetcherThread("bob", 0, fromProps, kafka$server$ReplicaFetcherThreadTest$$failedPartitions(), replicaManager, (ReplicaQuota) Mockito.mock(ReplicaQuota.class), blockingSend, MetadataVersion.latestTesting()).processPartitionData(t1p0(), 0L, new FetchResponseData.PartitionData().setPartitionIndex(t1p0().partition()).setLastStableOffset(0L).setLogStartOffset(0L).setRecords(withRecords));
        if (z) {
            Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        } else {
            Assertions.assertEquals(0L, ((Meter) brokerTopicStats.allTopicsStats().reassignmentBytesInPerSec().get()).count());
        }
        Assertions.assertEquals(withRecords.sizeInBytes(), ((Meter) brokerTopicStats.allTopicsStats().replicationBytesInRate().get()).count());
    }

    public void stub(Partition partition, ReplicaManager replicaManager, UnifiedLog unifiedLog) {
        Mockito.when(replicaManager.localLogOrException(t1p0())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p0())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t1p1())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t1p1())).thenReturn(partition);
        Mockito.when(replicaManager.localLogOrException(t2p1())).thenReturn(unifiedLog);
        Mockito.when(replicaManager.getPartitionOrException(t2p1())).thenReturn(partition);
    }

    public static final /* synthetic */ void $anonfun$assertPartitionStates$1(AbstractFetcherThread abstractFetcherThread, boolean z, boolean z2, boolean z3, TopicPartition topicPartition) {
        Assertions.assertTrue(abstractFetcherThread.fetchState(topicPartition).isDefined());
        PartitionFetchState partitionFetchState = (PartitionFetchState) abstractFetcherThread.fetchState(topicPartition).get();
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z), BoxesRunTime.boxToBoolean(partitionFetchState.isReadyForFetch()), "Partition " + topicPartition + " should" + (!z ? " NOT" : "") + " be ready for fetching");
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z2), BoxesRunTime.boxToBoolean(partitionFetchState.isTruncating()), "Partition " + topicPartition + " should" + (!z2 ? " NOT" : "") + " be truncating its log");
        Assertions.assertEquals(BoxesRunTime.boxToBoolean(z3), BoxesRunTime.boxToBoolean(partitionFetchState.isDelayed()), "Partition " + topicPartition + " should" + (!z3 ? " NOT" : "") + " be delayed");
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$4(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    private static final FetchResponseData.PartitionData partitionData$1(int i, FetchResponseData.EpochEndOffset epochEndOffset) {
        return new FetchResponseData.PartitionData().setPartitionIndex(i).setLastStableOffset(0L).setLogStartOffset(0L).setDivergingEpoch(epochEndOffset);
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$5(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$6(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public static final /* synthetic */ void $anonfun$shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower$7(ReplicaFetcherThread replicaFetcherThread, TopicPartition topicPartition) {
        Assertions.assertEquals(Fetching$.MODULE$, ((PartitionFetchState) replicaFetcherThread.fetchState(topicPartition).get()).state());
    }

    public ReplicaFetcherThreadTest() {
        MetadataCache$ metadataCache$ = MetadataCache$.MODULE$;
        this.metadataCache = new KRaftMetadataCache(0, () -> {
            return KRaftVersion.LATEST_PRODUCTION;
        });
    }
}
