package com.github.sonus21.rqueue.core.impl;

import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.utils.PriorityUtils;
import com.github.sonus21.rqueue.utils.Validator;
import java.util.Objects;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl.class */
public class ReactiveRqueueMessageEnqueuerImpl extends BaseMessageSender implements ReactiveRqueueMessageEnqueuer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReactiveRqueueMessageEnqueuerImpl.class);

    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$BoolMonoConverter.class */
    private static class BoolMonoConverter implements MonoConverter<Boolean> {
        private BoolMonoConverter(RqueueMessage rqueueMessage) {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.github.sonus21.rqueue.core.impl.ReactiveRqueueMessageEnqueuerImpl.MonoConverter
        public Boolean call(Long l, Boolean bool) {
            return Boolean.TRUE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$BooleanMonoConverterGenerator.class */
    public static class BooleanMonoConverterGenerator implements MonoConverterGenerator<Boolean> {
        private BooleanMonoConverterGenerator() {
        }

        @Override // com.github.sonus21.rqueue.core.impl.ReactiveRqueueMessageEnqueuerImpl.MonoConverterGenerator
        public MonoConverter<Boolean> create(RqueueMessage rqueueMessage) {
            return new BoolMonoConverter(rqueueMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$MessageBuilder.class */
    public interface MessageBuilder {
        RqueueMessage build(MessageConverter messageConverter, String str, String str2, Object obj, Integer num, Long l, MessageHeaders messageHeaders);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$MonoConverter.class */
    public interface MonoConverter<T> {
        T call(Long l, Boolean bool);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$MonoConverterGenerator.class */
    public interface MonoConverterGenerator<T> {
        MonoConverter<T> create(RqueueMessage rqueueMessage);
    }

    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$StrMonoConverter.class */
    private static class StrMonoConverter implements MonoConverter<String> {
        private final RqueueMessage message;

        private StrMonoConverter(RqueueMessage rqueueMessage) {
            this.message = rqueueMessage;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.github.sonus21.rqueue.core.impl.ReactiveRqueueMessageEnqueuerImpl.MonoConverter
        public String call(Long l, Boolean bool) {
            return this.message.getId();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl$StrMonoConverterGenerator.class */
    public static class StrMonoConverterGenerator implements MonoConverterGenerator<String> {
        private StrMonoConverterGenerator() {
        }

        @Override // com.github.sonus21.rqueue.core.impl.ReactiveRqueueMessageEnqueuerImpl.MonoConverterGenerator
        public MonoConverter<String> create(RqueueMessage rqueueMessage) {
            return new StrMonoConverter(rqueueMessage);
        }
    }

    public ReactiveRqueueMessageEnqueuerImpl(RqueueMessageTemplate rqueueMessageTemplate, MessageConverter messageConverter, MessageHeaders messageHeaders) {
        super(rqueueMessageTemplate, messageConverter, messageHeaders);
    }

    private <T> Mono<T> pushReactiveMessage(MessageBuilder messageBuilder, String str, String str2, Object obj, Integer num, Long l, MonoConverterGenerator<T> monoConverterGenerator) {
        QueueDetail queueDetail = EndpointRegistry.get(str);
        RqueueMessage build = messageBuilder.build(this.messageConverter, str, str2, obj, num, l, this.messageHeaders);
        MonoConverter<T> create = monoConverterGenerator.create(build);
        try {
            Object enqueue = enqueue(queueDetail, build, l, true);
            Mono mono = (Mono) storeMessageMetadata(build, l, true);
            Mono elementAt = enqueue instanceof Flux ? ((Flux) enqueue).elementAt(0) : (Mono) enqueue;
            Objects.requireNonNull(create);
            return elementAt.zipWith(mono, create::call);
        } catch (Exception e) {
            log.error("Queue: {} Message {} could not be pushed {}", new Object[]{str, build, e});
            return Mono.error(e);
        }
    }

    private Mono<String> pushReactiveMessage(String str, Object obj, Integer num, Long l) {
        return pushReactiveMessage(RqueueMessageUtils::buildMessage, str, null, obj, num, l, new StrMonoConverterGenerator());
    }

    private Mono<Boolean> pushReactiveWithMessageId(String str, String str2, Object obj, Integer num, Long l) {
        return pushReactiveMessage(RqueueMessageUtils::buildMessage, str, str2, obj, num, l, new BooleanMonoConverterGenerator());
    }

    private Mono<String> pushReactivePeriodicMessage(String str, Object obj, long j) {
        return pushReactiveMessage((v0, v1, v2, v3, v4, v5, v6) -> {
            return RqueueMessageUtils.buildPeriodicMessage(v0, v1, v2, v3, v4, v5, v6);
        }, str, null, obj, null, Long.valueOf(j), new StrMonoConverterGenerator());
    }

    private Mono<Boolean> pushReactivePeriodicMessageWithMessageId(String str, String str2, Object obj, long j) {
        return pushReactiveMessage((v0, v1, v2, v3, v4, v5, v6) -> {
            return RqueueMessageUtils.buildPeriodicMessage(v0, v1, v2, v3, v4, v5, v6);
        }, str, str2, obj, null, Long.valueOf(j), new BooleanMonoConverterGenerator());
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<String> enqueue(String str, Object obj) {
        Validator.validateQueue(str);
        Validator.validateMessage(obj);
        return pushReactiveMessage(str, obj, null, null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueue(String str, String str2, Object obj) {
        Validator.validateQueue(str);
        Validator.validateMessageId(str2);
        Validator.validateMessage(obj);
        return pushReactiveWithMessageId(str, str2, obj, null, null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueueUnique(String str, String str2, Object obj) {
        Validator.validateQueue(str);
        Validator.validateMessageId(str2);
        Validator.validateMessage(obj);
        return pushReactiveWithMessageId(str, str2, obj, null, null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<String> enqueueWithRetry(String str, Object obj, int i) {
        Validator.validateQueue(str);
        Validator.validateMessage(obj);
        Validator.validateRetryCount(i);
        return pushReactiveMessage(str, obj, Integer.valueOf(i), null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueueWithRetry(String str, String str2, Object obj, int i) {
        Validator.validateQueue(str);
        Validator.validateMessageId(str2);
        Validator.validateMessage(obj);
        Validator.validateRetryCount(i);
        return pushReactiveWithMessageId(str, str2, obj, Integer.valueOf(i), null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<String> enqueueWithPriority(String str, String str2, Object obj) {
        Validator.validateQueue(str);
        Validator.validatePriority(str2);
        Validator.validateMessage(obj);
        return pushReactiveMessage(PriorityUtils.getQueueNameForPriority(str, str2), obj, null, null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueueWithPriority(String str, String str2, String str3, Object obj) {
        Validator.validateQueue(str);
        Validator.validatePriority(str2);
        Validator.validateMessageId(str3);
        Validator.validateMessage(obj);
        return pushReactiveWithMessageId(PriorityUtils.getQueueNameForPriority(str, str2), str3, obj, null, null);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<String> enqueueIn(String str, Object obj, long j) {
        Validator.validateQueue(str);
        Validator.validateMessage(obj);
        Validator.validateDelay(j);
        return pushReactiveMessage(str, obj, null, Long.valueOf(j));
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueueIn(String str, String str2, Object obj, long j) {
        Validator.validateQueue(str);
        Validator.validateMessageId(str2);
        Validator.validateMessage(obj);
        Validator.validateDelay(j);
        return pushReactiveWithMessageId(str, str2, obj, null, Long.valueOf(j));
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueueUniqueIn(String str, String str2, Object obj, long j) {
        Validator.validateQueue(str);
        Validator.validateMessageId(str2);
        Validator.validateMessage(obj);
        Validator.validateDelay(j);
        return pushReactiveWithMessageId(str, str2, obj, null, Long.valueOf(j));
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<String> enqueueInWithRetry(String str, Object obj, int i, long j) {
        Validator.validateQueue(str);
        Validator.validateMessage(obj);
        Validator.validateRetryCount(i);
        Validator.validateDelay(j);
        return pushReactiveMessage(str, obj, Integer.valueOf(i), Long.valueOf(j));
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueueInWithRetry(String str, String str2, Object obj, int i, long j) {
        Validator.validateQueue(str);
        Validator.validateMessageId(str2);
        Validator.validateMessage(obj);
        Validator.validateDelay(i);
        Validator.validateDelay(j);
        return pushReactiveWithMessageId(str, str2, obj, Integer.valueOf(i), Long.valueOf(j));
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<String> enqueuePeriodic(String str, Object obj, long j) {
        Validator.validateQueue(str);
        Validator.validateMessage(obj);
        Validator.validatePeriod(j);
        return pushReactivePeriodicMessage(str, obj, j);
    }

    @Override // com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer
    public Mono<Boolean> enqueuePeriodic(String str, String str2, Object obj, long j) {
        Validator.validateQueue(str);
        Validator.validateMessage(obj);
        Validator.validateMessageId(str2);
        Validator.validatePeriod(j);
        return pushReactivePeriodicMessageWithMessageId(str, str2, obj, j);
    }
}
