package io.kroxylicious.proxy.internal;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.kroxylicious.proxy.filter.FilterAndInvoker;
import io.kroxylicious.proxy.filter.NetFilter;
import io.kroxylicious.proxy.frame.DecodedRequestFrame;
import io.kroxylicious.proxy.frame.RequestFrame;
import io.kroxylicious.proxy.internal.ProxyChannelState;
import io.kroxylicious.proxy.internal.codec.FrameOversizedException;
import io.kroxylicious.proxy.model.VirtualCluster;
import io.kroxylicious.proxy.service.HostPort;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/internal/ProxyChannelStateMachine.class */
public class ProxyChannelStateMachine {
    private static final String DUPLICATE_INITIATE_CONNECT_ERROR = "NetFilter called NetFilterContext.initiateConnect() more than once";
    private static final Logger LOGGER = LoggerFactory.getLogger(ProxyChannelStateMachine.class);
    boolean serverReadsBlocked;
    boolean clientReadsBlocked;

    @Nullable
    private KafkaProxyBackendHandler backendHandler;

    @NonNull
    private ProxyChannelState state = ProxyChannelState.Startup.STARTING_STATE;

    @NonNull
    private KafkaProxyFrontendHandler frontendHandler = null;

    ProxyChannelState state() {
        return this.state;
    }

    void forceState(@NonNull ProxyChannelState proxyChannelState, @NonNull KafkaProxyFrontendHandler kafkaProxyFrontendHandler, @Nullable KafkaProxyBackendHandler kafkaProxyBackendHandler) {
        LOGGER.info("Forcing state to {} with {} and {}", new Object[]{proxyChannelState, kafkaProxyFrontendHandler, kafkaProxyBackendHandler});
        this.state = proxyChannelState;
        this.frontendHandler = kafkaProxyFrontendHandler;
        this.backendHandler = kafkaProxyBackendHandler;
    }

    public String toString() {
        return "StateHolder{state=" + String.valueOf(this.state) + ", serverReadsBlocked=" + this.serverReadsBlocked + ", clientReadsBlocked=" + this.clientReadsBlocked + ", frontendHandler=" + String.valueOf(this.frontendHandler) + ", backendHandler=" + String.valueOf(this.backendHandler) + "}";
    }

    public String currentState() {
        return state().getClass().getSimpleName();
    }

    public void onClientUnwritable() {
        if (this.serverReadsBlocked) {
            return;
        }
        this.serverReadsBlocked = true;
        ((KafkaProxyBackendHandler) Objects.requireNonNull(this.backendHandler)).applyBackpressure();
    }

    public void onClientWritable() {
        if (this.serverReadsBlocked) {
            this.serverReadsBlocked = false;
            ((KafkaProxyBackendHandler) Objects.requireNonNull(this.backendHandler)).relieveBackpressure();
        }
    }

    public void onServerUnwritable() {
        if (this.clientReadsBlocked) {
            return;
        }
        this.clientReadsBlocked = true;
        this.frontendHandler.applyBackpressure();
    }

    public void onServerWritable() {
        if (this.clientReadsBlocked) {
            this.clientReadsBlocked = false;
            this.frontendHandler.relieveBackpressure();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onClientActive(@NonNull KafkaProxyFrontendHandler kafkaProxyFrontendHandler) {
        if (!ProxyChannelState.Startup.STARTING_STATE.equals(this.state)) {
            illegalState("Client activation while not in the start state");
        } else {
            this.frontendHandler = kafkaProxyFrontendHandler;
            toClientActive(ProxyChannelState.Startup.STARTING_STATE.toClientActive(), kafkaProxyFrontendHandler);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onNetFilterInitiateConnect(@NonNull HostPort hostPort, @NonNull List<FilterAndInvoker> list, VirtualCluster virtualCluster, NetFilter netFilter) {
        ProxyChannelState proxyChannelState = this.state;
        if (proxyChannelState instanceof ProxyChannelState.SelectingServer) {
            toConnecting(((ProxyChannelState.SelectingServer) proxyChannelState).toConnecting(hostPort), list, virtualCluster);
        } else {
            illegalState("NetFilter called NetFilterContext.initiateConnect() more than once : netFilter='" + String.valueOf(netFilter) + "'");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onServerActive() {
        ProxyChannelState state = state();
        if (state instanceof ProxyChannelState.Connecting) {
            toForwarding(((ProxyChannelState.Connecting) state).toForwarding());
        } else {
            illegalState("Server became active while not in the connecting state");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void illegalState(@NonNull String str) {
        if (this.state instanceof ProxyChannelState.Closed) {
            return;
        }
        LOGGER.error("Unexpected event while in {} message: {}, closing channels with no client response.", this.state, str);
        toClosed(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void messageFromServer(Object obj) {
        ((KafkaProxyFrontendHandler) Objects.requireNonNull(this.frontendHandler)).forwardToClient(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void serverReadComplete() {
        ((KafkaProxyFrontendHandler) Objects.requireNonNull(this.frontendHandler)).flushToClient();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void messageFromClient(Object obj) {
        ((KafkaProxyBackendHandler) Objects.requireNonNull(this.backendHandler)).forwardToServer(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clientReadComplete() {
        if (this.state instanceof ProxyChannelState.Forwarding) {
            ((KafkaProxyBackendHandler) Objects.requireNonNull(this.backendHandler)).flushToServer();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onClientRequest(@NonNull SaslDecodePredicate saslDecodePredicate, Object obj) {
        Objects.requireNonNull(this.frontendHandler);
        if (state() instanceof ProxyChannelState.Forwarding) {
            messageFromClient(obj);
        } else {
            if (onClientRequestBeforeForwarding(saslDecodePredicate, obj)) {
                return;
            }
            illegalState("Unexpected message received: " + (obj == null ? "null" : "message class=" + String.valueOf(obj.getClass())));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assertIsConnecting(String str) {
        if (this.state instanceof ProxyChannelState.Connecting) {
            return;
        }
        illegalState(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProxyChannelState.SelectingServer enforceInSelectingServer(String str) {
        ProxyChannelState proxyChannelState = this.state;
        if (proxyChannelState instanceof ProxyChannelState.SelectingServer) {
            return (ProxyChannelState.SelectingServer) proxyChannelState;
        }
        illegalState(str);
        throw new IllegalStateException("State required to be " + ProxyChannelState.SelectingServer.class.getSimpleName() + " but was " + currentState() + ":" + str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onServerInactive() {
        toClosed(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onClientInactive() {
        toClosed(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onServerException(Throwable th) {
        LOGGER.atWarn().setCause(LOGGER.isDebugEnabled() ? th : null).addArgument(th != null ? th.getMessage() : "").log("Exception from the server channel: {}. Increase log level to DEBUG for stacktrace");
        toClosed(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onClientException(Throwable th, boolean z) {
        ApiException exception;
        if (th instanceof DecoderException) {
            Throwable cause = ((DecoderException) th).getCause();
            if (cause instanceof FrameOversizedException) {
                FrameOversizedException frameOversizedException = (FrameOversizedException) cause;
                LOGGER.warn("Received over-sized frame from the client, max frame size bytes {}, received frame size bytes {} (hint: are we decoding a Kafka frame, or something unexpected like an HTTP request{}?)", new Object[]{Integer.valueOf(frameOversizedException.getMaxFrameSizeBytes()), Integer.valueOf(frameOversizedException.getReceivedFrameSizeBytes()), z ? "" : " or an unexpected TLS handshake"});
                exception = Errors.INVALID_REQUEST.exception();
                toClosed(exception);
            }
        }
        LOGGER.atWarn().setCause(LOGGER.isDebugEnabled() ? th : null).addArgument(th != null ? th.getMessage() : "").log("Exception from the client channel: {}. Increase log level to DEBUG for stacktrace");
        exception = Errors.UNKNOWN_SERVER_ERROR.exception();
        toClosed(exception);
    }

    private void toClientActive(@NonNull ProxyChannelState.ClientActive clientActive, @NonNull KafkaProxyFrontendHandler kafkaProxyFrontendHandler) {
        setState(clientActive);
        kafkaProxyFrontendHandler.inClientActive();
    }

    private void toConnecting(ProxyChannelState.Connecting connecting, @NonNull List<FilterAndInvoker> list, VirtualCluster virtualCluster) {
        setState(connecting);
        this.backendHandler = new KafkaProxyBackendHandler(this, virtualCluster);
        this.frontendHandler.inConnecting(connecting.remote(), list, this.backendHandler);
    }

    private void toForwarding(ProxyChannelState.Forwarding forwarding) {
        setState(forwarding);
        ((KafkaProxyFrontendHandler) Objects.requireNonNull(this.frontendHandler)).inForwarding();
    }

    private boolean onClientRequestBeforeForwarding(@NonNull SaslDecodePredicate saslDecodePredicate, Object obj) {
        this.frontendHandler.bufferMsg(obj);
        ProxyChannelState state = state();
        if (state instanceof ProxyChannelState.ClientActive) {
            return onClientRequestInClientActiveState(saslDecodePredicate, obj, (ProxyChannelState.ClientActive) state);
        }
        ProxyChannelState state2 = state();
        if (state2 instanceof ProxyChannelState.HaProxy) {
            return onClientRequestInHaProxyState(saslDecodePredicate, obj, (ProxyChannelState.HaProxy) state2);
        }
        ProxyChannelState state3 = state();
        return state3 instanceof ProxyChannelState.ApiVersions ? onClientRequestInApiVersionsState(saslDecodePredicate, obj, (ProxyChannelState.ApiVersions) state3) : state() instanceof ProxyChannelState.SelectingServer ? obj instanceof RequestFrame : (state() instanceof ProxyChannelState.Connecting) && (obj instanceof RequestFrame);
    }

    private boolean onClientRequestInApiVersionsState(@NonNull SaslDecodePredicate saslDecodePredicate, Object obj, ProxyChannelState.ApiVersions apiVersions) {
        if (!(obj instanceof RequestFrame)) {
            return false;
        }
        toSelectingServer(apiVersions.toSelectingServer());
        return true;
    }

    private boolean onClientRequestInHaProxyState(@NonNull SaslDecodePredicate saslDecodePredicate, Object obj, ProxyChannelState.HaProxy haProxy) {
        Objects.requireNonNull(haProxy);
        Function<DecodedRequestFrame<ApiVersionsRequestData>, ProxyChannelState.ApiVersions> function = haProxy::toApiVersions;
        Objects.requireNonNull(haProxy);
        return transitionClientRequest(saslDecodePredicate, obj, function, haProxy::toSelectingServer);
    }

    private boolean transitionClientRequest(@NonNull SaslDecodePredicate saslDecodePredicate, Object obj, Function<DecodedRequestFrame<ApiVersionsRequestData>, ProxyChannelState.ApiVersions> function, Function<DecodedRequestFrame<ApiVersionsRequestData>, ProxyChannelState.SelectingServer> function2) {
        if (!isMessageApiVersionsRequest(obj)) {
            if (!(obj instanceof RequestFrame)) {
                return false;
            }
            toSelectingServer(function2.apply(null));
            return true;
        }
        DecodedRequestFrame<ApiVersionsRequestData> decodedRequestFrame = (DecodedRequestFrame) obj;
        if (saslDecodePredicate.isAuthenticationOffloadEnabled()) {
            toApiVersions(function.apply(decodedRequestFrame), decodedRequestFrame);
            return true;
        }
        toSelectingServer(function2.apply(decodedRequestFrame));
        return true;
    }

    private boolean onClientRequestInClientActiveState(@NonNull SaslDecodePredicate saslDecodePredicate, Object obj, ProxyChannelState.ClientActive clientActive) {
        if (obj instanceof HAProxyMessage) {
            toHaProxy(clientActive.toHaProxy((HAProxyMessage) obj));
            return true;
        }
        Objects.requireNonNull(clientActive);
        Function<DecodedRequestFrame<ApiVersionsRequestData>, ProxyChannelState.ApiVersions> function = clientActive::toApiVersions;
        Objects.requireNonNull(clientActive);
        return transitionClientRequest(saslDecodePredicate, obj, function, clientActive::toSelectingServer);
    }

    private void toHaProxy(ProxyChannelState.HaProxy haProxy) {
        setState(haProxy);
    }

    private void toApiVersions(ProxyChannelState.ApiVersions apiVersions, DecodedRequestFrame<ApiVersionsRequestData> decodedRequestFrame) {
        setState(apiVersions);
        ((KafkaProxyFrontendHandler) Objects.requireNonNull(this.frontendHandler)).inApiVersions(decodedRequestFrame);
    }

    private void toSelectingServer(ProxyChannelState.SelectingServer selectingServer) {
        setState(selectingServer);
        ((KafkaProxyFrontendHandler) Objects.requireNonNull(this.frontendHandler)).inSelectingServer();
    }

    private void toClosed(@Nullable Throwable th) {
        if (this.state instanceof ProxyChannelState.Closed) {
            return;
        }
        setState(new ProxyChannelState.Closed());
        if (this.backendHandler != null) {
            this.backendHandler.inClosed();
        }
        ((KafkaProxyFrontendHandler) Objects.requireNonNull(this.frontendHandler)).inClosed(th);
    }

    private void setState(@NonNull ProxyChannelState proxyChannelState) {
        LOGGER.trace("{} transitioning to {}", this, proxyChannelState);
        this.state = proxyChannelState;
    }

    private static boolean isMessageApiVersionsRequest(Object obj) {
        return (obj instanceof DecodedRequestFrame) && ((DecodedRequestFrame) obj).apiKey() == ApiKeys.API_VERSIONS;
    }
}
