package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamStatus;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.ZonedDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.Generated;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement.class */
class DefaultStreamManagement implements StreamManagement {

    @Generated
    private static final Logger log = Logger.getLogger(DefaultStreamManagement.class);
    private final io.nats.client.Connection connection;
    private final StreamStateMapper streamStateMapper;
    private final ConsumerMapper consumerMapper;
    private final Vertx vertx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple.class */
    public static final class StreamInfoTuple extends Record {
        private final StreamInfo streamInfo;
        private final JetStreamManagement jetStreamManagement;

        @Generated
        /* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple$StreamInfoTupleBuilder.class */
        public static class StreamInfoTupleBuilder {

            @Generated
            private StreamInfo streamInfo;

            @Generated
            private JetStreamManagement jetStreamManagement;

            @Generated
            StreamInfoTupleBuilder() {
            }

            @Generated
            public StreamInfoTupleBuilder streamInfo(StreamInfo streamInfo) {
                this.streamInfo = streamInfo;
                return this;
            }

            @Generated
            public StreamInfoTupleBuilder jetStreamManagement(JetStreamManagement jetStreamManagement) {
                this.jetStreamManagement = jetStreamManagement;
                return this;
            }

            @Generated
            public StreamInfoTuple build() {
                return new StreamInfoTuple(this.streamInfo, this.jetStreamManagement);
            }

            @Generated
            public String toString() {
                return "DefaultStreamManagement.StreamInfoTuple.StreamInfoTupleBuilder(streamInfo=" + this.streamInfo + ", jetStreamManagement=" + this.jetStreamManagement + ")";
            }
        }

        private StreamInfoTuple(StreamInfo streamInfo, JetStreamManagement jetStreamManagement) {
            this.streamInfo = streamInfo;
            this.jetStreamManagement = jetStreamManagement;
        }

        @Generated
        public static StreamInfoTupleBuilder builder() {
            return new StreamInfoTupleBuilder();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, StreamInfoTuple.class), StreamInfoTuple.class, "streamInfo;jetStreamManagement", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple;->streamInfo:Lio/nats/client/api/StreamInfo;", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple;->jetStreamManagement:Lio/nats/client/JetStreamManagement;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, StreamInfoTuple.class), StreamInfoTuple.class, "streamInfo;jetStreamManagement", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple;->streamInfo:Lio/nats/client/api/StreamInfo;", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple;->jetStreamManagement:Lio/nats/client/JetStreamManagement;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, StreamInfoTuple.class, Object.class), StreamInfoTuple.class, "streamInfo;jetStreamManagement", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple;->streamInfo:Lio/nats/client/api/StreamInfo;", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamInfoTuple;->jetStreamManagement:Lio/nats/client/JetStreamManagement;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public StreamInfo streamInfo() {
            return this.streamInfo;
        }

        public JetStreamManagement jetStreamManagement() {
            return this.jetStreamManagement;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple.class */
    public static final class StreamSetupConfigurationTuple extends Record {
        private final StreamSetupConfiguration configuration;
        private final JetStreamManagement jetStreamManagement;

        @Generated
        /* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple$StreamSetupConfigurationTupleBuilder.class */
        public static class StreamSetupConfigurationTupleBuilder {

            @Generated
            private StreamSetupConfiguration configuration;

            @Generated
            private JetStreamManagement jetStreamManagement;

            @Generated
            StreamSetupConfigurationTupleBuilder() {
            }

            @Generated
            public StreamSetupConfigurationTupleBuilder configuration(StreamSetupConfiguration streamSetupConfiguration) {
                this.configuration = streamSetupConfiguration;
                return this;
            }

            @Generated
            public StreamSetupConfigurationTupleBuilder jetStreamManagement(JetStreamManagement jetStreamManagement) {
                this.jetStreamManagement = jetStreamManagement;
                return this;
            }

            @Generated
            public StreamSetupConfigurationTuple build() {
                return new StreamSetupConfigurationTuple(this.configuration, this.jetStreamManagement);
            }

            @Generated
            public String toString() {
                return "DefaultStreamManagement.StreamSetupConfigurationTuple.StreamSetupConfigurationTupleBuilder(configuration=" + this.configuration + ", jetStreamManagement=" + this.jetStreamManagement + ")";
            }
        }

        private StreamSetupConfigurationTuple(StreamSetupConfiguration streamSetupConfiguration, JetStreamManagement jetStreamManagement) {
            this.configuration = streamSetupConfiguration;
            this.jetStreamManagement = jetStreamManagement;
        }

        @Generated
        public static StreamSetupConfigurationTupleBuilder builder() {
            return new StreamSetupConfigurationTupleBuilder();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, StreamSetupConfigurationTuple.class), StreamSetupConfigurationTuple.class, "configuration;jetStreamManagement", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple;->configuration:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/api/StreamSetupConfiguration;", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple;->jetStreamManagement:Lio/nats/client/JetStreamManagement;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, StreamSetupConfigurationTuple.class), StreamSetupConfigurationTuple.class, "configuration;jetStreamManagement", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple;->configuration:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/api/StreamSetupConfiguration;", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple;->jetStreamManagement:Lio/nats/client/JetStreamManagement;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, StreamSetupConfigurationTuple.class, Object.class), StreamSetupConfigurationTuple.class, "configuration;jetStreamManagement", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple;->configuration:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/api/StreamSetupConfiguration;", "FIELD:Lio/quarkiverse/reactive/messaging/nats/jetstream/client/DefaultStreamManagement$StreamSetupConfigurationTuple;->jetStreamManagement:Lio/nats/client/JetStreamManagement;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public StreamSetupConfiguration configuration() {
            return this.configuration;
        }

        public JetStreamManagement jetStreamManagement() {
            return this.jetStreamManagement;
        }
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Consumer> getConsumer(String str, String str2) {
        Context context = context();
        UniOnItem onItem = getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    return jetStreamManagement.getConsumerInfo(str, str2);
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }).onItem();
        ConsumerMapper consumerMapper = this.consumerMapper;
        Objects.requireNonNull(consumerMapper);
        return context.executeBlocking(onItem.transform(consumerMapper::of));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Void> deleteConsumer(String str, String str2) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    jetStreamManagement.deleteConsumer(str, str2);
                    return null;
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Void> pauseConsumer(String str, String str2, ZonedDateTime zonedDateTime) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    if (jetStreamManagement.pauseConsumer(str, str2, zonedDateTime).isPaused()) {
                        return null;
                    }
                    throw new SystemException(String.format("Unable to pause consumer %s in stream %s", str2, str));
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Void> resumeConsumer(String str, String str2) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    if (jetStreamManagement.resumeConsumer(str, str2)) {
                        return null;
                    }
                    throw new SystemException(String.format("Unable to resume consumer %s in stream %s", str2, str));
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Long> getFirstSequence(String str) {
        return context().executeBlocking(getStreamInfo(str).onItem().transform(streamInfoTuple -> {
            return Long.valueOf(streamInfoTuple.streamInfo().getStreamState().getFirstSequence());
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<List<String>> getStreams() {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            UniCreate createFrom = Uni.createFrom();
            Objects.requireNonNull(jetStreamManagement);
            return createFrom.item(Unchecked.supplier(jetStreamManagement::getStreamNames));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<List<String>> getSubjects(String str) {
        return context().executeBlocking(getStreamInfo(str).onItem().transform(streamInfoTuple -> {
            return streamInfoTuple.streamInfo().getConfiguration().getSubjects();
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<List<String>> getConsumerNames(String str) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    return jetStreamManagement.getConsumerNames(str);
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<PurgeResult> purgeStream(String str) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    PurgeResponse purgeStream = jetStreamManagement.purgeStream(str);
                    return new PurgeResult(str, purgeStream.isSuccess(), purgeStream.getPurged());
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Void> deleteMessage(String str, long j, boolean z) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    if (jetStreamManagement.deleteMessage(str, j, z)) {
                        return null;
                    }
                    throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", str, Long.valueOf(j)));
                } catch (IOException | JetStreamApiException e) {
                    throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", str, Long.valueOf(j), e.getMessage()), e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<StreamState> getStreamState(String str) {
        return context().executeBlocking(getStreamInfo(str).onItem().transform(streamInfoTuple -> {
            return this.streamStateMapper.of(streamInfoTuple.streamInfo().getStreamState());
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<StreamConfiguration> getStreamConfiguration(String str) {
        return context().executeBlocking(getStreamInfo(str).onItem().transform(streamInfoTuple -> {
            return StreamConfiguration.of(streamInfoTuple.streamInfo().getConfiguration());
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<List<PurgeResult>> purgeAllStreams() {
        return context().executeBlocking(getStreams().onItem().transformToUni(this::purgeAllStreams));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<List<StreamResult>> addStreams(List<StreamSetupConfiguration> list) {
        return context().executeBlocking(getJetStreamManagement().onItem().transformToMulti(jetStreamManagement -> {
            return Multi.createFrom().items(list.stream().map(streamSetupConfiguration -> {
                return StreamSetupConfigurationTuple.builder().jetStreamManagement(jetStreamManagement).configuration(streamSetupConfiguration).build();
            }));
        }).onItem().transformToUniAndMerge(streamSetupConfigurationTuple -> {
            return addOrUpdateStream(streamSetupConfigurationTuple.jetStreamManagement(), streamSetupConfigurationTuple.configuration());
        }).collect().asList());
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Void> addSubject(String str, String str2) {
        return context().executeBlocking(getStreamInfo(str).onItem().transformToUni(streamInfoTuple -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    HashSet hashSet = new HashSet(streamInfoTuple.streamInfo().getConfiguration().getSubjects());
                    if (hashSet.contains(str2)) {
                        return null;
                    }
                    hashSet.add(str2);
                    streamInfoTuple.jetStreamManagement().updateStream(io.nats.client.api.StreamConfiguration.builder(streamInfoTuple.streamInfo().getConfiguration()).subjects(hashSet).build());
                    return null;
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement
    public Uni<Void> removeSubject(String str, String str2) {
        return context().executeBlocking(getStreamInfo(str).onItem().transformToUni(streamInfoTuple -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                try {
                    HashSet hashSet = new HashSet(streamInfoTuple.streamInfo().getConfiguration().getSubjects());
                    if (!hashSet.contains(str2)) {
                        return null;
                    }
                    hashSet.remove(str2);
                    streamInfoTuple.jetStreamManagement().updateStream(io.nats.client.api.StreamConfiguration.builder(streamInfoTuple.streamInfo().getConfiguration()).subjects(hashSet).build());
                    return null;
                } catch (IOException | JetStreamApiException e) {
                    throw new SystemException(e);
                }
            }));
        }));
    }

    private Uni<StreamInfoTuple> getStreamInfo(String str) {
        return getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return getStreamInfo(jetStreamManagement, str);
        });
    }

    private Uni<StreamInfoTuple> getStreamInfo(JetStreamManagement jetStreamManagement, String str) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(StreamInfoTuple.builder().jetStreamManagement(jetStreamManagement).streamInfo(jetStreamManagement.getStreamInfo(str, StreamInfoOptions.allSubjects())).build());
            } catch (Throwable th) {
                uniEmitter.fail(new SystemException(String.format("Unable to read stream %s with message: %s", str, th.getMessage()), th));
            }
        });
    }

    private Uni<StreamResult> addOrUpdateStream(JetStreamManagement jetStreamManagement, StreamSetupConfiguration streamSetupConfiguration) {
        return getStreamInfo(jetStreamManagement, streamSetupConfiguration.configuration().name()).onItem().transformToUni(streamInfoTuple -> {
            return updateStream(streamInfoTuple.jetStreamManagement(), streamInfoTuple.streamInfo(), streamSetupConfiguration);
        }).onFailure().recoverWithUni(th -> {
            return createStream(jetStreamManagement, streamSetupConfiguration.configuration());
        });
    }

    private Uni<StreamResult> createStream(JetStreamManagement jetStreamManagement, StreamConfiguration streamConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                jetStreamManagement.addStream(new StreamConfigurationFactory().create(streamConfiguration));
                return StreamResult.builder().configuration(streamConfiguration).status(StreamStatus.Created).build();
            } catch (Exception e) {
                throw new SetupException(String.format("Unable to create stream: %s with message: %s", streamConfiguration.name(), e.getMessage()), e);
            }
        }));
    }

    private Uni<StreamResult> updateStream(JetStreamManagement jetStreamManagement, StreamInfo streamInfo, StreamSetupConfiguration streamSetupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                Optional<io.nats.client.api.StreamConfiguration> create = new StreamConfigurationFactory().create(streamInfo.getConfiguration(), streamSetupConfiguration.configuration());
                if (!create.isPresent()) {
                    return StreamResult.builder().configuration(streamSetupConfiguration.configuration()).status(StreamStatus.NotModified).build();
                }
                log.debugf("Updating stream %s", streamSetupConfiguration.configuration().name());
                jetStreamManagement.updateStream(create.get());
                return StreamResult.builder().configuration(streamSetupConfiguration.configuration()).status(StreamStatus.Updated).build();
            } catch (Exception e) {
                log.errorf(e, "message: %s", e.getMessage());
                throw new SetupException(String.format("Unable to update stream: %s with message: %s", streamSetupConfiguration.configuration().name(), e.getMessage()), e);
            }
        })).onFailure().recoverWithUni(th -> {
            return ((th.getCause() instanceof JetStreamApiException) && streamSetupConfiguration.overwrite()) ? deleteStream(jetStreamManagement, streamSetupConfiguration.configuration().name()).onItem().transformToUni(r7 -> {
                return createStream(jetStreamManagement, streamSetupConfiguration.configuration());
            }) : Uni.createFrom().failure(th);
        });
    }

    private Uni<Void> deleteStream(JetStreamManagement jetStreamManagement, String str) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                jetStreamManagement.deleteStream(str);
                return null;
            } catch (IOException | JetStreamApiException e) {
                throw new SetupException(String.format("Unable to delete stream: %s with message: %s", str, e.getMessage()), e);
            }
        }));
    }

    private Optional<PurgeResult> purgeStream(JetStreamManagement jetStreamManagement, String str) {
        try {
            PurgeResponse purgeStream = jetStreamManagement.purgeStream(str);
            return Optional.of(PurgeResult.builder().streamName(str).success(purgeStream.isSuccess()).purgeCount(purgeStream.getPurged()).build());
        } catch (IOException | JetStreamApiException e) {
            log.warnf(e, "Unable to purge stream %s with message: %s", str, e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<List<PurgeResult>> purgeAllStreams(List<String> list) {
        return getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return Uni.createFrom().item(Unchecked.supplier(() -> {
                return list.stream().flatMap(str -> {
                    return purgeStream(jetStreamManagement, str).stream();
                }).toList();
            }));
        });
    }

    private Uni<JetStreamManagement> getJetStreamManagement() {
        UniCreate createFrom = Uni.createFrom();
        io.nats.client.Connection connection = this.connection;
        Objects.requireNonNull(connection);
        return createFrom.item(Unchecked.supplier(connection::jetStreamManagement));
    }

    private Context context() {
        return this.vertx.getOrCreateContext();
    }

    @Generated
    public DefaultStreamManagement(io.nats.client.Connection connection, StreamStateMapper streamStateMapper, ConsumerMapper consumerMapper, Vertx vertx) {
        this.connection = connection;
        this.streamStateMapper = streamStateMapper;
        this.consumerMapper = consumerMapper;
        this.vertx = vertx;
    }
}
