package moka.sdk.internal.rsocket;

import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import java.io.Closeable;
import kotlin.Lazy;
import kotlin.LazyKt;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.ContinuationImpl;
import kotlin.coroutines.jvm.internal.DebugMetadata;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowCollector;
import kotlinx.coroutines.flow.FlowKt;
import kotlinx.coroutines.flow.MutableSharedFlow;
import kotlinx.coroutines.flow.SharedFlowKt;
import kotlinx.coroutines.reactor.MonoKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/* compiled from: RSocketSubscribe.kt */
@Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��<\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\b��\u0018��2\u00020\u0001B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\b\u0010\u0010\u001a\u00020\u0011H\u0016J\u0018\u0010\u0012\u001a\b\u0012\u0004\u0012\u00020\u000b0\u00132\n\u0010\u0014\u001a\u00060\u0005j\u0002`\u0015R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R!\u0010\t\u001a\b\u0012\u0004\u0012\u00020\u000b0\n8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u000e\u0010\u000f\u001a\u0004\b\f\u0010\r¨\u0006\u0016"}, d2 = {"Lmoka/sdk/internal/rsocket/SharedRSocketServer;", "Ljava/io/Closeable;", "port", "", "token", "", "(ILjava/lang/String;)V", "disposable", "Lreactor/core/Disposable;", "mutableSharedFlow", "Lkotlinx/coroutines/flow/MutableSharedFlow;", "Lio/rsocket/Payload;", "getMutableSharedFlow", "()Lkotlinx/coroutines/flow/MutableSharedFlow;", "mutableSharedFlow$delegate", "Lkotlin/Lazy;", "close", "", "subscribeTo", "Lkotlinx/coroutines/flow/Flow;", "topic", "Lmoka/sdk/api/Topic;", "moka-sdk-core"})
/* loaded from: input_file:moka/sdk/internal/rsocket/SharedRSocketServer.class */
public final class SharedRSocketServer implements Closeable {

    @NotNull
    private final Disposable disposable;

    @NotNull
    private final Lazy mutableSharedFlow$delegate;

    public SharedRSocketServer(int i, @NotNull String str) {
        Intrinsics.checkNotNullParameter(str, "token");
        this.mutableSharedFlow$delegate = LazyKt.lazy(new Function0<MutableSharedFlow<Payload>>() { // from class: moka.sdk.internal.rsocket.SharedRSocketServer$mutableSharedFlow$2
            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final MutableSharedFlow<Payload> m54invoke() {
                return SharedFlowKt.MutableSharedFlow$default(0, 64, BufferOverflow.DROP_OLDEST, 1, (Object) null);
            }
        });
        SocketAcceptor socketAcceptor = (v2, v3) -> {
            return m50_init_$lambda1(r0, r1, v2, v3);
        };
        ServerTransport create = TcpServerTransport.create(i);
        Intrinsics.checkNotNullExpressionValue(create, "create(port)");
        Disposable bindNow = RSocketServer.create(socketAcceptor).bindNow(create);
        Intrinsics.checkNotNullExpressionValue(bindNow, "create(acceptor).bindNow(transport)");
        this.disposable = bindNow;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final MutableSharedFlow<Payload> getMutableSharedFlow() {
        return (MutableSharedFlow) this.mutableSharedFlow$delegate.getValue();
    }

    @NotNull
    public final Flow<Payload> subscribeTo(@NotNull final String str) {
        Intrinsics.checkNotNullParameter(str, "topic");
        final Flow mutableSharedFlow = getMutableSharedFlow();
        final Flow<Payload> flow = new Flow<Payload>() { // from class: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$map$1

            /* compiled from: Collect.kt */
            @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0013\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0005*\u0001��\b\n\u0018��2\b\u0012\u0004\u0012\u00028��0\u0001J\u0019\u0010\u0002\u001a\u00020\u00032\u0006\u0010\u0004\u001a\u00028��H\u0096@ø\u0001��¢\u0006\u0002\u0010\u0005\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u0006¸\u0006\b"}, d2 = {"kotlinx/coroutines/flow/FlowKt__CollectKt$collect$3", "Lkotlinx/coroutines/flow/FlowCollector;", "emit", "", "value", "(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "kotlinx-coroutines-core", "kotlinx/coroutines/flow/FlowKt__EmittersKt$unsafeTransform$lambda-1$$inlined$collect$1", "kotlinx/coroutines/flow/FlowKt__TransformKt$map$$inlined$unsafeTransform$1$2"})
            /* renamed from: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$map$1$2, reason: invalid class name */
            /* loaded from: input_file:moka/sdk/internal/rsocket/SharedRSocketServer$subscribeTo$$inlined$map$1$2.class */
            public static final class AnonymousClass2 implements FlowCollector<Payload> {
                final /* synthetic */ FlowCollector $this_unsafeFlow$inlined;

                @Metadata(mv = {1, 5, 1}, k = 3, xi = 48)
                @DebugMetadata(f = "RSocketSubscribe.kt", l = {137, 137}, i = {}, s = {}, n = {}, m = "emit", c = "moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$map$1$2")
                /* renamed from: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$map$1$2$1, reason: invalid class name */
                /* loaded from: input_file:moka/sdk/internal/rsocket/SharedRSocketServer$subscribeTo$$inlined$map$1$2$1.class */
                public static final class AnonymousClass1 extends ContinuationImpl {
                    /* synthetic */ Object result;
                    int label;
                    Object L$0;

                    public AnonymousClass1(Continuation continuation) {
                        super(continuation);
                    }

                    @Nullable
                    public final Object invokeSuspend(@NotNull Object obj) {
                        this.result = obj;
                        this.label |= Integer.MIN_VALUE;
                        return AnonymousClass2.this.emit(null, (Continuation) this);
                    }
                }

                public AnonymousClass2(FlowCollector flowCollector) {
                    this.$this_unsafeFlow$inlined = flowCollector;
                }

                /* JADX WARN: Failed to find 'out' block for switch in B:7:0x003d. Please report as an issue. */
                /* JADX WARN: Removed duplicated region for block: B:15:0x00ef  */
                /* JADX WARN: Removed duplicated region for block: B:19:0x00b9  */
                /* JADX WARN: Removed duplicated region for block: B:20:0x00f2  */
                /* JADX WARN: Removed duplicated region for block: B:21:0x0104  */
                /* JADX WARN: Removed duplicated region for block: B:8:0x0058  */
                @org.jetbrains.annotations.Nullable
                /*
                    Code decompiled incorrectly, please refer to instructions dump.
                    To view partially-correct add '--show-bad-code' argument
                */
                public java.lang.Object emit(java.lang.Object r7, @org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation r8) {
                    /*
                        Method dump skipped, instructions count: 270
                        To view this dump add '--comments-level debug' option
                    */
                    throw new UnsupportedOperationException("Method not decompiled: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$map$1.AnonymousClass2.emit(java.lang.Object, kotlin.coroutines.Continuation):java.lang.Object");
                }
            }

            @Nullable
            public Object collect(@NotNull FlowCollector flowCollector, @NotNull Continuation continuation) {
                Object collect = mutableSharedFlow.collect(new AnonymousClass2(flowCollector), continuation);
                return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
            }
        };
        return FlowKt.cancellable(FlowKt.buffer$default(new Flow<Payload>() { // from class: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$filter$1

            /* compiled from: Collect.kt */
            @Metadata(mv = {1, 5, 1}, k = 1, xi = 48, d1 = {"��\u0013\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0005*\u0001��\b\n\u0018��2\b\u0012\u0004\u0012\u00028��0\u0001J\u0019\u0010\u0002\u001a\u00020\u00032\u0006\u0010\u0004\u001a\u00028��H\u0096@ø\u0001��¢\u0006\u0002\u0010\u0005\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u0006¸\u0006\b"}, d2 = {"kotlinx/coroutines/flow/FlowKt__CollectKt$collect$3", "Lkotlinx/coroutines/flow/FlowCollector;", "emit", "", "value", "(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "kotlinx-coroutines-core", "kotlinx/coroutines/flow/FlowKt__EmittersKt$unsafeTransform$lambda-1$$inlined$collect$1", "kotlinx/coroutines/flow/FlowKt__TransformKt$filter$$inlined$unsafeTransform$1$2"})
            /* renamed from: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$filter$1$2, reason: invalid class name */
            /* loaded from: input_file:moka/sdk/internal/rsocket/SharedRSocketServer$subscribeTo$$inlined$filter$1$2.class */
            public static final class AnonymousClass2 implements FlowCollector<Payload> {
                final /* synthetic */ FlowCollector $this_unsafeFlow$inlined;
                final /* synthetic */ String $topic$inlined;

                @Metadata(mv = {1, 5, 1}, k = 3, xi = 48)
                @DebugMetadata(f = "RSocketSubscribe.kt", l = {138, 137}, i = {0, 0, 0}, s = {"L$0", "L$1", "L$2"}, n = {"this", "value", "$this$filter_u24lambda_u2d0"}, m = "emit", c = "moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$filter$1$2")
                /* renamed from: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$filter$1$2$1, reason: invalid class name */
                /* loaded from: input_file:moka/sdk/internal/rsocket/SharedRSocketServer$subscribeTo$$inlined$filter$1$2$1.class */
                public static final class AnonymousClass1 extends ContinuationImpl {
                    /* synthetic */ Object result;
                    int label;
                    Object L$0;
                    Object L$1;
                    Object L$2;

                    public AnonymousClass1(Continuation continuation) {
                        super(continuation);
                    }

                    @Nullable
                    public final Object invokeSuspend(@NotNull Object obj) {
                        this.result = obj;
                        this.label |= Integer.MIN_VALUE;
                        return AnonymousClass2.this.emit(null, (Continuation) this);
                    }
                }

                public AnonymousClass2(FlowCollector flowCollector, String str) {
                    this.$this_unsafeFlow$inlined = flowCollector;
                    this.$topic$inlined = str;
                }

                /* JADX WARN: Can't wrap try/catch for region: R(6:1|(2:3|(4:5|6|7|8))|37|6|7|8) */
                /* JADX WARN: Code restructure failed: missing block: B:35:0x00f5, code lost:
                
                    r23 = move-exception;
                 */
                /* JADX WARN: Code restructure failed: missing block: B:36:0x00f7, code lost:
                
                    r0 = kotlin.coroutines.jvm.internal.Boxing.boxBoolean(false);
                    r0 = r0.booleanValue();
                    r0 = moka.sdk.internal.rsocket.RSocketSubscribeKt.logger;
                    r0.error(r23, moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$2$2$1.INSTANCE);
                    r22 = r0.booleanValue();
                 */
                /* JADX WARN: Failed to find 'out' block for switch in B:8:0x003d. Please report as an issue. */
                /* JADX WARN: Removed duplicated region for block: B:19:0x012c  */
                /* JADX WARN: Removed duplicated region for block: B:22:0x013b  */
                /* JADX WARN: Removed duplicated region for block: B:28:0x0130  */
                /* JADX WARN: Removed duplicated region for block: B:30:0x00c1  */
                /* JADX WARN: Removed duplicated region for block: B:32:0x0162  */
                /* JADX WARN: Removed duplicated region for block: B:33:0x0178  */
                /* JADX WARN: Removed duplicated region for block: B:9:0x0058  */
                @org.jetbrains.annotations.Nullable
                /*
                    Code decompiled incorrectly, please refer to instructions dump.
                    To view partially-correct add '--show-bad-code' argument
                */
                public java.lang.Object emit(java.lang.Object r7, @org.jetbrains.annotations.NotNull kotlin.coroutines.Continuation r8) {
                    /*
                        Method dump skipped, instructions count: 386
                        To view this dump add '--comments-level debug' option
                    */
                    throw new UnsupportedOperationException("Method not decompiled: moka.sdk.internal.rsocket.SharedRSocketServer$subscribeTo$$inlined$filter$1.AnonymousClass2.emit(java.lang.Object, kotlin.coroutines.Continuation):java.lang.Object");
                }
            }

            @Nullable
            public Object collect(@NotNull FlowCollector flowCollector, @NotNull Continuation continuation) {
                Object collect = flow.collect(new AnonymousClass2(flowCollector, str), continuation);
                return collect == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? collect : Unit.INSTANCE;
            }
        }, 0, (BufferOverflow) null, 3, (Object) null));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.disposable.isDisposed()) {
            return;
        }
        this.disposable.dispose();
    }

    /* renamed from: _init_$lambda-1, reason: not valid java name */
    private static final Mono m50_init_$lambda1(String str, final SharedRSocketServer sharedRSocketServer, ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        Intrinsics.checkNotNullParameter(str, "$token");
        Intrinsics.checkNotNullParameter(sharedRSocketServer, "this$0");
        Intrinsics.checkNotNullParameter(connectionSetupPayload, "setupPayload");
        Intrinsics.checkNotNullParameter(rSocket, "$noName_1");
        RSocketPayloads.INSTANCE.check(connectionSetupPayload, str);
        return Mono.just(new RSocket() { // from class: moka.sdk.internal.rsocket.SharedRSocketServer$acceptor$1$1
            @NotNull
            public Mono<Void> fireAndForget(@NotNull Payload payload) {
                Intrinsics.checkNotNullParameter(payload, "payload");
                Mono<Void> flatMap = MonoKt.mono$default((CoroutineContext) null, new SharedRSocketServer$acceptor$1$1$fireAndForget$1(SharedRSocketServer.this, payload, null), 1, (Object) null).flatMap(SharedRSocketServer$acceptor$1$1::m52fireAndForget$lambda0);
                Intrinsics.checkNotNullExpressionValue(flatMap, "internal class SharedRSocketServer(port: Int, token: String) : Closeable {\n\n  /** The [Disposable] that is running the [RSocketServer]. */\n  private val disposable: Disposable\n\n  /** The internal [MutableSharedFlow] for distributing [Payload] data to subscriber(s). */\n  private val mutableSharedFlow by lazy {\n    MutableSharedFlow<Payload>(\n        extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)\n  }\n\n  init {\n    // Socket connection acceptor that defines server semantics\n    val acceptor = SocketAcceptor { setupPayload, _ ->\n      setupPayload.check(token)\n\n      // RSocket handling the incoming requests\n      object : RSocket {\n            override fun fireAndForget(payload: Payload): Mono<Void> =\n                mono { mutableSharedFlow.emit(payload) }.flatMap { Mono.empty() }\n          }\n          .let { rSocket -> Mono.just(rSocket) }\n    }\n\n    // TCP server socket used by RSocketServer\n    val transport = TcpServerTransport.create(port)\n\n    disposable = RSocketServer.create(acceptor).bindNow(transport)\n  }\n\n  /**\n   * Receive [Payload] data for messages published to a [topic].\n   *\n   * @param topic the [Topic] to subscribe to\n   * @return the [Flow] containing received payloads\n   */\n  fun subscribeTo(topic: Topic): Flow<Payload> {\n    return mutableSharedFlow\n        // Defensively copy the payload since it could be emitted to multiple subscribers\n        .map { payload -> withContext(Dispatchers.Default) { payload.copy() } }\n        // Only emit payloads that were routed to the topic\n        .filter { payload ->\n          try {\n            withContext(Dispatchers.Default) { payload.decodeRoutingMetadata() } == topic\n          } catch (ex: Exception) {\n            false.also { logger.error(ex) { \"failed decode payload routing metadata\" } }\n          }\n        }\n        .buffer()\n        .cancellable()\n  }\n\n  /** Release all server resources through [Disposable.dispose]. */\n  override fun close() {\n    if (!disposable.isDisposed) disposable.dispose()\n  }\n}");
                return flatMap;
            }

            /* renamed from: fireAndForget$lambda-0, reason: not valid java name */
            private static final Mono m52fireAndForget$lambda0(Unit unit) {
                return Mono.empty();
            }
        });
    }
}
