package org.apache.distributedlog.bk;

import dlshade.com.google.common.annotations.VisibleForTesting;
import dlshade.com.google.common.collect.Lists;
import dlshade.org.apache.bookkeeper.client.LedgerHandle;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener;
import dlshade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import dlshade.org.apache.bookkeeper.net.NodeBase;
import dlshade.org.apache.bookkeeper.util.ZkUtils;
import dlshade.org.apache.bookkeeper.versioning.LongVersion;
import dlshade.org.apache.bookkeeper.versioning.Versioned;
import dlshade.org.apache.zookeeper.AsyncCallback;
import dlshade.org.apache.zookeeper.CreateMode;
import dlshade.org.apache.zookeeper.KeeperException;
import dlshade.org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/bk/LedgerAllocatorPool.class */
public class LedgerAllocatorPool implements LedgerAllocator {
    private static final Logger logger = LoggerFactory.getLogger(LedgerAllocatorPool.class);
    private final DistributedLogConfiguration conf;
    private final QuorumConfigProvider quorumConfigProvider;
    private final BookKeeperClient bkc;
    private final ZooKeeperClient zkc;
    private final ScheduledExecutorService scheduledExecutorService;
    private final String poolPath;
    private final int corePoolSize;
    private final LinkedList<SimpleLedgerAllocator> pendingList = new LinkedList<>();
    private final LinkedList<SimpleLedgerAllocator> allocatingList = new LinkedList<>();
    private final Map<String, SimpleLedgerAllocator> rescueMap = new HashMap();
    private final Map<LedgerHandle, SimpleLedgerAllocator> obtainMap = new HashMap();
    private final Map<SimpleLedgerAllocator, LedgerHandle> reverseObtainMap = new HashMap();

    public LedgerAllocatorPool(String str, int i, DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClient zooKeeperClient, BookKeeperClient bookKeeperClient, ScheduledExecutorService scheduledExecutorService) throws IOException {
        this.poolPath = str;
        this.corePoolSize = i;
        this.conf = distributedLogConfiguration;
        this.quorumConfigProvider = new ImmutableQuorumConfigProvider(distributedLogConfiguration.getQuorumConfig());
        this.zkc = zooKeeperClient;
        this.bkc = bookKeeperClient;
        this.scheduledExecutorService = scheduledExecutorService;
        initializePool();
    }

    @Override // org.apache.distributedlog.bk.LedgerAllocator
    public void start() throws IOException {
        Iterator<SimpleLedgerAllocator> it = this.pendingList.iterator();
        while (it.hasNext()) {
            it.next().allocate();
        }
    }

    @VisibleForTesting
    synchronized int pendingListSize() {
        return this.pendingList.size();
    }

    @VisibleForTesting
    synchronized int allocatingListSize() {
        return this.allocatingList.size();
    }

    @VisibleForTesting
    public synchronized int obtainMapSize() {
        return this.obtainMap.size();
    }

    @VisibleForTesting
    synchronized int rescueSize() {
        return this.rescueMap.size();
    }

    @VisibleForTesting
    synchronized SimpleLedgerAllocator getLedgerAllocator(LedgerHandle ledgerHandle) {
        return this.obtainMap.get(ledgerHandle);
    }

    private void initializePool() throws IOException {
        List<String> children;
        try {
            try {
                children = this.zkc.get().getChildren(this.poolPath, false);
            } catch (KeeperException.NoNodeException e) {
                logger.info("Allocator Pool {} doesn't exist. Creating it.", this.poolPath);
                ZkUtils.createFullPathOptimistic(this.zkc.get(), this.poolPath, new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT);
                children = this.zkc.get().getChildren(this.poolPath, false);
            }
            if (null == children) {
                children = new ArrayList();
            }
            if (children.size() < this.corePoolSize) {
                createAllocators(this.corePoolSize - children.size());
                children = this.zkc.get().getChildren(this.poolPath, false);
            }
            initializeAllocators(children);
        } catch (KeeperException e2) {
            throw new IOException("Encountered zookeeper exception when initializing pool " + this.poolPath + " : ", e2);
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            throw new DLInterruptedException("Interrupted when ensuring " + this.poolPath + " created : ", e3);
        }
    }

    private void createAllocators(int i) throws InterruptedException, IOException {
        final AtomicInteger atomicInteger = new AtomicInteger(i);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        AsyncCallback.StringCallback stringCallback = new AsyncCallback.StringCallback() { // from class: org.apache.distributedlog.bk.LedgerAllocatorPool.1
            @Override // dlshade.org.apache.zookeeper.AsyncCallback.StringCallback
            public void processResult(int i2, String str, Object obj, String str2) {
                if (KeeperException.Code.OK.intValue() != i2) {
                    atomicInteger2.incrementAndGet();
                    countDownLatch.countDown();
                } else if (atomicInteger.decrementAndGet() == 0 && atomicInteger2.get() == 0) {
                    countDownLatch.countDown();
                }
            }
        };
        for (int i2 = 0; i2 < i; i2++) {
            this.zkc.get().create(this.poolPath + "/A", new byte[0], this.zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL, stringCallback, null);
        }
        countDownLatch.await();
        if (atomicInteger2.get() > 0) {
            throw new IOException("Failed to create " + i + " allocators.");
        }
    }

    private void initializeAllocators(List<String> list) throws IOException, InterruptedException {
        final AtomicInteger atomicInteger = new AtomicInteger(list.size());
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(atomicInteger.get() > 0 ? 1 : 0);
        AsyncCallback.DataCallback dataCallback = new AsyncCallback.DataCallback() { // from class: org.apache.distributedlog.bk.LedgerAllocatorPool.2
            @Override // dlshade.org.apache.zookeeper.AsyncCallback.DataCallback
            public void processResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
                if (KeeperException.Code.OK.intValue() != i) {
                    atomicInteger2.incrementAndGet();
                    countDownLatch.countDown();
                    return;
                }
                SimpleLedgerAllocator simpleLedgerAllocator = new SimpleLedgerAllocator(str, new Versioned(bArr, new LongVersion(stat.getVersion())), LedgerAllocatorPool.this.quorumConfigProvider, LedgerAllocatorPool.this.zkc, LedgerAllocatorPool.this.bkc);
                simpleLedgerAllocator.start();
                LedgerAllocatorPool.this.pendingList.add(simpleLedgerAllocator);
                if (atomicInteger.decrementAndGet() == 0 && atomicInteger2.get() == 0) {
                    countDownLatch.countDown();
                }
            }
        };
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.zkc.get().getData(this.poolPath + NodeBase.PATH_SEPARATOR_STR + it.next(), false, dataCallback, (Object) null);
        }
        countDownLatch.await();
        if (atomicInteger2.get() > 0) {
            throw new IOException("Failed to initialize allocators : " + list);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleAllocatorRescue(final SimpleLedgerAllocator simpleLedgerAllocator) {
        try {
            this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.distributedlog.bk.LedgerAllocatorPool.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        LedgerAllocatorPool.this.rescueAllocator(simpleLedgerAllocator);
                    } catch (DLInterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, this.conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            logger.warn("Failed to schedule rescuing ledger allocator {} : ", simpleLedgerAllocator.allocatePath, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rescueAllocator(final SimpleLedgerAllocator simpleLedgerAllocator) throws DLInterruptedException {
        SimpleLedgerAllocator put;
        synchronized (this) {
            put = this.rescueMap.put(simpleLedgerAllocator.allocatePath, simpleLedgerAllocator);
        }
        if (put != null) {
            logger.info("ledger allocator {} is being rescued.", simpleLedgerAllocator.allocatePath);
            return;
        }
        try {
            this.zkc.get().getData(simpleLedgerAllocator.allocatePath, false, new AsyncCallback.DataCallback() { // from class: org.apache.distributedlog.bk.LedgerAllocatorPool.4
                @Override // dlshade.org.apache.zookeeper.AsyncCallback.DataCallback
                public void processResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
                    boolean z = false;
                    SimpleLedgerAllocator simpleLedgerAllocator2 = null;
                    if (KeeperException.Code.OK.intValue() == i) {
                        Versioned versioned = new Versioned(bArr, new LongVersion(stat.getVersion()));
                        LedgerAllocatorPool.logger.info("Rescuing ledger allocator {}.", str);
                        simpleLedgerAllocator2 = new SimpleLedgerAllocator(str, versioned, LedgerAllocatorPool.this.quorumConfigProvider, LedgerAllocatorPool.this.zkc, LedgerAllocatorPool.this.bkc);
                        simpleLedgerAllocator2.start();
                        LedgerAllocatorPool.logger.info("Rescued ledger allocator {}.", str);
                    } else if (KeeperException.Code.NONODE.intValue() == i) {
                        LedgerAllocatorPool.logger.info("Ledger allocator {} doesn't exist, skip rescuing it.", str);
                    } else {
                        z = true;
                    }
                    synchronized (LedgerAllocatorPool.this) {
                        LedgerAllocatorPool.this.rescueMap.remove(simpleLedgerAllocator.allocatePath);
                        if (null != simpleLedgerAllocator2) {
                            LedgerAllocatorPool.this.pendingList.addLast(simpleLedgerAllocator2);
                        }
                    }
                    if (z) {
                        LedgerAllocatorPool.this.scheduleAllocatorRescue(simpleLedgerAllocator);
                    }
                }
            }, (Object) null);
        } catch (IOException e) {
            logger.warn("Failed to rescue ledger allocator {}, retry rescuing it later : ", simpleLedgerAllocator.allocatePath, e);
            synchronized (this) {
                this.rescueMap.remove(simpleLedgerAllocator.allocatePath);
                scheduleAllocatorRescue(simpleLedgerAllocator);
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            logger.warn("Interrupted on rescuing ledger allocator {} : ", simpleLedgerAllocator.allocatePath, e2);
            synchronized (this) {
                this.rescueMap.remove(simpleLedgerAllocator.allocatePath);
                throw new DLInterruptedException("Interrupted on rescuing ledger allocator " + simpleLedgerAllocator.allocatePath, e2);
            }
        }
    }

    @Override // org.apache.distributedlog.util.Allocator
    public void allocate() throws IOException {
        SimpleLedgerAllocator removeFirst;
        synchronized (this) {
            if (this.pendingList.isEmpty()) {
                throw new IOException("No ledger allocator available under " + this.poolPath + ".");
            }
            removeFirst = this.pendingList.removeFirst();
        }
        try {
            removeFirst.allocate();
            synchronized (this) {
                this.allocatingList.addLast(removeFirst);
            }
            if (1 == 0) {
                rescueAllocator(removeFirst);
            }
        } catch (Throwable th) {
            if (0 == 0) {
                rescueAllocator(removeFirst);
            }
            throw th;
        }
    }

    @Override // org.apache.distributedlog.util.Allocator
    public CompletableFuture<LedgerHandle> tryObtain(Transaction<Object> transaction, final Transaction.OpListener<LedgerHandle> opListener) {
        synchronized (this) {
            if (this.allocatingList.isEmpty()) {
                return FutureUtils.exception(new IOException("No ledger allocator available under " + this.poolPath + "."));
            }
            final SimpleLedgerAllocator removeFirst = this.allocatingList.removeFirst();
            final CompletableFuture<LedgerHandle> completableFuture = new CompletableFuture<>();
            removeFirst.tryObtain(transaction, new Transaction.OpListener<LedgerHandle>() { // from class: org.apache.distributedlog.bk.LedgerAllocatorPool.6
                @Override // org.apache.distributedlog.util.Transaction.OpListener
                public void onCommit(LedgerHandle ledgerHandle) {
                    LedgerAllocatorPool.this.confirmObtain(removeFirst);
                    opListener.onCommit(ledgerHandle);
                }

                @Override // org.apache.distributedlog.util.Transaction.OpListener
                public void onAbort(Throwable th) {
                    LedgerAllocatorPool.this.abortObtain(removeFirst);
                    opListener.onAbort(th);
                }
            }).whenComplete((BiConsumer<? super LedgerHandle, ? super Throwable>) new FutureEventListener<LedgerHandle>() { // from class: org.apache.distributedlog.bk.LedgerAllocatorPool.5
                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onSuccess(LedgerHandle ledgerHandle) {
                    synchronized (LedgerAllocatorPool.this) {
                        LedgerAllocatorPool.this.obtainMap.put(ledgerHandle, removeFirst);
                        LedgerAllocatorPool.this.reverseObtainMap.put(removeFirst, ledgerHandle);
                        completableFuture.complete(ledgerHandle);
                    }
                }

                @Override // dlshade.org.apache.bookkeeper.common.concurrent.FutureEventListener
                public void onFailure(Throwable th) {
                    try {
                        LedgerAllocatorPool.this.rescueAllocator(removeFirst);
                    } catch (IOException e) {
                        LedgerAllocatorPool.logger.info("Failed to rescue allocator {}", removeFirst.allocatePath, e);
                    }
                    completableFuture.completeExceptionally(th);
                }
            });
            return completableFuture;
        }
    }

    void confirmObtain(SimpleLedgerAllocator simpleLedgerAllocator) {
        synchronized (this) {
            LedgerHandle remove = this.reverseObtainMap.remove(simpleLedgerAllocator);
            if (null != remove) {
                this.obtainMap.remove(remove);
            }
        }
        synchronized (this) {
            this.pendingList.addLast(simpleLedgerAllocator);
        }
    }

    void abortObtain(SimpleLedgerAllocator simpleLedgerAllocator) {
        synchronized (this) {
            LedgerHandle remove = this.reverseObtainMap.remove(simpleLedgerAllocator);
            if (null != remove) {
                this.obtainMap.remove(remove);
            }
        }
        try {
            rescueAllocator(simpleLedgerAllocator);
        } catch (DLInterruptedException e) {
            logger.warn("Interrupted on rescuing ledger allocator pool {} : ", this.poolPath, e);
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.apache.distributedlog.io.AsyncCloseable
    public CompletableFuture<Void> asyncClose() {
        ArrayList newArrayListWithExpectedSize;
        synchronized (this) {
            newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.pendingList.size() + this.allocatingList.size() + this.obtainMap.size());
            newArrayListWithExpectedSize.addAll(this.pendingList);
            newArrayListWithExpectedSize.addAll(this.allocatingList);
            newArrayListWithExpectedSize.addAll(this.obtainMap.values());
        }
        return FutureUtils.processList(newArrayListWithExpectedSize, ledgerAllocator -> {
            return ledgerAllocator.asyncClose();
        }, this.scheduledExecutorService).thenApply(list -> {
            return null;
        });
    }

    @Override // org.apache.distributedlog.io.AsyncDeleteable
    public CompletableFuture<Void> delete() {
        ArrayList newArrayListWithExpectedSize;
        synchronized (this) {
            newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.pendingList.size() + this.allocatingList.size() + this.obtainMap.size());
            newArrayListWithExpectedSize.addAll(this.pendingList);
            newArrayListWithExpectedSize.addAll(this.allocatingList);
            newArrayListWithExpectedSize.addAll(this.obtainMap.values());
        }
        return FutureUtils.processList(newArrayListWithExpectedSize, ledgerAllocator -> {
            return ledgerAllocator.delete();
        }, this.scheduledExecutorService).thenCompose(list -> {
            return Utils.zkDelete(this.zkc, this.poolPath, new LongVersion(-1L));
        });
    }
}
