package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.codec.Encoder;
import io.mantisrx.common.network.WritableEndpoint;
import io.reactivex.mantis.remote.observable.RemoteRxEvent;
import io.reactivex.mantis.remote.observable.ingress.IngressPolicy;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.channel.ConnectionHandler;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Notification;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

/* loaded from: input_file:io/reactivex/mantis/remote/observable/RemoteObservableConnectionHandler.class */
public class RemoteObservableConnectionHandler implements ConnectionHandler<RemoteRxEvent, List<RemoteRxEvent>> {
    private static final Logger logger = LoggerFactory.getLogger(RemoteObservableConnectionHandler.class);
    private Map<String, ServeConfig> observables;
    private RxMetrics serverMetrics;
    private IngressPolicy ingressPolicy;
    private int writeBufferTimeMSec;

    public RemoteObservableConnectionHandler(Map<String, ServeConfig> map, IngressPolicy ingressPolicy, RxMetrics rxMetrics, int i) {
        this.observables = map;
        this.ingressPolicy = ingressPolicy;
        this.serverMetrics = rxMetrics;
        this.writeBufferTimeMSec = i;
    }

    public Observable<Void> handle(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection) {
        logger.info("Connection received: " + observableConnection.getChannel().remoteAddress());
        return this.ingressPolicy.allowed(observableConnection) ? setupConnection(observableConnection) : Observable.error(new RemoteObservableException("Connection rejected due to ingress policy"));
    }

    private <T> Subscription serveObservable(Observable<T> observable, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, final RemoteRxEvent remoteRxEvent, Func1<Map<String, String>, Func1<T, Boolean>> func1, final Encoder<T> encoder, ServeObservable<T> serveObservable, WritableEndpoint<Observable<T>> writableEndpoint) {
        MutableReference mutableReference = new MutableReference();
        mutableReference.setValue(observable.filter((Func1) func1.call(remoteRxEvent.getSubscribeParameters())).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.5
            public void call() {
                RemoteObservableConnectionHandler.logger.info("OnCompleted recieved in serveObservable, sending to client.");
            }
        }).doOnError(new Action1<Throwable>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.4
            public void call(Throwable th) {
                RemoteObservableConnectionHandler.logger.info("OnError received in serveObservable, sending to client: ", th);
            }
        }).materialize().lift(new DisableBackPressureOperator()).buffer(this.writeBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<Notification<T>>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.3
            public Boolean call(List<Notification<T>> list) {
                return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
            }
        }).map(new Func1<List<Notification<T>>, List<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.2
            public List<RemoteRxEvent> call(List<Notification<T>> list) {
                ArrayList arrayList = new ArrayList(list.size());
                for (Notification<T> notification : list) {
                    if (notification.getKind() == Notification.Kind.OnNext) {
                        arrayList.add(RemoteRxEvent.next(remoteRxEvent.getName(), encoder.encode(notification.getValue())));
                    } else if (notification.getKind() == Notification.Kind.OnError) {
                        arrayList.add(RemoteRxEvent.error(remoteRxEvent.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable())));
                    } else {
                        if (notification.getKind() != Notification.Kind.OnCompleted) {
                            throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
                        }
                        arrayList.add(RemoteRxEvent.completed(remoteRxEvent.getName()));
                    }
                }
                return arrayList;
            }
        }).filter(new Func1<List<RemoteRxEvent>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.1
            public Boolean call(List<RemoteRxEvent> list) {
                return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
            }
        }).subscribe(new WriteBytesObserver(observableConnection, mutableReference, this.serverMetrics, serveObservable.getSlottingStrategy(), writableEndpoint)));
        return (Subscription) mutableReference.getValue();
    }

    private <T> Subscription serveNestedObservable(Observable<T> observable, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, final RemoteRxEvent remoteRxEvent, Func1<Map<String, String>, Func1<T, Boolean>> func1, final Encoder<T> encoder, ServeNestedObservable<Observable<T>> serveNestedObservable, WritableEndpoint<Observable<T>> writableEndpoint) {
        MutableReference mutableReference = new MutableReference();
        mutableReference.setValue(observable.filter((Func1) func1.call(remoteRxEvent.getSubscribeParameters())).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.11
            public void call() {
                RemoteObservableConnectionHandler.logger.info("OnCompleted recieved in serveNestedObservable, sending to client.");
            }
        }).doOnError(new Action1<Throwable>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.10
            public void call(Throwable th) {
                RemoteObservableConnectionHandler.logger.info("OnError received in serveNestedObservable, sending to client: ", th);
            }
        }).map(new Func1<T, byte[]>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.9
            public byte[] call(T t) {
                return encoder.encode(t);
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* renamed from: call, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m10call(Object obj) {
                return call((AnonymousClass9<T>) obj);
            }
        }).materialize().map(new Func1<Notification<byte[]>, RemoteRxEvent>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.8
            public RemoteRxEvent call(Notification<byte[]> notification) {
                if (notification.getKind() == Notification.Kind.OnNext) {
                    return RemoteRxEvent.next(remoteRxEvent.getName(), (byte[]) notification.getValue());
                }
                if (notification.getKind() == Notification.Kind.OnError) {
                    return RemoteRxEvent.error(remoteRxEvent.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable()));
                }
                if (notification.getKind() == Notification.Kind.OnCompleted) {
                    return RemoteRxEvent.completed(remoteRxEvent.getName());
                }
                throw new RuntimeException("Unsupported notification kind: " + notification.getKind());
            }
        }).lift(new DisableBackPressureOperator()).buffer(this.writeBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<RemoteRxEvent>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.7
            public Boolean call(List<RemoteRxEvent> list) {
                return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
            }
        }).filter(new Func1<List<RemoteRxEvent>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.6
            public Boolean call(List<RemoteRxEvent> list) {
                return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
            }
        }).subscribe(new WriteBytesObserver(observableConnection, mutableReference, this.serverMetrics, serveNestedObservable.getSlottingStrategy(), writableEndpoint)));
        return (Subscription) mutableReference.getValue();
    }

    private <K, V> Subscription serveGroupedObservable(Observable<Group<K, V>> observable, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, final RemoteRxEvent remoteRxEvent, final Func1<Map<String, String>, Func1<K, Boolean>> func1, Encoder<K> encoder, final Encoder<V> encoder2, ServeGroupedObservable<K, V> serveGroupedObservable, WritableEndpoint<GroupedObservable<K, V>> writableEndpoint) {
        MutableReference mutableReference = new MutableReference();
        mutableReference.setValue(observable.filter(new Func1<Group<K, V>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.17
            public Boolean call(Group<K, V> group) {
                return (Boolean) ((Func1) func1.call(remoteRxEvent.getSubscribeParameters())).call(group.getKeyValue());
            }
        }).doOnCompleted(new Action0() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.16
            public void call() {
                RemoteObservableConnectionHandler.logger.info("OnCompleted recieved in serveGroupedObservable, sending to client.");
            }
        }).doOnError(new Action1<Throwable>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.15
            public void call(Throwable th) {
                RemoteObservableConnectionHandler.logger.info("OnError received in serveGroupedObservable, sending to client: ", th);
            }
        }).materialize().lift(new DisableBackPressureOperator()).buffer(this.writeBufferTimeMSec, TimeUnit.MILLISECONDS).filter(new Func1<List<Notification<Group<K, V>>>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.14
            public Boolean call(List<Notification<Group<K, V>>> list) {
                return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
            }
        }).map(new Func1<List<Notification<Group<K, V>>>, List<RemoteRxEvent>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.13
            public List<RemoteRxEvent> call(List<Notification<Group<K, V>>> list) {
                ArrayList arrayList = new ArrayList(list.size());
                for (Notification<Group<K, V>> notification : list) {
                    if (Notification.Kind.OnNext == notification.getKind()) {
                        Group group = (Group) notification.getValue();
                        int length = group.getKeyBytes().length;
                        Notification<V> notification2 = ((Group) notification.getValue()).getNotification();
                        byte[] bArr = null;
                        if (Notification.Kind.OnNext == notification2.getKind()) {
                            byte[] encode = encoder2.encode(notification2.getValue());
                            bArr = ByteBuffer.allocate(5 + length + encode.length).put((byte) 1).putInt(length).put(group.getKeyBytes()).put(encode).array();
                        } else if (Notification.Kind.OnCompleted == notification2.getKind()) {
                            bArr = ByteBuffer.allocate(5 + length).put((byte) 2).putInt(length).put(group.getKeyBytes()).array();
                        } else if (Notification.Kind.OnError == notification2.getKind()) {
                            byte[] fromThrowableToBytes = RemoteObservable.fromThrowableToBytes(notification2.getThrowable());
                            bArr = ByteBuffer.allocate(5 + length + fromThrowableToBytes.length).put((byte) 3).putInt(length).put(group.getKeyBytes()).put(fromThrowableToBytes).array();
                        }
                        arrayList.add(RemoteRxEvent.next(remoteRxEvent.getName(), bArr));
                    } else if (Notification.Kind.OnCompleted == notification.getKind()) {
                        arrayList.add(RemoteRxEvent.completed(remoteRxEvent.getName()));
                    } else {
                        if (Notification.Kind.OnError != notification.getKind()) {
                            throw new RuntimeException("Unsupported notification type: " + notification.getKind());
                        }
                        arrayList.add(RemoteRxEvent.error(remoteRxEvent.getName(), RemoteObservable.fromThrowableToBytes(notification.getThrowable())));
                    }
                }
                return arrayList;
            }
        }).filter(new Func1<List<RemoteRxEvent>, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.12
            public Boolean call(List<RemoteRxEvent> list) {
                return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
            }
        }).subscribe(new WriteBytesObserver(observableConnection, mutableReference, this.serverMetrics, serveGroupedObservable.getSlottingStrategy(), writableEndpoint)));
        return (Subscription) mutableReference.getValue();
    }

    private void subscribe(MutableReference<Subscription> mutableReference, RemoteRxEvent remoteRxEvent, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, ServeConfig serveConfig, WritableEndpoint writableEndpoint) {
        Func1 filterFunction = serveConfig.getFilterFunction();
        Subscription subscription = null;
        if (serveConfig instanceof ServeObservable) {
            ServeObservable serveObservable = (ServeObservable) serveConfig;
            if (serveObservable.isSubscriptionPerConnection()) {
                Observable observable = serveObservable.getObservable();
                if (serveObservable.isHotStream()) {
                    observable = observable.share();
                }
                subscription = serveObservable(observable, observableConnection, remoteRxEvent, filterFunction, serveObservable.getEncoder(), serveObservable, writableEndpoint);
            } else {
                subscription = serveObservable(writableEndpoint.read(), observableConnection, remoteRxEvent, filterFunction, serveObservable.getEncoder(), serveObservable, writableEndpoint);
            }
        } else if (serveConfig instanceof ServeGroupedObservable) {
            ServeGroupedObservable serveGroupedObservable = (ServeGroupedObservable) serveConfig;
            subscription = serveGroupedObservable(writableEndpoint.read(), observableConnection, remoteRxEvent, filterFunction, serveGroupedObservable.getKeyEncoder(), serveGroupedObservable.getValueEncoder(), serveGroupedObservable, writableEndpoint);
        } else if (serveConfig instanceof ServeNestedObservable) {
            ServeNestedObservable serveNestedObservable = (ServeNestedObservable) serveConfig;
            subscription = serveNestedObservable(writableEndpoint.read(), observableConnection, remoteRxEvent, filterFunction, serveNestedObservable.getEncoder(), serveNestedObservable, writableEndpoint);
        }
        mutableReference.setValue(subscription);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Observable<Void> handleSubscribeRequest(RemoteRxEvent remoteRxEvent, ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection, MutableReference<SlottingStrategy> mutableReference, MutableReference<Subscription> mutableReference2, MutableReference<WritableEndpoint> mutableReference3) {
        String name = remoteRxEvent.getName();
        ServeConfig serveConfig = this.observables.get(name);
        if (serveConfig == null) {
            return Observable.error(new RemoteObservableException("No remote observable configuration found for name: " + name));
        }
        if (remoteRxEvent.getType() == RemoteRxEvent.Type.subscribed) {
            String str = null;
            Map<String, String> subscribeParameters = remoteRxEvent.getSubscribeParameters();
            if (subscribeParameters != null) {
                str = subscribeParameters.get("slotId");
            }
            InetSocketAddress inetSocketAddress = (InetSocketAddress) observableConnection.getChannel().remoteAddress();
            WritableEndpoint writableEndpoint = new WritableEndpoint(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), str, observableConnection);
            SlottingStrategy slottingStrategy = serveConfig.getSlottingStrategy();
            mutableReference3.setValue(writableEndpoint);
            mutableReference.setValue(slottingStrategy);
            logger.info("Connection received on server from client endpoint: " + writableEndpoint + ", subscribed to observable: " + name);
            this.serverMetrics.incrementSubscribedCount();
            subscribe(mutableReference2, remoteRxEvent, observableConnection, serveConfig, writableEndpoint);
            if (!slottingStrategy.addConnection(writableEndpoint)) {
                logger.warn("Failed to slot connection for endpoint: " + writableEndpoint);
                observableConnection.close(true);
            }
        }
        return Observable.empty();
    }

    private Observable<Void> setupConnection(final ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> observableConnection) {
        final MutableReference mutableReference = new MutableReference();
        final MutableReference mutableReference2 = new MutableReference();
        final MutableReference mutableReference3 = new MutableReference();
        return observableConnection.getInput().filter(new Func1<RemoteRxEvent, Boolean>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.19
            public Boolean call(RemoteRxEvent remoteRxEvent) {
                boolean z = false;
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.subscribed || remoteRxEvent.getType() == RemoteRxEvent.Type.unsubscribed) {
                    z = true;
                }
                return Boolean.valueOf(z);
            }
        }).flatMap(new Func1<RemoteRxEvent, Observable<Void>>() { // from class: io.reactivex.mantis.remote.observable.RemoteObservableConnectionHandler.18
            public Observable<Void> call(RemoteRxEvent remoteRxEvent) {
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.subscribed) {
                    return RemoteObservableConnectionHandler.this.handleSubscribeRequest(remoteRxEvent, observableConnection, mutableReference2, mutableReference, mutableReference3);
                }
                if (remoteRxEvent.getType() == RemoteRxEvent.Type.unsubscribed) {
                    Subscription subscription = (Subscription) mutableReference.getValue();
                    if (subscription != null) {
                        subscription.unsubscribe();
                    }
                    RemoteObservableConnectionHandler.this.serverMetrics.incrementUnsubscribedCount();
                    if (mutableReference2.getValue() != null && !((SlottingStrategy) mutableReference2.getValue()).removeConnection((WritableEndpoint) mutableReference3.getValue())) {
                        RemoteObservableConnectionHandler.logger.error("Failed to remove endpoint from slot,  endpoint: " + mutableReference3.getValue());
                    }
                    RemoteObservableConnectionHandler.logger.info("Connection: " + observableConnection.getChannel().remoteAddress() + " unsubscribed, closing connection");
                    observableConnection.close(true);
                }
                return Observable.empty();
            }
        });
    }
}
