package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ManagementChannel.class */
public class ManagementChannel implements ServiceBusManagementNode {
    private final MessageSerializer messageSerializer;
    private final TokenManager tokenManager;
    private final Duration operationTimeout;
    private final Mono<RequestResponseChannel> createChannel;
    private final String fullyQualifiedNamespace;
    private final ClientLogger logger;
    private final String entityPath;
    private volatile boolean isDisposed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ManagementChannel(Mono<RequestResponseChannel> mono, String str, String str2, TokenManager tokenManager, MessageSerializer messageSerializer, Duration duration) {
        this.createChannel = (Mono) Objects.requireNonNull(mono, "'createChannel' cannot be null.");
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.logger = new ClientLogger(String.format("%s<%s>", ManagementChannel.class, str2));
        this.entityPath = (String) Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.tokenManager = (TokenManager) Objects.requireNonNull(tokenManager, "'tokenManager' cannot be null.");
        this.operationTimeout = (Duration) Objects.requireNonNull(duration, "'operationTimeout' cannot be null.");
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Void> cancelScheduledMessages(Iterable<Long> iterable, String str) {
        ArrayList arrayList = new ArrayList();
        iterable.forEach(l -> {
            arrayList.add(l);
        });
        return arrayList.isEmpty() ? Mono.empty() : isAuthorized("com.microsoft:cancel-scheduled-message").then(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:cancel-scheduled-message", str);
            createManagementMessage.setBody(new AmqpValue(Collections.singletonMap(ManagementConstants.SEQUENCE_NUMBERS, (Long[]) arrayList.toArray(new Long[0]))));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null);
        })).then();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<byte[]> getSessionState(String str, String str2) {
        return str == null ? FluxUtil.monoError(this.logger, new NullPointerException("'sessionId' cannot be null.")) : str.isEmpty() ? FluxUtil.monoError(this.logger, new IllegalArgumentException("'sessionId' cannot be blank.")) : isAuthorized("com.microsoft:get-session-state").then(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:get-session-state", str2);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.SESSION_ID, str);
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null);
        })).flatMap(message -> {
            Object value = message.getBody().getValue();
            if (!(value instanceof Map)) {
                return FluxUtil.monoError(this.logger, Exceptions.propagate(new AmqpException(false, String.format("Body not expected when renewing session. Id: %s. Value: %s", str, value), getErrorContext())));
            }
            Object obj = ((Map) value).get(ManagementConstants.SESSION_STATE);
            if (obj != null) {
                return Mono.just(((Binary) obj).getArray());
            }
            this.logger.info("sessionId[{}]. Does not have a session state.", new Object[]{str});
            return Mono.empty();
        });
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<ServiceBusReceivedMessage> peek(long j, String str, String str2) {
        return peek(j, str, str2, 1).next();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Flux<ServiceBusReceivedMessage> peek(long j, String str, String str2, int i) {
        return isAuthorized("com.microsoft:peek-message").thenMany(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:peek-message", str2);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.FROM_SEQUENCE_NUMBER, Long.valueOf(j));
            hashMap.put(ManagementConstants.MESSAGE_COUNT_KEY, Integer.valueOf(i));
            if (!CoreUtils.isNullOrEmpty(str)) {
                hashMap.put(ManagementConstants.SESSION_ID, str);
            }
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null);
        }).flatMapMany(message -> {
            return Flux.fromIterable(this.messageSerializer.deserializeList(message, ServiceBusReceivedMessage.class));
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(ServiceBusReceiveMode serviceBusReceiveMode, String str, String str2, Iterable<Long> iterable) {
        if (iterable == null) {
            return FluxUtil.fluxError(this.logger, new NullPointerException("'sequenceNumbers' cannot be null"));
        }
        ArrayList arrayList = new ArrayList();
        iterable.forEach(l -> {
            arrayList.add(l);
        });
        return arrayList.isEmpty() ? Flux.empty() : isAuthorized("com.microsoft:receive-by-sequence-number").thenMany(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:receive-by-sequence-number", str2);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.SEQUENCE_NUMBERS, arrayList.toArray(new Long[0]));
            hashMap.put(ManagementConstants.RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(serviceBusReceiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE ? 0 : 1));
            if (!CoreUtils.isNullOrEmpty(str)) {
                hashMap.put(ManagementConstants.SESSION_ID, str);
            }
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null);
        }).flatMapMany(message -> {
            return Flux.fromIterable(this.messageSerializer.deserializeList(message, ServiceBusReceivedMessage.class));
        }));
    }

    private Throwable mapError(Throwable th) {
        return th instanceof AmqpException ? new ServiceBusException(th, ServiceBusErrorSource.MANAGEMENT) : th;
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<OffsetDateTime> renewMessageLock(String str, String str2) {
        return isAuthorized("com.microsoft:peek-message").then(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:renew-lock", str2);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.LOCK_TOKENS_KEY, new UUID[]{UUID.fromString(str)});
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null);
        }).map(message -> {
            List deserializeList = this.messageSerializer.deserializeList(message, OffsetDateTime.class);
            if (CoreUtils.isNullOrEmpty(deserializeList)) {
                throw this.logger.logExceptionAsError(Exceptions.propagate(new AmqpException(false, String.format("Service bus response empty. Could not renew message with lock token: '%s'.", str), getErrorContext())));
            }
            return (OffsetDateTime) deserializeList.get(0);
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<OffsetDateTime> renewSessionLock(String str, String str2) {
        return str == null ? FluxUtil.monoError(this.logger, new NullPointerException("'sessionId' cannot be null.")) : str.isEmpty() ? FluxUtil.monoError(this.logger, new IllegalArgumentException("'sessionId' cannot be blank.")) : isAuthorized("com.microsoft:renew-session-lock").then(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:renew-session-lock", str2);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.SESSION_ID, str);
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null);
        })).map(message -> {
            Object value = message.getBody().getValue();
            if (!(value instanceof Map)) {
                throw this.logger.logExceptionAsError(Exceptions.propagate(new AmqpException(false, String.format("Body not expected when renewing session. Id: %s. Value: %s", str, value), getErrorContext())));
            }
            Object obj = ((Map) value).get(ManagementConstants.EXPIRATION);
            if (obj instanceof Date) {
                return ((Date) obj).toInstant().atOffset(ZoneOffset.UTC);
            }
            throw this.logger.logExceptionAsError(Exceptions.propagate(new AmqpException(false, String.format("Expiration is not of type Date when renewing session. Id: %s. Value: %s", str, obj), getErrorContext())));
        });
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Flux<Long> schedule(List<ServiceBusMessage> list, OffsetDateTime offsetDateTime, int i, String str, ServiceBusTransactionContext serviceBusTransactionContext) {
        return isAuthorized("com.microsoft:schedule-message").thenMany(this.createChannel.flatMap(requestResponseChannel -> {
            LinkedList linkedList = new LinkedList();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ServiceBusMessage serviceBusMessage = (ServiceBusMessage) it.next();
                serviceBusMessage.setScheduledEnqueueTime(offsetDateTime);
                Message serialize = this.messageSerializer.serialize(serviceBusMessage);
                int min = Math.min(this.messageSerializer.getSize(serialize) + 512, i);
                byte[] bArr = new byte[min];
                try {
                    int encode = serialize.encode(bArr, 0, min);
                    HashMap hashMap = new HashMap();
                    hashMap.put(ManagementConstants.MESSAGE, new Binary(bArr, 0, encode));
                    hashMap.put(ManagementConstants.MESSAGE_ID, serialize.getMessageId());
                    String groupId = serialize.getGroupId();
                    if (!CoreUtils.isNullOrEmpty(groupId)) {
                        hashMap.put(ManagementConstants.SESSION_ID, groupId);
                    }
                    String partitionKey = serviceBusMessage.getPartitionKey();
                    if (!CoreUtils.isNullOrEmpty(partitionKey)) {
                        hashMap.put(ManagementConstants.PARTITION_KEY, partitionKey);
                    }
                    linkedList.add(hashMap);
                } catch (BufferOverflowException e) {
                    return FluxUtil.monoError(this.logger, Exceptions.propagate(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format("Error sending. Size of the payload exceeded maximum message size: %s kb", Integer.valueOf(i / 1024)), e, requestResponseChannel.getErrorContext())));
                }
            }
            HashMap hashMap2 = new HashMap();
            hashMap2.put(ManagementConstants.MESSAGES, linkedList);
            Message createManagementMessage = createManagementMessage("com.microsoft:schedule-message", str);
            createManagementMessage.setBody(new AmqpValue(hashMap2));
            TransactionalState transactionalState = null;
            if (serviceBusTransactionContext != null && serviceBusTransactionContext.getTransactionId() != null) {
                transactionalState = new TransactionalState();
                transactionalState.setTxnId(new Binary(serviceBusTransactionContext.getTransactionId().array()));
            }
            return sendWithVerify(requestResponseChannel, createManagementMessage, transactionalState);
        }).flatMapMany(message -> {
            List deserializeList = this.messageSerializer.deserializeList(message, Long.class);
            if (CoreUtils.isNullOrEmpty(deserializeList)) {
                FluxUtil.fluxError(this.logger, new AmqpException(false, String.format("Service Bus response was empty. Could not schedule message()s.", new Object[0]), getErrorContext()));
            }
            return Flux.fromIterable(deserializeList);
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Void> setSessionState(String str, byte[] bArr, String str2) {
        return str == null ? FluxUtil.monoError(this.logger, new NullPointerException("'sessionId' cannot be null.")) : str.isEmpty() ? FluxUtil.monoError(this.logger, new IllegalArgumentException("'sessionId' cannot be blank.")) : isAuthorized("com.microsoft:set-session-state").then(this.createChannel.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:set-session-state", str2);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.SESSION_ID, str);
            hashMap.put(ManagementConstants.SESSION_STATE, bArr == null ? null : new Binary(bArr));
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return sendWithVerify(requestResponseChannel, createManagementMessage, null).then();
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Void> updateDisposition(String str, DispositionStatus dispositionStatus, String str2, String str3, Map<String, Object> map, String str4, String str5, ServiceBusTransactionContext serviceBusTransactionContext) {
        UUID[] uuidArr = {UUID.fromString(str)};
        return isAuthorized("com.microsoft:update-disposition").then(this.createChannel.flatMap(requestResponseChannel -> {
            this.logger.verbose("Update disposition of deliveries '{}' to '{}' on entity '{}', session '{}'", new Object[]{Arrays.toString(uuidArr), dispositionStatus, this.entityPath, str4});
            Message createManagementMessage = createManagementMessage("com.microsoft:update-disposition", str5);
            HashMap hashMap = new HashMap();
            hashMap.put(ManagementConstants.LOCK_TOKENS_KEY, uuidArr);
            hashMap.put("disposition-status", dispositionStatus.getValue());
            if (str2 != null) {
                hashMap.put("deadletter-reason", str2);
            }
            if (str3 != null) {
                hashMap.put("deadletter-description", str3);
            }
            if (map != null && map.size() > 0) {
                hashMap.put("properties-to-modify", map);
            }
            if (!CoreUtils.isNullOrEmpty(str4)) {
                hashMap.put(ManagementConstants.SESSION_ID, str4);
            }
            createManagementMessage.setBody(new AmqpValue(hashMap));
            TransactionalState transactionalState = null;
            if (serviceBusTransactionContext != null && serviceBusTransactionContext.getTransactionId() != null) {
                transactionalState = new TransactionalState();
                transactionalState.setTxnId(new Binary(serviceBusTransactionContext.getTransactionId().array()));
            }
            return sendWithVerify(requestResponseChannel, createManagementMessage, transactionalState);
        })).then();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed) {
            return;
        }
        this.isDisposed = true;
        this.tokenManager.close();
    }

    private Mono<Message> sendWithVerify(RequestResponseChannel requestResponseChannel, Message message, DeliveryState deliveryState) {
        return requestResponseChannel.sendWithAck(message, deliveryState).handle((message2, synchronousSink) -> {
            if (RequestResponseUtils.isSuccessful(message2)) {
                synchronousSink.next(message2);
                return;
            }
            AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message2);
            if (statusCode == AmqpResponseCode.NO_CONTENT) {
                synchronousSink.next(message2);
                return;
            }
            String errorCondition = RequestResponseUtils.getErrorCondition(message2);
            if (statusCode == AmqpResponseCode.NOT_FOUND) {
                AmqpErrorCondition fromString = AmqpErrorCondition.fromString(errorCondition);
                if (fromString == AmqpErrorCondition.MESSAGE_NOT_FOUND) {
                    this.logger.info("There was no matching message found.");
                    synchronousSink.next(message2);
                    return;
                } else if (fromString == AmqpErrorCondition.SESSION_NOT_FOUND) {
                    this.logger.info("There was no matching session found.");
                    synchronousSink.next(message2);
                    return;
                }
            }
            String statusDescription = RequestResponseUtils.getStatusDescription(message2);
            Exception exception = ExceptionUtil.toException(errorCondition, statusDescription, requestResponseChannel.getErrorContext());
            this.logger.warning("status[{}] description[{}] condition[{}] Operation not successful.", new Object[]{statusCode, statusDescription, errorCondition});
            synchronousSink.error(exception);
        }).switchIfEmpty(Mono.error(new AmqpException(true, "No response received from management channel.", requestResponseChannel.getErrorContext()))).onErrorMap(this::mapError);
    }

    private Mono<Void> isAuthorized(String str) {
        return this.tokenManager.getAuthorizationResults().onErrorMap(this::mapError).next().handle((amqpResponseCode, synchronousSink) -> {
            if (amqpResponseCode == AmqpResponseCode.ACCEPTED || amqpResponseCode == AmqpResponseCode.OK) {
                synchronousSink.complete();
            } else {
                synchronousSink.error(new ServiceBusException(new AmqpException(false, AmqpErrorCondition.UNAUTHORIZED_ACCESS, String.format("User does not have authorization to perform operation [%s] on entity [%s]. Response: [%s]", str, this.entityPath, amqpResponseCode), getErrorContext()), ServiceBusErrorSource.MANAGEMENT));
            }
        });
    }

    private Message createManagementMessage(String str, String str2) {
        Duration adjustServerTimeout = MessageUtils.adjustServerTimeout(this.operationTimeout);
        HashMap hashMap = new HashMap();
        hashMap.put("operation", str);
        hashMap.put("com.microsoft:server-timeout", Long.valueOf(adjustServerTimeout.toMillis()));
        if (!CoreUtils.isNullOrEmpty(str2)) {
            hashMap.put(ManagementConstants.ASSOCIATED_LINK_NAME_KEY, str2);
        }
        Message message = Proton.message();
        message.setApplicationProperties(new ApplicationProperties(hashMap));
        return message;
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.fullyQualifiedNamespace, this.entityPath);
    }
}
