package dev.snowdrop.vertx.kafka;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.converters.multi.MultiReactorConverters;
import io.smallrye.mutiny.converters.uni.UniReactorConverters;
import io.vertx.core.streams.ReadStream;
import io.vertx.kafka.client.common.TopicPartition;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/vertx-spring-boot-starter-kafka-1.1.6.jar:dev/snowdrop/vertx/kafka/SnowdropKafkaConsumer.class */
final class SnowdropKafkaConsumer<K, V> implements KafkaConsumer<K, V> {
    private final io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<K, V> delegate;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnowdropKafkaConsumer(io.vertx.mutiny.kafka.client.consumer.KafkaConsumer<K, V> kafkaConsumer) {
        this.delegate = kafkaConsumer;
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> subscribe(String str) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("Topic cannot be empty");
        }
        return (Mono) this.delegate.subscribe(str).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> subscribe(Collection<String> collection) {
        Objects.requireNonNull(collection, "Topics cannot be null");
        return (Mono) this.delegate.subscribe(new HashSet(collection)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> assign(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return (Mono) this.delegate.assign(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> assign(Collection<Partition> collection) {
        Objects.requireNonNull(collection, "Partitions cannot be null");
        return (Mono) this.delegate.assign((Set<TopicPartition>) collection.stream().map(this::toVertxTopicPartition).collect(Collectors.toSet())).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> unsubscribe() {
        return (Mono) this.delegate.unsubscribe().convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Flux<String> subscriptions() {
        return UniReactorConverters.toMono().apply((Uni) this.delegate.subscription()).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Flux<Partition> assignments() {
        return ((Mono) this.delegate.assignment().convert().with(UniReactorConverters.toMono())).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        }).map(SnowdropPartition::new);
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Flux<PartitionInfo> partitionsFor(String str) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("Topic cannot be empty");
        }
        return ((Mono) this.delegate.partitionsFor(str).convert().with(UniReactorConverters.toMono())).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        }).map(SnowdropPartitionInfo::new);
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public void partitionsRevokedHandler(Consumer<Set<Partition>> consumer) {
        Objects.requireNonNull(consumer, "Handler cannot be null");
        this.delegate.partitionsRevokedHandler(set -> {
            consumer.accept((Set) set.stream().map(SnowdropPartition::new).collect(Collectors.toSet()));
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public void partitionsAssignedHandler(Consumer<Set<Partition>> consumer) {
        Objects.requireNonNull(consumer, "Handler cannot be null");
        this.delegate.partitionsAssignedHandler(set -> {
            consumer.accept((Set) set.stream().map(SnowdropPartition::new).collect(Collectors.toSet()));
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> seek(Partition partition, long j) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        if (j < 0) {
            throw new IllegalArgumentException("Offset cannot be negative");
        }
        return (Mono) this.delegate.seek(toVertxTopicPartition(partition), j).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> seekToBeginning(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return (Mono) this.delegate.seekToBeginning(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> seekToBeginning(Collection<Partition> collection) {
        Objects.requireNonNull(collection, "Partitions cannot be null");
        return (Mono) this.delegate.seekToBeginning((Set<TopicPartition>) collection.stream().map(this::toVertxTopicPartition).collect(Collectors.toSet())).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> seekToEnd(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return (Mono) this.delegate.seekToEnd(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> seekToEnd(Collection<Partition> collection) {
        Objects.requireNonNull(collection, "Partitions cannot be null");
        return (Mono) this.delegate.seekToEnd((Set<TopicPartition>) collection.stream().map(this::toVertxTopicPartition).collect(Collectors.toSet())).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Long> position(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return (Mono) this.delegate.position(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Long> committed(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return ((Mono) this.delegate.committed(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono())).map((v0) -> {
            return v0.getOffset();
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Long> beginningOffset(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return (Mono) this.delegate.beginningOffsets(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Long> endOffset(Partition partition) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        return (Mono) this.delegate.endOffsets(toVertxTopicPartition(partition)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Long> timeOffset(Partition partition, long j) {
        Objects.requireNonNull(partition, "Partition cannot be null");
        if (j < 0) {
            throw new IllegalArgumentException("Timestamp cannot be negative");
        }
        return ((Mono) this.delegate.offsetsForTimes(toVertxTopicPartition(partition), Long.valueOf(j)).convert().with(UniReactorConverters.toMono())).map((v0) -> {
            return v0.getOffset();
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> commit() {
        return (Mono) this.delegate.commit().convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public Mono<Void> close() {
        return (Mono) this.delegate.close().convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaConsumer
    public <T> Mono<T> doOnVertxConsumer(Function<io.vertx.kafka.client.consumer.KafkaConsumer<K, V>, T> function) {
        Objects.requireNonNull(function, "Function cannot be null");
        return Mono.create(monoSink -> {
            try {
                monoSink.success(function.apply(this.delegate.mo1445getDelegate()));
            } catch (Throwable th) {
                monoSink.error(th);
            }
        });
    }

    @Override // dev.snowdrop.vertx.streams.ReadStream
    public Mono<ConsumerRecord<K, V>> mono() {
        return ((Mono) this.delegate.toMulti().convert().with(MultiReactorConverters.toMono())).map(SnowdropConsumerRecord::new);
    }

    @Override // dev.snowdrop.vertx.streams.ReadStream
    public Flux<ConsumerRecord<K, V>> flux() {
        return ((Flux) this.delegate.toMulti().convert().with(MultiReactorConverters.toFlux())).map(SnowdropConsumerRecord::new);
    }

    @Override // dev.snowdrop.vertx.streams.ReadStream
    public ReadStream vertxReadStream() {
        return this.delegate.mo1445getDelegate();
    }

    private TopicPartition toVertxTopicPartition(Partition partition) {
        return new TopicPartition(partition.topic(), partition.partition());
    }
}
