package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.IndexUtils;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/client/InMemoryMessageStore.class */
public class InMemoryMessageStore implements GatewayClient, TrackingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemoryMessageStore.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final AtomicLong nextIndex = new AtomicLong();
    private final Map<String, TrackerRead> trackers = new ConcurrentHashMap();
    private final List<Consumer<SerializedMessage>> monitors = new CopyOnWriteArrayList();
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog = new ConcurrentSkipListMap<>();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap();

    @Override // io.fluxcapacitor.javaclient.publishing.client.GatewayClient
    public Awaitable send(SerializedMessage... serializedMessageArr) {
        Arrays.stream(serializedMessageArr).forEach(serializedMessage -> {
            if (serializedMessage.getIndex() == null) {
                serializedMessage.setIndex(Long.valueOf(this.nextIndex.updateAndGet(j -> {
                    return j <= 0 ? IndexUtils.indexFromMillis(FluxCapacitor.currentClock().millis()) : j + 1;
                })));
            }
            this.messageLog.put(serializedMessage.getIndex(), serializedMessage);
            this.monitors.forEach(consumer -> {
                consumer.accept(serializedMessage);
            });
        });
        synchronized (this) {
            notifyAll();
        }
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<MessageBatch> read(String str, String str2, Long l, ConsumerConfiguration consumerConfiguration) {
        return read(new SimpleTrackerRead(str, str2, l, consumerConfiguration));
    }

    public CompletableFuture<MessageBatch> read(TrackerRead trackerRead) {
        if (trackerRead.getMessageType() != MessageType.RESULT && !Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumerName(), str -> {
            return trackerRead;
        }).getTrackerId())) {
            return CompletableFuture.supplyAsync(() -> {
                return new MessageBatch(new int[]{0, 0}, Collections.emptyList(), (Long) null);
            }, CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
        CompletableFuture<MessageBatch> completableFuture = new CompletableFuture<>();
        this.executorService.submit(() -> {
            synchronized (this) {
                Map<Long, SerializedMessage> emptyMap = Collections.emptyMap();
                while (System.currentTimeMillis() < trackerRead.getDeadline()) {
                    ConcurrentNavigableMap<Long, SerializedMessage> tailMap = this.messageLog.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Optional.ofNullable(trackerRead.getLastTrackerIndex()).orElseGet(() -> {
                        return Long.valueOf(getLastIndex(trackerRead.getConsumerName()));
                    }), false);
                    emptyMap = tailMap;
                    if (!shouldWait(tailMap)) {
                        break;
                    }
                    try {
                        wait(trackerRead.getDeadline() - System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                List<SerializedMessage> filterMessages = filterMessages(new ArrayList(emptyMap.values()));
                List<SerializedMessage> subList = filterMessages.subList(0, Math.min(filterMessages.size(), trackerRead.getMaxSize()));
                Long index = subList.isEmpty() ? null : subList.get(subList.size() - 1).getIndex();
                Stream<SerializedMessage> stream = subList.stream();
                Objects.requireNonNull(trackerRead);
                completableFuture.complete(new MessageBatch(new int[]{0, 128}, (List) stream.filter(trackerRead::canHandle).collect(Collectors.toList()), index));
            }
        });
        return completableFuture;
    }

    protected boolean shouldWait(Map<Long, SerializedMessage> map) {
        return filterMessages(map.values()).isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<SerializedMessage> filterMessages(Collection<SerializedMessage> collection) {
        return collection.isEmpty() ? Collections.emptyList() : collection instanceof List ? (List) collection : new ArrayList(collection);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public List<SerializedMessage> readFromIndex(long j, int i) {
        ArrayList arrayList = new ArrayList(this.messageLog.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Long.valueOf(j)).values());
        return arrayList.subList(0, Math.min(i, arrayList.size()));
    }

    private long getLastIndex(String str) {
        return this.consumerTokens.computeIfAbsent(str, str2 -> {
            return -1L;
        }).longValue();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable storePosition(String str, int[] iArr, long j) {
        return resetPosition(str, j);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable resetPosition(String str, long j) {
        this.consumerTokens.put(str, Long.valueOf(j));
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable disconnectTracker(String str, String str2, boolean z) {
        disconnectTrackersMatching(trackerRead -> {
            return Objects.equals(str2, trackerRead.getTrackerId());
        });
        return Awaitable.ready();
    }

    public <T extends TrackerRead> void disconnectTrackersMatching(Predicate<T> predicate) {
        this.trackers.values().removeIf(trackerRead -> {
            return predicate.test(trackerRead);
        });
    }

    public Registration registerMonitor(Consumer<SerializedMessage> consumer) {
        this.monitors.add(consumer);
        return () -> {
            this.monitors.remove(consumer);
        };
    }

    @Override // io.fluxcapacitor.javaclient.publishing.client.GatewayClient, java.lang.AutoCloseable
    public void close() {
        this.executorService.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SerializedMessage getMessage(long j) {
        return this.messageLog.get(Long.valueOf(j));
    }
}
