package io.reactivex.mantis.network.push;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.compression.CompressionUtils;
import io.mantisrx.common.messages.MantisMetaDroppedMessage;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import io.reactivx.mantis.operators.DropOperator;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.channel.DefaultChannelWriter;
import mantis.io.reactivex.netty.server.RxServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

/* loaded from: input_file:io/reactivex/mantis/network/push/PushServer.class */
public abstract class PushServer<T, R> {
    private static final Logger logger = LoggerFactory.getLogger(PushServer.class);
    protected int port;
    protected MonitoredQueue<T> outboundBuffer;
    protected ConnectionManager<T> connectionManager;
    private int writeRetryCount;
    private RxServer<?, ?> server;
    private Counter processedWrites;
    private Counter successfulWrites;
    private Counter failedWrites;
    private Gauge batchWriteSize;
    private Observable<String> serverSignals;
    private String serverName;
    private final int maxNotWritableTimeSec;
    private final ScheduledExecutorService scheduledExecutorService;
    private final MetricsRegistry metricsRegistry;
    final byte[] prefix = "data: ".getBytes();
    final byte[] nwnw = "\n\n".getBytes();
    private Set<Future<Void>> consumerThreadFutures = new HashSet();

    public PushServer(final PushTrigger<T> pushTrigger, ServerConfig<T> serverConfig, Observable<String> observable) {
        this.serverSignals = observable;
        this.serverName = serverConfig.getName();
        this.maxNotWritableTimeSec = serverConfig.getMaxNotWritableTimeSec();
        this.metricsRegistry = serverConfig.getMetricsRegistry();
        this.outboundBuffer = new MonitoredQueue<>(this.serverName, serverConfig.getBufferCapacity(), serverConfig.useSpscQueue());
        pushTrigger.setBuffer(this.outboundBuffer);
        Action0 action0 = new Action0() { // from class: io.reactivex.mantis.network.push.PushServer.1
            public void call() {
                pushTrigger.start();
            }
        };
        Action0 action02 = new Action0() { // from class: io.reactivex.mantis.network.push.PushServer.2
            public void call() {
                PushServer.logger.info("doOnZeroConnections Triggered");
                pushTrigger.stop();
            }
        };
        MetricGroupId metricGroupId = new MetricGroupId("PushServer", new Tag[]{new BasicTag("groupId", (String) Optional.ofNullable(this.serverName).orElse("none"))});
        this.connectionManager = new ConnectionManager<>(this.metricsRegistry, action0, action02);
        int numQueueConsumers = serverConfig.getNumQueueConsumers();
        MonitoredThreadPool monitoredThreadPool = new MonitoredThreadPool("QueueConsumerPool", new ThreadPoolExecutor(numQueueConsumers, numQueueConsumers, 5L, TimeUnit.SECONDS, new ArrayBlockingQueue(numQueueConsumers), new NamedThreadFactory("QueueConsumerPool")));
        logger.info("PushServer create consumer threads, use spsc: {}, num threads: {}, buffer capacity: {}, chunk size: {}, chunk time ms: {}", new Object[]{Boolean.valueOf(serverConfig.useSpscQueue()), Integer.valueOf(numQueueConsumers), Integer.valueOf(serverConfig.getBufferCapacity()), Integer.valueOf(serverConfig.getMaxChunkSize()), Integer.valueOf(serverConfig.getMaxChunkTimeMSec())});
        if (serverConfig.useSpscQueue()) {
            this.consumerThreadFutures.add(monitoredThreadPool.submit(new SingleThreadedChunker(serverConfig.getChunkProcessor(), this.outboundBuffer, serverConfig.getMaxChunkSize(), serverConfig.getMaxChunkTimeMSec(), this.connectionManager)));
        } else {
            for (int i = 0; i < numQueueConsumers; i++) {
                this.consumerThreadFutures.add(monitoredThreadPool.submit(new TimedChunker(this.outboundBuffer, serverConfig.getMaxChunkSize(), serverConfig.getMaxChunkTimeMSec(), serverConfig.getChunkProcessor(), this.connectionManager)));
            }
        }
        Metrics build = new Metrics.Builder().id(metricGroupId).addCounter("numProcessedWrites").addCounter("numSuccessfulWrites").addCounter("numFailedWrites").addGauge(this.connectionManager.getActiveConnections(metricGroupId)).addGauge("batchWriteSize").build();
        this.successfulWrites = build.getCounter("numSuccessfulWrites");
        this.failedWrites = build.getCounter("numFailedWrites");
        this.batchWriteSize = build.getGauge("batchWriteSize");
        this.processedWrites = build.getCounter("numProcessedWrites");
        registerMetrics(this.metricsRegistry, build, monitoredThreadPool.getMetrics(), this.outboundBuffer.getMetrics(), pushTrigger.getMetrics(), serverConfig.getChunkProcessor().router.getMetrics());
        this.port = serverConfig.getPort();
        this.writeRetryCount = serverConfig.getWriteRetryCount();
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder().setNameFormat("netty-channel-checker-%d").build());
    }

    private void registerMetrics(MetricsRegistry metricsRegistry, Metrics metrics, Metrics metrics2, Metrics metrics3, Metrics metrics4, Metrics metrics5) {
        metricsRegistry.registerAndGet(metrics);
        metricsRegistry.registerAndGet(metrics2);
        metricsRegistry.registerAndGet(metrics3);
        metricsRegistry.registerAndGet(metrics4);
        metricsRegistry.registerAndGet(metrics5);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Observable<Void> manageConnection(DefaultChannelWriter<R> defaultChannelWriter, String str, int i, String str2, String str3, String str4, AtomicLong atomicLong, boolean z, Subscription subscription, boolean z2, long j, Func1<T, Boolean> func1, Action0 action0, Counter counter, Counter counter2, Action0 action02) {
        return manageConnection(defaultChannelWriter, str, i, str2, str3, str4, atomicLong, z, subscription, z2, j, null, null, func1, action0, counter, counter2, action02);
    }

    protected Observable<Void> manageConnection(DefaultChannelWriter<R> defaultChannelWriter, String str, int i, String str2, String str3, String str4, AtomicLong atomicLong, boolean z, Subscription subscription, boolean z2, long j, SerializedSubject<String, String> serializedSubject, Subscription subscription2, Func1<T, Boolean> func1, Action0 action0, Counter counter, Counter counter2, Action0 action02) {
        return manageConnectionWithCompression(defaultChannelWriter, str, i, str2, str3, str4, atomicLong, z, subscription, z2, j, null, null, func1, action0, counter, counter2, action02, false, false, null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Observable<Void> manageConnectionWithCompression(DefaultChannelWriter<R> defaultChannelWriter, String str, int i, String str2, String str3, String str4, AtomicLong atomicLong, boolean z, final Subscription subscription, boolean z2, long j, SerializedSubject<String, String> serializedSubject, final Subscription subscription2, Func1<T, Boolean> func1, final Action0 action0, Counter counter, Counter counter2, Action0 action02, boolean z3, boolean z4, byte[] bArr, String str5) {
        if (str4 == null || str4.isEmpty()) {
            str4 = str + "_" + i + "_" + System.currentTimeMillis();
        }
        if (str3 == null || str3.isEmpty()) {
            str3 = str4;
        }
        if (str2 == null || str2.isEmpty()) {
            str2 = str4;
        }
        Tag basicTag = new BasicTag("slotId", str3);
        SerializedSubject serializedSubject2 = new SerializedSubject(PublishSubject.create());
        Observable lift = serializedSubject2.lift(new DropOperator("batch_writes", new Tag[]{basicTag}));
        if (z2) {
            lift = lift.sample(j, TimeUnit.MILLISECONDS).map(list -> {
                LinkedList linkedList = new LinkedList();
                if (!list.isEmpty()) {
                    linkedList.add(list.get(list.size() - 1));
                }
                return linkedList;
            });
        }
        Metrics build = new Metrics.Builder().id("PushServer", new Tag[]{new BasicTag(PushServerSse.CLIENT_ID_TAG_NAME, (String) Optional.ofNullable(str2).orElse("none"))}).addCounter("channelWritable").addCounter("channelNotWritable").addCounter("channelNotWritableTimeout").build();
        this.metricsRegistry.registerAndGet(build);
        Counter counter3 = build.getCounter("channelWritable");
        Counter counter4 = build.getCounter("channelNotWritable");
        Counter counter5 = build.getCounter("channelNotWritableTimeout");
        AtomicLong atomicLong2 = new AtomicLong(System.currentTimeMillis());
        ScheduledFuture<?> scheduleAtFixedRate = this.maxNotWritableTimeSec > 0 ? this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            if (defaultChannelWriter.getChannel().isWritable()) {
                counter3.increment();
                atomicLong2.set(currentTimeMillis);
            } else {
                if (currentTimeMillis - atomicLong2.get() <= TimeUnit.SECONDS.toMillis(this.maxNotWritableTimeSec)) {
                    counter4.increment();
                    return;
                }
                logger.warn("Closing connection due to channel not writable for more than {} secs", Integer.valueOf(this.maxNotWritableTimeSec));
                counter5.increment();
                try {
                    defaultChannelWriter.close();
                } catch (Throwable th) {
                    logger.error("Failed to close connection.", th);
                }
            }
        }, 0L, 10L, TimeUnit.SECONDS) : null;
        final AsyncConnection asyncConnection = new AsyncConnection(str, i, str4, str3, str2, serializedSubject2, func1, str5);
        Channel channel = defaultChannelWriter.getChannel();
        final ScheduledFuture<?> scheduledFuture = scheduleAtFixedRate;
        channel.closeFuture().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() { // from class: io.reactivex.mantis.network.push.PushServer.3
            public void operationComplete(io.netty.util.concurrent.Future<Void> future) throws Exception {
                PushServer.this.connectionManager.remove(asyncConnection);
                PushServer.this.connectionCleanup(subscription, action0, subscription2);
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(false);
                }
            }
        });
        return lift.doOnSubscribe(() -> {
            this.connectionManager.add(asyncConnection);
            if (action02 != null) {
                action02.call();
            }
        }).lift(new DisableBackPressureOperator()).buffer(200L, TimeUnit.MILLISECONDS).flatMap(list2 -> {
            ByteBuffer wrap;
            if (list2 != null && !list2.isEmpty()) {
                int i2 = 0;
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    i2 += ((List) it.next()).size();
                }
                int i3 = i2;
                this.processedWrites.increment(i3);
                if (channel.isActive() && channel.isWritable()) {
                    atomicLong2.set(System.currentTimeMillis());
                    if (!z4) {
                        int i4 = 0;
                        Iterator it2 = list2.iterator();
                        while (it2.hasNext()) {
                            Iterator it3 = ((List) it2.next()).iterator();
                            while (it3.hasNext()) {
                                i4 += ((byte[]) it3.next()).length;
                            }
                        }
                        wrap = ByteBuffer.wrap(new byte[i4]);
                        Iterator it4 = list2.iterator();
                        while (it4.hasNext()) {
                            Iterator it5 = ((List) it4.next()).iterator();
                            while (it5.hasNext()) {
                                wrap.put((byte[]) it5.next());
                            }
                        }
                    } else if (z3) {
                        byte[] compressAndBase64EncodeBytes = bArr == null ? CompressionUtils.compressAndBase64EncodeBytes(list2, true) : CompressionUtils.compressAndBase64EncodeBytes(list2, true, bArr);
                        wrap = ByteBuffer.allocate(this.prefix.length + compressAndBase64EncodeBytes.length + this.nwnw.length);
                        wrap.put(this.prefix);
                        wrap.put(compressAndBase64EncodeBytes);
                        wrap.put(this.nwnw);
                    } else {
                        int i5 = 0;
                        Iterator it6 = list2.iterator();
                        while (it6.hasNext()) {
                            Iterator it7 = ((List) it6.next()).iterator();
                            while (it7.hasNext()) {
                                i5 += ((byte[]) it7.next()).length + this.prefix.length + this.nwnw.length;
                            }
                        }
                        wrap = ByteBuffer.wrap(new byte[i5]);
                        Iterator it8 = list2.iterator();
                        while (it8.hasNext()) {
                            for (byte[] bArr2 : (List) it8.next()) {
                                wrap.put(this.prefix);
                                wrap.put(bArr2);
                                wrap.put(this.nwnw);
                            }
                        }
                    }
                    return defaultChannelWriter.writeBytesAndFlush(wrap.array()).retry(this.writeRetryCount).doOnError(th -> {
                        failedToWriteBatch(asyncConnection, i3, counter2, serializedSubject);
                    }).doOnCompleted(() -> {
                        if (z && atomicLong != null) {
                            atomicLong.set(System.currentTimeMillis());
                        }
                        if (counter != null) {
                            counter.increment(i3);
                        }
                        this.successfulWrites.increment(i3);
                        this.connectionManager.successfulWrites(asyncConnection, Integer.valueOf(i3));
                    }).doOnTerminate(() -> {
                        this.batchWriteSize.set(i3);
                    });
                }
                failedToWriteBatch(asyncConnection, i3, counter2, serializedSubject);
            }
            return Observable.empty();
        });
    }

    protected void failedToWriteBatch(AsyncConnection<T> asyncConnection, int i, Counter counter, SerializedSubject<String, String> serializedSubject) {
        if (counter != null) {
            counter.increment(i);
        }
        if (serializedSubject != null) {
            serializedSubject.onNext(new MantisMetaDroppedMessage(i, System.currentTimeMillis()).toString());
        }
        this.failedWrites.increment(i);
        this.connectionManager.failedWrites(asyncConnection, Integer.valueOf(i));
    }

    protected void connectionCleanup(Subscription subscription, Action0 action0, Subscription subscription2) {
        if (subscription != null) {
            logger.info("Unsubscribing from heartbeats");
            subscription.unsubscribe();
        }
        if (subscription2 != null) {
            logger.info("Unsubscribing from metaMsg subject");
            subscription2.unsubscribe();
        }
        if (action0 != null) {
            action0.call();
        }
    }

    public abstract RxServer<?, ?> createServer();

    public void start() {
        this.server = createServer();
        this.server.start();
        this.serverSignals.subscribe(str -> {
            logger.info("Signal received for server: " + this.serverName + " signal: " + str);
        }, th -> {
            logger.info("Signal received for server: " + this.serverName + " signal: SERVER_ERROR", th);
        }, () -> {
            logger.info("Signal received for server: " + this.serverName + " signal: SERVER_COMPLETED");
        });
    }

    public void blockUntilShutdown() {
        this.serverSignals.toBlocking().forEach(str -> {
        });
    }

    public void startAndBlock() {
        this.server = createServer();
        this.server.start();
        try {
            this.serverSignals.toBlocking().forEach(str -> {
                logger.info("Signal received for server: " + this.serverName + " signal: " + str);
            });
            logger.info("Signal received for server: " + this.serverName + " signal: SERVER_COMPLETED");
        } catch (Throwable th) {
            logger.info("Signal received for server: " + this.serverName + " signal: SERVER_ERROR", th);
            throw th;
        }
    }

    public void shutdown() {
        Iterator<Future<Void>> it = this.consumerThreadFutures.iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        try {
            this.server.shutdown();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
