package org.blobit.core.cluster;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.net.NetworkTopologyImpl;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.blobit.core.api.BucketMetadata;
import org.blobit.core.api.Configuration;
import org.blobit.core.api.DownloadPromise;
import org.blobit.core.api.GetPromise;
import org.blobit.core.api.LocationInfo;
import org.blobit.core.api.ObjectManagerException;
import org.blobit.core.api.ObjectMetadata;
import org.blobit.core.api.PutOptions;
import org.blobit.core.api.PutPromise;

/* loaded from: input_file:org/blobit/core/cluster/BookKeeperBlobManager.class */
public class BookKeeperBlobManager implements AutoCloseable {
    private final HerdDBMetadataStorageManager metadataStorageManager;
    private final BookKeeper bookKeeper;
    final GenericKeyedObjectPool<String, BucketWriter> writers;
    final GenericKeyedObjectPool<Long, BucketReader> readers;
    private final int replicationFactor;
    private final long maxBytesPerLedger;
    private final int maxEntrySize;
    private final boolean enableChecksum;
    private final boolean deferredSync;
    private final ExecutorService callbacksExecutor;
    private final ExecutorService threadpool = Executors.newSingleThreadExecutor();
    private ConcurrentMap<Long, BucketWriter> activeWriters = new ConcurrentHashMap();
    private final Stats stats = new Stats();
    private final long maxWriterTtl;
    private static final Logger LOG = Logger.getLogger(BookKeeperBlobManager.class.getName());
    static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

    /* loaded from: input_file:org/blobit/core/cluster/BookKeeperBlobManager$ReadersFactory.class */
    private final class ReadersFactory implements KeyedPooledObjectFactory<Long, BucketReader> {
        private ReadersFactory() {
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public PooledObject<BucketReader> makeObject(Long l) throws Exception {
            BucketReader bucketReader;
            BucketWriter bucketWriter = (BucketWriter) BookKeeperBlobManager.this.activeWriters.get(l);
            if (bucketWriter == null || !bucketWriter.isValid()) {
                bucketReader = new BucketReader(l.longValue(), BookKeeperBlobManager.this.bookKeeper, BookKeeperBlobManager.this);
            } else {
                BookKeeperBlobManager.this.stats.usedWritersAsReaders.increment();
                bucketReader = new BucketReader(bucketWriter.getLh(), BookKeeperBlobManager.this);
            }
            return new DefaultPooledObject(bucketReader);
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public void destroyObject(Long l, PooledObject<BucketReader> pooledObject) throws Exception {
            pooledObject.getObject().close();
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public boolean validateObject(Long l, PooledObject<BucketReader> pooledObject) {
            return pooledObject.getObject().isValid();
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public void activateObject(Long l, PooledObject<BucketReader> pooledObject) throws Exception {
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public void passivateObject(Long l, PooledObject<BucketReader> pooledObject) throws Exception {
        }
    }

    /* loaded from: input_file:org/blobit/core/cluster/BookKeeperBlobManager$Stats.class */
    public static final class Stats {
        private final LongAdder usedWritersAsReaders = new LongAdder();

        public long getUsedWritersAsReaders() {
            return this.usedWritersAsReaders.longValue();
        }
    }

    /* loaded from: input_file:org/blobit/core/cluster/BookKeeperBlobManager$WritersFactory.class */
    private final class WritersFactory implements KeyedPooledObjectFactory<String, BucketWriter> {
        private WritersFactory() {
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public PooledObject<BucketWriter> makeObject(String str) throws Exception {
            BucketWriter bucketWriter = new BucketWriter(str, BookKeeperBlobManager.this.bookKeeper, BookKeeperBlobManager.this.replicationFactor, BookKeeperBlobManager.this.maxEntrySize, BookKeeperBlobManager.this.maxBytesPerLedger, BookKeeperBlobManager.this.enableChecksum, BookKeeperBlobManager.this.deferredSync, BookKeeperBlobManager.this.metadataStorageManager, BookKeeperBlobManager.this, System.currentTimeMillis() + BookKeeperBlobManager.this.maxWriterTtl);
            BookKeeperBlobManager.this.activeWriters.put(bucketWriter.getId(), bucketWriter);
            return new DefaultPooledObject(bucketWriter);
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public void destroyObject(String str, PooledObject<BucketWriter> pooledObject) throws Exception {
            pooledObject.getObject().close();
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public boolean validateObject(String str, PooledObject<BucketWriter> pooledObject) {
            return pooledObject.getObject().isValid();
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public void activateObject(String str, PooledObject<BucketWriter> pooledObject) throws Exception {
        }

        @Override // org.apache.commons.pool2.KeyedPooledObjectFactory
        public void passivateObject(String str, PooledObject<BucketWriter> pooledObject) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<? extends LocationInfo> getLocationInfo(BKEntryId bKEntryId) {
        CompletableFuture<? extends LocationInfo> completableFuture = new CompletableFuture<>();
        this.bookKeeper.getLedgerManager().readLedgerMetadata(bKEntryId.ledgerId).whenComplete((versioned, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(new ObjectManagerException(th));
            } else {
                completableFuture.complete(new BKLocationInfo(bKEntryId, (LedgerMetadata) versioned.getValue()));
            }
        });
        return completableFuture;
    }

    public PutPromise put(String str, String str2, long j, InputStream inputStream, PutOptions putOptions) {
        if (j == 0) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(null);
            return new PutPromise(BKEntryId.EMPTY_ENTRY_ID, completableFuture);
        }
        try {
            BucketWriter borrowObject = this.writers.borrowObject(str);
            try {
                PutPromise writeBlob = borrowObject.writeBlob(str, str2, j, inputStream, putOptions);
                this.writers.returnObject(str, borrowObject);
                return writeBlob;
            } catch (Throwable th) {
                this.writers.returnObject(str, borrowObject);
                throw th;
            }
        } catch (Exception e) {
            return new PutPromise(null, wrapGenericException(e));
        }
    }

    public PutPromise put(String str, String str2, byte[] bArr, int i, int i2, PutOptions putOptions) {
        if (bArr.length < i + i2 || i < 0 || i2 < 0) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            CompletableFuture completableFuture = new CompletableFuture();
            if (str2 == null) {
                completableFuture.complete(null);
            } else {
                try {
                    this.metadataStorageManager.appendEmptyObject(str, str2, putOptions.isOverwrite());
                    completableFuture.complete(null);
                } catch (ObjectManagerException e) {
                    completableFuture.completeExceptionally(e);
                }
            }
            return new PutPromise(BKEntryId.EMPTY_ENTRY_ID, completableFuture);
        }
        try {
            BucketWriter borrowObject = this.writers.borrowObject(str);
            try {
                PutPromise writeBlob = borrowObject.writeBlob(str, str2, bArr, i, i2, putOptions);
                this.writers.returnObject(str, borrowObject);
                return writeBlob;
            } catch (Throwable th) {
                this.writers.returnObject(str, borrowObject);
                throw th;
            }
        } catch (Exception e2) {
            return new PutPromise(null, wrapGenericException(e2));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> CompletableFuture<T> wrapGenericException(Exception exc) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new ObjectManagerException(exc));
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DownloadPromise download(String str, String str2, Consumer<Long> consumer, OutputStream outputStream, long j, long j2) {
        if (str2 == null) {
            return new DownloadPromise(null, 0L, wrapGenericException(new IllegalArgumentException("null id")));
        }
        if (BKEntryId.EMPTY_ENTRY_ID.equals(str2) || j2 == 0) {
            consumer.accept(0L);
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(null);
            return new DownloadPromise(str2, 0L, completableFuture);
        }
        try {
            BKEntryId parseId = BKEntryId.parseId(str2);
            if (j >= parseId.length) {
                consumer.accept(0L);
                CompletableFuture completableFuture2 = new CompletableFuture();
                completableFuture2.complete(null);
                return new DownloadPromise(str2, 0L, completableFuture2);
            }
            long min = j2 < 0 ? parseId.length : Math.min(j2, parseId.length);
            if (min < 0) {
                min = 0;
            }
            if (min > parseId.length - j) {
                consumer.accept(Long.valueOf(parseId.length - j));
            } else {
                consumer.accept(Long.valueOf(min));
            }
            BucketReader borrowObject = this.readers.borrowObject(Long.valueOf(parseId.ledgerId));
            try {
                DownloadPromise downloadPromise = new DownloadPromise(str2, parseId.length, borrowObject.streamObject(parseId.firstEntryId, (parseId.firstEntryId + parseId.numEntries) - 1, min, parseId.entrySize, parseId.length, outputStream, j));
                this.readers.returnObject(Long.valueOf(parseId.ledgerId), borrowObject);
                return downloadPromise;
            } catch (Throwable th) {
                this.readers.returnObject(Long.valueOf(parseId.ledgerId), borrowObject);
                throw th;
            }
        } catch (Exception e) {
            return new DownloadPromise(str2, 0L, wrapGenericException(e));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public GetPromise get(String str, String str2) {
        if (str2 == null) {
            return new GetPromise(null, 0L, wrapGenericException(new IllegalArgumentException("null id")));
        }
        if (BKEntryId.EMPTY_ENTRY_ID.equals(str2)) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(EMPTY_BYTE_ARRAY);
            return new GetPromise(str2, 0L, completableFuture);
        }
        try {
            BKEntryId parseId = BKEntryId.parseId(str2);
            if (parseId.length >= 2147483647L) {
                return new GetPromise(str2, 0L, wrapGenericException(new UnsupportedOperationException("Cannot read an " + parseId.length + " bytes object into a byte[]")));
            }
            int i = (int) parseId.length;
            BucketReader borrowObject = this.readers.borrowObject(Long.valueOf(parseId.ledgerId));
            try {
                GetPromise getPromise = new GetPromise(str2, parseId.length, borrowObject.readObject(parseId.firstEntryId, (parseId.firstEntryId + parseId.numEntries) - 1, i));
                this.readers.returnObject(Long.valueOf(parseId.ledgerId), borrowObject);
                return getPromise;
            } catch (Throwable th) {
                this.readers.returnObject(Long.valueOf(parseId.ledgerId), borrowObject);
                throw th;
            }
        } catch (Exception e) {
            return new GetPromise(str2, 0L, wrapGenericException(e));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ObjectMetadata stat(String str, String str2) {
        if (str2 == null) {
            return null;
        }
        return BKEntryId.EMPTY_ENTRY_ID.equals(str2) ? new ObjectMetadata(str2, 0L) : new ObjectMetadata(str2, BKEntryId.parseId(str2).length);
    }

    public BookKeeperBlobManager(Configuration configuration, HerdDBMetadataStorageManager herdDBMetadataStorageManager) throws ObjectManagerException {
        try {
            this.replicationFactor = configuration.getReplicationFactor();
            this.maxBytesPerLedger = configuration.getMaxBytesPerLedger();
            this.maxEntrySize = configuration.getMaxEntrySize();
            this.metadataStorageManager = herdDBMetadataStorageManager;
            int concurrentWriters = configuration.getConcurrentWriters();
            int maxReaders = configuration.getMaxReaders();
            this.enableChecksum = configuration.isEnableChecksum();
            this.deferredSync = configuration.isDeferredSync();
            this.callbacksExecutor = Executors.newFixedThreadPool(concurrentWriters);
            ClientConfiguration clientConfiguration = new ClientConfiguration();
            clientConfiguration.setThrottleValue(0);
            clientConfiguration.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
            clientConfiguration.setEnableDigestTypeAutodetection(true);
            String str = "zk+null://" + configuration.getZookkeeperUrl().replace(NetworkTopologyImpl.NODE_SEPARATOR, ";") + configuration.getProperty(Configuration.BOOKKEEPER_ZK_LEDGERS_ROOT_PATH, "/ledgers");
            LOG.log(Level.INFO, "BlobIt client is using BookKeeper metadataservice URI: {0}", str);
            clientConfiguration.setMetadataServiceUri(str);
            for (String str2 : configuration.keys()) {
                if (str2.startsWith("bookkeeper.")) {
                    clientConfiguration.setProperty(str2.substring("bookkeeper.".length()), configuration.getProperty(str2));
                }
            }
            LOG.info("ObjectManager will use BookKeeper ensemble at " + configuration.getZookkeeperUrl() + ", that is BK configuration bookkeeper.metadataServiceUri=" + clientConfiguration.getMetadataServiceUriUnchecked());
            GenericKeyedObjectPoolConfig genericKeyedObjectPoolConfig = new GenericKeyedObjectPoolConfig();
            genericKeyedObjectPoolConfig.setMaxTotalPerKey(concurrentWriters);
            genericKeyedObjectPoolConfig.setMaxIdlePerKey(concurrentWriters);
            genericKeyedObjectPoolConfig.setTestOnReturn(true);
            genericKeyedObjectPoolConfig.setTestOnBorrow(true);
            genericKeyedObjectPoolConfig.setBlockWhenExhausted(true);
            this.writers = new GenericKeyedObjectPool<>(new WritersFactory(), genericKeyedObjectPoolConfig);
            GenericKeyedObjectPoolConfig genericKeyedObjectPoolConfig2 = new GenericKeyedObjectPoolConfig();
            genericKeyedObjectPoolConfig2.setMaxTotalPerKey(1);
            genericKeyedObjectPoolConfig2.setMaxIdlePerKey(1);
            genericKeyedObjectPoolConfig2.setMaxTotal(maxReaders);
            genericKeyedObjectPoolConfig2.setTestOnReturn(true);
            genericKeyedObjectPoolConfig2.setTestOnBorrow(true);
            genericKeyedObjectPoolConfig2.setBlockWhenExhausted(true);
            this.readers = new GenericKeyedObjectPool<>(new ReadersFactory(), genericKeyedObjectPoolConfig2);
            this.bookKeeper = BookKeeper.forConfig(clientConfiguration).build();
            this.maxWriterTtl = configuration.getWriterMaxTtl();
        } catch (IOException | InterruptedException | BKException e) {
            throw new ObjectManagerException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scanAndDeleteLedgersForBuckets(List<BucketMetadata> list) throws ObjectManagerException {
        try {
            BookKeeperAdmin bookKeeperAdmin = new BookKeeperAdmin(this.bookKeeper);
            Iterator<Long> it = bookKeeperAdmin.listLedgers().iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                LedgerHandle openLedgerNoRecovery = this.bookKeeper.openLedgerNoRecovery(longValue, BookKeeper.DigestType.CRC32C, BucketWriter.DUMMY_PWD);
                try {
                    Map<String, byte[]> customMetadata = bookKeeperAdmin.getLedgerMetadata(openLedgerNoRecovery).getCustomMetadata();
                    byte[] bArr = customMetadata.get("bucketUUID");
                    byte[] bArr2 = customMetadata.get("bucketId");
                    if (bArr != null && bArr2 != null) {
                        String str = new String(bArr, StandardCharsets.UTF_8);
                        String str2 = new String(bArr2, StandardCharsets.UTF_8);
                        if (openLedgerNoRecovery != null) {
                            openLedgerNoRecovery.close();
                        }
                        if (list.stream().anyMatch(bucketMetadata -> {
                            return bucketMetadata.getBucketId().equals(str2) && bucketMetadata.getUuid().equals(str);
                        })) {
                            LOG.log(Level.INFO, "found droppable ledger {0}, for {1}, {2}", new Object[]{Long.valueOf(longValue), str2, str});
                            this.bookKeeper.deleteLedger(longValue);
                        }
                    } else if (openLedgerNoRecovery != null) {
                        openLedgerNoRecovery.close();
                    }
                } finally {
                }
            }
        } catch (IOException | InterruptedException | BKException e) {
            throw new ObjectManagerException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean dropLedger(long j) throws ObjectManagerException {
        if (this.activeWriters.containsKey(Long.valueOf(j))) {
            LOG.log(Level.FINE, "cannot drop ledger used locally {0}", Long.valueOf(j));
            return false;
        }
        try {
            LOG.log(Level.INFO, "dropping ledger {0}", Long.valueOf(j));
            this.bookKeeper.deleteLedger(j);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        } catch (BKException.BKNoSuchLedgerExistsException e2) {
            return true;
        } catch (BKException e3) {
            throw new ObjectManagerException(e3);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.writers.close();
        this.readers.close();
        waitForWritersTermination();
        if (this.bookKeeper != null) {
            try {
                this.bookKeeper.close();
            } catch (InterruptedException | BKException e) {
                LOG.log(Level.SEVERE, "Error while closing BK", e);
            }
        }
        this.threadpool.shutdown();
        this.callbacksExecutor.shutdown();
    }

    private void waitForWritersTermination() {
        Iterator<BucketWriter> it = this.activeWriters.values().iterator();
        while (it.hasNext()) {
            it.next().awaitTermination();
        }
        while (!this.activeWriters.isEmpty()) {
            for (BucketWriter bucketWriter : this.activeWriters.values()) {
                scheduleWriterDisposal(bucketWriter);
                bucketWriter.awaitTermination();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<?> scheduleWriterDisposal(BucketWriter bucketWriter) {
        return bucketWriter.isClosed() ? FutureUtils.Void() : this.threadpool.submit(() -> {
            if (bucketWriter.releaseResources()) {
                this.activeWriters.remove(bucketWriter.getId(), bucketWriter);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<?> scheduleReaderDisposal(BucketReader bucketReader) {
        return this.threadpool.submit(() -> {
            bucketReader.releaseResources();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExecutorService getCallbacksExecutor() {
        return this.callbacksExecutor;
    }

    void closeAllActiveWritersForTests() {
        ArrayList arrayList = new ArrayList(this.activeWriters.values());
        this.writers.clear();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((BucketWriter) it.next()).awaitTermination();
        }
    }

    public Stats getStats() {
        return this.stats;
    }
}
