package io.datarouter.client.gcp.pubsub.node;

import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import io.datarouter.client.gcp.pubsub.GcpPubSubOpFactory;
import io.datarouter.client.gcp.pubsub.GcpPubsubClientType;
import io.datarouter.client.gcp.pubsub.TopicAndSubscriptionName;
import io.datarouter.client.gcp.pubsub.client.GcpPubsubClientManager;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.model.serialize.fielder.DatabeanFielder;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.config.Config;
import io.datarouter.storage.config.properties.EnvironmentName;
import io.datarouter.storage.config.properties.ServiceName;
import io.datarouter.storage.node.NodeParams;
import io.datarouter.storage.node.op.raw.write.QueueStorageWriter;
import io.datarouter.storage.node.type.physical.base.BasePhysicalNode;
import io.datarouter.storage.queue.QueueMessageKey;
import io.datarouter.util.singletonsupplier.SingletonSupplier;
import io.datarouter.util.string.StringTool;
import io.datarouter.util.timer.PhaseTimer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/client/gcp/pubsub/node/BaseGcpPubsubNode.class */
public abstract class BaseGcpPubsubNode<PK extends PrimaryKey<PK>, D extends Databean<PK, D>, F extends DatabeanFielder<PK, D>> extends BasePhysicalNode<PK, D, F> implements QueueStorageWriter<PK, D>, GcpPubsubPhysicalNode<PK, D, F> {
    private static final Logger logger = LoggerFactory.getLogger(BaseGcpPubsubNode.class);
    public static final int MAX_SERIALIZED_REQUEST_SIZE = 10000000;
    public static final int MAX_TOPIC_PLUS_MESSAGE_SIZE = 9999970;
    public static final int MAX_MESSAGES_PER_BATCH = 10;
    private final EnvironmentName environmentName;
    private final ServiceName serviceName;
    private final NodeParams<PK, D, F> params;
    private final GcpPubsubClientManager gcpPubsubClientManager;
    private final ClientId clientId;
    private final Supplier<TopicAndSubscriptionName> topicAndSubscription;
    protected final GcpPubSubOpFactory<PK, D, F> gcpPubSubOpFactory;

    public BaseGcpPubsubNode(EnvironmentName environmentName, ServiceName serviceName, NodeParams<PK, D, F> nodeParams, GcpPubsubClientType gcpPubsubClientType, GcpPubsubClientManager gcpPubsubClientManager, ClientId clientId) {
        super(nodeParams, gcpPubsubClientType);
        this.environmentName = environmentName;
        this.serviceName = serviceName;
        this.params = nodeParams;
        this.gcpPubsubClientManager = gcpPubsubClientManager;
        this.clientId = clientId;
        this.topicAndSubscription = SingletonSupplier.of(this::getOrCreateTopic);
        this.gcpPubSubOpFactory = new GcpPubSubOpFactory<>(this, gcpPubsubClientManager, clientId);
    }

    private TopicAndSubscriptionName getOrCreateTopic() {
        String str = (String) this.params.getNamespace().orElse(String.valueOf(this.environmentName.get()) + "-" + this.serviceName.get());
        String tableName = StringTool.isEmpty(str) ? getFieldInfo().getTableName() : String.valueOf(str) + "-" + getFieldInfo().getTableName();
        PhaseTimer phaseTimer = new PhaseTimer();
        TopicName createQueueAndGetName = this.gcpPubsubClientManager.createQueueAndGetName(tableName, this.clientId);
        phaseTimer.add("createTopic");
        Subscription createSubscriptionAndGetName = this.gcpPubsubClientManager.createSubscriptionAndGetName(tableName, this.clientId, createQueueAndGetName);
        phaseTimer.add("createSubscription");
        this.gcpPubsubClientManager.createAndRegisterPublisher(this.clientId, createQueueAndGetName);
        phaseTimer.add("createPublisher");
        SubscriberStub subscriber = this.gcpPubsubClientManager.getSubscriber(this.clientId);
        phaseTimer.add("getSubscriber");
        this.gcpPubsubClientManager.peekOnSubscriptionCreation(subscriber, createSubscriptionAndGetName);
        phaseTimer.add("peekOnSubscriptionCreation");
        logger.warn("nodeName={} {}", getName(), phaseTimer);
        return new TopicAndSubscriptionName(createQueueAndGetName, createSubscriptionAndGetName.getName());
    }

    @Override // io.datarouter.client.gcp.pubsub.node.GcpPubsubPhysicalNode
    public Supplier<TopicAndSubscriptionName> getTopicAndSubscriptionName() {
        return this.topicAndSubscription;
    }

    @Override // io.datarouter.client.gcp.pubsub.node.GcpPubsubPhysicalNode
    public boolean getAgeMonitoringStatusForMetricAlert() {
        return this.params.getAgeMonitoringStatus();
    }

    @Override // io.datarouter.client.gcp.pubsub.node.GcpPubsubPhysicalNode
    public Duration getCustomMessageAgeThreshold() {
        return this.params.getCustomMessageAgeThreshold();
    }

    public void ack(QueueMessageKey queueMessageKey, Config config) {
        ackMulti(List.of(queueMessageKey), config);
    }

    public void ackMulti(Collection<QueueMessageKey> collection, Config config) {
        this.gcpPubSubOpFactory.makeAckMultiOp(collection, config).call();
    }
}
