package io.axoniq.axonserver.connector.command.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.InstructionHandler;
import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.command.CommandChannel;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream;
import io.axoniq.axonserver.connector.impl.AsyncRegistration;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.ObjectUtils;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.MetaDataValue;
import io.axoniq.axonserver.grpc.ProcessingInstruction;
import io.axoniq.axonserver.grpc.ProcessingKey;
import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandProviderInbound;
import io.axoniq.axonserver.grpc.command.CommandProviderOutbound;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import io.axoniq.axonserver.grpc.command.CommandServiceGrpc;
import io.axoniq.axonserver.grpc.command.CommandSubscription;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.util.internal.OutOfDirectMemoryError;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/command/impl/CommandChannelImpl.class */
public class CommandChannelImpl extends AbstractAxonServerChannel<CommandProviderOutbound> implements CommandChannel {
    private static final Logger logger = LoggerFactory.getLogger(CommandChannelImpl.class);
    private final AtomicReference<CallStreamObserver<CommandProviderOutbound>> outboundCommandStream;
    private final ClientIdentification clientIdentification;
    private final ConcurrentMap<String, CommandHandler> commandHandlers;
    private final ConcurrentMap<CommandProviderInbound.RequestCase, InstructionHandler<CommandProviderInbound, CommandProviderOutbound>> handlers;
    private final int permits;
    private final int permitsBatch;
    private final CommandServiceGrpc.CommandServiceStub commandServiceStub;
    private final CommandHandler noCommandHandler;
    private final String context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axonserver/connector/command/impl/CommandChannelImpl$CommandHandler.class */
    public static class CommandHandler {
        private final Function<Command, CompletableFuture<CommandResponse>> handler;
        private final int loadFactor;

        public CommandHandler(Function<Command, CompletableFuture<CommandResponse>> function, int i) {
            this.handler = function;
            this.loadFactor = i;
        }

        public Function<Command, CompletableFuture<CommandResponse>> getHandler() {
            return this.handler;
        }

        public int getLoadFactor() {
            return this.loadFactor;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.handler.equals(((CommandHandler) obj).handler);
        }

        public int hashCode() {
            return Objects.hash(this.handler);
        }
    }

    /* loaded from: input_file:io/axoniq/axonserver/connector/command/impl/CommandChannelImpl$CommandResponseHandler.class */
    private static class CommandResponseHandler implements StreamObserver<CommandResponse> {
        private final String clientId;
        private final CompletableFuture<CommandResponse> response;

        public CommandResponseHandler(String str, CompletableFuture<CommandResponse> completableFuture) {
            this.clientId = str;
            this.response = completableFuture;
        }

        public void onNext(CommandResponse commandResponse) {
            if (this.response.isDone()) {
                return;
            }
            this.response.complete(commandResponse);
        }

        public void onError(Throwable th) {
            if (this.response.isDone()) {
                return;
            }
            this.response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Received exception while dispatching command", this.clientId, th));
        }

        public void onCompleted() {
            if (this.response.isDone()) {
                return;
            }
            this.response.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Reply completed without result", this.clientId));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axonserver/connector/command/impl/CommandChannelImpl$IncomingCommandStream.class */
    public class IncomingCommandStream extends AbstractIncomingInstructionStream<CommandProviderInbound, CommandProviderOutbound> {
        public IncomingCommandStream(String str, int i, int i2, Consumer<Throwable> consumer, Consumer<CallStreamObserver<CommandProviderOutbound>> consumer2) {
            super(str, i, i2, consumer, consumer2);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
        public CommandProviderOutbound buildFlowControlMessage(FlowControl flowControl) {
            return CommandProviderOutbound.newBuilder().setFlowControl(flowControl).m498build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public CommandProviderOutbound buildAckMessage(InstructionAck instructionAck) {
            return CommandProviderOutbound.newBuilder().setAck(instructionAck).m498build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public String getInstructionId(CommandProviderInbound commandProviderInbound) {
            return commandProviderInbound.getInstructionId();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        public InstructionHandler<CommandProviderInbound, CommandProviderOutbound> getHandler(CommandProviderInbound commandProviderInbound) {
            return (InstructionHandler) CommandChannelImpl.this.handlers.get(commandProviderInbound.getRequestCase());
        }

        @Override // io.axoniq.axonserver.connector.impl.AbstractIncomingInstructionStream
        protected boolean unregisterOutboundStream(CallStreamObserver<CommandProviderOutbound> callStreamObserver) {
            return CommandChannelImpl.this.outboundCommandStream.compareAndSet(callStreamObserver, null);
        }
    }

    public CommandChannelImpl(ClientIdentification clientIdentification, String str, int i, int i2, ScheduledExecutorService scheduledExecutorService, AxonServerManagedChannel axonServerManagedChannel) {
        super(clientIdentification, scheduledExecutorService, axonServerManagedChannel);
        this.outboundCommandStream = new AtomicReference<>();
        this.commandHandlers = new ConcurrentHashMap();
        this.handlers = new ConcurrentHashMap();
        this.noCommandHandler = new CommandHandler(command -> {
            return noHandlerForCommand();
        }, 0);
        this.clientIdentification = clientIdentification;
        this.context = str;
        this.permits = i;
        this.permitsBatch = i2;
        this.handlers.put(CommandProviderInbound.RequestCase.COMMAND, this::handleIncomingCommand);
        this.handlers.put(CommandProviderInbound.RequestCase.ACK, this::handleAck);
        this.commandServiceStub = CommandServiceGrpc.newStub(axonServerManagedChannel);
    }

    private void handleIncomingCommand(CommandProviderInbound commandProviderInbound, ReplyChannel<CommandProviderOutbound> replyChannel) {
        Command command = commandProviderInbound.getCommand();
        CommandHandler commandHandler = this.commandHandlers.get(command.getName());
        if (commandHandler != null) {
            replyChannel.sendAck();
        } else {
            replyChannel.sendNack();
            commandHandler = this.noCommandHandler;
        }
        CompletableFuture whenComplete = commandHandler.getHandler().apply(command).exceptionally(th -> {
            return CommandResponse.newBuilder().setErrorCode(ErrorCategory.COMMAND_EXECUTION_ERROR.errorCode()).setErrorMessage(ErrorMessage.newBuilder().setMessage(th.getMessage()).m64build()).m546build();
        }).thenApply(CommandResponse::newBuilder).thenApply((Function<? super U, ? extends U>) builder -> {
            return builder.setRequestIdentifier(command.getMessageIdentifier());
        }).whenComplete((builder2, th2) -> {
            replyChannel.send(CommandProviderOutbound.newBuilder().setCommandResponse(builder2).m498build());
        });
        replyChannel.getClass();
        whenComplete.thenRun(replyChannel::complete);
    }

    private CompletableFuture<CommandResponse> noHandlerForCommand() {
        CompletableFuture<CommandResponse> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new AxonServerException(ErrorCategory.NO_HANDLER_FOR_COMMAND, "No handler for command", this.clientIdentification.getClientId()));
        return completableFuture;
    }

    private void handleAck(CommandProviderInbound commandProviderInbound, ReplyChannel<CommandProviderOutbound> replyChannel) {
        processAck(commandProviderInbound.getAck());
        replyChannel.complete();
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void connect() {
        if (this.commandHandlers.isEmpty()) {
            return;
        }
        doCreateCommandStream();
    }

    private synchronized void doCreateCommandStream() {
        if (this.outboundCommandStream.get() != null) {
            logger.debug("CommandChannel for context '{}' is already connected", this.context);
            return;
        }
        StreamObserver<CommandProviderInbound> incomingCommandStream = new IncomingCommandStream(this.clientIdentification.getClientId(), this.permits, this.permitsBatch, this::scheduleReconnect, this::registerOutboundStream);
        this.commandServiceStub.openStream(incomingCommandStream);
        ClientCallStreamObserver instructionsForPlatform = incomingCommandStream.getInstructionsForPlatform();
        this.commandHandlers.forEach((str, commandHandler) -> {
            instructionsForPlatform.onNext(buildSubscribeMessage(str, "", commandHandler.getLoadFactor()));
        });
        logger.info("CommandChannel for context '{}' connected, {} command handlers registered", this.context, Integer.valueOf(this.commandHandlers.size()));
        incomingCommandStream.enableFlowControl();
    }

    private void registerOutboundStream(CallStreamObserver<CommandProviderOutbound> callStreamObserver) {
        ObjectUtils.silently(this.outboundCommandStream.getAndSet(callStreamObserver), (v0) -> {
            v0.onCompleted();
        });
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void reconnect() {
        CompletableFuture completableFuture = (CompletableFuture) this.commandHandlers.keySet().stream().map(this::sendUnsubscribe).reduce((completableFuture2, completableFuture3) -> {
            return CompletableFuture.allOf(completableFuture2, completableFuture3);
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(null);
        });
        StreamObserver andSet = this.outboundCommandStream.getAndSet(null);
        completableFuture.thenRun(() -> {
            ObjectUtils.doIfNotNull(andSet, (v0) -> {
                v0.onCompleted();
            });
        });
        scheduleImmediateReconnect();
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void disconnect() {
        this.commandHandlers.keySet().forEach(this::sendUnsubscribe);
        this.commandHandlers.clear();
        ObjectUtils.doIfNotNull(this.outboundCommandStream.getAndSet(null), (v0) -> {
            v0.onCompleted();
        });
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public boolean isReady() {
        return this.outboundCommandStream.get() != null || this.commandHandlers.isEmpty();
    }

    @Override // io.axoniq.axonserver.connector.command.CommandChannel
    public Registration registerCommandHandler(Function<Command, CompletableFuture<CommandResponse>> function, int i, String... strArr) {
        if (this.commandHandlers.isEmpty()) {
            doCreateCommandStream();
        }
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        CommandHandler commandHandler = new CommandHandler(function, i);
        for (String str : strArr) {
            this.commandHandlers.put(str, commandHandler);
            logger.info("Registered handler for command '{}' in context '{}'", str, this.context);
            completedFuture = CompletableFuture.allOf(completedFuture, sendInstruction(buildSubscribeMessage(str, UUID.randomUUID().toString(), i), (v0) -> {
                return v0.getInstructionId();
            }, this.outboundCommandStream.get()));
        }
        return new AsyncRegistration(completedFuture, () -> {
            return unsubscribe(commandHandler, strArr);
        });
    }

    private CompletableFuture<Void> unsubscribe(CommandHandler commandHandler, String... strArr) {
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        for (String str : strArr) {
            if (this.commandHandlers.get(str) == commandHandler) {
                logger.info("Unregistered handler for command '{}' in context '{}'", str, this.context);
                completedFuture = CompletableFuture.allOf(completedFuture, sendUnsubscribe(str).thenRun(() -> {
                    this.commandHandlers.remove(str, commandHandler);
                }));
            }
        }
        return completedFuture;
    }

    private CompletableFuture<Void> sendUnsubscribe(String str) {
        String uuid = UUID.randomUUID().toString();
        return sendInstruction(CommandProviderOutbound.newBuilder().setInstructionId(uuid).setUnsubscribe(CommandSubscription.newBuilder().setMessageId(uuid).setCommand(str).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).m597build()).m498build(), (v0) -> {
            return v0.getInstructionId();
        }, this.outboundCommandStream.get());
    }

    private CommandProviderOutbound buildSubscribeMessage(String str, String str2, int i) {
        return CommandProviderOutbound.newBuilder().setInstructionId(str2).setSubscribe(CommandSubscription.newBuilder().setMessageId(str2).setCommand(str).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName()).setLoadFactor(i)).m498build();
    }

    @Override // io.axoniq.axonserver.connector.command.CommandChannel
    public CompletableFuture<CommandResponse> sendCommand(Command command) {
        boolean anyMatch = command.getProcessingInstructionsList().stream().anyMatch(processingInstruction -> {
            return processingInstruction.getKey() == ProcessingKey.ROUTING_KEY;
        });
        Command.Builder componentName = Command.newBuilder(command).setMessageIdentifier("".equals(command.getMessageIdentifier()) ? UUID.randomUUID().toString() : command.getMessageIdentifier()).setClientId(this.clientIdentification.getClientId()).setComponentName(this.clientIdentification.getComponentName());
        if (!anyMatch) {
            componentName.addProcessingInstructions(ProcessingInstruction.newBuilder().setKey(ProcessingKey.ROUTING_KEY).setValue(MetaDataValue.newBuilder().setTextValue(componentName.getMessageIdentifier())));
        }
        CompletableFuture<CommandResponse> completableFuture = new CompletableFuture<>();
        try {
            this.commandServiceStub.dispatch(componentName.m399build(), new CommandResponseHandler(this.clientIdentification.getClientId(), completableFuture));
        } catch (OutOfDirectMemoryError e) {
            completableFuture.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "Unable to buffer message for dispatching", this.clientIdentification.getClientId(), e));
        } catch (Exception e2) {
            completableFuture.completeExceptionally(new AxonServerException(ErrorCategory.COMMAND_DISPATCH_ERROR, "An error occurred while attempting to dispatch a message", this.clientIdentification.getClientId(), e2));
        }
        return completableFuture;
    }

    @Override // io.axoniq.axonserver.connector.command.CommandChannel
    public CompletableFuture<Void> prepareDisconnect() {
        return (CompletableFuture) this.commandHandlers.keySet().stream().map(this::sendUnsubscribe).reduce((completableFuture, completableFuture2) -> {
            return CompletableFuture.allOf(completableFuture, completableFuture2);
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(null);
        });
    }
}
