package com.datarobot.mlops.common.spooler;

import com.datarobot.mlops.common.config.MappedConfig;
import com.datarobot.mlops.common.constants.ConfigConstants;
import com.datarobot.mlops.common.enums.DataFormat;
import com.datarobot.mlops.common.enums.SpoolerType;
import com.datarobot.mlops.common.exceptions.DRCommonException;
import com.datarobot.mlops.common.exceptions.DRQueueException;
import com.datarobot.mlops.common.records.Record;
import com.datarobot.mlops.common.spooler.RecordSpooler;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

/* loaded from: input_file:com/datarobot/mlops/common/spooler/PubSubSpooler.class */
public class PubSubSpooler extends RecordSpooler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) PubSubSpooler.class);
    public static final int MAX_RECORDS_TO_DEQUEUE = 10;
    private static final int MESSAGE_SIZE_LIMIT_IN_BYTE = 10485760;
    private static final String DEFAULT_SPOOLER_DATA_FORMAT = "JSON";
    private static final int DEFAULT_PUBSUB_ACK_DEADLINE_MS = 10000;
    private RecordSpooler.Action action;
    private String projectId;
    private String topicName;
    private Topic topic;
    private Publisher publisher;
    private PubSubHelper pubSubHelper;
    private String subscriptionName;
    private SubscriberStub subscriber;
    private PullRequest pullRequest;
    private int ackDeadline;
    private Subscription subscription;
    private String defaultSubscriptionName;

    public PubSubSpooler(MappedConfig mappedConfig) {
        super(mappedConfig);
        this.publisher = null;
        this.pubSubHelper = null;
        this.subscriptionName = null;
        this.subscriber = null;
        this.pullRequest = null;
        this.defaultSubscriptionName = "TrackingAgentSubscription";
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public SpoolerType getType() {
        return SpoolerType.PUBSUB;
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public List<String> getRequiredConfigKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ConfigConstants.SPOOLER_ACTION_STR);
        arrayList.add(ConfigConstants.PUBSUB_PROJECT_ID_STR);
        arrayList.add(ConfigConstants.PUBSUB_TOPIC_NAME_STR);
        return arrayList;
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public List<String> getOptionalConfigKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ConfigConstants.PUBSUB_SUBSCRIPTION_NAME_STR);
        arrayList.add(ConfigConstants.PUBSUB_ACK_DEADLINE_STR);
        return arrayList;
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public void verifyConfig() throws DRCommonException {
        List<String> FindMissingConfigKeys = FindMissingConfigKeys();
        if (FindMissingConfigKeys.size() > 0) {
            throw new DRCommonException("Missing required configuration for: " + FindMissingConfigKeys.toString());
        }
        DataFormat fromString = DataFormat.fromString(this.config.getValueWithDefault(ConfigConstants.MLOPS_SPOOLER_DATA_FORMAT, "JSON"));
        if (fromString != DataFormat.JSON) {
            throw new DRCommonException("Data Format: '" + fromString.toString() + "' is not supported for the PubSub spooler");
        }
        this.projectId = this.config.getStringValue(ConfigConstants.PUBSUB_PROJECT_ID_STR);
        this.topicName = this.config.getStringValue(ConfigConstants.PUBSUB_TOPIC_NAME_STR);
        this.action = RecordSpooler.Action.valueOf(this.config.getStringValue(ConfigConstants.SPOOLER_ACTION_STR));
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public int getMessageByteSizeLimit() {
        return 10485760;
    }

    /* JADX WARN: Type inference failed for: r1v45, types: [com.google.cloud.pubsub.v1.stub.SubscriberStubSettings] */
    @Override // com.datarobot.mlops.common.spooler.Spooler
    public void open() throws DRCommonException {
        verifyConfig();
        try {
            this.pubSubHelper = new PubSubHelper(this.projectId);
            this.topic = this.pubSubHelper.getTopic(this.topicName);
            if (this.topic == null) {
                throw new DRCommonException("Failed to find PubSub spooler topic: '" + this.topicName + "' in project: '" + this.projectId + "' Please verify the project id and topic name.");
            }
            if (this.action == RecordSpooler.Action.ENQUEUE || this.action == RecordSpooler.Action.ENQUEUE_DEQUEUE) {
                this.publisher = Publisher.newBuilder(this.topic.getName()).build();
            }
            if (this.action == RecordSpooler.Action.DEQUEUE || this.action == RecordSpooler.Action.ENQUEUE_DEQUEUE) {
                this.subscriptionName = this.config.getValueWithDefault(ConfigConstants.PUBSUB_SUBSCRIPTION_NAME_STR, this.defaultSubscriptionName);
                int valueWithDefault = this.config.getValueWithDefault(ConfigConstants.PUBSUB_ACK_DEADLINE_STR, 10000);
                if (valueWithDefault <= 0) {
                    close();
                    throw new DRCommonException("MLOPS_PUBSUB_ACKNOWLEDGEMENT_DEADLINE must be > 0.");
                }
                this.ackDeadline = valueWithDefault / 1000;
                this.subscription = this.pubSubHelper.getSubscription(this.subscriptionName);
                if (this.subscription == null) {
                    close();
                    throw new DRCommonException("Cannot find the provided subscription '" + this.config.getStringValue(ConfigConstants.PUBSUB_SUBSCRIPTION_NAME_STR) + "'. Please check the value of " + ConfigConstants.PUBSUB_SUBSCRIPTION_NAME_STR);
                }
                logger.info("Using subscription: " + this.subscriptionName);
                SubscriberStubSettings.Builder newBuilder = SubscriberStubSettings.newBuilder();
                newBuilder.createSubscriptionSettings().setRetrySettings(newBuilder.createSubscriptionSettings().getRetrySettings().toBuilder().setTotalTimeout(Duration.ofSeconds(30L)).build());
                newBuilder.setTransportChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder().setMaxInboundMessageSize(10485760).build());
                this.subscriber = GrpcSubscriberStub.create((SubscriberStubSettings) newBuilder.build2());
                this.pullRequest = PullRequest.newBuilder().setMaxMessages(10).setSubscription(this.subscription.getName()).build();
            }
        } catch (DRCommonException e) {
            close();
            throw e;
        } catch (Exception e2) {
            String str = "Failed to open PubSub spooler. Please verify the projectId '" + this.projectId + "' as set in " + ConfigConstants.PUBSUB_PROJECT_ID_STR + ". Details: " + e2.getMessage();
            logger.error(str);
            close();
            throw new DRCommonException(str);
        }
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public int enqueue(Collection<Record> collection) throws DRQueueException {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        Iterator<Record> it2 = collection.iterator();
        while (it2.hasNext()) {
            Record next = it2.next();
            try {
                String json = next.toJson();
                ByteString copyFromUtf8 = ByteString.copyFromUtf8(json);
                try {
                    logger.debug("Sending message: size " + json.getBytes().length);
                    logger.debug("Published message ID: " + this.publisher.publish(PubsubMessage.newBuilder().setData(copyFromUtf8).build()).get());
                } catch (InterruptedException | ExecutionException e) {
                    logger.error("Failed to send messages to PubSub queue, Error: " + e.getMessage());
                    arrayList.add(next);
                    arrayList.getClass();
                    it2.forEachRemaining((v1) -> {
                        r1.add(v1);
                    });
                    throw new DRQueueException("Failed to enqueue records", arrayList);
                }
            } catch (DRCommonException e2) {
                logger.error("Failed to serialize data record, Error: " + e2.getMessage());
                i++;
            }
        }
        return collection.size() - i;
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public Collection<Record> dequeue() throws DRQueueException {
        ArrayList arrayList = new ArrayList();
        try {
            PullResponse call = this.subscriber.pullCallable().call(this.pullRequest);
            ArrayList arrayList2 = new ArrayList();
            for (ReceivedMessage receivedMessage : call.getReceivedMessagesList()) {
                try {
                    String stringUtf8 = receivedMessage.getMessage().getData().toStringUtf8();
                    logger.debug("msgJson: " + stringUtf8);
                    Record fromJson = Record.fromJson(stringUtf8);
                    arrayList.add(fromJson);
                    arrayList2.add(receivedMessage.getAckId());
                    addPendingRecord(fromJson.getId(), receivedMessage.getAckId());
                } catch (Exception e) {
                    logger.error("Badly formatted message. " + e.getMessage());
                }
            }
            if (!arrayList2.isEmpty()) {
                if (this.enableDequeueAckRecord) {
                    this.subscriber.modifyAckDeadlineCallable().call(ModifyAckDeadlineRequest.newBuilder().setSubscription(this.subscription.getName()).addAllAckIds(arrayList2).setAckDeadlineSeconds(this.ackDeadline).build());
                } else {
                    subscriberAckRecords(arrayList2);
                }
            }
            return arrayList;
        } catch (Exception e2) {
            String str = "Failed to receive messages from PubSub Error: " + e2.getMessage();
            logger.error(str);
            throw new DRQueueException(str, arrayList);
        }
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler, com.datarobot.mlops.common.spooler.Spooler
    public void ackRecords(Collection<String> collection) throws DRQueueException {
        if (this.enableDequeueAckRecord) {
            try {
                subscriberAckRecords((List) collection.stream().map(str -> {
                    return (String) this.recordsPendingAck.remove(str);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).collect(Collectors.toList()));
            } catch (Exception e) {
                throw new DRQueueException(collection, "Failed to send ack");
            }
        }
    }

    private void subscriberAckRecords(List<String> list) {
        this.subscriber.acknowledgeCallable().call(AcknowledgeRequest.newBuilder().setSubscription(this.subscription.getName()).addAllAckIds(list).build());
    }

    private void shutdown() throws Exception {
        if (this.publisher != null) {
            this.publisher.shutdown();
        }
        if (this.subscriber != null) {
            this.subscriber.shutdownNow();
        }
        if (this.publisher != null) {
            Boolean bool = false;
            int i = 0;
            while (true) {
                if (i >= 15) {
                    break;
                }
                bool = Boolean.valueOf(this.publisher.awaitTermination(10L, TimeUnit.SECONDS));
                if (bool.booleanValue()) {
                    logger.info("Shutdown complete.");
                    this.publisher = null;
                    break;
                }
                i++;
            }
            if (!bool.booleanValue()) {
                logger.error("Shutdown not complete after 15 tries. Giving up.");
            }
        }
        if (this.subscriber != null) {
            this.subscriber.awaitTermination(2L, TimeUnit.SECONDS);
            this.subscriber = null;
        }
        if (this.pubSubHelper != null) {
            this.pubSubHelper.shutdown();
            this.pubSubHelper = null;
        }
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public void close() {
        try {
            shutdown();
        } catch (Exception e) {
            logger.error("Error during shutdown: " + e.getMessage());
        }
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public boolean needsRetry() {
        return false;
    }
}
