package io.fluxcapacitor.javaclient.web;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Data;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.publishing.correlation.CorrelationDataProvider;
import io.fluxcapacitor.javaclient.publishing.correlation.DefaultCorrelationDataProvider;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/web/ForwardingWebConsumer.class */
public class ForwardingWebConsumer implements AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ForwardingWebConsumer.class);
    private final String host;
    private final LocalServerConfig localServerConfig;
    private final ConsumerConfiguration configuration;

    @Generated
    private final Object $lock = new Object[0];
    private final AtomicReference<Registration> registration = new AtomicReference<>();
    private final HttpClient httpClient = HttpClient.newHttpClient();

    public ForwardingWebConsumer(LocalServerConfig localServerConfig, ConsumerConfiguration consumerConfiguration) {
        this.host = "http://localhost:" + localServerConfig.getPort();
        this.localServerConfig = localServerConfig;
        this.configuration = consumerConfiguration;
    }

    public void start(FluxCapacitor fluxCapacitor) {
        synchronized (this.$lock) {
            close();
            GatewayClient gatewayClient = fluxCapacitor.client().getGatewayClient(MessageType.WEBRESPONSE);
            BiConsumer biConsumer = (serializedMessage, serializedMessage2) -> {
                serializedMessage2.setTarget(serializedMessage.getSource());
                serializedMessage2.setRequestId(serializedMessage.getRequestId());
                serializedMessage2.setMetadata(serializedMessage2.getMetadata().with(FluxCapacitor.currentCorrelationData()));
                gatewayClient.append(Guarantee.NONE, serializedMessage2);
            };
            Consumer consumer = list -> {
                list.forEach(serializedMessage3 -> {
                    Map<String, String> correlationData = getCorrelationData(serializedMessage3);
                    try {
                        this.httpClient.sendAsync(createRequest(serializedMessage3), HttpResponse.BodyHandlers.ofByteArray()).whenComplete((httpResponse, th) -> {
                            if (th == null && httpResponse.statusCode() == 404 && this.localServerConfig.isIgnore404()) {
                                return;
                            }
                            biConsumer.accept(serializedMessage3, th == null ? toMessage((HttpResponse<byte[]>) httpResponse, (Map<String, String>) correlationData) : toMessage(th, (Map<String, String>) correlationData));
                        });
                    } catch (Exception e) {
                        try {
                            biConsumer.accept(serializedMessage3, toMessage(e, correlationData));
                        } catch (Exception e2) {
                            log.error("Failed to create response message from exception", e2);
                        }
                    }
                });
            };
            this.registration.getAndUpdate(registration -> {
                return registration == null ? DefaultTracker.start((Consumer<List<SerializedMessage>>) consumer, MessageType.WEBREQUEST, this.configuration, fluxCapacitor) : registration;
            });
        }
    }

    protected Map<String, String> getCorrelationData(SerializedMessage serializedMessage) {
        try {
            return ((CorrelationDataProvider) FluxCapacitor.getOptionally().map((v0) -> {
                return v0.correlationDataProvider();
            }).orElse(DefaultCorrelationDataProvider.INSTANCE)).getCorrelationData(new DeserializingMessage(serializedMessage, (Function<Class<?>, Object>) cls -> {
                return null;
            }, MessageType.WEBRESPONSE, (String) null));
        } catch (Exception e) {
            log.error("Failed to get correlation data for request message", e);
            return Collections.emptyMap();
        }
    }

    protected HttpRequest createRequest(SerializedMessage serializedMessage) {
        try {
            HttpRequest.Builder method = HttpRequest.newBuilder().uri(new URI(this.host + WebRequest.getUrl(serializedMessage.getMetadata()))).method(WebRequest.getMethod(serializedMessage.getMetadata()).name(), ((byte[]) serializedMessage.getData().getValue()).length == 0 ? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray((byte[]) serializedMessage.getData().getValue()));
            String[] strArr = (String[]) WebRequest.getHeaders(serializedMessage.getMetadata()).entrySet().stream().filter(entry -> {
                return !isRestricted((String) entry.getKey());
            }).flatMap(entry2 -> {
                return ((List) entry2.getValue()).stream().flatMap(str -> {
                    return Stream.of((Object[]) new String[]{(String) entry2.getKey(), str});
                });
            }).toArray(i -> {
                return new String[i];
            });
            if (strArr.length > 0) {
                method.headers(strArr);
            }
            if (serializedMessage.getData().getFormat() != null) {
                method.header("Content-Type", serializedMessage.getData().getFormat());
            }
            return method.build();
        } catch (Exception e) {
            throw new IllegalStateException("Failed to create HttpRequest", e);
        }
    }

    protected boolean isRestricted(String str) {
        return Set.of("connection", "content-length", "expect", "host", "upgrade").contains(str.toLowerCase());
    }

    protected SerializedMessage toMessage(HttpResponse<byte[]> httpResponse, Map<String, String> map) {
        HttpHeaders headers = httpResponse.headers();
        return new SerializedMessage(new Data((byte[]) httpResponse.body(), (String) null, 0, (String) headers.firstValue("content-type").orElse(null)), Metadata.of(map).with(WebResponse.asMetadata(httpResponse.statusCode(), headers.map())), FluxCapacitor.generateId(), Long.valueOf(System.currentTimeMillis()));
    }

    protected SerializedMessage toMessage(Throwable th, Map<String, String> map) {
        log.error("Failed to handle web request: " + th.getMessage() + ". Continuing with next request.", th);
        return new SerializedMessage(new Data("The request failed due to a server error".getBytes(), (String) null, 0, "text/plain"), Metadata.of(map).with(WebResponse.asMetadata(500, Collections.emptyMap())), FluxCapacitor.generateId(), Long.valueOf(System.currentTimeMillis()));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.registration.getAndUpdate(registration -> {
            if (registration == null) {
                return null;
            }
            registration.cancel();
            return null;
        });
    }
}
