package dev.snowdrop.vertx.kafka;

import io.smallrye.mutiny.converters.uni.UniReactorConverters;
import io.vertx.core.streams.WriteStream;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.kafka.client.producer.KafkaHeader;
import io.vertx.mutiny.kafka.client.producer.KafkaProducerRecord;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/vertx-spring-boot-starter-kafka-1.1.6.jar:dev/snowdrop/vertx/kafka/SnowdropKafkaProducer.class */
final class SnowdropKafkaProducer<K, V> implements KafkaProducer<K, V> {
    private final io.vertx.mutiny.kafka.client.producer.KafkaProducer<K, V> delegate;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnowdropKafkaProducer(io.vertx.mutiny.kafka.client.producer.KafkaProducer<K, V> kafkaProducer) {
        this.delegate = kafkaProducer;
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer
    public Mono<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        Objects.requireNonNull(producerRecord, "Record cannot be null");
        return ((Mono) this.delegate.send(toMutinyProducerRecord(producerRecord)).convert().with(UniReactorConverters.toMono())).map(SnowdropRecordMetadata::new);
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer
    public Flux<PartitionInfo> partitionsFor(String str) {
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("Topic cannot be empty");
        }
        return ((Mono) this.delegate.partitionsFor(str).convert().with(UniReactorConverters.toMono())).flatMapMany((v0) -> {
            return Flux.fromIterable(v0);
        }).map(SnowdropPartitionInfo::new);
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer
    public Mono<Void> flush() {
        return Mono.create(monoSink -> {
            try {
                this.delegate.flush(r3 -> {
                    monoSink.success();
                });
            } catch (Throwable th) {
                monoSink.error(th);
            }
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer
    public Mono<Void> close() {
        return (Mono) this.delegate.close().convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer
    public Mono<Void> close(long j) {
        return (Mono) this.delegate.close(j).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer
    public <T> Mono<T> doOnVertxProducer(Function<io.vertx.kafka.client.producer.KafkaProducer<K, V>, T> function) {
        Objects.requireNonNull(function, "Function cannot be null");
        return Mono.create(monoSink -> {
            try {
                monoSink.success(function.apply(this.delegate.mo1445getDelegate()));
            } catch (Throwable th) {
                monoSink.error(th);
            }
        });
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer, dev.snowdrop.vertx.streams.WriteStream
    public KafkaProducer<K, V> exceptionHandler(Consumer<Throwable> consumer) {
        this.delegate.exceptionHandler(consumer);
        return this;
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer, dev.snowdrop.vertx.streams.WriteStream
    public KafkaProducer<K, V> drainHandler(Consumer<Void> consumer) {
        this.delegate.drainHandler(consumer);
        return this;
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer, dev.snowdrop.vertx.streams.WriteStream
    public KafkaProducer<K, V> setWriteQueueMaxSize(int i) {
        this.delegate.setWriteQueueMaxSize2(i);
        return this;
    }

    @Override // dev.snowdrop.vertx.streams.WriteStream
    public boolean writeQueueFull() {
        return this.delegate.writeQueueFull();
    }

    @Override // dev.snowdrop.vertx.streams.WriteStream
    public Mono<Void> write(ProducerRecord<K, V> producerRecord) {
        Objects.requireNonNull(producerRecord, "Record cannot be null");
        return (Mono) this.delegate.write((KafkaProducerRecord) toMutinyProducerRecord(producerRecord)).convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.streams.WriteStream
    public Mono<Void> end() {
        return (Mono) this.delegate.end().convert().with(UniReactorConverters.toMono());
    }

    @Override // dev.snowdrop.vertx.streams.WriteStream
    public WriteStream vertxWriteStream() {
        return this.delegate.mo1445getDelegate();
    }

    private KafkaProducerRecord<K, V> toMutinyProducerRecord(ProducerRecord<K, V> producerRecord) {
        return KafkaProducerRecord.create(producerRecord.topic(), producerRecord.key(), producerRecord.value(), producerRecord.timestamp(), producerRecord.partition()).addHeaders((List) producerRecord.headers().stream().map(this::toMutinyHeader).collect(Collectors.toList()));
    }

    private KafkaHeader toMutinyHeader(Header header) {
        return KafkaHeader.header(header.key(), Buffer.buffer(header.value().asByteBuffer().array()));
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer, dev.snowdrop.vertx.streams.WriteStream
    public /* bridge */ /* synthetic */ dev.snowdrop.vertx.streams.WriteStream drainHandler(Consumer consumer) {
        return drainHandler((Consumer<Void>) consumer);
    }

    @Override // dev.snowdrop.vertx.kafka.KafkaProducer, dev.snowdrop.vertx.streams.WriteStream
    public /* bridge */ /* synthetic */ dev.snowdrop.vertx.streams.WriteStream exceptionHandler(Consumer consumer) {
        return exceptionHandler((Consumer<Throwable>) consumer);
    }
}
