package org.apache.pulsar.functions.metrics.sink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.pulsar.functions.metrics.MetricsSink;

/* loaded from: input_file:org/apache/pulsar/functions/metrics/sink/AbstractWebSink.class */
public abstract class AbstractWebSink implements MetricsSink {
    private static final Logger LOG = Logger.getLogger(AbstractWebSink.class.getName());
    private static final int HTTP_STATUS_OK = 200;
    public static final String KEY_PORT = "port";
    public static final String KEY_PATH = "path";
    private static final String KEY_METRICS_CACHE_MAX_SIZE = "metrics-cache-max-size";
    private static final String DEFAULT_MAX_CACHE_SIZE = "1000000";
    private static final String KEY_METRICS_CACHE_TTL_SEC = "metrics-cache-ttl-sec";
    private static final String DEFAULT_CACHE_TTL_SECONDS = "600";
    private HttpServer httpServer;
    private long cacheMaxSize;
    private long cacheTtlSeconds;
    private final Ticker cacheTicker;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractWebSink() {
        this(Ticker.systemTicker());
    }

    @VisibleForTesting
    AbstractWebSink(Ticker ticker) {
        this.cacheTicker = ticker;
    }

    @Override // org.apache.pulsar.functions.metrics.MetricsSink
    public final void init(Map<String, String> map) {
        String str = map.get(KEY_PATH);
        this.cacheMaxSize = Long.valueOf(map.getOrDefault(KEY_METRICS_CACHE_MAX_SIZE, DEFAULT_MAX_CACHE_SIZE)).longValue();
        this.cacheTtlSeconds = Long.valueOf(map.getOrDefault(KEY_METRICS_CACHE_TTL_SEC, DEFAULT_CACHE_TTL_SECONDS)).longValue();
        initialize(map);
        startHttpServer(str, Integer.valueOf(map.getOrDefault("port", "9099")).intValue());
    }

    protected void startHttpServer(String str, int i) {
        LOG.info("Starting AbstractWebMetricSink at path" + str + " and port " + i);
        try {
            this.httpServer = HttpServer.create(new InetSocketAddress(i), 0);
            this.httpServer.createContext(str, httpExchange -> {
                byte[] generateResponse = generateResponse();
                httpExchange.sendResponseHeaders(200, generateResponse.length);
                OutputStream responseBody = httpExchange.getResponseBody();
                responseBody.write(generateResponse);
                responseBody.close();
                LOG.log(Level.INFO, "Received metrics request.");
            });
            this.httpServer.start();
        } catch (IOException e) {
            throw new RuntimeException("Failed to create Http server on port " + i, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> Cache<K, V> createCache() {
        return (Cache<K, V>) CacheBuilder.newBuilder().maximumSize(this.cacheMaxSize).expireAfterWrite(this.cacheTtlSeconds, TimeUnit.SECONDS).ticker(this.cacheTicker).build();
    }

    abstract byte[] generateResponse() throws IOException;

    abstract void initialize(Map<String, String> map);

    @Override // org.apache.pulsar.functions.metrics.MetricsSink
    public void flush() {
    }

    @Override // org.apache.pulsar.functions.metrics.MetricsSink, java.lang.AutoCloseable
    public void close() {
        if (this.httpServer != null) {
            this.httpServer.stop(0);
        }
    }
}
