package org.apache.bookkeeper.client;

import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.Unpooled;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.MetadataUpdateLoop;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.replication.ReplicationStats;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsDoc(name = ReplicationStats.REPLICATION_WORKER_SCOPE, help = "Ledger fragment replicator related stats")
/* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.8.jar:org/apache/bookkeeper/client/LedgerFragmentReplicator.class */
public class LedgerFragmentReplicator {
    private BookKeeper bkc;
    private StatsLogger statsLogger;

    @StatsDoc(name = ReplicationStats.NUM_ENTRIES_READ, help = "Number of entries read by the replicator")
    private final Counter numEntriesRead;

    @StatsDoc(name = ReplicationStats.NUM_BYTES_READ, help = "The distribution of size of entries read by the replicator")
    private final OpStatsLogger numBytesRead;

    @StatsDoc(name = ReplicationStats.NUM_ENTRIES_WRITTEN, help = "Number of entries written by the replicator")
    private final Counter numEntriesWritten;

    @StatsDoc(name = ReplicationStats.NUM_BYTES_WRITTEN, help = "The distribution of size of entries written by the replicator")
    private final OpStatsLogger numBytesWritten;

    @StatsDoc(name = ReplicationStats.READ_DATA_LATENCY, help = "The distribution of latency of read entries by the replicator")
    private final OpStatsLogger readDataLatency;

    @StatsDoc(name = ReplicationStats.WRITE_DATA_LATENCY, help = "The distribution of latency of write entries by the replicator")
    private final OpStatsLogger writeDataLatency;
    protected Throttler replicationThrottle;
    private AtomicInteger averageEntrySize;
    private static final int INITIAL_AVERAGE_ENTRY_SIZE = 1024;
    private static final double AVERAGE_ENTRY_SIZE_RATIO = 0.8d;
    private static final Logger LOG;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.8.jar:org/apache/bookkeeper/client/LedgerFragmentReplicator$SingleFragmentCallback.class */
    static class SingleFragmentCallback implements AsyncCallback.VoidCallback {
        final AsyncCallback.VoidCallback ledgerFragmentsMcb;
        final LedgerHandle lh;
        final LedgerManager ledgerManager;
        final long fragmentStartId;
        final Map<BookieId, BookieId> oldBookie2NewBookie;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SingleFragmentCallback(AsyncCallback.VoidCallback voidCallback, LedgerHandle ledgerHandle, LedgerManager ledgerManager, long j, Map<BookieId, BookieId> map) {
            this.ledgerFragmentsMcb = voidCallback;
            this.lh = ledgerHandle;
            this.ledgerManager = ledgerManager;
            this.fragmentStartId = j;
            this.oldBookie2NewBookie = map;
        }

        @Override // org.apache.zookeeper.AsyncCallback.VoidCallback
        public void processResult(int i, String str, Object obj) {
            if (i == 0) {
                LedgerFragmentReplicator.updateEnsembleInfo(this.ledgerManager, this.ledgerFragmentsMcb, this.fragmentStartId, this.lh, this.oldBookie2NewBookie);
            } else {
                LedgerFragmentReplicator.LOG.error("BK error replicating ledger fragments for ledger: " + this.lh.getId(), (Throwable) BKException.create(i));
                this.ledgerFragmentsMcb.processResult(i, null, null);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:META-INF/bundled-dependencies/bookkeeper-server-4.14.8.jar:org/apache/bookkeeper/client/LedgerFragmentReplicator$Throttler.class */
    public static class Throttler {
        private final RateLimiter rateLimiter;

        Throttler(int i) {
            this.rateLimiter = RateLimiter.create(i);
        }

        void acquire(int i) {
            this.rateLimiter.acquire(i);
        }
    }

    public LedgerFragmentReplicator(BookKeeper bookKeeper, StatsLogger statsLogger, ClientConfiguration clientConfiguration) {
        this.replicationThrottle = null;
        this.bkc = bookKeeper;
        this.statsLogger = statsLogger;
        this.numEntriesRead = this.statsLogger.getCounter(ReplicationStats.NUM_ENTRIES_READ);
        this.numBytesRead = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_BYTES_READ);
        this.numEntriesWritten = this.statsLogger.getCounter(ReplicationStats.NUM_ENTRIES_WRITTEN);
        this.numBytesWritten = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_BYTES_WRITTEN);
        this.readDataLatency = this.statsLogger.getOpStatsLogger(ReplicationStats.READ_DATA_LATENCY);
        this.writeDataLatency = this.statsLogger.getOpStatsLogger(ReplicationStats.WRITE_DATA_LATENCY);
        if (clientConfiguration.getReplicationRateByBytes() > 0) {
            this.replicationThrottle = new Throttler(clientConfiguration.getReplicationRateByBytes());
        }
        this.averageEntrySize = new AtomicInteger(1024);
    }

    public LedgerFragmentReplicator(BookKeeper bookKeeper, ClientConfiguration clientConfiguration) {
        this(bookKeeper, NullStatsLogger.INSTANCE, clientConfiguration);
    }

    private void replicateFragmentInternal(LedgerHandle ledgerHandle, LedgerFragment ledgerFragment, AsyncCallback.VoidCallback voidCallback, Set<BookieId> set, BiConsumer<Long, Long> biConsumer) throws InterruptedException {
        if (!ledgerFragment.isClosed()) {
            LOG.error("Trying to replicate an unclosed fragment; This is not safe {}", ledgerFragment);
            voidCallback.processResult(-103, null, null);
            return;
        }
        Long valueOf = Long.valueOf(ledgerFragment.getFirstStoredEntryId());
        Long valueOf2 = Long.valueOf(ledgerFragment.getLastStoredEntryId());
        if ((valueOf.longValue() == -1) ^ (valueOf2.longValue() == -1)) {
            LOG.error("For LedgerFragment: {}, seeing inconsistent firstStoredEntryId: {} and lastStoredEntryId: {}", ledgerFragment, valueOf, valueOf2);
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
        }
        if (valueOf.longValue() > valueOf2.longValue() || valueOf2.longValue() <= -1) {
            voidCallback.processResult(0, null, null);
            return;
        }
        LinkedList linkedList = new LinkedList();
        long lastStoredEntryId = ledgerFragment.getLastStoredEntryId();
        long firstStoredEntryId = ledgerFragment.getFirstStoredEntryId();
        while (true) {
            long j = firstStoredEntryId;
            if (j > lastStoredEntryId) {
                break;
            }
            linkedList.add(Long.valueOf(j));
            firstStoredEntryId = j + 1;
        }
        BookkeeperInternalCallbacks.MultiCallback multiCallback = new BookkeeperInternalCallbacks.MultiCallback(linkedList.size(), voidCallback, null, 0, -10);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            recoverLedgerFragmentEntry((Long) it.next(), ledgerHandle, multiCallback, set, biConsumer);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void replicate(LedgerHandle ledgerHandle, LedgerFragment ledgerFragment, AsyncCallback.VoidCallback voidCallback, Set<BookieId> set, BiConsumer<Long, Long> biConsumer) throws InterruptedException {
        Set<LedgerFragment> splitIntoSubFragments = splitIntoSubFragments(ledgerHandle, ledgerFragment, this.bkc.getConf().getRereplicationEntryBatchSize());
        LOG.info("Replicating fragment {} in {} sub fragments.", ledgerFragment, Integer.valueOf(splitIntoSubFragments.size()));
        replicateNextBatch(ledgerHandle, splitIntoSubFragments.iterator(), voidCallback, set, biConsumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void replicateNextBatch(final LedgerHandle ledgerHandle, final Iterator<LedgerFragment> it, final AsyncCallback.VoidCallback voidCallback, final Set<BookieId> set, final BiConsumer<Long, Long> biConsumer) {
        if (!it.hasNext()) {
            voidCallback.processResult(0, null, null);
            return;
        }
        try {
            replicateFragmentInternal(ledgerHandle, it.next(), new AsyncCallback.VoidCallback() { // from class: org.apache.bookkeeper.client.LedgerFragmentReplicator.1
                @Override // org.apache.zookeeper.AsyncCallback.VoidCallback
                public void processResult(int i, String str, Object obj) {
                    if (i != 0) {
                        voidCallback.processResult(i, null, null);
                    } else {
                        LedgerFragmentReplicator.this.replicateNextBatch(ledgerHandle, it, voidCallback, set, biConsumer);
                    }
                }
            }, set, biConsumer);
        } catch (InterruptedException e) {
            voidCallback.processResult(-15, null, null);
            Thread.currentThread().interrupt();
        }
    }

    static Set<LedgerFragment> splitIntoSubFragments(LedgerHandle ledgerHandle, LedgerFragment ledgerFragment, long j) {
        HashSet hashSet = new HashSet();
        if (j <= 0) {
            hashSet.add(ledgerFragment);
            return hashSet;
        }
        long firstStoredEntryId = ledgerFragment.getFirstStoredEntryId();
        long lastStoredEntryId = ledgerFragment.getLastStoredEntryId();
        if ((firstStoredEntryId == -1) ^ (lastStoredEntryId == -1)) {
            LOG.error("For LedgerFragment: {}, seeing inconsistent firstStoredEntryId: {} and lastStoredEntryId: {}", ledgerFragment, Long.valueOf(firstStoredEntryId), Long.valueOf(lastStoredEntryId));
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
        }
        long j2 = (lastStoredEntryId - firstStoredEntryId) + 1;
        long j3 = j2 / j;
        if (j3 == 0) {
            hashSet.add(ledgerFragment);
            return hashSet;
        }
        for (int i = 0; i < j3; i++) {
            long j4 = (firstStoredEntryId + j) - 1;
            hashSet.add(new LedgerFragment(ledgerHandle, firstStoredEntryId, j4, ledgerFragment.getBookiesIndexes()));
            firstStoredEntryId = j4 + 1;
        }
        long j5 = j2 % j;
        if (j5 > 0) {
            hashSet.add(new LedgerFragment(ledgerHandle, firstStoredEntryId, (firstStoredEntryId + j5) - 1, ledgerFragment.getBookiesIndexes()));
        }
        return hashSet;
    }

    private void recoverLedgerFragmentEntry(final Long l, LedgerHandle ledgerHandle, final AsyncCallback.VoidCallback voidCallback, final Set<BookieId> set, final BiConsumer<Long, Long> biConsumer) throws InterruptedException {
        final long id = ledgerHandle.getId();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        if (this.replicationThrottle != null) {
            this.replicationThrottle.acquire(this.averageEntrySize.get());
        }
        final BookkeeperInternalCallbacks.WriteCallback writeCallback = new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.client.LedgerFragmentReplicator.2
            @Override // org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback
            public void writeComplete(int i, long j, long j2, BookieId bookieId, Object obj) {
                if (i != 0) {
                    LedgerFragmentReplicator.LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}", Long.valueOf(j), Long.valueOf(j2), bookieId, BKException.create(i));
                    if (atomicBoolean.compareAndSet(false, true)) {
                        voidCallback.processResult(i, null, null);
                        return;
                    }
                    return;
                }
                LedgerFragmentReplicator.this.numEntriesWritten.inc();
                if (obj instanceof Long) {
                    LedgerFragmentReplicator.this.numBytesWritten.registerSuccessfulValue(((Long) obj).longValue());
                }
                if (LedgerFragmentReplicator.LOG.isDebugEnabled()) {
                    LedgerFragmentReplicator.LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!", Long.valueOf(j), Long.valueOf(j2), bookieId);
                }
                if (atomicInteger.incrementAndGet() == set.size() && atomicBoolean.compareAndSet(false, true)) {
                    voidCallback.processResult(i, null, null);
                }
            }
        };
        final long nowInNano = MathUtils.nowInNano();
        ledgerHandle.asyncReadEntries(l.longValue(), l.longValue(), new AsyncCallback.ReadCallback() { // from class: org.apache.bookkeeper.client.LedgerFragmentReplicator.3
            @Override // org.apache.bookkeeper.client.AsyncCallback.ReadCallback
            public void readComplete(int i, LedgerHandle ledgerHandle2, Enumeration<LedgerEntry> enumeration, Object obj) {
                if (i != 0) {
                    LedgerFragmentReplicator.LOG.error("BK error reading ledger entry: " + l, (Throwable) BKException.create(i));
                    biConsumer.accept(Long.valueOf(id), l);
                    voidCallback.processResult(i, null, null);
                    return;
                }
                LedgerFragmentReplicator.this.readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                LedgerEntry nextElement = enumeration.nextElement();
                byte[] entry = nextElement.getEntry();
                long length = entry.length;
                LedgerFragmentReplicator.this.numEntriesRead.inc();
                LedgerFragmentReplicator.this.numBytesRead.registerSuccessfulValue(length);
                ByteBufList computeDigestAndPackageForSending = ledgerHandle2.getDigestManager().computeDigestAndPackageForSending(l.longValue(), ledgerHandle2.getLastAddConfirmed(), nextElement.getLength(), Unpooled.wrappedBuffer(entry, 0, entry.length));
                if (LedgerFragmentReplicator.this.replicationThrottle != null) {
                    LedgerFragmentReplicator.this.updateAverageEntrySize(computeDigestAndPackageForSending.readableBytes());
                }
                for (BookieId bookieId : set) {
                    long nowInNano2 = MathUtils.nowInNano();
                    LedgerFragmentReplicator.this.bkc.getBookieClient().addEntry(bookieId, ledgerHandle2.getId(), ledgerHandle2.getLedgerKey(), l.longValue(), ByteBufList.clone(computeDigestAndPackageForSending), writeCallback, Long.valueOf(length), 2, false, WriteFlag.NONE);
                    LedgerFragmentReplicator.this.writeDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano2), TimeUnit.NANOSECONDS);
                }
                computeDigestAndPackageForSending.release();
            }
        }, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateAverageEntrySize(int i) {
        this.averageEntrySize.updateAndGet(i2 -> {
            return (int) ((i2 * AVERAGE_ENTRY_SIZE_RATIO) + (0.19999999999999996d * i));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void updateEnsembleInfo(LedgerManager ledgerManager, AsyncCallback.VoidCallback voidCallback, long j, LedgerHandle ledgerHandle, Map<BookieId, BookieId> map) {
        long id = ledgerHandle.getId();
        ledgerHandle.getClass();
        Supplier supplier = ledgerHandle::getVersionedLedgerMetadata;
        MetadataUpdateLoop.NeedsUpdatePredicate needsUpdatePredicate = ledgerMetadata -> {
            List list = (List) ledgerMetadata.getAllEnsembles().get(Long.valueOf(j));
            Stream stream = map.keySet().stream();
            list.getClass();
            return stream.anyMatch((v1) -> {
                return r1.contains(v1);
            });
        };
        MetadataUpdateLoop.MetadataTransform metadataTransform = ledgerMetadata2 -> {
            return LedgerMetadataBuilder.from(ledgerMetadata2).replaceEnsembleEntry(j, (List) ((List) ledgerMetadata2.getAllEnsembles().get(Long.valueOf(j))).stream().map(bookieId -> {
                return (BookieId) map.getOrDefault(bookieId, bookieId);
            }).collect(Collectors.toList())).build();
        };
        ledgerHandle.getClass();
        new MetadataUpdateLoop(ledgerManager, id, supplier, needsUpdatePredicate, metadataTransform, ledgerHandle::setLedgerMetadata).run().whenComplete((versioned, th) -> {
            if (th == null) {
                LOG.info("Updated ZK to point ledger fragments from old bookies to new bookies: {}", map);
                voidCallback.processResult(0, null, null);
            } else {
                LOG.error("Error updating ledger config metadata for ledgerId {}", Long.valueOf(ledgerHandle.getId()), th);
                voidCallback.processResult(BKException.getExceptionCode(th, BKException.Code.UnexpectedConditionException), null, null);
            }
        });
    }

    static {
        $assertionsDisabled = !LedgerFragmentReplicator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) LedgerFragmentReplicator.class);
    }
}
