package io.kroxylicious.proxy;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.kroxylicious.proxy.bootstrap.FilterChainFactory;
import io.kroxylicious.proxy.config.Configuration;
import io.kroxylicious.proxy.config.IllegalConfigurationException;
import io.kroxylicious.proxy.config.MicrometerDefinition;
import io.kroxylicious.proxy.config.PluginFactoryRegistry;
import io.kroxylicious.proxy.config.admin.AdminHttpConfiguration;
import io.kroxylicious.proxy.internal.ApiVersionsServiceImpl;
import io.kroxylicious.proxy.internal.KafkaProxyInitializer;
import io.kroxylicious.proxy.internal.MeterRegistries;
import io.kroxylicious.proxy.internal.PortConflictDetector;
import io.kroxylicious.proxy.internal.admin.AdminHttpInitializer;
import io.kroxylicious.proxy.internal.config.Features;
import io.kroxylicious.proxy.internal.net.DefaultNetworkBindingOperationProcessor;
import io.kroxylicious.proxy.internal.net.EndpointRegistry;
import io.kroxylicious.proxy.internal.net.NetworkBindingOperationProcessor;
import io.kroxylicious.proxy.internal.util.Metrics;
import io.kroxylicious.proxy.model.VirtualCluster;
import io.kroxylicious.proxy.service.HostPort;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.incubator.channel.uring.IOUring;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.util.concurrent.Future;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.protocol.ApiKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/KafkaProxy.class */
public final class KafkaProxy implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxy.class);
    private static final Logger STARTUP_SHUTDOWN_LOGGER = LoggerFactory.getLogger("io.kroxylicious.proxy.StartupShutdownLogger");

    @NonNull
    private final Configuration config;

    @Nullable
    private final AdminHttpConfiguration adminHttpConfig;

    @NonNull
    private final List<MicrometerDefinition> micrometerConfig;

    @NonNull
    private final List<VirtualCluster> virtualClusters;
    private final AtomicBoolean running = new AtomicBoolean();
    private final CompletableFuture<Void> shutdown = new CompletableFuture<>();
    private final NetworkBindingOperationProcessor bindingOperationProcessor = new DefaultNetworkBindingOperationProcessor();
    private final EndpointRegistry endpointRegistry = new EndpointRegistry(this.bindingOperationProcessor);

    @NonNull
    private final PluginFactoryRegistry pfr;

    @Nullable
    private MeterRegistries meterRegistries;

    @Nullable
    private FilterChainFactory filterChainFactory;

    @Nullable
    private EventGroupConfig adminEventGroup;

    @Nullable
    private EventGroupConfig serverEventGroup;

    @Nullable
    private Channel metricsChannel;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/kroxylicious/proxy/KafkaProxy$EventGroupConfig.class */
    public static final class EventGroupConfig extends Record {
        private final String name;
        private final EventLoopGroup bossGroup;
        private final EventLoopGroup workerGroup;
        private final Class<? extends ServerChannel> clazz;

        private EventGroupConfig(String str, EventLoopGroup eventLoopGroup, EventLoopGroup eventLoopGroup2, Class<? extends ServerChannel> cls) {
            this.name = str;
            this.bossGroup = eventLoopGroup;
            this.workerGroup = eventLoopGroup2;
            this.clazz = cls;
        }

        public List<Future<?>> shutdownGracefully() {
            return List.of(this.bossGroup.shutdownGracefully(), this.workerGroup.shutdownGracefully());
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, EventGroupConfig.class), EventGroupConfig.class, "name;bossGroup;workerGroup;clazz", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->name:Ljava/lang/String;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->bossGroup:Lio/netty/channel/EventLoopGroup;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->workerGroup:Lio/netty/channel/EventLoopGroup;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->clazz:Ljava/lang/Class;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, EventGroupConfig.class), EventGroupConfig.class, "name;bossGroup;workerGroup;clazz", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->name:Ljava/lang/String;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->bossGroup:Lio/netty/channel/EventLoopGroup;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->workerGroup:Lio/netty/channel/EventLoopGroup;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->clazz:Ljava/lang/Class;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, EventGroupConfig.class, Object.class), EventGroupConfig.class, "name;bossGroup;workerGroup;clazz", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->name:Ljava/lang/String;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->bossGroup:Lio/netty/channel/EventLoopGroup;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->workerGroup:Lio/netty/channel/EventLoopGroup;", "FIELD:Lio/kroxylicious/proxy/KafkaProxy$EventGroupConfig;->clazz:Ljava/lang/Class;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String name() {
            return this.name;
        }

        public EventLoopGroup bossGroup() {
            return this.bossGroup;
        }

        public EventLoopGroup workerGroup() {
            return this.workerGroup;
        }

        public Class<? extends ServerChannel> clazz() {
            return this.clazz;
        }
    }

    public KafkaProxy(@NonNull PluginFactoryRegistry pluginFactoryRegistry, @NonNull Configuration configuration, @NonNull Features features) {
        this.pfr = (PluginFactoryRegistry) Objects.requireNonNull(pluginFactoryRegistry);
        this.config = validate((Configuration) Objects.requireNonNull(configuration), (Features) Objects.requireNonNull(features));
        this.virtualClusters = configuration.virtualClusterModel();
        this.adminHttpConfig = configuration.adminHttpConfig();
        this.micrometerConfig = configuration.getMicrometer();
    }

    static Configuration validate(Configuration configuration, Features features) {
        List<String> supports = features.supports(configuration);
        if (supports.isEmpty()) {
            return configuration;
        }
        String str = "invalid configuration: " + String.join(",", supports);
        LOGGER.error(str);
        throw new IllegalConfigurationException(str);
    }

    public KafkaProxy startup() throws InterruptedException {
        if (this.running.getAndSet(true)) {
            throw new IllegalStateException("This proxy is already running");
        }
        try {
            STARTUP_SHUTDOWN_LOGGER.info("Kroxylicious is starting");
            new PortConflictDetector().validate(this.virtualClusters, Optional.ofNullable(shouldBindAdminEndpoint() ? new HostPort(this.adminHttpConfig.host(), this.adminHttpConfig.port().intValue()) : null));
            int availableProcessors = Runtime.getRuntime().availableProcessors();
            this.meterRegistries = new MeterRegistries(this.micrometerConfig);
            this.adminEventGroup = buildNettyEventGroups("admin", availableProcessors, this.config.isUseIoUring());
            this.serverEventGroup = buildNettyEventGroups("server", availableProcessors, this.config.isUseIoUring());
            maybeStartMetricsListener(this.adminEventGroup, this.meterRegistries);
            ApiVersionsServiceImpl apiVersionsServiceImpl = new ApiVersionsServiceImpl(getApiKeyMaxVersionOverride(this.config));
            this.filterChainFactory = new FilterChainFactory(this.pfr, this.config.filters());
            this.bindingOperationProcessor.start(buildServerBootstrap(this.serverEventGroup, new KafkaProxyInitializer(this.filterChainFactory, this.pfr, false, this.endpointRegistry, this.endpointRegistry, false, Map.of(), apiVersionsServiceImpl)), buildServerBootstrap(this.serverEventGroup, new KafkaProxyInitializer(this.filterChainFactory, this.pfr, true, this.endpointRegistry, this.endpointRegistry, false, Map.of(), apiVersionsServiceImpl)));
            CompletableFuture.allOf((CompletableFuture[]) this.virtualClusters.stream().map(virtualCluster -> {
                return this.endpointRegistry.registerVirtualCluster(virtualCluster).toCompletableFuture();
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            Metrics.inboundDownstreamMessagesCounter();
            Metrics.inboundDownstreamDecodedMessagesCounter();
            return this;
        } catch (InterruptedException | RuntimeException e) {
            shutdown();
            throw e;
        }
    }

    private Map<ApiKeys, Short> getApiKeyMaxVersionOverride(Configuration configuration) {
        Optional<U> map = configuration.development().map(map2 -> {
            return map2.get("apiKeyIdMaxVersionOverride");
        });
        Class<Map> cls = Map.class;
        Objects.requireNonNull(Map.class);
        Optional filter = map.filter(cls::isInstance);
        Class<Map> cls2 = Map.class;
        Objects.requireNonNull(Map.class);
        return (Map) ((Map) filter.map(cls2::cast).orElse(Map.of())).entrySet().stream().collect(Collectors.toMap(entry -> {
            return ApiKeys.valueOf((String) entry.getKey());
        }, entry2 -> {
            return Short.valueOf(((Number) entry2.getValue()).shortValue());
        }));
    }

    private ServerBootstrap buildServerBootstrap(EventGroupConfig eventGroupConfig, KafkaProxyInitializer kafkaProxyInitializer) {
        return new ServerBootstrap().group(eventGroupConfig.bossGroup(), eventGroupConfig.workerGroup()).channel(eventGroupConfig.clazz()).option(ChannelOption.SO_REUSEADDR, true).childHandler(kafkaProxyInitializer).childOption(ChannelOption.TCP_NODELAY, true);
    }

    private EventGroupConfig buildNettyEventGroups(String str, int i, boolean z) {
        IOUringEventLoopGroup nioEventLoopGroup;
        IOUringEventLoopGroup nioEventLoopGroup2;
        Class cls;
        if (z) {
            if (!IOUring.isAvailable()) {
                throw new IllegalStateException("io_uring not available due to: " + String.valueOf(IOUring.unavailabilityCause()));
            }
            nioEventLoopGroup = new IOUringEventLoopGroup(1);
            nioEventLoopGroup2 = new IOUringEventLoopGroup(i);
            cls = IOUringServerSocketChannel.class;
        } else if (Epoll.isAvailable()) {
            nioEventLoopGroup = new EpollEventLoopGroup(1);
            nioEventLoopGroup2 = new EpollEventLoopGroup(i);
            cls = EpollServerSocketChannel.class;
        } else if (KQueue.isAvailable()) {
            nioEventLoopGroup = new KQueueEventLoopGroup(1);
            nioEventLoopGroup2 = new KQueueEventLoopGroup(i);
            cls = KQueueServerSocketChannel.class;
        } else {
            nioEventLoopGroup = new NioEventLoopGroup(1);
            nioEventLoopGroup2 = new NioEventLoopGroup(i);
            cls = NioServerSocketChannel.class;
        }
        return new EventGroupConfig(str, nioEventLoopGroup, nioEventLoopGroup2, cls);
    }

    private void maybeStartMetricsListener(EventGroupConfig eventGroupConfig, MeterRegistries meterRegistries) throws InterruptedException {
        if (shouldBindAdminEndpoint()) {
            ServerBootstrap childHandler = new ServerBootstrap().group(eventGroupConfig.bossGroup(), eventGroupConfig.workerGroup()).option(ChannelOption.SO_REUSEADDR, true).channel(eventGroupConfig.clazz()).childHandler(new AdminHttpInitializer(meterRegistries, this.adminHttpConfig));
            LOGGER.info("Binding metrics endpoint: {}:{}", this.adminHttpConfig.host(), this.adminHttpConfig.port());
            this.metricsChannel = childHandler.bind(this.adminHttpConfig.host(), this.adminHttpConfig.port().intValue()).sync().channel();
        }
    }

    private boolean shouldBindAdminEndpoint() {
        return this.adminHttpConfig != null && this.adminHttpConfig.endpoints().maybePrometheus().isPresent();
    }

    public void block() throws Exception {
        if (!this.running.get()) {
            throw new IllegalStateException("This proxy is not running");
        }
        this.shutdown.join();
    }

    public void shutdown() throws InterruptedException {
        if (!this.running.getAndSet(false)) {
            throw new IllegalStateException("This proxy is not running");
        }
        try {
            STARTUP_SHUTDOWN_LOGGER.info("Shutting down");
            this.endpointRegistry.shutdown().handle((r5, th) -> {
                this.bindingOperationProcessor.close();
                ArrayList arrayList = new ArrayList();
                if (this.serverEventGroup != null) {
                    arrayList.addAll(this.serverEventGroup.shutdownGracefully());
                }
                if (this.adminEventGroup != null) {
                    arrayList.addAll(this.adminEventGroup.shutdownGracefully());
                }
                arrayList.forEach((v0) -> {
                    v0.syncUninterruptibly();
                });
                if (this.filterChainFactory != null) {
                    this.filterChainFactory.close();
                }
                if (th == null) {
                    return null;
                }
                if (th instanceof RuntimeException) {
                    throw ((RuntimeException) th);
                }
                throw new RuntimeException(th);
            }).toCompletableFuture().join();
            if (this.meterRegistries != null) {
                this.meterRegistries.close();
            }
        } finally {
            this.adminEventGroup = null;
            this.serverEventGroup = null;
            this.metricsChannel = null;
            this.meterRegistries = null;
            this.filterChainFactory = null;
            this.shutdown.complete(null);
            LOGGER.info("Shut down completed.");
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.running.get()) {
            shutdown();
        }
    }
}
