package io.github.cfraser.connekt.redis;

import io.github.cfraser.connekt.api.Transport;
import io.github.cfraser.connekt.redis.RedisTransport;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.ToByteBufEncoder;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import io.lettuce.core.resource.ClientResources;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.ContinuationImpl;
import kotlin.coroutines.jvm.internal.DebugMetadata;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.MutablePropertyReference1Impl;
import kotlin.jvm.internal.Reflection;
import kotlin.properties.Delegates;
import kotlin.properties.ReadWriteProperty;
import kotlin.reflect.KProperty;
import kotlin.text.Charsets;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowCollector;
import kotlinx.coroutines.flow.FlowKt;
import kotlinx.coroutines.flow.MutableSharedFlow;
import kotlinx.coroutines.flow.SharedFlow;
import kotlinx.coroutines.flow.SharedFlowKt;
import kotlinx.coroutines.reactor.MonoKt;
import mu.KLogger;
import mu.KotlinLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;

/* compiled from: RedisTransport.kt */
@Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��@\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0012\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0006\u0018�� \u00192\u00020\u0001:\u0005\u0016\u0017\u0018\u0019\u001aB\u000f\b\u0002\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\b\u0010\r\u001a\u00020\u000eH\u0016J!\u0010\u000f\u001a\u00020\u000e2\u0006\u0010\u0010\u001a\u00020\n2\u0006\u0010\u0011\u001a\u00020\u000bH\u0096@ø\u0001��¢\u0006\u0002\u0010\u0012J\u001a\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u000b0\u0014*\u00020\u00152\u0006\u0010\u0010\u001a\u00020\nH\u0016R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��R \u0010\u0007\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\n\u0012\u0004\u0012\u00020\u000b0\t0\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\f\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u001b"}, d2 = {"Lio/github/cfraser/connekt/redis/RedisTransport;", "Lio/github/cfraser/connekt/api/Transport$Base;", "redisClient", "Lio/lettuce/core/RedisClient;", "(Lio/lettuce/core/RedisClient;)V", "receiveConnection", "Lio/github/cfraser/connekt/redis/RedisTransport$PubSubConnection;", "receivedChannelMessages", "Lkotlinx/coroutines/flow/SharedFlow;", "Lio/lettuce/core/pubsub/api/reactive/ChannelMessage;", "", "", "sendConnection", "close", "", "send", "queue", "byteArray", "(Ljava/lang/String;[BLkotlin/coroutines/Continuation;)Ljava/lang/Object;", "receive", "Lkotlinx/coroutines/flow/Flow;", "Lkotlinx/coroutines/CoroutineScope;", "Builder", "ChannelMessageListener", "Codec", "Companion", "PubSubConnection", "connekt-redis"})
/* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport.class */
public final class RedisTransport extends Transport.Base {

    @NotNull
    private final RedisClient redisClient;

    @NotNull
    private final SharedFlow<ChannelMessage<String, byte[]>> receivedChannelMessages;

    @NotNull
    private final PubSubConnection receiveConnection;

    @NotNull
    private final PubSubConnection sendConnection;

    @NotNull
    private static final Companion Companion = new Companion(null);

    @Deprecated
    @NotNull
    private static final Lazy<KLogger> logger$delegate = LazyKt.lazy(new Function0<KLogger>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$Companion$logger$2
        @NotNull
        /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
        public final KLogger m6invoke() {
            return KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$Companion$logger$2.1
                public final void invoke() {
                }

                /* renamed from: invoke, reason: collision with other method in class */
                public /* bridge */ /* synthetic */ Object m8invoke() {
                    invoke();
                    return Unit.INSTANCE;
                }
            });
        }
    });

    /* compiled from: RedisTransport.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��&\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\b\n\u0002\u0018\u0002\n��\u0018��2\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002J\b\u0010\u0010\u001a\u00020\u0011H\u0016J\u000e\u0010\u0003\u001a\u00020��2\u0006\u0010\u0003\u001a\u00020\u0004J\u000e\u0010\u0005\u001a\u00020��2\u0006\u0010\u0005\u001a\u00020\u0006J\u000e\u0010\t\u001a\u00020��2\u0006\u0010\t\u001a\u00020\bR\u0010\u0010\u0003\u001a\u0004\u0018\u00010\u0004X\u0082\u000e¢\u0006\u0002\n��R\u0010\u0010\u0005\u001a\u0004\u0018\u00010\u0006X\u0082\u000e¢\u0006\u0002\n��R+\u0010\t\u001a\u00020\b2\u0006\u0010\u0007\u001a\u00020\b8B@BX\u0082\u008e\u0002¢\u0006\u0012\n\u0004\b\u000e\u0010\u000f\u001a\u0004\b\n\u0010\u000b\"\u0004\b\f\u0010\r¨\u0006\u0012"}, d2 = {"Lio/github/cfraser/connekt/redis/RedisTransport$Builder;", "Lio/github/cfraser/connekt/api/Transport$Builder;", "()V", "clientOptions", "Lio/lettuce/core/ClientOptions;", "clientResources", "Lio/lettuce/core/resource/ClientResources;", "<set-?>", "Lio/lettuce/core/RedisURI;", "redisURI", "getRedisURI", "()Lio/lettuce/core/RedisURI;", "setRedisURI", "(Lio/lettuce/core/RedisURI;)V", "redisURI$delegate", "Lkotlin/properties/ReadWriteProperty;", "build", "Lio/github/cfraser/connekt/api/Transport;", "connekt-redis"})
    /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$Builder.class */
    public static final class Builder implements Transport.Builder {
        static final /* synthetic */ KProperty<Object>[] $$delegatedProperties = {(KProperty) Reflection.mutableProperty1(new MutablePropertyReference1Impl(Builder.class, "redisURI", "getRedisURI()Lio/lettuce/core/RedisURI;", 0))};

        @NotNull
        private final ReadWriteProperty redisURI$delegate = Delegates.INSTANCE.notNull();

        @Nullable
        private ClientResources clientResources;

        @Nullable
        private ClientOptions clientOptions;

        private final RedisURI getRedisURI() {
            return (RedisURI) this.redisURI$delegate.getValue(this, $$delegatedProperties[0]);
        }

        private final void setRedisURI(RedisURI redisURI) {
            this.redisURI$delegate.setValue(this, $$delegatedProperties[0], redisURI);
        }

        @NotNull
        public final Builder redisURI(@NotNull RedisURI redisURI) {
            Intrinsics.checkNotNullParameter(redisURI, "redisURI");
            setRedisURI(redisURI);
            return this;
        }

        @NotNull
        public final Builder clientResources(@NotNull ClientResources clientResources) {
            Intrinsics.checkNotNullParameter(clientResources, "clientResources");
            this.clientResources = clientResources;
            return this;
        }

        @NotNull
        public final Builder clientOptions(@NotNull ClientOptions clientOptions) {
            Intrinsics.checkNotNullParameter(clientOptions, "clientOptions");
            this.clientOptions = clientOptions;
            return this;
        }

        @NotNull
        public Transport build() {
            RedisClient create = this.clientResources != null ? RedisClient.create(this.clientResources, getRedisURI()) : RedisClient.create(getRedisURI());
            if (this.clientOptions != null) {
                create.setOptions(this.clientOptions);
            }
            Intrinsics.checkNotNullExpressionValue(create, "redisClient");
            return new RedisTransport(create, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: RedisTransport.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��.\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0010\t\n\u0002\b\u0004\b\u0002\u0018��2\u000e\u0012\u0004\u0012\u00020\u0002\u0012\u0004\u0012\u00020\u00030\u0001B\u001f\u0012\u0018\u0010\u0004\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0002\u0012\u0004\u0012\u00020\u00030\u00060\u0005¢\u0006\u0002\u0010\u0007J\u001c\u0010\b\u001a\u00020\t2\b\u0010\n\u001a\u0004\u0018\u00010\u00022\b\u0010\b\u001a\u0004\u0018\u00010\u0003H\u0016J&\u0010\b\u001a\u00020\t2\b\u0010\u000b\u001a\u0004\u0018\u00010\u00022\b\u0010\n\u001a\u0004\u0018\u00010\u00022\b\u0010\b\u001a\u0004\u0018\u00010\u0003H\u0016J\u001a\u0010\f\u001a\u00020\t2\b\u0010\u000b\u001a\u0004\u0018\u00010\u00022\u0006\u0010\r\u001a\u00020\u000eH\u0016J\u001a\u0010\u000f\u001a\u00020\t2\b\u0010\u000b\u001a\u0004\u0018\u00010\u00022\u0006\u0010\r\u001a\u00020\u000eH\u0016J\u001a\u0010\u0010\u001a\u00020\t2\b\u0010\n\u001a\u0004\u0018\u00010\u00022\u0006\u0010\r\u001a\u00020\u000eH\u0016J\u001a\u0010\u0011\u001a\u00020\t2\b\u0010\n\u001a\u0004\u0018\u00010\u00022\u0006\u0010\r\u001a\u00020\u000eH\u0016R \u0010\u0004\u001a\u0014\u0012\u0010\u0012\u000e\u0012\u0004\u0012\u00020\u0002\u0012\u0004\u0012\u00020\u00030\u00060\u0005X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0012"}, d2 = {"Lio/github/cfraser/connekt/redis/RedisTransport$ChannelMessageListener;", "Lio/lettuce/core/pubsub/RedisPubSubListener;", "", "", "mutableSharedFlow", "Lkotlinx/coroutines/flow/MutableSharedFlow;", "Lio/lettuce/core/pubsub/api/reactive/ChannelMessage;", "(Lkotlinx/coroutines/flow/MutableSharedFlow;)V", "message", "", "channel", "pattern", "psubscribed", "count", "", "punsubscribed", "subscribed", "unsubscribed", "connekt-redis"})
    /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$ChannelMessageListener.class */
    public static final class ChannelMessageListener implements RedisPubSubListener<String, byte[]> {

        @NotNull
        private final MutableSharedFlow<ChannelMessage<String, byte[]>> mutableSharedFlow;

        public ChannelMessageListener(@NotNull MutableSharedFlow<ChannelMessage<String, byte[]>> mutableSharedFlow) {
            Intrinsics.checkNotNullParameter(mutableSharedFlow, "mutableSharedFlow");
            this.mutableSharedFlow = mutableSharedFlow;
        }

        public void message(@Nullable String str, @Nullable byte[] bArr) {
            if (str == null || bArr == null) {
                return;
            }
            BuildersKt.runBlocking(Dispatchers.getDefault(), new RedisTransport$ChannelMessageListener$message$1(this, str, bArr, null));
        }

        public void message(@Nullable final String str, @Nullable final String str2, @Nullable byte[] bArr) {
            RedisTransport.Companion.getLogger().debug(new Function0<Object>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$ChannelMessageListener$message$2
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }

                @Nullable
                public final Object invoke() {
                    return "Received message for channel " + str2 + " with pattern " + str;
                }
            });
        }

        public void subscribed(@Nullable final String str, long j) {
            RedisTransport.Companion.getLogger().debug(new Function0<Object>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$ChannelMessageListener$subscribed$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }

                @Nullable
                public final Object invoke() {
                    return "Subscribed to channel " + str;
                }
            });
        }

        public void psubscribed(@Nullable final String str, long j) {
            RedisTransport.Companion.getLogger().debug(new Function0<Object>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$ChannelMessageListener$psubscribed$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }

                @Nullable
                public final Object invoke() {
                    return "Subscribed to pattern " + str;
                }
            });
        }

        public void unsubscribed(@Nullable final String str, long j) {
            RedisTransport.Companion.getLogger().debug(new Function0<Object>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$ChannelMessageListener$unsubscribed$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }

                @Nullable
                public final Object invoke() {
                    return "Unsubscribed from channel " + str;
                }
            });
        }

        public void punsubscribed(@Nullable final String str, long j) {
            RedisTransport.Companion.getLogger().debug(new Function0<Object>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$ChannelMessageListener$punsubscribed$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(0);
                }

                @Nullable
                public final Object invoke() {
                    return "Unsubscribed from pattern " + str;
                }
            });
        }
    }

    /* compiled from: RedisTransport.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��B\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0012\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\b\n��\n\u0002\u0010��\n��\bÂ\u0002\u0018��2\u000e\u0012\u0004\u0012\u00020\u0002\u0012\u0004\u0012\u00020\u00030\u00012\u000e\u0012\u0004\u0012\u00020\u0002\u0012\u0004\u0012\u00020\u00030\u0004B\u0007\b\u0002¢\u0006\u0002\u0010\u0005J\u0012\u0010\t\u001a\u00020\u00022\b\u0010\n\u001a\u0004\u0018\u00010\u000bH\u0016J\u0012\u0010\f\u001a\u00020\u00032\b\u0010\n\u001a\u0004\u0018\u00010\u000bH\u0016J\u0012\u0010\r\u001a\u00020\u000b2\b\u0010\u000e\u001a\u0004\u0018\u00010\u0002H\u0016J\u001c\u0010\r\u001a\u00020\u000f2\b\u0010\u000e\u001a\u0004\u0018\u00010\u00022\b\u0010\u0010\u001a\u0004\u0018\u00010\u0011H\u0016J\u0012\u0010\u0012\u001a\u00020\u000b2\b\u0010\u0013\u001a\u0004\u0018\u00010\u0003H\u0016J\u001c\u0010\u0012\u001a\u00020\u000f2\b\u0010\u0013\u001a\u0004\u0018\u00010\u00032\b\u0010\u0010\u001a\u0004\u0018\u00010\u0011H\u0016J\u0012\u0010\u0014\u001a\u00020\u00152\b\u0010\u0016\u001a\u0004\u0018\u00010\u0017H\u0016R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0018"}, d2 = {"Lio/github/cfraser/connekt/redis/RedisTransport$Codec;", "Lio/lettuce/core/codec/RedisCodec;", "", "", "Lio/lettuce/core/codec/ToByteBufEncoder;", "()V", "charset", "Ljava/nio/charset/Charset;", "empty", "decodeKey", "bytes", "Ljava/nio/ByteBuffer;", "decodeValue", "encodeKey", "key", "", "target", "Lio/netty/buffer/ByteBuf;", "encodeValue", "value", "estimateSize", "", "keyOrValue", "", "connekt-redis"})
    /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$Codec.class */
    private static final class Codec implements RedisCodec<String, byte[]>, ToByteBufEncoder<String, byte[]> {

        @NotNull
        public static final Codec INSTANCE = new Codec();

        @NotNull
        private static final byte[] empty = new byte[0];

        @NotNull
        private static final Charset charset = Charsets.UTF_8;

        private Codec() {
        }

        @NotNull
        /* renamed from: decodeKey, reason: merged with bridge method [inline-methods] */
        public String m3decodeKey(@Nullable ByteBuffer byteBuffer) {
            String byteBuf = Unpooled.wrappedBuffer(byteBuffer == null ? ByteBuffer.wrap(empty) : byteBuffer).toString(charset);
            Intrinsics.checkNotNullExpressionValue(byteBuf, "wrappedBuffer(bytes ?: B…empty)).toString(charset)");
            return byteBuf;
        }

        @NotNull
        /* renamed from: decodeValue, reason: merged with bridge method [inline-methods] */
        public byte[] m4decodeValue(@Nullable ByteBuffer byteBuffer) {
            int remaining;
            if (byteBuffer != null && (remaining = byteBuffer.remaining()) != 0) {
                byte[] bArr = new byte[remaining];
                byteBuffer.get(bArr);
                return bArr;
            }
            return empty;
        }

        @NotNull
        public ByteBuffer encodeKey(@Nullable String str) {
            if (str == null) {
                ByteBuffer wrap = ByteBuffer.wrap(empty);
                Intrinsics.checkNotNullExpressionValue(wrap, "wrap(empty)");
                return wrap;
            }
            ByteBuffer allocate = ByteBuffer.allocate(ByteBufUtil.utf8MaxBytes(str));
            ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(allocate);
            wrappedBuffer.clear();
            ByteBufUtil.writeUtf8(wrappedBuffer, str);
            allocate.limit(wrappedBuffer.writerIndex());
            Intrinsics.checkNotNullExpressionValue(allocate, "buffer");
            return allocate;
        }

        @NotNull
        public ByteBuffer encodeValue(@Nullable byte[] bArr) {
            ByteBuffer wrap = ByteBuffer.wrap(bArr == null ? empty : bArr);
            Intrinsics.checkNotNullExpressionValue(wrap, "wrap(value ?: empty)");
            return wrap;
        }

        public void encodeKey(@Nullable String str, @Nullable ByteBuf byteBuf) {
            if (str == null || byteBuf == null) {
                return;
            }
            ByteBufUtil.writeUtf8(byteBuf, str);
        }

        public void encodeValue(@Nullable byte[] bArr, @Nullable ByteBuf byteBuf) {
            if (bArr == null || byteBuf == null) {
                return;
            }
            byteBuf.writeBytes(bArr);
        }

        public int estimateSize(@Nullable Object obj) {
            if (obj instanceof String) {
                return ByteBufUtil.utf8MaxBytes((CharSequence) obj);
            }
            if (obj instanceof byte[]) {
                return ((byte[]) obj).length;
            }
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: RedisTransport.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0014\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\b\u0082\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u001b\u0010\u0003\u001a\u00020\u00048FX\u0086\u0084\u0002¢\u0006\f\n\u0004\b\u0007\u0010\b\u001a\u0004\b\u0005\u0010\u0006¨\u0006\t"}, d2 = {"Lio/github/cfraser/connekt/redis/RedisTransport$Companion;", "", "()V", "logger", "Lmu/KLogger;", "getLogger", "()Lmu/KLogger;", "logger$delegate", "Lkotlin/Lazy;", "connekt-redis"})
    /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        @NotNull
        public final KLogger getLogger() {
            return (KLogger) RedisTransport.logger$delegate.getValue();
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: RedisTransport.kt */
    @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��0\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0012\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n��\b\u0002\u0018��2\u00020\u0001B\r\u0012\u0006\u0010\u0002\u001a\u00020\u0003¢\u0006\u0002\u0010\u0004J\b\u0010\u000f\u001a\u00020\u0010H\u0016R\u001d\u0010\u0005\u001a\u000e\u0012\u0004\u0012\u00020\u0007\u0012\u0004\u0012\u00020\b0\u0006¢\u0006\b\n��\u001a\u0004\b\t\u0010\nR\u001d\u0010\u000b\u001a\u000e\u0012\u0004\u0012\u00020\u0007\u0012\u0004\u0012\u00020\b0\f¢\u0006\b\n��\u001a\u0004\b\r\u0010\u000e¨\u0006\u0011"}, d2 = {"Lio/github/cfraser/connekt/redis/RedisTransport$PubSubConnection;", "Ljava/io/Closeable;", "redisClient", "Lio/lettuce/core/RedisClient;", "(Lio/lettuce/core/RedisClient;)V", "commands", "Lio/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands;", "", "", "getCommands", "()Lio/lettuce/core/pubsub/api/reactive/RedisPubSubReactiveCommands;", "connection", "Lio/lettuce/core/pubsub/StatefulRedisPubSubConnection;", "getConnection", "()Lio/lettuce/core/pubsub/StatefulRedisPubSubConnection;", "close", "", "connekt-redis"})
    /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$PubSubConnection.class */
    public static final class PubSubConnection implements Closeable {

        @NotNull
        private final StatefulRedisPubSubConnection<String, byte[]> connection;

        @NotNull
        private final RedisPubSubReactiveCommands<String, byte[]> commands;

        public PubSubConnection(@NotNull RedisClient redisClient) {
            Intrinsics.checkNotNullParameter(redisClient, "redisClient");
            StatefulRedisPubSubConnection<String, byte[]> connectPubSub = redisClient.connectPubSub(Codec.INSTANCE);
            Intrinsics.checkNotNullExpressionValue(connectPubSub, "redisClient.connectPubSub(Codec)");
            this.connection = connectPubSub;
            RedisPubSubReactiveCommands<String, byte[]> reactive = this.connection.reactive();
            Intrinsics.checkNotNullExpressionValue(reactive, "connection.reactive()");
            this.commands = reactive;
        }

        @NotNull
        public final StatefulRedisPubSubConnection<String, byte[]> getConnection() {
            return this.connection;
        }

        @NotNull
        public final RedisPubSubReactiveCommands<String, byte[]> getCommands() {
            return this.commands;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.connection.isOpen()) {
                this.connection.close();
            }
        }
    }

    private RedisTransport(RedisClient redisClient) {
        this.redisClient = redisClient;
        MutableSharedFlow MutableSharedFlow$default = SharedFlowKt.MutableSharedFlow$default(0, 0, (BufferOverflow) null, 7, (Object) null);
        this.receivedChannelMessages = FlowKt.asSharedFlow(MutableSharedFlow$default);
        PubSubConnection pubSubConnection = new PubSubConnection(this.redisClient);
        pubSubConnection.getCommands().getStatefulConnection().addListener(new ChannelMessageListener(MutableSharedFlow$default));
        Unit unit = Unit.INSTANCE;
        this.receiveConnection = pubSubConnection;
        this.sendConnection = new PubSubConnection(this.redisClient);
    }

    @NotNull
    public Flow<byte[]> receive(@NotNull CoroutineScope coroutineScope, @NotNull final String str) {
        Intrinsics.checkNotNullParameter(coroutineScope, "<this>");
        Intrinsics.checkNotNullParameter(str, "queue");
        BuildersKt.runBlocking(Dispatchers.getIO(), new RedisTransport$receive$1(this, str, null));
        Job job = coroutineScope.getCoroutineContext().get(Job.Key);
        if (job != null) {
            job.invokeOnCompletion(new Function1<Throwable, Unit>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$receive$2
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(1);
                }

                public final void invoke(@Nullable Throwable th) {
                    RedisTransport.PubSubConnection pubSubConnection;
                    pubSubConnection = RedisTransport.this.receiveConnection;
                    pubSubConnection.getCommands().unsubscribe(new String[]{str}).block(Duration.ofSeconds(10L));
                }

                public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                    invoke((Throwable) obj);
                    return Unit.INSTANCE;
                }
            });
        }
        final Flow flow = this.receivedChannelMessages;
        final Flow<ChannelMessage<String, byte[]>> flow2 = new Flow<ChannelMessage<String, byte[]>>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1

            /* compiled from: Collect.kt */
            @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0013\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0005*\u0001��\b\n\u0018��2\b\u0012\u0004\u0012\u00028��0\u0001J\u0019\u0010\u0002\u001a\u00020\u00032\u0006\u0010\u0004\u001a\u00028��H\u0096@ø\u0001��¢\u0006\u0002\u0010\u0005\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u0006¸\u0006\b"}, d2 = {"kotlinx/coroutines/flow/FlowKt__CollectKt$collect$3", "Lkotlinx/coroutines/flow/FlowCollector;", "emit", "", "value", "(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "kotlinx-coroutines-core", "kotlinx/coroutines/flow/FlowKt__EmittersKt$unsafeTransform$lambda-1$$inlined$collect$1", "kotlinx/coroutines/flow/FlowKt__TransformKt$filter$$inlined$unsafeTransform$1$2"})
            /* renamed from: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1$2, reason: invalid class name */
            /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$receive$$inlined$filter$1$2.class */
            public static final class AnonymousClass2 implements FlowCollector<ChannelMessage<String, byte[]>> {
                final /* synthetic */ FlowCollector $this_unsafeFlow$inlined;
                final /* synthetic */ String $queue$inlined;

                @Metadata(mv = {1, 5, 1}, k = 3, xi = 48)
                @DebugMetadata(f = "RedisTransport.kt", l = {137}, i = {}, s = {}, n = {}, m = "emit", c = "io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1$2")
                /* renamed from: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1$2$1, reason: invalid class name */
                /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$receive$$inlined$filter$1$2$1.class */
                public static final class AnonymousClass1 extends ContinuationImpl {
                    /* synthetic */ Object result;
                    int label;
                    Object L$0;
                    Object L$1;

                    public AnonymousClass1(Continuation continuation) {
                        super(continuation);
                    }

                    @Nullable
                    public final Object invokeSuspend(@NotNull Object obj) {
                        this.result = obj;
                        this.label |= Integer.MIN_VALUE;
                        return AnonymousClass2.this.emit(null, (Continuation) this);
                    }
                }

                public AnonymousClass2(FlowCollector flowCollector, String str) {
                    this.$this_unsafeFlow$inlined = flowCollector;
                    this.$queue$inlined = str;
                }

                /* JADX WARN: Removed duplicated region for block: B:17:0x00a9  */
                /* JADX WARN: Removed duplicated region for block: B:18:0x00bf  */
                /* JADX WARN: Removed duplicated region for block: B:8:0x0054  */
                @org.jetbrains.annotations.Nullable
                /*
                    Code decompiled incorrectly, please refer to instructions dump.
                    To view partially-correct add '--show-bad-code' argument
                */
                public java.lang.Object emit(java.lang.Object r7, @org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation r8) {
                    /*
                        r6 = this;
                        r0 = r8
                        boolean r0 = r0 instanceof io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1.AnonymousClass2.AnonymousClass1
                        if (r0 == 0) goto L24
                        r0 = r8
                        io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1$2$1 r0 = (io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1.AnonymousClass2.AnonymousClass1) r0
                        r9 = r0
                        r0 = r9
                        int r0 = r0.label
                        r1 = -2147483648(0xffffffff80000000, float:-0.0)
                        r0 = r0 & r1
                        if (r0 == 0) goto L24
                        r0 = r9
                        r1 = r0
                        int r1 = r1.label
                        r2 = -2147483648(0xffffffff80000000, float:-0.0)
                        int r1 = r1 - r2
                        r0.label = r1
                        goto L2e
                    L24:
                        io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1$2$1 r0 = new io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1$2$1
                        r1 = r0
                        r2 = r6
                        r3 = r8
                        r1.<init>(r3)
                        r9 = r0
                    L2e:
                        r0 = r9
                        java.lang.Object r0 = r0.result
                        r10 = r0
                        java.lang.Object r0 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
                        r11 = r0
                        r0 = r9
                        int r0 = r0.label
                        switch(r0) {
                            case 0: goto L54;
                            case 1: goto La9;
                            default: goto Lbf;
                        }
                    L54:
                        r0 = r10
                        kotlin.ResultKt.throwOnFailure(r0)
                        r0 = r7
                        r1 = r9
                        r12 = r1
                        r13 = r0
                        r0 = 0
                        r14 = r0
                        r0 = r6
                        kotlinx.coroutines.flow.FlowCollector r0 = r0.$this_unsafeFlow$inlined
                        r1 = r13
                        r2 = r9
                        r15 = r2
                        r16 = r1
                        r17 = r0
                        r0 = 0
                        r18 = r0
                        r0 = r16
                        r1 = r9
                        kotlin.coroutines.Continuation r1 = (kotlin.coroutines.Continuation) r1
                        r19 = r1
                        io.lettuce.core.pubsub.api.reactive.ChannelMessage r0 = (io.lettuce.core.pubsub.api.reactive.ChannelMessage) r0
                        r20 = r0
                        r0 = 0
                        r21 = r0
                        r0 = r20
                        java.lang.Object r0 = r0.getChannel()
                        r1 = r6
                        java.lang.String r1 = r1.$queue$inlined
                        boolean r0 = kotlin.jvm.internal.Intrinsics.areEqual(r0, r1)
                        if (r0 == 0) goto Lba
                        r0 = r17
                        r1 = r16
                        r2 = r9
                        r3 = r9
                        r4 = 1
                        r3.label = r4
                        java.lang.Object r0 = r0.emit(r1, r2)
                        r1 = r0
                        r2 = r11
                        if (r1 != r2) goto Lb6
                        r1 = r11
                        return r1
                    La9:
                        r0 = 0
                        r14 = r0
                        r0 = 0
                        r18 = r0
                        r0 = r10
                        kotlin.ResultKt.throwOnFailure(r0)
                        r0 = r10
                    Lb6:
                        goto Lbb
                    Lba:
                    Lbb:
                        kotlin.Unit r0 = kotlin.Unit.INSTANCE
                        return r0
                    Lbf:
                        java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
                        r1 = r0
                        java.lang.String r2 = "call to 'resume' before 'invoke' with coroutine"
                        r1.<init>(r2)
                        throw r0
                    */
                    throw new UnsupportedOperationException("Method not decompiled: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$filter$1.AnonymousClass2.emit(java.lang.Object, kotlin.coroutines.Continuation):java.lang.Object");
                }
            }

            @Nullable
            public Object collect(@NotNull FlowCollector flowCollector, @NotNull Continuation continuation) {
                Object collect = flow.collect(new AnonymousClass2(flowCollector, str), continuation);
                return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
            }
        };
        return new Flow<byte[]>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1

            /* compiled from: Collect.kt */
            @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0013\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0005*\u0001��\b\n\u0018��2\b\u0012\u0004\u0012\u00028��0\u0001J\u0019\u0010\u0002\u001a\u00020\u00032\u0006\u0010\u0004\u001a\u00028��H\u0096@ø\u0001��¢\u0006\u0002\u0010\u0005\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u0006¸\u0006\b"}, d2 = {"kotlinx/coroutines/flow/FlowKt__CollectKt$collect$3", "Lkotlinx/coroutines/flow/FlowCollector;", "emit", "", "value", "(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "kotlinx-coroutines-core", "kotlinx/coroutines/flow/FlowKt__EmittersKt$unsafeTransform$lambda-1$$inlined$collect$1", "kotlinx/coroutines/flow/FlowKt__TransformKt$mapNotNull$$inlined$unsafeTransform$1$2"})
            /* renamed from: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1$2, reason: invalid class name */
            /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$receive$$inlined$mapNotNull$1$2.class */
            public static final class AnonymousClass2 implements FlowCollector<ChannelMessage<String, byte[]>> {
                final /* synthetic */ FlowCollector $this_unsafeFlow$inlined;

                @Metadata(mv = {1, 5, 1}, k = 3, xi = 48)
                @DebugMetadata(f = "RedisTransport.kt", l = {138}, i = {}, s = {}, n = {}, m = "emit", c = "io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1$2")
                /* renamed from: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1$2$1, reason: invalid class name */
                /* loaded from: input_file:io/github/cfraser/connekt/redis/RedisTransport$receive$$inlined$mapNotNull$1$2$1.class */
                public static final class AnonymousClass1 extends ContinuationImpl {
                    /* synthetic */ Object result;
                    int label;
                    Object L$0;

                    public AnonymousClass1(Continuation continuation) {
                        super(continuation);
                    }

                    @Nullable
                    public final Object invokeSuspend(@NotNull Object obj) {
                        this.result = obj;
                        this.label |= Integer.MIN_VALUE;
                        return AnonymousClass2.this.emit(null, (Continuation) this);
                    }
                }

                public AnonymousClass2(FlowCollector flowCollector) {
                    this.$this_unsafeFlow$inlined = flowCollector;
                }

                /* JADX WARN: Removed duplicated region for block: B:17:0x00ad  */
                /* JADX WARN: Removed duplicated region for block: B:18:0x00bf  */
                /* JADX WARN: Removed duplicated region for block: B:8:0x0054  */
                @org.jetbrains.annotations.Nullable
                /*
                    Code decompiled incorrectly, please refer to instructions dump.
                    To view partially-correct add '--show-bad-code' argument
                */
                public java.lang.Object emit(java.lang.Object r7, @org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation r8) {
                    /*
                        r6 = this;
                        r0 = r8
                        boolean r0 = r0 instanceof io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1.AnonymousClass2.AnonymousClass1
                        if (r0 == 0) goto L24
                        r0 = r8
                        io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1$2$1 r0 = (io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1.AnonymousClass2.AnonymousClass1) r0
                        r9 = r0
                        r0 = r9
                        int r0 = r0.label
                        r1 = -2147483648(0xffffffff80000000, float:-0.0)
                        r0 = r0 & r1
                        if (r0 == 0) goto L24
                        r0 = r9
                        r1 = r0
                        int r1 = r1.label
                        r2 = -2147483648(0xffffffff80000000, float:-0.0)
                        int r1 = r1 - r2
                        r0.label = r1
                        goto L2e
                    L24:
                        io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1$2$1 r0 = new io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1$2$1
                        r1 = r0
                        r2 = r6
                        r3 = r8
                        r1.<init>(r3)
                        r9 = r0
                    L2e:
                        r0 = r9
                        java.lang.Object r0 = r0.result
                        r10 = r0
                        java.lang.Object r0 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
                        r11 = r0
                        r0 = r9
                        int r0 = r0.label
                        switch(r0) {
                            case 0: goto L54;
                            case 1: goto Lad;
                            default: goto Lbf;
                        }
                    L54:
                        r0 = r10
                        kotlin.ResultKt.throwOnFailure(r0)
                        r0 = r7
                        r1 = r9
                        r12 = r1
                        r13 = r0
                        r0 = 0
                        r14 = r0
                        r0 = r6
                        kotlinx.coroutines.flow.FlowCollector r0 = r0.$this_unsafeFlow$inlined
                        r1 = r13
                        r2 = r9
                        r15 = r2
                        r16 = r1
                        r17 = r0
                        r0 = 0
                        r18 = r0
                        r0 = r16
                        r1 = r9
                        kotlin.coroutines.Continuation r1 = (kotlin.coroutines.Continuation) r1
                        r19 = r1
                        io.lettuce.core.pubsub.api.reactive.ChannelMessage r0 = (io.lettuce.core.pubsub.api.reactive.ChannelMessage) r0
                        r20 = r0
                        r0 = 0
                        r21 = r0
                        r0 = r20
                        java.lang.Object r0 = r0.getMessage()
                        r22 = r0
                        r0 = r22
                        if (r0 != 0) goto L91
                        goto Lbb
                    L91:
                        r0 = r22
                        r23 = r0
                        r0 = r17
                        r1 = r23
                        r2 = r9
                        r3 = r9
                        r4 = 1
                        r3.label = r4
                        java.lang.Object r0 = r0.emit(r1, r2)
                        r1 = r0
                        r2 = r11
                        if (r1 != r2) goto Lba
                        r1 = r11
                        return r1
                    Lad:
                        r0 = 0
                        r14 = r0
                        r0 = 0
                        r18 = r0
                        r0 = r10
                        kotlin.ResultKt.throwOnFailure(r0)
                        r0 = r10
                    Lba:
                    Lbb:
                        kotlin.Unit r0 = kotlin.Unit.INSTANCE
                        return r0
                    Lbf:
                        java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
                        r1 = r0
                        java.lang.String r2 = "call to 'resume' before 'invoke' with coroutine"
                        r1.<init>(r2)
                        throw r0
                    */
                    throw new UnsupportedOperationException("Method not decompiled: io.github.cfraser.connekt.redis.RedisTransport$receive$$inlined$mapNotNull$1.AnonymousClass2.emit(java.lang.Object, kotlin.coroutines.Continuation):java.lang.Object");
                }
            }

            @Nullable
            public Object collect(@NotNull FlowCollector flowCollector, @NotNull Continuation continuation) {
                Object collect = flow2.collect(new AnonymousClass2(flowCollector), continuation);
                return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
            }
        };
    }

    @Nullable
    public Object send(@NotNull String str, @NotNull byte[] bArr, @NotNull Continuation<? super Unit> continuation) {
        Mono publish = this.sendConnection.getCommands().publish(str, bArr);
        Intrinsics.checkNotNullExpressionValue(publish, "sendConnection.commands.publish(queue, byteArray)");
        Object awaitSingleOrNull = MonoKt.awaitSingleOrNull(publish, continuation);
        return awaitSingleOrNull == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? awaitSingleOrNull : Unit.INSTANCE;
    }

    public void close() {
        Object obj;
        for (Closeable closeable : CollectionsKt.listOf(new Closeable[]{this.receiveConnection, this.sendConnection})) {
            try {
                Result.Companion companion = Result.Companion;
                closeable.close();
                obj = Result.constructor-impl(Unit.INSTANCE);
            } catch (Throwable th) {
                Result.Companion companion2 = Result.Companion;
                obj = Result.constructor-impl(ResultKt.createFailure(th));
            }
            Throwable th2 = Result.exceptionOrNull-impl(obj);
            if (th2 != null) {
                Companion.getLogger().warn(th2, new Function0<Object>() { // from class: io.github.cfraser.connekt.redis.RedisTransport$close$2$1
                    @Nullable
                    public final Object invoke() {
                        return "Failed to close connection";
                    }
                });
            }
        }
        this.redisClient.shutdown();
    }

    public /* synthetic */ RedisTransport(RedisClient redisClient, DefaultConstructorMarker defaultConstructorMarker) {
        this(redisClient);
    }
}
