package org.apache.distributedlog.service.balancer;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import java.net.URI;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.client.serverset.DLZkServerSet;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.service.ClientUtils;
import org.apache.distributedlog.service.DLSocketAddress;
import org.apache.distributedlog.service.DistributedLogClient;
import org.apache.distributedlog.service.DistributedLogClientBuilder;
import org.apache.distributedlog.tools.Tool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/balancer/BalancerTool.class */
public class BalancerTool extends Tool {
    private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/distributedlog/service/balancer/BalancerTool$BalancerCommand.class */
    public static abstract class BalancerCommand extends Tool.OptsCommand {
        protected Options options;
        protected int rebalanceWaterMark;
        protected double rebalanceTolerancePercentage;
        protected int rebalanceConcurrency;
        protected Double rate;
        protected Optional<RateLimiter> rateLimiter;

        BalancerCommand(String str, String str2) {
            super(str, str2);
            this.options = new Options();
            this.rebalanceWaterMark = 0;
            this.rebalanceTolerancePercentage = 0.0d;
            this.rebalanceConcurrency = 1;
            this.rate = null;
            this.options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
            this.options.addOption("rtp", "rebalance-tolerance-percentage", true, "Rebalance tolerance percentage per proxy");
            this.options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
            this.options.addOption("r", "rate", true, "Rebalance rate");
        }

        Optional<RateLimiter> getRateLimiter() {
            return this.rateLimiter;
        }

        protected Options getOptions() {
            return this.options;
        }

        protected void parseCommandLine(CommandLine commandLine) throws ParseException {
            if (commandLine.hasOption("rwm")) {
                this.rebalanceWaterMark = Integer.parseInt(commandLine.getOptionValue("rwm"));
            }
            if (commandLine.hasOption("rtp")) {
                this.rebalanceTolerancePercentage = Double.parseDouble(commandLine.getOptionValue("rtp"));
            }
            if (commandLine.hasOption("rc")) {
                this.rebalanceConcurrency = Integer.parseInt(commandLine.getOptionValue("rc"));
            }
            if (commandLine.hasOption("r")) {
                this.rate = Double.valueOf(Double.parseDouble(commandLine.getOptionValue("r")));
            }
            Preconditions.checkArgument(this.rebalanceWaterMark >= 0, "Rebalance Water Mark should be a non-negative number");
            Preconditions.checkArgument(this.rebalanceTolerancePercentage >= 0.0d, "Rebalance Tolerance Percentage should be a non-negative number");
            Preconditions.checkArgument(this.rebalanceConcurrency > 0, "Rebalance Concurrency should be a positive number");
            if (null == this.rate || this.rate.doubleValue() <= 0.0d) {
                this.rateLimiter = Optional.absent();
            } else {
                this.rateLimiter = Optional.of(RateLimiter.create(this.rate.doubleValue()));
            }
        }

        protected int runCmd(CommandLine commandLine) throws Exception {
            try {
                parseCommandLine(commandLine);
                return executeCommand(commandLine);
            } catch (ParseException e) {
                BalancerTool.println("ERROR: fail to parse commandline : '" + e.getMessage() + "'");
                printUsage();
                return -1;
            }
        }

        protected abstract int executeCommand(CommandLine commandLine) throws Exception;
    }

    /* loaded from: input_file:org/apache/distributedlog/service/balancer/BalancerTool$ClusterBalancerCommand.class */
    protected static class ClusterBalancerCommand extends BalancerCommand {
        protected URI uri;
        protected String source;

        protected ClusterBalancerCommand() {
            super("clusterbalancer", "Balance streams inside a cluster");
            this.source = null;
            this.options.addOption("u", "uri", true, "DistributedLog URI");
            this.options.addOption("sp", "source-proxy", true, "Source proxy to balance");
        }

        protected String getUsage() {
            return "clusterbalancer [options]";
        }

        @Override // org.apache.distributedlog.service.balancer.BalancerTool.BalancerCommand
        protected void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            if (!commandLine.hasOption("u")) {
                throw new ParseException("No proxy serverset provided.");
            }
            this.uri = URI.create(commandLine.getOptionValue("u"));
            if (commandLine.hasOption("sp")) {
                String optionValue = commandLine.getOptionValue("sp");
                try {
                    DLSocketAddress.parseSocketAddress(optionValue);
                    this.source = optionValue;
                } catch (IllegalArgumentException e) {
                    throw new ParseException("Invalid source proxy " + optionValue + " : " + e.getMessage());
                }
            }
        }

        @Override // org.apache.distributedlog.service.balancer.BalancerTool.BalancerCommand
        protected int executeCommand(CommandLine commandLine) throws Exception {
            DLZkServerSet of = DLZkServerSet.of(this.uri, 60000);
            BalancerTool.logger.info("Created serverset for {}", this.uri);
            try {
                DistributedLogClientBuilder createDistributedLogClientBuilder = BalancerTool.createDistributedLogClientBuilder(of.getServerSet());
                ClusterBalancer clusterBalancer = new ClusterBalancer(createDistributedLogClientBuilder);
                try {
                    int runBalancer = runBalancer(createDistributedLogClientBuilder, clusterBalancer);
                    clusterBalancer.close();
                    of.close();
                    return runBalancer;
                } catch (Throwable th) {
                    clusterBalancer.close();
                    throw th;
                }
            } catch (Throwable th2) {
                of.close();
                throw th2;
            }
        }

        protected int runBalancer(DistributedLogClientBuilder distributedLogClientBuilder, ClusterBalancer clusterBalancer) throws Exception {
            if (null == this.source) {
                clusterBalancer.balance(this.rebalanceWaterMark, this.rebalanceTolerancePercentage, this.rebalanceConcurrency, getRateLimiter());
                return 0;
            }
            balanceFromSource(distributedLogClientBuilder, clusterBalancer, this.source, getRateLimiter());
            return 0;
        }

        protected void balanceFromSource(DistributedLogClientBuilder distributedLogClientBuilder, ClusterBalancer clusterBalancer, String str, Optional<RateLimiter> optional) throws Exception {
            Pair<DistributedLogClient, MonitorServiceClient> buildClient = ClientUtils.buildClient(DistributedLogClientBuilder.newBuilder(distributedLogClientBuilder).host(DLSocketAddress.parseSocketAddress(str)));
            try {
                Await.result(((MonitorServiceClient) buildClient.getRight()).setAcceptNewStream(false));
                BalancerTool.logger.info("Disable accepting new stream on proxy {}.", str);
                clusterBalancer.balanceAll(str, this.rebalanceConcurrency, optional);
                ((DistributedLogClient) buildClient.getLeft()).close();
            } catch (Throwable th) {
                ((DistributedLogClient) buildClient.getLeft()).close();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/service/balancer/BalancerTool$RegionBalancerCommand.class */
    protected static class RegionBalancerCommand extends BalancerCommand {
        protected URI region1;
        protected URI region2;
        protected String source;

        protected RegionBalancerCommand() {
            super("regionbalancer", "Balance streams between regions");
            this.source = null;
            this.options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]");
            this.options.addOption("s", "source", true, "DistributedLog Source Region to balance");
        }

        protected String getUsage() {
            return "regionbalancer [options]";
        }

        @Override // org.apache.distributedlog.service.balancer.BalancerTool.BalancerCommand
        protected void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            if (!commandLine.hasOption("rs")) {
                throw new ParseException("No regions provided.");
            }
            String[] split = commandLine.getOptionValue("rs").split(",");
            if (split.length != 2) {
                throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]");
            }
            this.region1 = URI.create(split[0]);
            this.region2 = URI.create(split[1]);
            if (commandLine.hasOption("s")) {
                this.source = commandLine.getOptionValue("s");
            }
        }

        @Override // org.apache.distributedlog.service.balancer.BalancerTool.BalancerCommand
        protected int executeCommand(CommandLine commandLine) throws Exception {
            DLZkServerSet of = DLZkServerSet.of(this.region1, 60000);
            BalancerTool.logger.info("Created serverset for {}", this.region1);
            DLZkServerSet of2 = DLZkServerSet.of(this.region2, 60000);
            BalancerTool.logger.info("Created serverset for {}", this.region2);
            try {
                Pair<DistributedLogClient, MonitorServiceClient> buildClient = ClientUtils.buildClient(BalancerTool.createDistributedLogClientBuilder(of.getServerSet()));
                Pair<DistributedLogClient, MonitorServiceClient> buildClient2 = ClientUtils.buildClient(BalancerTool.createDistributedLogClientBuilder(of2.getServerSet()));
                try {
                    SimpleBalancer simpleBalancer = new SimpleBalancer(BKNamespaceDriver.getZKServersFromDLUri(this.region1), (DistributedLogClient) buildClient.getLeft(), (MonitorServiceClient) buildClient.getRight(), BKNamespaceDriver.getZKServersFromDLUri(this.region2), (DistributedLogClient) buildClient2.getLeft(), (MonitorServiceClient) buildClient2.getRight());
                    try {
                        int runBalancer = runBalancer(simpleBalancer);
                        simpleBalancer.close();
                        ((DistributedLogClient) buildClient.getLeft()).close();
                        ((DistributedLogClient) buildClient2.getLeft()).close();
                        of.close();
                        of2.close();
                        return runBalancer;
                    } catch (Throwable th) {
                        simpleBalancer.close();
                        throw th;
                    }
                } catch (Throwable th2) {
                    ((DistributedLogClient) buildClient.getLeft()).close();
                    ((DistributedLogClient) buildClient2.getLeft()).close();
                    throw th2;
                }
            } catch (Throwable th3) {
                of.close();
                of2.close();
                throw th3;
            }
        }

        protected int runBalancer(SimpleBalancer simpleBalancer) throws Exception {
            if (null == this.source) {
                simpleBalancer.balance(this.rebalanceWaterMark, this.rebalanceTolerancePercentage, this.rebalanceConcurrency, getRateLimiter());
                return 0;
            }
            simpleBalancer.balanceAll(this.source, this.rebalanceConcurrency, getRateLimiter());
            return 0;
        }
    }

    static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
        return DistributedLogClientBuilder.newBuilder().name("rebalancer_tool").clientId(ClientId$.MODULE$.apply("rebalancer_tool")).maxRedirects(2).serverSet(serverSet).clientBuilder(ClientBuilder.get().connectionTimeout(Duration.fromSeconds(2)).tcpConnectTimeout(Duration.fromSeconds(2)).requestTimeout(Duration.fromSeconds(10)).hostConnectionLimit(1).hostConnectionCoresize(1).keepAlive(true).failFast(false));
    }

    public BalancerTool() {
        addCommand(new ClusterBalancerCommand());
        addCommand(new RegionBalancerCommand());
    }

    protected String getName() {
        return "balancer";
    }
}
