package org.springframework.cloud.gcp.pubsub.reactive;

import com.google.api.gax.rpc.DeadlineExceededException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-gcp-pubsub-1.2.1.RELEASE.jar:org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory.class */
public final class PubSubReactiveFactory {
    private static final Log LOGGER = LogFactory.getLog((Class<?>) PubSubReactiveFactory.class);
    private final PubSubSubscriberOperations subscriberOperations;
    private final Scheduler scheduler;

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-gcp-pubsub-1.2.1.RELEASE.jar:org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory$BlockingLimitedDemandPullTask.class */
    private class BlockingLimitedDemandPullTask extends PubSubPullTask {
        private final long initialDemand;

        BlockingLimitedDemandPullTask(String str, long j, FluxSink<AcknowledgeablePubsubMessage> fluxSink) {
            super(str, fluxSink);
            this.initialDemand = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i;
            long j = this.initialDemand;
            while (j > 0 && !this.sink.isCancelled()) {
                if (j > 2147483647L) {
                    i = Integer.MAX_VALUE;
                } else {
                    try {
                        i = (int) j;
                    } catch (DeadlineExceededException e) {
                        if (PubSubReactiveFactory.LOGGER.isTraceEnabled()) {
                            PubSubReactiveFactory.LOGGER.trace("Blocking pull timed out due to empty subscription " + this.subscriptionName + "; retrying.");
                        }
                    }
                }
                j -= pullToSink(i, true);
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-gcp-pubsub-1.2.1.RELEASE.jar:org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory$NonBlockingUnlimitedDemandPullTask.class */
    private class NonBlockingUnlimitedDemandPullTask extends PubSubPullTask {
        NonBlockingUnlimitedDemandPullTask(String str, FluxSink<AcknowledgeablePubsubMessage> fluxSink) {
            super(str, fluxSink);
        }

        @Override // java.lang.Runnable
        public void run() {
            pullToSink(Integer.MAX_VALUE, false);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-gcp-pubsub-1.2.1.RELEASE.jar:org/springframework/cloud/gcp/pubsub/reactive/PubSubReactiveFactory$PubSubPullTask.class */
    private abstract class PubSubPullTask implements Runnable {
        protected final String subscriptionName;
        protected final FluxSink<AcknowledgeablePubsubMessage> sink;

        PubSubPullTask(String str, FluxSink<AcknowledgeablePubsubMessage> fluxSink) {
            this.subscriptionName = str;
            this.sink = fluxSink;
        }

        protected int pullToSink(int i, boolean z) {
            List<AcknowledgeablePubsubMessage> pull = PubSubReactiveFactory.this.subscriberOperations.pull(this.subscriptionName, Integer.valueOf(i), Boolean.valueOf(!z));
            if (!this.sink.isCancelled()) {
                FluxSink<AcknowledgeablePubsubMessage> fluxSink = this.sink;
                Objects.requireNonNull(fluxSink);
                pull.forEach((v1) -> {
                    r1.next(v1);
                });
            }
            return pull.size();
        }
    }

    public PubSubReactiveFactory(PubSubSubscriberOperations pubSubSubscriberOperations, Scheduler scheduler) {
        Assert.notNull(pubSubSubscriberOperations, "subscriberOperations cannot be null.");
        Assert.notNull(scheduler, "scheduler cannot be null.");
        this.subscriberOperations = pubSubSubscriberOperations;
        this.scheduler = scheduler;
    }

    public Flux<AcknowledgeablePubsubMessage> poll(String str, long j) {
        return Flux.create(fluxSink -> {
            Scheduler.Worker createWorker = this.scheduler.createWorker();
            fluxSink.onRequest(j2 -> {
                if (j2 == Long.MAX_VALUE) {
                    createWorker.schedulePeriodically(new NonBlockingUnlimitedDemandPullTask(str, fluxSink), 0L, j, TimeUnit.MILLISECONDS);
                } else {
                    createWorker.schedule(new BlockingLimitedDemandPullTask(str, j2, fluxSink));
                }
            });
            fluxSink.onCancel(createWorker);
        });
    }
}
