package io.reactivex.netty.protocol.tcp.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.reactivex.netty.channel.AbstractConnectionToChannelBridge;
import io.reactivex.netty.channel.ChannelSubscriberEvent;
import io.reactivex.netty.channel.ConnectionImpl;
import io.reactivex.netty.channel.EmitConnectionEvent;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventAttributeKeys;
import io.reactivex.netty.protocol.tcp.server.events.TcpServerEventPublisher;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/protocol/tcp/server/TcpServerConnectionToChannelBridge.class */
public class TcpServerConnectionToChannelBridge<R, W> extends AbstractConnectionToChannelBridge<R, W> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TcpServerConnectionToChannelBridge.class);
    private static final String HANDLER_NAME = "server-conn-channel-bridge";
    private final ConnectionHandler<R, W> connectionHandler;
    private final TcpServerEventPublisher eventPublisher;
    private final boolean isSecure;
    private final ChannelSubscriberEvent<R, W> channelSubscriberEvent;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/reactivex/netty/protocol/tcp/server/TcpServerConnectionToChannelBridge$NewChannelSubscriber.class */
    public final class NewChannelSubscriber extends Subscriber<Channel> {
        private NewChannelSubscriber() {
        }

        @Override // rx.Observer
        public void onCompleted() {
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            TcpServerConnectionToChannelBridge.logger.error("Error while listening for new client connections.", th);
        }

        @Override // rx.Observer
        public void onNext(Channel channel) {
            Observable<Void> error;
            channel.attr(EventAttributeKeys.EVENT_PUBLISHER).set(TcpServerConnectionToChannelBridge.this.eventPublisher);
            channel.attr(EventAttributeKeys.CONNECTION_EVENT_LISTENER).set(TcpServerConnectionToChannelBridge.this.eventPublisher);
            final ConnectionImpl fromChannel = ConnectionImpl.fromChannel(channel);
            final long newStartTimeNanos = TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled() ? Clock.newStartTimeNanos() : -1L;
            if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                TcpServerConnectionToChannelBridge.this.eventPublisher.onNewClientConnected();
            }
            try {
                if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                    TcpServerConnectionToChannelBridge.this.eventPublisher.onConnectionHandlingStart(Clock.onEndNanos(newStartTimeNanos), TimeUnit.NANOSECONDS);
                }
                error = TcpServerConnectionToChannelBridge.this.connectionHandler.handle(fromChannel);
            } catch (Throwable th) {
                error = Observable.error(th);
            }
            if (null == error) {
                TcpServerConnectionToChannelBridge.logger.error("Connection handler returned null.");
                error = Observable.empty();
            }
            error.onErrorResumeNext(new Func1<Throwable, Observable<? extends Void>>() { // from class: io.reactivex.netty.protocol.tcp.server.TcpServerConnectionToChannelBridge.NewChannelSubscriber.2
                @Override // rx.functions.Func1
                public Observable<? extends Void> call(Throwable th2) {
                    if (th2 instanceof ClosedChannelException) {
                        return Observable.empty();
                    }
                    if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                        TcpServerConnectionToChannelBridge.this.eventPublisher.onConnectionHandlingFailed(Clock.onEndNanos(newStartTimeNanos), TimeUnit.NANOSECONDS, th2);
                    }
                    TcpServerConnectionToChannelBridge.logger.error("Error processing connection.", th2);
                    return fromChannel.close();
                }
            }).ambWith(fromChannel.closeListener()).concatWith(fromChannel.close()).doOnCompleted(new Action0() { // from class: io.reactivex.netty.protocol.tcp.server.TcpServerConnectionToChannelBridge.NewChannelSubscriber.1
                @Override // rx.functions.Action0
                public void call() {
                    if (TcpServerConnectionToChannelBridge.this.eventPublisher.publishingEnabled()) {
                        TcpServerConnectionToChannelBridge.this.eventPublisher.onConnectionHandlingSuccess(Clock.onEndNanos(newStartTimeNanos), TimeUnit.NANOSECONDS);
                    }
                }
            }).subscribe();
        }
    }

    private TcpServerConnectionToChannelBridge(ConnectionHandler<R, W> connectionHandler, TcpServerEventPublisher tcpServerEventPublisher, boolean z) {
        super(HANDLER_NAME, tcpServerEventPublisher, tcpServerEventPublisher);
        this.connectionHandler = connectionHandler;
        this.eventPublisher = tcpServerEventPublisher;
        this.isSecure = z;
        this.channelSubscriberEvent = new ChannelSubscriberEvent<>(new NewChannelSubscriber());
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        userEventTriggered(channelHandlerContext, this.channelSubscriberEvent);
        if (!this.isSecure) {
            userEventTriggered(channelHandlerContext, EmitConnectionEvent.INSTANCE);
        }
        super.channelRegistered(channelHandlerContext);
    }

    public static <R, W> TcpServerConnectionToChannelBridge<R, W> addToPipeline(ChannelPipeline channelPipeline, ConnectionHandler<R, W> connectionHandler, TcpServerEventPublisher tcpServerEventPublisher, boolean z) {
        TcpServerConnectionToChannelBridge<R, W> tcpServerConnectionToChannelBridge = new TcpServerConnectionToChannelBridge<>(connectionHandler, tcpServerEventPublisher, z);
        channelPipeline.addLast(HANDLER_NAME, tcpServerConnectionToChannelBridge);
        return tcpServerConnectionToChannelBridge;
    }
}
