package io.mantisrx.publish;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/mantisrx/publish/EventDrainer.class */
class EventDrainer implements Runnable {
    public static final String LOGGING_CONTEXT_KEY = "mantisLogCtx";
    static final String DEFAULT_THREAD_NAME = "mantisDrainer";
    public static final String LOGGING_CONTEXT_VALUE = "mantisDrainer";
    private static final Logger LOG = LoggerFactory.getLogger(EventDrainer.class);
    private final MrePublishConfiguration config;
    private final Timer mantisEventDrainTimer;
    private final StreamManager streamManager;
    private final EventTransmitter eventTransmitter;
    private final Clock clock;
    private EventProcessor eventProcessor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventDrainer(MrePublishConfiguration mrePublishConfiguration, StreamManager streamManager, Registry registry, EventProcessor eventProcessor, EventTransmitter eventTransmitter, Clock clock) {
        this.config = mrePublishConfiguration;
        this.mantisEventDrainTimer = SpectatorUtils.buildAndRegisterTimer(registry, "mrePublishEventDrainTime");
        this.streamManager = streamManager;
        this.eventProcessor = eventProcessor;
        this.eventTransmitter = eventTransmitter;
        this.clock = clock;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Starting drainer thread.");
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            MDC.put(LOGGING_CONTEXT_KEY, "mantisDrainer");
            long millis = this.clock.millis();
            for (String str : this.streamManager.getRegisteredStreams()) {
                ArrayList arrayList = new ArrayList();
                try {
                    Optional<BlockingQueue<Event>> queueForStream = this.streamManager.getQueueForStream(str);
                    if (queueForStream.isPresent()) {
                        queueForStream.get().drainTo(arrayList);
                        int size = arrayList.size();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Queue drained size: {} for stream {}", Integer.valueOf(size), str);
                        }
                        this.streamManager.getStreamMetrics(str).ifPresent(streamMetrics -> {
                            streamMetrics.getMantisEventsQueuedGauge().set(size);
                            if (size > 0) {
                                streamMetrics.updateLastEventOnStreamTimestamp();
                            }
                        });
                        arrayList.stream().map(event -> {
                            return process(str, event);
                        }).filter((v0) -> {
                            return Objects.nonNull(v0);
                        }).forEach(event2 -> {
                            this.eventTransmitter.send(event2, this.config.mantisJobCluster(str));
                        });
                        arrayList.clear();
                    }
                } catch (Exception e) {
                    LOG.warn("Exception processing events for stream {}", str, e);
                }
            }
            this.mantisEventDrainTimer.record(this.clock.millis() - millis, TimeUnit.MILLISECONDS);
            MDC.remove(LOGGING_CONTEXT_KEY);
        } catch (Throwable th) {
            MDC.remove(LOGGING_CONTEXT_KEY);
            throw th;
        }
    }

    private Event process(String str, Event event) {
        long millis = this.clock.millis();
        Event process = this.eventProcessor.process(str, event);
        long millis2 = this.clock.millis() - millis;
        this.streamManager.getStreamMetrics(str).ifPresent(streamMetrics -> {
            streamMetrics.getMantisEventsProcessTimeTimer().record(millis2, TimeUnit.MILLISECONDS);
        });
        return process;
    }
}
