package io.fluxcapacitor.javaclient.publishing;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/publishing/DefaultRequestHandler.class */
public class DefaultRequestHandler implements RequestHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultRequestHandler.class);
    private final Client client;
    private final MessageType resultType;
    private final Duration timeout;
    private final Map<Integer, CompletableFuture<SerializedMessage>> callbacks;
    private final AtomicInteger nextId;
    private final AtomicBoolean started;
    private volatile Registration registration;

    public DefaultRequestHandler(Client client, MessageType messageType) {
        this(client, messageType, Duration.ofSeconds(200L));
    }

    @Override // io.fluxcapacitor.javaclient.publishing.RequestHandler
    public CompletableFuture<SerializedMessage> sendRequest(SerializedMessage serializedMessage, Consumer<SerializedMessage> consumer) {
        ensureStarted();
        int andIncrement = this.nextId.getAndIncrement();
        CompletableFuture<SerializedMessage> whenComplete = new CompletableFuture().orTimeout(this.timeout.getSeconds(), TimeUnit.SECONDS).whenComplete((serializedMessage2, th) -> {
            this.callbacks.remove(Integer.valueOf(andIncrement));
        });
        this.callbacks.put(Integer.valueOf(andIncrement), whenComplete);
        serializedMessage.setRequestId(Integer.valueOf(andIncrement));
        serializedMessage.setSource(this.client.id());
        consumer.accept(serializedMessage);
        return whenComplete;
    }

    @Override // io.fluxcapacitor.javaclient.publishing.RequestHandler
    public List<CompletableFuture<SerializedMessage>> sendRequests(List<SerializedMessage> list, Consumer<List<SerializedMessage>> consumer) {
        ensureStarted();
        ArrayList arrayList = new ArrayList();
        consumer.accept((List) list.stream().peek(serializedMessage -> {
            int andIncrement = this.nextId.getAndIncrement();
            CompletableFuture<SerializedMessage> whenComplete = new CompletableFuture().orTimeout(this.timeout.getSeconds(), TimeUnit.SECONDS).whenComplete((serializedMessage, th) -> {
                this.callbacks.remove(Integer.valueOf(andIncrement));
            });
            this.callbacks.put(Integer.valueOf(andIncrement), whenComplete);
            serializedMessage.setRequestId(Integer.valueOf(andIncrement));
            serializedMessage.setSource(this.client.id());
            arrayList.add(whenComplete);
        }).collect(Collectors.toList()));
        return arrayList;
    }

    protected void handleMessages(List<SerializedMessage> list) {
        list.forEach(serializedMessage -> {
            CompletableFuture<SerializedMessage> remove = this.callbacks.remove(serializedMessage.getRequestId());
            if (remove == null) {
                log.warn("Received response with index {} for unknown request {}", serializedMessage.getIndex(), serializedMessage.getRequestId());
            } else {
                remove.complete(serializedMessage);
            }
        });
    }

    protected void ensureStarted() {
        if (this.started.compareAndSet(false, true)) {
            this.registration = DefaultTracker.start((Consumer<List<SerializedMessage>>) this::handleMessages, this.resultType, ConsumerConfiguration.builder().name(String.format("%s_%s", this.client.name(), "$request-handler")).ignoreSegment(true).filterMessageTarget(true).minIndex(Long.valueOf(IndexUtils.indexFromTimestamp(FluxCapacitor.currentTime().minusSeconds(1L)))).build(), this.client);
        }
    }

    @Override // io.fluxcapacitor.javaclient.publishing.RequestHandler, java.lang.AutoCloseable
    public void close() {
        ClientUtils.waitForResults(Duration.ofSeconds(2L), this.callbacks.values());
        if (this.registration != null) {
            this.registration.cancel();
        }
    }

    @ConstructorProperties({"client", "resultType", "timeout"})
    public DefaultRequestHandler(Client client, MessageType messageType, Duration duration) {
        this.callbacks = new ConcurrentHashMap();
        this.nextId = new AtomicInteger();
        this.started = new AtomicBoolean();
        this.client = client;
        this.resultType = messageType;
        this.timeout = duration;
    }
}
