package org.eclipse.ditto.services.utils.devops;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import com.typesafe.config.Config;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.common.HttpStatus;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.model.devops.LoggingFacade;
import org.eclipse.ditto.services.utils.akka.actors.RetrieveConfigBehavior;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.services.utils.cluster.JsonValueSourceRef;
import org.eclipse.ditto.services.utils.cluster.MappingStrategies;
import org.eclipse.ditto.signals.base.JsonTypeNotParsableException;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.devops.AggregatedDevOpsCommandResponse;
import org.eclipse.ditto.signals.commands.devops.ChangeLogLevel;
import org.eclipse.ditto.signals.commands.devops.ChangeLogLevelResponse;
import org.eclipse.ditto.signals.commands.devops.DevOpsCommand;
import org.eclipse.ditto.signals.commands.devops.DevOpsCommandResponse;
import org.eclipse.ditto.signals.commands.devops.DevOpsErrorResponse;
import org.eclipse.ditto.signals.commands.devops.ExecutePiggybackCommand;
import org.eclipse.ditto.signals.commands.devops.RetrieveLoggerConfig;
import org.eclipse.ditto.signals.commands.devops.RetrieveLoggerConfigResponse;

/* loaded from: input_file:org/eclipse/ditto/services/utils/devops/DevOpsCommandsActor.class */
public final class DevOpsCommandsActor extends AbstractActor implements RetrieveConfigBehavior {
    public static final String ACTOR_NAME = "devOpsCommandsActor";
    public static final String AGGREGATE_HEADER = "aggregate";
    private static final String UNKNOWN_MESSAGE_TEMPLATE = "Unknown message: {}";
    private static final String TOPIC_HEADER = "topic";
    private static final String IS_GROUP_TOPIC_HEADER = "is-group-topic";
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final LoggingFacade loggingFacade;
    private final String serviceName;
    private final String instance;
    private final ActorRef pubSubMediator;
    private final MappingStrategies serviceMappingStrategy;

    /* loaded from: input_file:org/eclipse/ditto/services/utils/devops/DevOpsCommandsActor$DevOpsCommandResponseCorrelationActor.class */
    private static final class DevOpsCommandResponseCorrelationActor extends AbstractActor {
        private static final Duration DEFAULT_RECEIVE_TIMEOUT = Duration.ofMillis(5000);
        private static final boolean DEFAULT_AGGREGATE = true;
        private final ActorRef devOpsCommandSender;
        private final DevOpsCommand<?> devOpsCommand;
        private final List<CommandResponse<?>> commandResponses = new ArrayList();
        private final boolean aggregateResults;
        private final DittoDiagnosticLoggingAdapter logger;

        private DevOpsCommandResponseCorrelationActor(ActorRef actorRef, DevOpsCommand<?> devOpsCommand) {
            this.devOpsCommandSender = actorRef;
            this.devOpsCommand = devOpsCommand;
            DittoHeaders dittoHeaders = devOpsCommand.getDittoHeaders();
            this.aggregateResults = isAggregateResults(dittoHeaders);
            this.logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
            this.logger.setCorrelationId(dittoHeaders);
        }

        private static boolean isAggregateResults(DittoHeaders dittoHeaders) {
            boolean z = DEFAULT_AGGREGATE;
            String str = (String) dittoHeaders.get(DevOpsCommandsActor.AGGREGATE_HEADER);
            if (null != str) {
                z = Boolean.parseBoolean(str);
            }
            return z;
        }

        static Props props(ActorRef actorRef, DevOpsCommand<?> devOpsCommand) {
            return Props.create(DevOpsCommandResponseCorrelationActor.class, new Object[]{actorRef, devOpsCommand});
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(CommandResponse.class, this::handleCommandResponse).match(JsonValueSourceRef.class, this::handleJsonValueSourceRef).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(ReceiveTimeout.class, receiveTimeout -> {
                this.logger.withCorrelationId(getSelf().path().name()).info("Got ReceiveTimeout, answering with all aggregated DevOpsCommandResponses and stopping ourselves ...");
                sendCommandResponsesAndStop();
            }).matchAny(obj -> {
                this.logger.warning(DevOpsCommandsActor.UNKNOWN_MESSAGE_TEMPLATE, obj);
                unhandled(obj);
            }).build();
        }

        private void handleCommandResponse(CommandResponse<?> commandResponse) {
            if (commandResponse instanceof DevOpsCommandResponse) {
                this.logger.debug("Received DevOpsCommandResponse from service/instance <{}/{}>: {}", ((DevOpsCommandResponse) commandResponse).getServiceName().orElse("?"), ((DevOpsCommandResponse) commandResponse).getInstance().orElse("?"), commandResponse.getType());
            } else {
                this.logger.debug("Received DevOpsCommandResponse from service/instance <?/?>: {}", commandResponse.getType());
            }
            addCommandResponse(commandResponse);
        }

        private void addCommandResponse(CommandResponse<?> commandResponse) {
            this.commandResponses.add(commandResponse);
            if (this.aggregateResults) {
                return;
            }
            this.logger.info("Do not aggregate sent response immediately.");
            sendCommandResponsesAndStop();
        }

        private void sendCommandResponsesAndStop() {
            this.devOpsCommandSender.tell(AggregatedDevOpsCommandResponse.of(this.commandResponses, "devops.responses:" + this.devOpsCommand.getName(), this.commandResponses.isEmpty() ? HttpStatus.REQUEST_TIMEOUT : HttpStatus.OK, this.devOpsCommand.getDittoHeaders()), getSelf());
            stopSelf();
        }

        private void stopSelf() {
            getContext().stop(getSelf());
        }

        private void handleJsonValueSourceRef(JsonValueSourceRef jsonValueSourceRef) {
            this.logger.debug("Received {} from: <{}>", jsonValueSourceRef.getClass().getSimpleName(), getSender());
            this.devOpsCommandSender.tell(jsonValueSourceRef, getSelf());
            stopSelf();
        }

        private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
            this.logger.warning("Received DittoRuntimeException from <{}>: <{}>!", getSender(), dittoRuntimeException);
            addCommandResponse(DevOpsErrorResponse.of((String) null, (String) null, dittoRuntimeException.toJson(), dittoRuntimeException.getDittoHeaders()));
        }

        public void preStart() throws Exception {
            getContext().setReceiveTimeout(getReceiveTimeout());
        }

        private Duration getReceiveTimeout() {
            Duration duration = (Duration) this.devOpsCommand.getDittoHeaders().getTimeout().orElse(DEFAULT_RECEIVE_TIMEOUT);
            return duration.compareTo(DEFAULT_RECEIVE_TIMEOUT) > 0 ? duration : DEFAULT_RECEIVE_TIMEOUT;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/devops/DevOpsCommandsActor$DevOpsCommandViaPubSub.class */
    private static final class DevOpsCommandViaPubSub {
        private final DevOpsCommand wrappedCommand;

        private DevOpsCommandViaPubSub(DevOpsCommand devOpsCommand) {
            this.wrappedCommand = devOpsCommand;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/devops/DevOpsCommandsActor$PubSubSubscriberActor.class */
    private static final class PubSubSubscriberActor extends AbstractActor {
        private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

        private PubSubSubscriberActor(ActorRef actorRef, String str, String str2, String... strArr) {
            Arrays.stream(strArr).forEach(str3 -> {
                subscribeToDevOpsTopic(actorRef, str3, str, str2);
            });
        }

        static Props props(ActorRef actorRef, String str, String str2, String... strArr) {
            return Props.create(PubSubSubscriberActor.class, new Object[]{actorRef, str, str2, strArr});
        }

        private void subscribeToDevOpsTopic(ActorRef actorRef, String str, String str2, String str3) {
            actorRef.tell(DistPubSubAccess.subscribe(str, getSelf()), getSelf());
            actorRef.tell(DistPubSubAccess.subscribe(String.join(":", str, str2), getSelf()), getSelf());
            actorRef.tell(DistPubSubAccess.subscribeViaGroup(String.join(":", str, str2), str2, getSelf()), getSelf());
            actorRef.tell(DistPubSubAccess.subscribe(String.join(":", str, str2, str3), getSelf()), getSelf());
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(DevOpsCommand.class, devOpsCommand -> {
                getContext().getParent().forward(new DevOpsCommandViaPubSub(devOpsCommand), getContext());
            }).match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck).matchAny(obj -> {
                this.log.warning(DevOpsCommandsActor.UNKNOWN_MESSAGE_TEMPLATE, obj);
                unhandled(obj);
            }).build();
        }

        private void handleSubscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
            this.log.info("Successfully subscribed to distributed pub/sub on topic <{}>.", subscribeAck.subscribe().topic());
        }
    }

    private DevOpsCommandsActor(LoggingFacade loggingFacade, String str, String str2) {
        this.loggingFacade = loggingFacade;
        this.serviceName = str;
        this.instance = str2;
        AbstractActor.ActorContext context = getContext();
        this.pubSubMediator = DistributedPubSub.get(context.system()).mediator();
        this.serviceMappingStrategy = MappingStrategies.loadMappingStrategies(context.getSystem());
        context.actorOf(PubSubSubscriberActor.props(this.pubSubMediator, str, str2, "devops.commands:retrieveLoggerConfig", "devops.commands:changeLogLevel", "devops.commands:executePiggybackCommand"), "pubSubSubscriber");
    }

    public static Props props(LoggingFacade loggingFacade, String str, String str2) {
        return Props.create(DevOpsCommandsActor.class, new Object[]{loggingFacade, str, str2});
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(DevOpsCommand.class, this::handleInitialDevOpsCommand).match(DevOpsCommandViaPubSub.class, this::handleDevOpsCommandViaPubSub).build().orElse(retrieveConfigBehavior()).orElse(matchAnyUnhandled());
    }

    private AbstractActor.Receive matchAnyUnhandled() {
        return ReceiveBuilder.create().matchAny(obj -> {
            this.logger.warning(UNKNOWN_MESSAGE_TEMPLATE, obj);
            unhandled(obj);
        }).build();
    }

    public Config getConfig() {
        return getContext().getSystem().settings().config();
    }

    private void handleInitialDevOpsCommand(DevOpsCommand<?> devOpsCommand) {
        this.logger.setCorrelationId(devOpsCommand);
        Supplier<ActorRef> responseCorrelationActorSupplier = getResponseCorrelationActorSupplier(devOpsCommand);
        if (isExecutePiggybackCommandToPubSubMediator(devOpsCommand)) {
            executeAsPiggybackCommandToPubSubMediator(devOpsCommand, responseCorrelationActorSupplier);
        } else {
            executeAsIndirectPiggybackCommandToPubSubMediator(devOpsCommand, responseCorrelationActorSupplier);
        }
        this.logger.discardCorrelationId();
    }

    private Supplier<ActorRef> getResponseCorrelationActorSupplier(DevOpsCommand<?> devOpsCommand) {
        return () -> {
            return getContext().actorOf(DevOpsCommandResponseCorrelationActor.props(getSender(), devOpsCommand), getCorrelationIdOrThrow(devOpsCommand));
        };
    }

    private static String getCorrelationIdOrThrow(WithDittoHeaders withDittoHeaders) {
        return (String) withDittoHeaders.getDittoHeaders().getCorrelationId().orElseThrow(() -> {
            return new IllegalArgumentException("Missing correlation-id for DevOpsCommand!");
        });
    }

    private boolean isExecutePiggybackCommandToPubSubMediator(DevOpsCommand<?> devOpsCommand) {
        boolean z;
        if ("devops.commands:executePiggybackCommand".equals(devOpsCommand.getType())) {
            z = Objects.equals(((ExecutePiggybackCommand) devOpsCommand).getTargetActorSelection(), this.pubSubMediator.path().toStringWithoutAddress());
        } else {
            z = false;
        }
        return z;
    }

    private void executeAsPiggybackCommandToPubSubMediator(DevOpsCommand<?> devOpsCommand, Supplier<ActorRef> supplier) {
        tryInterpretAsDirectPublication(devOpsCommand, publish -> {
            this.logger.info("Publishing <{}> into cluster on topic <{}> with sendOneMessageToEachGroup=<{}>", publish.msg().getClass().getCanonicalName(), publish.topic(), Boolean.valueOf(publish.sendOneMessageToEachGroup()));
            this.pubSubMediator.tell(publish, (ActorRef) supplier.get());
        }, devOpsErrorResponse -> {
            this.logger.warning("Dropping publishing command <{}>. Reason: <{}>", devOpsCommand, devOpsErrorResponse.getDittoRuntimeException());
            getSender().tell(devOpsErrorResponse, getSelf());
        });
    }

    private void executeAsIndirectPiggybackCommandToPubSubMediator(DevOpsCommand<?> devOpsCommand, Supplier<ActorRef> supplier) {
        String type;
        Optional serviceName = devOpsCommand.getServiceName();
        if (serviceName.isPresent()) {
            String str = (String) serviceName.get();
            String str2 = (String) devOpsCommand.getInstance().orElse(null);
            type = str2 != null ? devOpsCommand.getType() + ":" + str + ":" + str2 : devOpsCommand.getType() + ":" + str;
        } else {
            type = devOpsCommand.getType();
        }
        DistributedPubSubMediator.Publish publishViaGroup = isGroupTopic(devOpsCommand.getDittoHeaders()) ? DistPubSubAccess.publishViaGroup(type, devOpsCommand) : DistPubSubAccess.publish(type, devOpsCommand);
        this.logger.info("Publishing DevOpsCommand <{}> into cluster on topic <{}> with sendOneMessageToEachGroup=<{}>", devOpsCommand.getType(), publishViaGroup.topic(), Boolean.valueOf(publishViaGroup.sendOneMessageToEachGroup()));
        this.pubSubMediator.tell(publishViaGroup, supplier.get());
    }

    private void tryInterpretAsDirectPublication(DevOpsCommand<?> devOpsCommand, Consumer<DistributedPubSubMediator.Publish> consumer, Consumer<DevOpsErrorResponse> consumer2) {
        if (!(devOpsCommand instanceof ExecutePiggybackCommand)) {
            consumer2.accept(getErrorResponse(devOpsCommand, GatewayInternalErrorException.newBuilder().dittoHeaders(devOpsCommand.getDittoHeaders()).build().toJson()));
            return;
        }
        ExecutePiggybackCommand executePiggybackCommand = (ExecutePiggybackCommand) devOpsCommand;
        DittoHeaders dittoHeaders = executePiggybackCommand.getDittoHeaders();
        deserializePiggybackCommand(executePiggybackCommand, jsonifiable -> {
            Optional optional = (Optional) Optional.ofNullable((String) dittoHeaders.get(TOPIC_HEADER)).map((v0) -> {
                return Optional.of(v0);
            }).orElseGet(() -> {
                return executePiggybackCommand.getPiggybackCommand().getValue(Command.JsonFields.TYPE);
            });
            if (!optional.isPresent()) {
                consumer2.accept(getErrorResponse(devOpsCommand));
            } else if (isGroupTopic(dittoHeaders)) {
                consumer.accept(DistPubSubAccess.publishViaGroup((String) optional.get(), jsonifiable));
            } else {
                consumer.accept(DistPubSubAccess.publish((String) optional.get(), jsonifiable));
            }
        }, dittoRuntimeException -> {
            consumer2.accept(getErrorResponse(devOpsCommand, dittoRuntimeException.toJson()));
        });
    }

    private static boolean isGroupTopic(DittoHeaders dittoHeaders) {
        String str = (String) dittoHeaders.get(IS_GROUP_TOPIC_HEADER);
        return (str == null || "false".equalsIgnoreCase(str)) ? false : true;
    }

    private void handleDevOpsCommandViaPubSub(DevOpsCommandViaPubSub devOpsCommandViaPubSub) {
        DevOpsCommand devOpsCommand = devOpsCommandViaPubSub.wrappedCommand;
        if (devOpsCommand instanceof ChangeLogLevel) {
            handleChangeLogLevel((ChangeLogLevel) devOpsCommand);
        } else if (devOpsCommand instanceof RetrieveLoggerConfig) {
            handleRetrieveLoggerConfig((RetrieveLoggerConfig) devOpsCommand);
        } else if (devOpsCommand instanceof ExecutePiggybackCommand) {
            handleExecutePiggyBack((ExecutePiggybackCommand) devOpsCommand);
        }
    }

    private void handleChangeLogLevel(ChangeLogLevel changeLogLevel) {
        getSender().tell(ChangeLogLevelResponse.of(this.serviceName, this.instance, Boolean.valueOf(this.loggingFacade.setLogLevel(changeLogLevel.getLoggerConfig())).booleanValue(), changeLogLevel.getDittoHeaders()), getSelf());
    }

    private void handleRetrieveLoggerConfig(RetrieveLoggerConfig retrieveLoggerConfig) {
        getSender().tell(RetrieveLoggerConfigResponse.of(this.serviceName, this.instance, retrieveLoggerConfig.isAllKnownLoggers() ? this.loggingFacade.getLoggerConfig() : this.loggingFacade.getLoggerConfig(retrieveLoggerConfig.getSpecificLoggers()), retrieveLoggerConfig.getDittoHeaders()), getSelf());
    }

    private void handleExecutePiggyBack(ExecutePiggybackCommand executePiggybackCommand) {
        deserializePiggybackCommand(executePiggybackCommand, jsonifiable -> {
            this.logger.withCorrelationId(executePiggybackCommand).info("Received PiggybackCommand: <{}> - telling to: <{}>", jsonifiable, executePiggybackCommand.getTargetActorSelection());
            getContext().actorSelection(executePiggybackCommand.getTargetActorSelection()).forward(jsonifiable, getContext());
        }, dittoRuntimeException -> {
            getSender().tell(dittoRuntimeException, getSelf());
        });
    }

    private void deserializePiggybackCommand(ExecutePiggybackCommand executePiggybackCommand, Consumer<Jsonifiable<?>> consumer, Consumer<DittoRuntimeException> consumer2) {
        JsonObject piggybackCommand = executePiggybackCommand.getPiggybackCommand();
        String str = (String) piggybackCommand.getValue(Command.JsonFields.TYPE).orElse(null);
        this.serviceMappingStrategy.getMappingStrategy(str).ifPresentOrElse(jsonParsable -> {
            try {
                consumer.accept((Jsonifiable) jsonParsable.parse(piggybackCommand, executePiggybackCommand.getDittoHeaders()));
            } catch (DittoRuntimeException e) {
                this.logger.withCorrelationId(executePiggybackCommand).warning("Got DittoRuntimeException while parsing PiggybackCommand <{}>: {}!", str, e);
                consumer2.accept(e);
            }
        }, () -> {
            String format = String.format("ExecutePiggybackCommand with PiggybackCommand <%s> cannot be executed by this service as there is no mapping strategy for it!", str);
            this.logger.withCorrelationId(executePiggybackCommand).warning(format);
            consumer2.accept(JsonTypeNotParsableException.fromMessage(format, executePiggybackCommand.getDittoHeaders()));
        });
    }

    private static DevOpsErrorResponse getErrorResponse(DevOpsCommand<?> devOpsCommand, JsonObject jsonObject) {
        return DevOpsErrorResponse.of((String) devOpsCommand.getServiceName().orElse(null), (String) devOpsCommand.getInstance().map((v0) -> {
            return String.valueOf(v0);
        }).orElse(null), jsonObject, devOpsCommand.getDittoHeaders());
    }

    private static DevOpsErrorResponse getErrorResponse(DevOpsCommand<?> devOpsCommand) {
        return getErrorResponse(devOpsCommand, JsonFactory.newObjectBuilder().set(DittoRuntimeException.JsonFields.STATUS, Integer.valueOf(HttpStatus.BAD_REQUEST.getCode())).set(DittoRuntimeException.JsonFields.MESSAGE, "No topic found for publishing. Did you set the <topic> header?").build());
    }
}
