package org.apache.distributedlog.service;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.Stat;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration;
import com.twitter.util.FutureEventListener;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.client.serverset.DLZkServerSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/MonitorService.class */
public class MonitorService implements NamespaceListener {
    private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
    private final Optional<String> uriArg;
    private final Optional<String> confFileArg;
    private final Optional<String> serverSetArg;
    private final Optional<Integer> intervalArg;
    private final Optional<Integer> regionIdArg;
    private final Optional<String> streamRegexArg;
    private final Optional<Integer> instanceIdArg;
    private final Optional<Integer> totalInstancesArg;
    private final Optional<Integer> heartbeatEveryChecksArg;
    private final Optional<Boolean> handshakeWithClientInfoArg;
    private final Optional<Boolean> watchNamespaceChangesArg;
    private final Optional<Boolean> isThriftMuxArg;
    private final StatsProvider statsProvider;
    private final StatsReceiver statsReceiver;
    private final StatsReceiver monitorReceiver;
    private final Stat successStat;
    private final Stat failureStat;
    private Namespace dlNamespace = null;
    private MonitorServiceClient dlClient = null;
    private DLZkServerSet[] zkServerSets = null;
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
    private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
    private final Map<String, StreamChecker> knownStreams = new HashMap();
    private int regionId = 0;
    private int interval = 100;
    private String streamRegex = null;
    private boolean watchNamespaceChanges = false;
    private boolean handshakeWithClientInfo = false;
    private int heartbeatEveryChecks = 0;
    private int instanceId = -1;
    private int totalInstances = -1;
    private boolean isThriftMux = false;
    private final HashFunction hashFunction = Hashing.md5();
    private final Gauge<Number> numOfStreamsGauge = new Gauge<Number>() { // from class: org.apache.distributedlog.service.MonitorService.1
        public Number getDefaultValue() {
            return 0;
        }

        public Number getSample() {
            return Integer.valueOf(MonitorService.this.knownStreams.size());
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/service/MonitorService$StreamChecker.class */
    public class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener {
        private final String name;
        private volatile boolean closed = false;
        private volatile boolean checking = false;
        private final Stopwatch stopwatch = Stopwatch.createUnstarted();
        private DistributedLogManager dlm = null;
        private int numChecks = 0;

        StreamChecker(String str) {
            this.name = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z;
            if (null == this.dlm) {
                try {
                    this.dlm = MonitorService.this.dlNamespace.openLog(this.name);
                    this.dlm.registerListener(this);
                    return;
                } catch (IOException e) {
                    if (null != this.dlm) {
                        try {
                            this.dlm.close();
                        } catch (IOException e2) {
                            MonitorService.logger.error("Failed to close dlm for {} : ", this.name, e2);
                        }
                        this.dlm = null;
                    }
                    MonitorService.this.executorService.schedule(this, MonitorService.this.interval, TimeUnit.MILLISECONDS);
                    return;
                }
            }
            this.stopwatch.reset().start();
            if (MonitorService.this.heartbeatEveryChecks > 0) {
                synchronized (this) {
                    this.numChecks++;
                    if (this.numChecks >= Integer.MAX_VALUE) {
                        this.numChecks = 0;
                    }
                    z = this.numChecks % MonitorService.this.heartbeatEveryChecks == 0;
                }
            } else {
                z = false;
            }
            if (z) {
                MonitorService.this.dlClient.heartbeat(this.name).addEventListener(this);
            } else {
                MonitorService.this.dlClient.check(this.name).addEventListener(this);
            }
        }

        public void onSegmentsUpdated(List<LogSegmentMetadata> list) {
            if (list.size() <= 0 || list.get(0).getRegionId() != MonitorService.this.regionId) {
                if (this.checking) {
                    MonitorService.logger.info("Stop checking stream {}.", this.name);
                }
            } else {
                if (this.checking) {
                    return;
                }
                MonitorService.logger.info("Start checking stream {}.", this.name);
                this.checking = true;
                run();
            }
        }

        public void onLogStreamDeleted() {
            MonitorService.logger.info("Stream {} is deleted", this.name);
        }

        public void onSuccess(Void r5) {
            MonitorService.this.successStat.add((float) this.stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            scheduleCheck();
        }

        public void onFailure(Throwable th) {
            MonitorService.this.failureStat.add((float) this.stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            scheduleCheck();
        }

        private void scheduleCheck() {
            if (!this.closed && this.checking) {
                try {
                    MonitorService.this.executorService.schedule(this, MonitorService.this.interval, TimeUnit.MILLISECONDS);
                } catch (RejectedExecutionException e) {
                    MonitorService.logger.error("Failed to schedule checking stream {} in {} ms : ", new Object[]{this.name, Integer.valueOf(MonitorService.this.interval), e});
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            this.closed = true;
            if (null != this.dlm) {
                try {
                    this.dlm.close();
                } catch (IOException e) {
                    MonitorService.logger.error("Failed to close dlm for {} : ", this.name, e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MonitorService(Optional<String> optional, Optional<String> optional2, Optional<String> optional3, Optional<Integer> optional4, Optional<Integer> optional5, Optional<String> optional6, Optional<Integer> optional7, Optional<Integer> optional8, Optional<Integer> optional9, Optional<Boolean> optional10, Optional<Boolean> optional11, Optional<Boolean> optional12, StatsReceiver statsReceiver, StatsProvider statsProvider) {
        this.uriArg = optional;
        this.confFileArg = optional2;
        this.serverSetArg = optional3;
        this.intervalArg = optional4;
        this.regionIdArg = optional5;
        this.streamRegexArg = optional6;
        this.instanceIdArg = optional7;
        this.totalInstancesArg = optional8;
        this.heartbeatEveryChecksArg = optional9;
        this.handshakeWithClientInfoArg = optional10;
        this.watchNamespaceChangesArg = optional11;
        this.isThriftMuxArg = optional12;
        this.statsReceiver = statsReceiver;
        this.monitorReceiver = statsReceiver.scope("monitor");
        this.successStat = this.monitorReceiver.stat0("success");
        this.failureStat = this.monitorReceiver.stat0("failure");
        this.statsProvider = statsProvider;
    }

    public void runServer() throws IllegalArgumentException, IOException {
        Preconditions.checkArgument(this.uriArg.isPresent(), "No distributedlog uri provided.");
        Preconditions.checkArgument(this.serverSetArg.isPresent(), "No proxy server set provided.");
        if (this.intervalArg.isPresent()) {
            this.interval = ((Integer) this.intervalArg.get()).intValue();
        }
        if (this.regionIdArg.isPresent()) {
            this.regionId = ((Integer) this.regionIdArg.get()).intValue();
        }
        if (this.streamRegexArg.isPresent()) {
            this.streamRegex = (String) this.streamRegexArg.get();
        }
        if (this.instanceIdArg.isPresent()) {
            this.instanceId = ((Integer) this.instanceIdArg.get()).intValue();
        }
        if (this.totalInstancesArg.isPresent()) {
            this.totalInstances = ((Integer) this.totalInstancesArg.get()).intValue();
        }
        if (this.heartbeatEveryChecksArg.isPresent()) {
            this.heartbeatEveryChecks = ((Integer) this.heartbeatEveryChecksArg.get()).intValue();
        }
        if (this.instanceId < 0 || this.totalInstances <= 0 || this.instanceId >= this.totalInstances) {
            throw new IllegalArgumentException("Invalid instance id or total instances number.");
        }
        this.handshakeWithClientInfo = this.handshakeWithClientInfoArg.isPresent();
        this.watchNamespaceChanges = this.watchNamespaceChangesArg.isPresent();
        this.isThriftMux = this.isThriftMuxArg.isPresent();
        URI create = URI.create((String) this.uriArg.get());
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        if (this.confFileArg.isPresent()) {
            String str = (String) this.confFileArg.get();
            try {
                distributedLogConfiguration.loadConf(new File(str).toURI().toURL());
            } catch (ConfigurationException e) {
                throw new IOException("Failed to load distributedlog configuration from " + str + ".");
            } catch (MalformedURLException e2) {
                throw new IOException("Failed to load distributedlog configuration from malformed " + str + ".");
            }
        }
        logger.info("Starting stats provider : {}.", this.statsProvider.getClass());
        this.statsProvider.start(distributedLogConfiguration);
        String[] split = StringUtils.split((String) this.serverSetArg.get(), ",");
        if (split.length == 0) {
            throw new IllegalArgumentException("Invalid serverset paths provided : " + ((String) this.serverSetArg.get()));
        }
        ServerSet[] createServerSets = createServerSets(split);
        ServerSet serverSet = createServerSets[0];
        ServerSet[] serverSetArr = new ServerSet[createServerSets.length - 1];
        System.arraycopy(createServerSets, 1, serverSetArr, 0, serverSetArr.length);
        ClientBuilder failFast = ClientBuilder.get().connectTimeout(Duration.fromSeconds(1)).tcpConnectTimeout(Duration.fromSeconds(1)).requestTimeout(Duration.fromSeconds(2)).keepAlive(true).failFast(false);
        if (!this.isThriftMux) {
            failFast = failFast.hostConnectionLimit(2).hostConnectionCoresize(2);
        }
        this.dlClient = DistributedLogClientBuilder.newBuilder().name("monitor").thriftmux(this.isThriftMux).clientId(ClientId$.MODULE$.apply("monitor")).redirectBackoffMaxMs(50).redirectBackoffStartMs(100).requestTimeoutMs(2000).maxRedirects(2).serverSets(serverSet, serverSetArr).streamNameRegex(this.streamRegex).handshakeWithClientInfo(this.handshakeWithClientInfo).clientBuilder(failFast).statsReceiver(this.monitorReceiver.scope("client")).buildMonitorClient();
        runMonitor(distributedLogConfiguration, create);
    }

    ServerSet[] createServerSets(String[] strArr) {
        ServerSet[] serverSetArr = new ServerSet[strArr.length];
        this.zkServerSets = new DLZkServerSet[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            this.zkServerSets[i] = parseServerSet(strArr[i]);
            serverSetArr[i] = this.zkServerSets[i].getServerSet();
        }
        return serverSetArr;
    }

    protected DLZkServerSet parseServerSet(String str) {
        return DLZkServerSet.of(URI.create(str), 60000);
    }

    public void onStreamsChanged(Iterator<String> it) {
        HashSet hashSet = new HashSet();
        while (it.hasNext()) {
            String next = it.next();
            if (null == this.streamRegex || next.matches(this.streamRegex)) {
                if (Math.abs(this.hashFunction.hashUnencodedChars(next).asInt()) % this.totalInstances == this.instanceId) {
                    hashSet.add(next);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this.knownStreams) {
            HashSet hashSet2 = new HashSet(this.knownStreams.keySet());
            ImmutableSet<String> immutableCopy = Sets.difference(hashSet2, hashSet).immutableCopy();
            ImmutableSet<String> immutableCopy2 = Sets.difference(hashSet, hashSet2).immutableCopy();
            for (String str : immutableCopy) {
                StreamChecker remove = this.knownStreams.remove(str);
                if (null != remove) {
                    logger.info("Removed stream {}", str);
                    arrayList.add(remove);
                }
            }
            for (String str2 : immutableCopy2) {
                if (!this.knownStreams.containsKey(str2)) {
                    logger.info("Added stream {}", str2);
                    StreamChecker streamChecker = new StreamChecker(str2);
                    this.knownStreams.put(str2, streamChecker);
                    streamChecker.run();
                }
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((StreamChecker) it2.next()).close();
        }
    }

    void runMonitor(DistributedLogConfiguration distributedLogConfiguration, URI uri) throws IOException {
        this.statsProvider.getStatsLogger("monitor").registerGauge("num_streams", this.numOfStreamsGauge);
        logger.info("Construct dl namespace @ {}", uri);
        this.dlNamespace = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(uri).build();
        if (this.watchNamespaceChanges) {
            this.dlNamespace.registerNamespaceListener(this);
        } else {
            onStreamsChanged(this.dlNamespace.getLogs());
        }
    }

    public void close() {
        logger.info("Closing monitor service.");
        if (null != this.dlClient) {
            this.dlClient.close();
        }
        if (null != this.zkServerSets) {
            for (DLZkServerSet dLZkServerSet : this.zkServerSets) {
                dLZkServerSet.close();
            }
        }
        if (null != this.dlNamespace) {
            this.dlNamespace.close();
        }
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(1L, TimeUnit.MINUTES)) {
                this.executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.error("Interrupted on waiting shutting down monitor executor service : ", e);
        }
        if (null != this.statsProvider) {
            unregisterGauge();
            this.statsProvider.stop();
        }
        this.keepAliveLatch.countDown();
        logger.info("Closed monitor service.");
    }

    public void join() throws InterruptedException {
        this.keepAliveLatch.await();
    }

    private void unregisterGauge() {
        this.statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", this.numOfStreamsGauge);
    }
}
