package io.reactivex.mantis.remote.observable;

import com.mantisrx.common.utils.NettyUtils;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.codec.Decoder;
import io.mantisrx.common.codec.Encoder;
import io.mantisrx.server.core.ServiceRegistry;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.compression.JdkZlibDecoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.RemoteRxEvent;
import io.reactivex.mantis.remote.observable.RemoteRxServer;
import io.reactivex.mantis.remote.observable.ServeObservable;
import io.reactivex.mantis.remote.observable.ingress.IngressPolicies;
import io.reactivex.mantis.remote.observable.ingress.IngressPolicy;
import io.reactivx.mantis.operators.DropOperator;
import io.reactivx.mantis.operators.GroupedObservableUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurator;
import mantis.io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.GroupedObservable;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/RemoteObservable.class */
public class RemoteObservable {
    private static final Logger logger = LoggerFactory.getLogger(RemoteObservable.class);
    private static boolean enableHeartBeating = true;
    private static boolean enableNettyLogging = false;
    private static boolean enableCompression = true;
    private static int maxFrameLength = 5242880;
    private static int bufferSize = 0;
    private static final String DEFAULT_BUFFER_SIZE_STR = "0";

    private RemoteObservable() {
    }

    private static void loadFastProperties() {
        if (ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.enableHeartBeating", "true").equals("false")) {
            enableHeartBeating = false;
        }
        if (ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.enableLogging", "false").equals("true")) {
            enableNettyLogging = true;
        }
        if (ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.enableCompression", "true").equals("false")) {
            enableCompression = false;
        }
        String stringValue = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.netty.maxFrameLength", "5242880");
        if (stringValue != null && stringValue.length() > 0) {
            maxFrameLength = Integer.parseInt(stringValue);
        }
        bufferSize = Integer.parseInt((String) Optional.ofNullable(ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("workerClient.buffer.size", DEFAULT_BUFFER_SIZE_STR)).orElse(DEFAULT_BUFFER_SIZE_STR));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryLogic(final ConnectToConfig connectToConfig) {
        return new Func1<Observable<? extends Throwable>, Observable<?>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.1
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.zipWith(Observable.range(1, ConnectToConfig.this.getSubscribeAttempts()), new Func2<Throwable, Integer, ThrowableWithCount>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.1.2
                    public ThrowableWithCount call(Throwable th, Integer num) {
                        return new ThrowableWithCount(th, num);
                    }
                }).flatMap(new Func1<ThrowableWithCount, Observable<?>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.1.1
                    public Observable<?> call(ThrowableWithCount throwableWithCount) {
                        RemoteObservable.logger.debug("Failed to subscribe to remote observable: " + ConnectToConfig.this.getName(), throwableWithCount.getThrowable());
                        RemoteObservable.logger.info("Failed to subscribe to remote observable: " + ConnectToConfig.this.getName() + " at host: " + ConnectToConfig.this.getHost() + " on port: " + ConnectToConfig.this.getPort() + " subscribe attempt: " + throwableWithCount.getCount() + " of: " + ConnectToConfig.this.getSubscribeAttempts());
                        return (throwableWithCount.getCount().intValue() != ConnectToConfig.this.getSubscribeAttempts() || throwableWithCount.getThrowable() == null) ? Observable.timer(throwableWithCount.getCount().intValue(), TimeUnit.SECONDS) : Observable.error(throwableWithCount.getThrowable());
                    }
                });
            }
        };
    }

    public static <T> RemoteRxConnection<T> connect(final ConnectToObservable<T> connectToObservable) {
        final RxMetrics rxMetrics = new RxMetrics();
        return new RemoteRxConnection<>(Observable.create(new Observable.OnSubscribe<T>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.2
            public void call(Subscriber<? super T> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(ConnectToObservable.this.getName());
                subscriber.add(remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToServer(ConnectToObservable.this, remoteUnsubscribe, rxMetrics, ConnectToObservable.this.getConnectionDisconnectCallback(), (Observable<Integer>) ConnectToObservable.this.getCloseTrigger()).subscribe(subscriber);
            }
        }), rxMetrics, connectToObservable.getCloseTrigger());
    }

    public static <K, V> RemoteRxConnection<GroupedObservable<K, V>> connect(final ConnectToGroupedObservable<K, V> connectToGroupedObservable) {
        final RxMetrics rxMetrics = new RxMetrics();
        return new RemoteRxConnection<>(Observable.create(new Observable.OnSubscribe<GroupedObservable<K, V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.3
            public void call(Subscriber<? super GroupedObservable<K, V>> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(ConnectToGroupedObservable.this.getName());
                subscriber.add(remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToServer(ConnectToGroupedObservable.this, remoteUnsubscribe, rxMetrics, ConnectToGroupedObservable.this.getConnectionDisconnectCallback(), (Observable<Integer>) ConnectToGroupedObservable.this.getCloseTrigger()).retryWhen(RemoteObservable.retryLogic(ConnectToGroupedObservable.this)).subscribe(subscriber);
            }
        }), rxMetrics, connectToGroupedObservable.getCloseTrigger());
    }

    public static <K, V> RemoteRxConnection<MantisGroup<K, V>> connectToMGO(final ConnectToGroupedObservable<K, V> connectToGroupedObservable, final SpscArrayQueue<MantisGroup<?, ?>> spscArrayQueue) {
        final RxMetrics rxMetrics = new RxMetrics();
        return new RemoteRxConnection<>(Observable.create(new Observable.OnSubscribe<MantisGroup<K, V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.4
            public void call(Subscriber<? super MantisGroup<K, V>> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(ConnectToGroupedObservable.this.getName());
                subscriber.add(remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToGOServer(ConnectToGroupedObservable.this, remoteUnsubscribe, rxMetrics, ConnectToGroupedObservable.this.getConnectionDisconnectCallback(), ConnectToGroupedObservable.this.getCloseTrigger(), spscArrayQueue).retryWhen(RemoteObservable.retryLogic(ConnectToGroupedObservable.this)).subscribe(subscriber);
            }
        }), rxMetrics, connectToGroupedObservable.getCloseTrigger());
    }

    public static <T> Observable<T> connect(String str, int i, Decoder<T> decoder) {
        return connect(new ConnectToObservable.Builder().host(str).port(i).decoder(decoder).build()).getObservable();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> Observable<GroupedObservable<K, V>> createTcpConnectionToServer(final ConnectToGroupedObservable<K, V> connectToGroupedObservable, final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics rxMetrics, final Action0 action0, Observable<Integer> observable) {
        final Decoder<K> keyDecoder = connectToGroupedObservable.getKeyDecoder();
        final Decoder<V> valueDecoder = connectToGroupedObservable.getValueDecoder();
        loadFastProperties();
        return RxNetty.createTcpClient(connectToGroupedObservable.getHost(), connectToGroupedObservable.getPort(), new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.13
            public void configureNewPipeline(ChannelPipeline channelPipeline) {
                if (RemoteObservable.enableNettyLogging) {
                    channelPipeline.addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.ERROR)});
                }
                if (RemoteObservable.enableHeartBeating) {
                    channelPipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
                    channelPipeline.addLast("heartbeat", new HeartbeatHandler());
                }
                if (RemoteObservable.enableCompression) {
                    channelPipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
                    channelPipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
                }
                channelPipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                channelPipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(RemoteObservable.maxFrameLength, 0, 4, 0, 4));
            }
        }, new BatchedRxEventPipelineConfigurator()})).connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.12
            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection) {
                observableConnection.writeAndFlush(RemoteRxEvent.subscribed(ConnectToGroupedObservable.this.getName(), ConnectToGroupedObservable.this.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(observableConnection);
                return observableConnection.getInput().lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")).rebatchRequests(RemoteObservable.bufferSize <= 0 ? 1 : RemoteObservable.bufferSize);
            }
        }).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.11
            public void call() {
                RemoteObservable.logger.warn("Detected connection completed when trying to connect to host: " + ConnectToGroupedObservable.this.getHost() + " port: " + ConnectToGroupedObservable.this.getPort());
                action0.call();
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.10
            public Observable<RemoteRxEvent> call(Throwable th) {
                RemoteObservable.logger.warn("Detected connection error when trying to connect to host: " + ConnectToGroupedObservable.this.getHost() + " port: " + ConnectToGroupedObservable.this.getPort(), th);
                action0.call();
                return Observable.empty();
            }
        }).takeUntil(observable).map(new Func1<RemoteRxEvent, Notification<byte[]>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.9
            public Notification<byte[]> call(RemoteRxEvent remoteRxEvent) {
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.next) {
                    RxMetrics.this.incrementNextCount();
                    return Notification.createOnNext(remoteRxEvent.getData());
                }
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.error) {
                    RxMetrics.this.incrementErrorCount();
                    return Notification.createOnError(RemoteObservable.fromBytesToThrowable(remoteRxEvent.getData()));
                }
                if (remoteRxEvent.getType() != RemoteRxEvent.Type.completed) {
                    throw new RuntimeException("RemoteRxEvent of type:" + remoteRxEvent.getType() + ", not supported.");
                }
                RxMetrics.this.incrementCompletedCount();
                return Notification.createOnCompleted();
            }
        }).dematerialize().groupBy(new Func1<byte[], K>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.7
            public K call(byte[] bArr) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                wrap.get();
                byte[] bArr2 = new byte[wrap.getInt()];
                wrap.get(bArr2);
                return (K) keyDecoder.decode(bArr2);
            }
        }, new Func1<byte[], Notification<V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.8
            public Notification<V> call(byte[] bArr) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                byte b = wrap.get();
                if (b == 1) {
                    int i = wrap.getInt();
                    int limit = ((wrap.limit() - 4) - 1) - i;
                    byte[] bArr2 = new byte[limit];
                    wrap.position(5 + i);
                    wrap.get(bArr2, 0, limit);
                    return Notification.createOnNext(valueDecoder.decode(bArr2));
                }
                if (b == 2) {
                    return Notification.createOnCompleted();
                }
                if (b != 3) {
                    throw new RuntimeException("Notification encoding not support: " + ((int) b));
                }
                int i2 = wrap.getInt();
                int limit2 = ((wrap.limit() - 4) - 1) - i2;
                byte[] bArr3 = new byte[limit2];
                wrap.position(5 + i2);
                wrap.get(bArr3, 0, limit2);
                return Notification.createOnError(RemoteObservable.fromBytesToThrowable(bArr3));
            }
        }).map(new Func1<GroupedObservable<K, Notification<V>>, GroupedObservable<K, V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.6
            public GroupedObservable<K, V> call(GroupedObservable<K, Notification<V>> groupedObservable) {
                return GroupedObservableUtils.createGroupedObservable(groupedObservable.getKey(), groupedObservable.dematerialize());
            }
        }).doOnEach(new Observer<GroupedObservable<K, V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.5
            public void onCompleted() {
                RemoteObservable.logger.info("RemoteRxEvent, name: {} onCompleted()", ConnectToGroupedObservable.this.getName());
            }

            public void onError(Throwable th) {
                RemoteObservable.logger.error("RemoteRxEvent, name: {} onError()", ConnectToGroupedObservable.this.getName(), th);
            }

            public void onNext(GroupedObservable<K, V> groupedObservable) {
                if (RemoteObservable.logger.isDebugEnabled()) {
                    RemoteObservable.logger.debug("RemoteRxEvent, name: {} new key: {}", ConnectToGroupedObservable.this.getName(), groupedObservable.getKey());
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> Observable<MantisGroup<K, V>> createTcpConnectionToGOServer(final ConnectToGroupedObservable<K, V> connectToGroupedObservable, final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics rxMetrics, final Action0 action0, Observable<Integer> observable, SpscArrayQueue<MantisGroup<?, ?>> spscArrayQueue) {
        final Decoder<K> keyDecoder = connectToGroupedObservable.getKeyDecoder();
        final Decoder<V> valueDecoder = connectToGroupedObservable.getValueDecoder();
        loadFastProperties();
        return RxNetty.createTcpClient(connectToGroupedObservable.getHost(), connectToGroupedObservable.getPort(), new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.21
            public void configureNewPipeline(ChannelPipeline channelPipeline) {
                if (RemoteObservable.enableNettyLogging) {
                    channelPipeline.addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.ERROR)});
                }
                if (RemoteObservable.enableHeartBeating) {
                    channelPipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
                    channelPipeline.addLast("heartbeat", new HeartbeatHandler());
                }
                if (RemoteObservable.enableCompression) {
                    channelPipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
                    channelPipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
                }
                channelPipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                channelPipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(RemoteObservable.maxFrameLength, 0, 4, 0, 4));
            }
        }, new BatchedRxEventPipelineConfigurator()})).connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.20
            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection) {
                observableConnection.writeAndFlush(RemoteRxEvent.subscribed(ConnectToGroupedObservable.this.getName(), ConnectToGroupedObservable.this.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(observableConnection);
                return observableConnection.getInput().lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")).rebatchRequests(RemoteObservable.bufferSize <= 0 ? 1 : RemoteObservable.bufferSize);
            }
        }).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.19
            public void call() {
                RemoteObservable.logger.warn("Detected connection completed when trying to connect to host: " + ConnectToGroupedObservable.this.getHost() + " port: " + ConnectToGroupedObservable.this.getPort());
                action0.call();
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.18
            public Observable<RemoteRxEvent> call(Throwable th) {
                RemoteObservable.logger.warn("Detected connection error when trying to connect to host: " + ConnectToGroupedObservable.this.getHost() + " port: " + ConnectToGroupedObservable.this.getPort(), th);
                action0.call();
                return Observable.empty();
            }
        }).takeUntil(observable).filter(new Func1<RemoteRxEvent, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.17
            public Boolean call(RemoteRxEvent remoteRxEvent) {
                return Boolean.valueOf(remoteRxEvent.getType() == RemoteRxEvent.Type.next);
            }
        }).map(new Func1<RemoteRxEvent, Notification<byte[]>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.16
            public Notification<byte[]> call(RemoteRxEvent remoteRxEvent) {
                RxMetrics.this.incrementNextCount();
                return Notification.createOnNext(remoteRxEvent.getData());
            }
        }).dematerialize().map(new Func1<byte[], MantisGroup<K, V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.15
            public MantisGroup<K, V> call(byte[] bArr) {
                ByteBuffer wrap = ByteBuffer.wrap(bArr);
                wrap.get();
                int i = wrap.getInt();
                byte[] bArr2 = new byte[i];
                wrap.get(bArr2);
                Object decode = keyDecoder.decode(bArr2);
                ByteBuffer wrap2 = ByteBuffer.wrap(bArr);
                byte b = wrap2.get();
                if (b != 1) {
                    throw new RuntimeException("Notification encoding not support: " + ((int) b));
                }
                int limit = ((wrap2.limit() - 4) - 1) - i;
                byte[] bArr3 = new byte[limit];
                wrap2.position(5 + i);
                wrap2.get(bArr3, 0, limit);
                return new MantisGroup<>(decode, valueDecoder.decode(bArr3));
            }
        }).doOnEach(new Observer<MantisGroup<K, V>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.14
            public void onCompleted() {
                RemoteObservable.logger.info("RemoteRxEvent, name: " + ConnectToGroupedObservable.this.getName() + " onCompleted()");
            }

            public void onError(Throwable th) {
                RemoteObservable.logger.error("RemoteRxEvent, name: " + ConnectToGroupedObservable.this.getName() + " onError()", th);
            }

            public void onNext(MantisGroup<K, V> mantisGroup) {
                if (RemoteObservable.logger.isDebugEnabled()) {
                    RemoteObservable.logger.debug("RemoteRxEvent, name: " + ConnectToGroupedObservable.this.getName() + " new key: " + mantisGroup.getKeyValue());
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Observable<T> createTcpConnectionToServer(final ConnectToObservable<T> connectToObservable, final RemoteUnsubscribe remoteUnsubscribe, final RxMetrics rxMetrics, final Action0 action0, Observable<Integer> observable) {
        final Decoder<T> decoder = connectToObservable.getDecoder();
        loadFastProperties();
        return RxNetty.createTcpClient(connectToObservable.getHost(), connectToObservable.getPort(), new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, List<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.27
            public void configureNewPipeline(ChannelPipeline channelPipeline) {
                if (RemoteObservable.enableNettyLogging) {
                    channelPipeline.addFirst(new ChannelHandler[]{new LoggingHandler(LogLevel.ERROR)});
                }
                if (RemoteObservable.enableHeartBeating) {
                    channelPipeline.addLast("idleStateHandler", new IdleStateHandler(10, 2, 0));
                    channelPipeline.addLast("heartbeat", new HeartbeatHandler());
                }
                if (RemoteObservable.enableCompression) {
                    channelPipeline.addLast("gzipInflater", new JdkZlibEncoder(ZlibWrapper.GZIP));
                    channelPipeline.addLast("gzipDeflater", new JdkZlibDecoder(ZlibWrapper.GZIP));
                }
                channelPipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                channelPipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(RemoteObservable.maxFrameLength, 0, 4, 0, 4));
            }
        }, new BatchedRxEventPipelineConfigurator()})).connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>>, Observable<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.26
            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection) {
                observableConnection.writeAndFlush(RemoteRxEvent.subscribed(ConnectToObservable.this.getName(), ConnectToObservable.this.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(observableConnection);
                return observableConnection.getInput().lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")).rebatchRequests(RemoteObservable.bufferSize <= 0 ? 1 : RemoteObservable.bufferSize);
            }
        }).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.25
            public void call() {
                RemoteObservable.logger.warn("Detected connection completed when trying to connect to host: " + ConnectToObservable.this.getHost() + " port: " + ConnectToObservable.this.getPort());
                action0.call();
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.24
            public Observable<RemoteRxEvent> call(Throwable th) {
                RemoteObservable.logger.warn("Detected connection error when trying to connect to host: " + ConnectToObservable.this.getHost() + " port: " + ConnectToObservable.this.getPort(), th);
                action0.call();
                return Observable.empty();
            }
        }).takeUntil(observable).map(new Func1<RemoteRxEvent, Notification<T>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.23
            public Notification<T> call(RemoteRxEvent remoteRxEvent) {
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.next) {
                    RxMetrics.this.incrementNextCount();
                    return Notification.createOnNext(decoder.decode(remoteRxEvent.getData()));
                }
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.error) {
                    RxMetrics.this.incrementErrorCount();
                    return Notification.createOnError(RemoteObservable.fromBytesToThrowable(remoteRxEvent.getData()));
                }
                if (remoteRxEvent.getType() != RemoteRxEvent.Type.completed) {
                    throw new RuntimeException("RemoteRxEvent of type: " + remoteRxEvent.getType() + ", not supported.");
                }
                RxMetrics.this.incrementCompletedCount();
                return Notification.createOnCompleted();
            }
        }).dematerialize().doOnEach(new Observer<T>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservable.22
            public void onCompleted() {
                RemoteObservable.logger.info("RemoteRxEvent: " + ConnectToObservable.this.getName() + " onCompleted()");
            }

            public void onError(Throwable th) {
                RemoteObservable.logger.error("RemoteRxEvent: " + ConnectToObservable.this.getName() + " onError()", th);
            }

            public void onNext(T t) {
                if (RemoteObservable.logger.isDebugEnabled()) {
                    RemoteObservable.logger.debug("RemoteRxEvent: " + ConnectToObservable.this.getName() + " onNext(): " + t);
                }
            }
        });
    }

    public static <T> RemoteRxServer serve(int i, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(configureServerFromParams(null, i, observable, encoder, IngressPolicies.allowAll()));
    }

    public static <T> RemoteRxServer serve(int i, String str, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(configureServerFromParams(str, i, observable, encoder, IngressPolicies.allowAll()));
    }

    private static <T> RemoteRxServer.Builder configureServerFromParams(String str, int i, Observable<T> observable, Encoder<T> encoder, IngressPolicy ingressPolicy) {
        return new RemoteRxServer.Builder().port(i).ingressPolicy(ingressPolicy).addObservable(new ServeObservable.Builder().name(str).encoder(encoder).observable(observable).subscriptionPerConnection().build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static byte[] fromThrowableToBytes(Throwable th) {
        ByteArrayOutputStream byteArrayOutputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            try {
                Throwable th2 = (Throwable) th.getClass().getConstructor(String.class).newInstance(th.getMessage());
                th2.setStackTrace(th.getStackTrace());
                byteArrayOutputStream = new ByteArrayOutputStream();
                objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                objectOutputStream.writeObject(th2);
                if (objectOutputStream != null) {
                    try {
                        objectOutputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                }
                if (byteArrayOutputStream != null) {
                    byteArrayOutputStream.close();
                }
                return byteArrayOutputStream.toByteArray();
            } catch (IOException | IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e2) {
                logger.error("Failed to convert throwable to bytes", e2);
                throw new RuntimeException(e2);
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (IOException e3) {
                    e3.printStackTrace();
                    throw new RuntimeException(e3);
                }
            }
            if (byteArrayOutputStream != null) {
                byteArrayOutputStream.close();
            }
            throw th3;
        }
    }

    static Throwable fromBytesToThrowable(byte[] bArr) {
        ByteArrayInputStream byteArrayInputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            try {
                byteArrayInputStream = new ByteArrayInputStream(bArr);
                objectInputStream = new ObjectInputStream(byteArrayInputStream);
                Throwable th = (Throwable) objectInputStream.readObject();
                if (byteArrayInputStream != null) {
                    try {
                        byteArrayInputStream.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
                if (objectInputStream != null) {
                    objectInputStream.close();
                }
                return th;
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            } catch (ClassNotFoundException e3) {
                throw new RuntimeException(e3);
            }
        } catch (Throwable th2) {
            if (byteArrayInputStream != null) {
                try {
                    byteArrayInputStream.close();
                } catch (IOException e4) {
                    throw new RuntimeException(e4);
                }
            }
            if (objectInputStream != null) {
                objectInputStream.close();
            }
            throw th2;
        }
    }

    static {
        NettyUtils.setNettyThreads();
    }
}
