package org.apache.distributedlog.service.placement;

import com.twitter.util.Duration;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.Futures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.client.routing.RoutingService;
import org.apache.distributedlog.service.placement.PlacementStateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.class */
public class LeastLoadPlacementPolicy extends PlacementPolicy {
    private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
    private TreeSet<ServerLoad> serverLoads;
    private Map<String, String> streamToServer;

    public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, Namespace namespace, PlacementStateManager placementStateManager, Duration duration, StatsLogger statsLogger) {
        super(loadAppraiser, routingService, namespace, placementStateManager, duration, statsLogger);
        this.serverLoads = new TreeSet<>();
        this.streamToServer = new HashMap();
        statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.1
            public Number getDefaultValue() {
                return 0;
            }

            public Number getSample() {
                return LeastLoadPlacementPolicy.this.serverLoads.size() > 0 ? Long.valueOf(((ServerLoad) LeastLoadPlacementPolicy.this.serverLoads.last()).getLoad() - ((ServerLoad) LeastLoadPlacementPolicy.this.serverLoads.first()).getLoad()) : getDefaultValue();
            }
        });
    }

    private synchronized String getStreamOwner(String str) {
        return this.streamToServer.get(str);
    }

    @Override // org.apache.distributedlog.service.placement.PlacementPolicy
    public Future<String> placeStream(String str) {
        String streamOwner = getStreamOwner(str);
        return null != streamOwner ? Future.value(streamOwner) : this.loadAppraiser.getStreamLoad(str).map(new Function<StreamLoad, String>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.2
            public String apply(StreamLoad streamLoad) {
                return LeastLoadPlacementPolicy.this.placeStreamSynchronized(streamLoad);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized String placeStreamSynchronized(StreamLoad streamLoad) {
        ServerLoad pollFirst = this.serverLoads.pollFirst();
        pollFirst.addStream(streamLoad);
        this.serverLoads.add(pollFirst);
        return pollFirst.getServer();
    }

    @Override // org.apache.distributedlog.service.placement.PlacementPolicy
    public void refresh() {
        logger.info("Refreshing server loads.");
        Future<Void> refreshCache = this.loadAppraiser.refreshCache();
        final Set<String> servers = getServers();
        final Set<String> streams = getStreams();
        refreshCache.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.3
            public Future<TreeSet<ServerLoad>> apply(Void r5) {
                return LeastLoadPlacementPolicy.this.calculate(servers, streams);
            }
        }).map(new Function<TreeSet<ServerLoad>, BoxedUnit>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.4
            public BoxedUnit apply(TreeSet<ServerLoad> treeSet) {
                try {
                    LeastLoadPlacementPolicy.this.updateServerLoads(treeSet);
                } catch (PlacementStateManager.StateManagerSaveException e) {
                    LeastLoadPlacementPolicy.logger.error("The refreshed mapping could not be persisted and will not be used.", e);
                }
                return BoxedUnit.UNIT;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateServerLoads(TreeSet<ServerLoad> treeSet) throws PlacementStateManager.StateManagerSaveException {
        this.placementStateManager.saveOwnership(treeSet);
        this.streamToServer = serverLoadsToMap(treeSet);
        this.serverLoads = treeSet;
    }

    @Override // org.apache.distributedlog.service.placement.PlacementPolicy
    public synchronized void load(TreeSet<ServerLoad> treeSet) {
        this.serverLoads = treeSet;
        this.streamToServer = serverLoadsToMap(treeSet);
    }

    public Future<TreeSet<ServerLoad>> calculate(final Set<String> set, Set<String> set2) {
        logger.info("Calculating server loads");
        final long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList(set2.size());
        Iterator<String> it = set2.iterator();
        while (it.hasNext()) {
            arrayList.add(this.loadAppraiser.getStreamLoad(it.next()));
        }
        return Futures.collect(arrayList).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.7
            public TreeSet<ServerLoad> apply(List<StreamLoad> list) {
                TreeSet treeSet = new TreeSet();
                Iterator<StreamLoad> it2 = list.iterator();
                while (it2.hasNext()) {
                    treeSet.add(it2.next());
                }
                TreeSet<ServerLoad> treeSet2 = new TreeSet<>();
                Iterator it3 = set.iterator();
                while (it3.hasNext()) {
                    ServerLoad serverLoad = new ServerLoad((String) it3.next());
                    if (!treeSet.isEmpty()) {
                        serverLoad.addStream((StreamLoad) treeSet.pollFirst());
                    }
                    treeSet2.add(serverLoad);
                }
                while (!treeSet.isEmpty()) {
                    ServerLoad pollFirst = treeSet2.pollFirst();
                    pollFirst.addStream((StreamLoad) treeSet.pollFirst());
                    treeSet2.add(pollFirst);
                }
                return treeSet2;
            }
        }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.6
            public BoxedUnit apply(TreeSet<ServerLoad> treeSet) {
                LeastLoadPlacementPolicy.this.placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MICROSECONDS);
                return BoxedUnit.UNIT;
            }
        }).onFailure(new Function<Throwable, BoxedUnit>() { // from class: org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy.5
            public BoxedUnit apply(Throwable th) {
                LeastLoadPlacementPolicy.logger.error("Failure calculating loads", th);
                LeastLoadPlacementPolicy.this.placementCalcStats.registerFailedEvent(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MICROSECONDS);
                return BoxedUnit.UNIT;
            }
        });
    }

    private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> collection) {
        HashMap hashMap = new HashMap(collection.size());
        for (ServerLoad serverLoad : collection) {
            Iterator<StreamLoad> it = serverLoad.getStreamLoads().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next().getStream(), serverLoad.getServer());
            }
        }
        return hashMap;
    }
}
