package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.MessageLockContainer;
import com.azure.messaging.servicebus.implementation.MessageManagementOperations;
import com.azure.messaging.servicebus.implementation.MessageUtils;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/UnnamedSessionReceiver.class */
public class UnnamedSessionReceiver implements AutoCloseable {
    private final ServiceBusReceiveLink receiveLink;
    private final boolean enableSessionLockRenewal;
    private final Duration maxSessionLockRenewDuration;
    private final Function<String, Mono<Instant>> renewSessionLock;
    private final Disposable.Composite subscriptions;
    private final Flux<ServiceBusReceivedMessageContext> receivedMessages;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicReference<Instant> sessionLockedUntil = new AtomicReference<>();
    private final AtomicReference<String> sessionId = new AtomicReference<>();
    private final ClientLogger logger = new ClientLogger(UnnamedSessionReceiver.class);
    private final MonoProcessor<ServiceBusReceivedMessageContext> cancelReceiveProcessor = MonoProcessor.create();
    private final DirectProcessor<String> messageReceivedEmitter = DirectProcessor.create();
    private final FluxSink<String> messageReceivedSink = this.messageReceivedEmitter.sink(FluxSink.OverflowStrategy.BUFFER);
    private final MessageLockContainer lockContainer = new MessageLockContainer(ServiceBusConstants.OPERATION_TIMEOUT);

    /* loaded from: input_file:com/azure/messaging/servicebus/UnnamedSessionReceiver$SessionMessageManagement.class */
    private static final class SessionMessageManagement implements MessageManagementOperations {
        private final ServiceBusReceiveLink link;

        private SessionMessageManagement(ServiceBusReceiveLink serviceBusReceiveLink) {
            this.link = serviceBusReceiveLink;
        }

        @Override // com.azure.messaging.servicebus.implementation.MessageManagementOperations
        public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
            return this.link.updateDisposition(str, deliveryState);
        }

        @Override // com.azure.messaging.servicebus.implementation.MessageManagementOperations
        public Mono<Instant> renewMessageLock(String str, String str2) {
            return Mono.just(Instant.now().plusSeconds(60L));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnnamedSessionReceiver(ServiceBusReceiveLink serviceBusReceiveLink, MessageSerializer messageSerializer, AmqpRetryOptions amqpRetryOptions, int i, boolean z, Scheduler scheduler, boolean z2, Duration duration, Function<String, Mono<Instant>> function) {
        this.receiveLink = serviceBusReceiveLink;
        this.enableSessionLockRenewal = z2;
        this.maxSessionLockRenewDuration = duration;
        this.renewSessionLock = function;
        serviceBusReceiveLink.setEmptyCreditListener(() -> {
            return 1;
        });
        this.receivedMessages = Flux.concat(new Publisher[]{serviceBusReceiveLink.receive().publishOn(scheduler).doOnSubscribe(subscription -> {
            this.logger.verbose("Adding prefetch to receive link.");
            serviceBusReceiveLink.addCredits(i);
        }).takeUntilOther(this.cancelReceiveProcessor).map(message -> {
            ServiceBusReceivedMessage serviceBusReceivedMessage = (ServiceBusReceivedMessage) messageSerializer.deserialize(message, ServiceBusReceivedMessage.class);
            if (CoreUtils.isNullOrEmpty(serviceBusReceivedMessage.getLockToken())) {
                this.logger.info("sessionId[{}] message[{}]. There is no lock token.", new Object[]{serviceBusReceivedMessage.getSessionId(), serviceBusReceivedMessage.getMessageId()});
            } else {
                this.lockContainer.addOrUpdate(serviceBusReceivedMessage.getLockToken(), serviceBusReceivedMessage.getLockedUntil());
            }
            return new ServiceBusReceivedMessageContext(serviceBusReceivedMessage);
        }).onErrorResume(th -> {
            this.logger.warning("sessionId[{}]. Error occurred. Ending session.", new Object[]{this.sessionId, th});
            return Mono.just(new ServiceBusReceivedMessageContext(getSessionId(), th));
        }).doOnNext(serviceBusReceivedMessageContext -> {
            if (serviceBusReceivedMessageContext.hasError()) {
                return;
            }
            ServiceBusReceivedMessage message2 = serviceBusReceivedMessageContext.getMessage();
            String lockToken = !CoreUtils.isNullOrEmpty(message2.getLockToken()) ? message2.getLockToken() : "";
            this.logger.verbose("Received sessionId[{}] messageId[{}]", new Object[]{serviceBusReceivedMessageContext.getSessionId(), message2.getMessageId()});
            this.messageReceivedSink.next(lockToken);
        }), this.cancelReceiveProcessor});
        this.subscriptions = Disposables.composite();
        if (z) {
            this.subscriptions.add(Flux.switchOnNext(this.messageReceivedEmitter.flatMap(str -> {
                return Mono.delay(amqpRetryOptions.getTryTimeout());
            }).handle((l, synchronousSink) -> {
                this.logger.info("entityPath[{}]. sessionId[{}]. Did not a receive message within timeout {}.", new Object[]{serviceBusReceiveLink.getEntityPath(), this.sessionId.get(), amqpRetryOptions.getTryTimeout()});
                this.cancelReceiveProcessor.onComplete();
                synchronousSink.complete();
            })).subscribe());
        }
        this.subscriptions.add(serviceBusReceiveLink.getSessionId().subscribe(str2 -> {
            if (this.sessionId.compareAndSet(null, str2)) {
                return;
            }
            this.logger.warning("Another method set sessionId. Existing: {}. Returned: {}.", new Object[]{this.sessionId.get(), str2});
        }));
        this.subscriptions.add(serviceBusReceiveLink.getSessionLockedUntil().subscribe(instant -> {
            if (this.sessionLockedUntil.compareAndSet(null, instant)) {
                this.subscriptions.add(getRenewLockOperation(instant));
            } else {
                this.logger.info("SessionLockedUntil was already set: {}", new Object[]{this.sessionLockedUntil});
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean containsLockToken(String str) {
        if (str == null) {
            throw this.logger.logExceptionAsError(new NullPointerException("'lockToken' cannot be null."));
        }
        if (str.isEmpty()) {
            throw this.logger.logExceptionAsError(new IllegalArgumentException("'lockToken' cannot be an empty string."));
        }
        return this.lockContainer.contains(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getLinkName() {
        return this.receiveLink.getLinkName();
    }

    String getSessionId() {
        return this.sessionId.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessageContext> receive() {
        return this.receivedMessages;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSessionLockedUntil(Instant instant) {
        this.sessionLockedUntil.set(instant);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> updateDisposition(String str, DeliveryState deliveryState) {
        return this.receiveLink.updateDisposition(str, deliveryState);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.receiveLink.dispose();
        this.subscriptions.dispose();
    }

    private Disposable getRenewLockOperation(Instant instant) {
        Instant now = Instant.now();
        Duration between = Duration.between(now, instant);
        if (between.isNegative()) {
            this.logger.info("Duration was negative. now[{}] lockedUntil[{}]", new Object[]{now, instant});
            between = Duration.ZERO;
        } else {
            Duration adjustServerTimeout = MessageUtils.adjustServerTimeout(between);
            if (adjustServerTimeout.isNegative()) {
                this.logger.info("Adjusted duration is negative. Adjusted: {}ms", new Object[]{Long.valueOf(between.toMillis())});
            } else {
                between = adjustServerTimeout;
            }
        }
        EmitterProcessor create = EmitterProcessor.create();
        FluxSink sink = create.sink(FluxSink.OverflowStrategy.BUFFER);
        sink.next(MessageUtils.adjustServerTimeout(between));
        return Flux.switchOnNext(create.map(Flux::interval)).takeUntilOther(this.enableSessionLockRenewal ? Flux.first(new Publisher[]{this.cancelReceiveProcessor, Mono.delay(this.maxSessionLockRenewDuration)}) : Flux.first(new Publisher[]{this.cancelReceiveProcessor})).flatMap(l -> {
            String str = this.sessionId.get();
            this.logger.info("sessionId[{}]. now[{}]. Starting lock renewal.", new Object[]{str, Instant.now()});
            return CoreUtils.isNullOrEmpty(str) ? Mono.error(new IllegalStateException("Cannot renew session lock without session id.")) : this.renewSessionLock.apply(this.sessionId.get());
        }).map(instant2 -> {
            Duration between2 = Duration.between(Instant.now(), instant2);
            this.logger.info("sessionId[{}]. nextExpiration[{}]. Next renewal: [{}]", new Object[]{this.sessionId, instant2, between2});
            sink.next(MessageUtils.adjustServerTimeout(between2));
            return instant2;
        }).subscribe(instant3 -> {
            this.logger.verbose("lockToken[{}]. lockedUntil[{}]. Lock renewal successful.", new Object[]{this.sessionId, instant3});
            this.sessionLockedUntil.set(instant3);
        }, th -> {
            this.logger.error("Error occurred while renewing lock token.", new Object[]{th});
            this.cancelReceiveProcessor.onNext(new ServiceBusReceivedMessageContext(this.sessionId.get(), th));
        }, () -> {
            this.logger.verbose("Renewing session lock task completed.");
            this.cancelReceiveProcessor.onComplete();
        });
    }
}
