package org.apache.distributedlog.service.balancer;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.service.DistributedLogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/balancer/SimpleBalancer.class */
public class SimpleBalancer implements Balancer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
    protected final String target1;
    protected final String target2;
    protected final DistributedLogClient targetClient1;
    protected final DistributedLogClient targetClient2;
    protected final MonitorServiceClient targetMonitor1;
    protected final MonitorServiceClient targetMonitor2;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/service/balancer/SimpleBalancer$RegionMover.class */
    public static class RegionMover implements Runnable {
        final StreamChooser streamChooser;
        final StreamMover streamMover;
        final Optional<RateLimiter> rateLimiter;
        final CountDownLatch doneLatch;
        volatile boolean running = true;

        RegionMover(StreamChooser streamChooser, StreamMover streamMover, Optional<RateLimiter> optional, CountDownLatch countDownLatch) {
            this.streamChooser = streamChooser;
            this.streamMover = streamMover;
            this.rateLimiter = optional;
            this.doneLatch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                if (this.rateLimiter.isPresent()) {
                    ((RateLimiter) this.rateLimiter.get()).acquire();
                }
                String choose = this.streamChooser.choose();
                if (null == choose) {
                    break;
                } else {
                    this.streamMover.moveStream(choose);
                }
            }
            this.doneLatch.countDown();
        }

        void shutdown() {
            this.running = false;
        }
    }

    public SimpleBalancer(String str, DistributedLogClient distributedLogClient, MonitorServiceClient monitorServiceClient, String str2, DistributedLogClient distributedLogClient2, MonitorServiceClient monitorServiceClient2) {
        this.target1 = str;
        this.targetClient1 = distributedLogClient;
        this.targetMonitor1 = monitorServiceClient;
        this.target2 = str2;
        this.targetClient2 = distributedLogClient2;
        this.targetMonitor2 = monitorServiceClient2;
    }

    protected static int countNumberStreams(Map<SocketAddress, Set<String>> map) {
        int i = 0;
        Iterator<Set<String>> it = map.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @Override // org.apache.distributedlog.service.balancer.Balancer
    public void balance(int i, double d, int i2, Optional<RateLimiter> optional) {
        String str;
        int i3;
        DistributedLogClient distributedLogClient;
        MonitorServiceClient monitorServiceClient;
        Map map;
        String str2;
        int i4;
        DistributedLogClient distributedLogClient2;
        MonitorServiceClient monitorServiceClient2;
        Map streamOwnershipDistribution = this.targetMonitor1.getStreamOwnershipDistribution();
        Map streamOwnershipDistribution2 = this.targetMonitor2.getStreamOwnershipDistribution();
        int size = streamOwnershipDistribution.size();
        int countNumberStreams = countNumberStreams(streamOwnershipDistribution);
        int size2 = streamOwnershipDistribution2.size();
        int countNumberStreams2 = countNumberStreams(streamOwnershipDistribution2);
        logger.info("'{}' has {} streams by {} proxies; while '{}' has {} streams by {} proxies.", new Object[]{this.target1, Integer.valueOf(countNumberStreams), Integer.valueOf(size), this.target2, Integer.valueOf(countNumberStreams2), Integer.valueOf(size2)});
        if (countNumberStreams > countNumberStreams2) {
            str = this.target1;
            i3 = countNumberStreams;
            distributedLogClient = this.targetClient1;
            monitorServiceClient = this.targetMonitor1;
            map = streamOwnershipDistribution;
            str2 = this.target2;
            i4 = countNumberStreams2;
            distributedLogClient2 = this.targetClient2;
            monitorServiceClient2 = this.targetMonitor2;
        } else {
            str = this.target2;
            i3 = countNumberStreams2;
            distributedLogClient = this.targetClient2;
            monitorServiceClient = this.targetMonitor2;
            map = streamOwnershipDistribution2;
            str2 = this.target1;
            i4 = countNumberStreams;
            distributedLogClient2 = this.targetClient1;
            monitorServiceClient2 = this.targetMonitor1;
        }
        HashMap hashMap = new HashMap();
        hashMap.put(str, Integer.valueOf(i3));
        hashMap.put(str2, Integer.valueOf(i4));
        int calculateNumStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance(str, hashMap, i, d);
        if (calculateNumStreamsToRebalance <= 0) {
            logger.info("No streams need to be rebalanced from '{}' to '{}'.", str, str2);
        } else {
            moveStreams(LimitedStreamChooser.of(new CountBasedStreamChooser(map), calculateNumStreamsToRebalance), new StreamMoverImpl(str, distributedLogClient, monitorServiceClient, str2, distributedLogClient2, monitorServiceClient2), i2, optional);
        }
    }

    @Override // org.apache.distributedlog.service.balancer.Balancer
    public void balanceAll(String str, int i, Optional<RateLimiter> optional) {
        DistributedLogClient distributedLogClient;
        MonitorServiceClient monitorServiceClient;
        String str2;
        DistributedLogClient distributedLogClient2;
        MonitorServiceClient monitorServiceClient2;
        if (this.target1.equals(str)) {
            distributedLogClient = this.targetClient1;
            monitorServiceClient = this.targetMonitor1;
            str2 = this.target2;
            distributedLogClient2 = this.targetClient2;
            monitorServiceClient2 = this.targetMonitor2;
        } else {
            if (!this.target2.equals(str)) {
                throw new IllegalArgumentException("Unknown target " + str);
            }
            distributedLogClient = this.targetClient2;
            monitorServiceClient = this.targetMonitor2;
            str2 = this.target1;
            distributedLogClient2 = this.targetClient1;
            monitorServiceClient2 = this.targetMonitor1;
        }
        Map streamOwnershipDistribution = monitorServiceClient.getStreamOwnershipDistribution();
        if (streamOwnershipDistribution.isEmpty()) {
            return;
        }
        moveStreams(new CountBasedStreamChooser(streamOwnershipDistribution), new StreamMoverImpl(str, distributedLogClient, monitorServiceClient, str2, distributedLogClient2, monitorServiceClient2), i, optional);
    }

    private void moveStreams(StreamChooser streamChooser, StreamMover streamMover, int i, Optional<RateLimiter> optional) {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        RegionMover regionMover = new RegionMover(streamChooser, streamMover, optional, countDownLatch);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        for (int i2 = 0; i2 < i; i2++) {
            try {
                newFixedThreadPool.submit(regionMover);
            } finally {
                newFixedThreadPool.shutdown();
            }
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.info("{} is interrupted. Stopping it ...", streamMover);
            regionMover.shutdown();
        }
    }

    @Override // org.apache.distributedlog.service.balancer.Balancer
    public void close() {
    }
}
