package org.apache.distributedlog.service.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.ServiceUnavailableException;
import org.apache.distributedlog.exceptions.StreamUnavailableException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.service.config.StreamConfigProvider;
import org.apache.distributedlog.service.streamset.PartitionMap;
import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
import org.apache.distributedlog.util.ConfUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/stream/StreamManagerImpl.class */
public class StreamManagerImpl implements StreamManager {
    private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
    private final StreamPartitionConverter partitionConverter;
    private final ScheduledExecutorService executorService;
    private final DistributedLogConfiguration dlConfig;
    private final StreamConfigProvider streamConfigProvider;
    private final String clientId;
    private final StreamFactory streamFactory;
    private final Namespace dlNamespace;
    private final ConcurrentHashMap<String, Stream> streams = new ConcurrentHashMap<>();
    private final AtomicInteger numCached = new AtomicInteger(0);
    private final ConcurrentHashMap<String, Stream> acquiredStreams = new ConcurrentHashMap<>();
    private final AtomicInteger numAcquired = new AtomicInteger(0);
    private final PartitionMap cachedPartitions = new PartitionMap();
    private final PartitionMap acquiredPartitions = new PartitionMap();
    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private boolean closed = false;

    public StreamManagerImpl(String str, DistributedLogConfiguration distributedLogConfiguration, ScheduledExecutorService scheduledExecutorService, StreamFactory streamFactory, StreamPartitionConverter streamPartitionConverter, StreamConfigProvider streamConfigProvider, Namespace namespace) {
        this.clientId = str;
        this.executorService = scheduledExecutorService;
        this.streamFactory = streamFactory;
        this.partitionConverter = streamPartitionConverter;
        this.dlConfig = distributedLogConfiguration;
        this.streamConfigProvider = streamConfigProvider;
        this.dlNamespace = namespace;
    }

    private DynamicDistributedLogConfiguration getDynConf(String str) {
        Optional<DynamicDistributedLogConfiguration> dynamicStreamConfig = this.streamConfigProvider.getDynamicStreamConfig(str);
        return dynamicStreamConfig.isPresent() ? (DynamicDistributedLogConfiguration) dynamicStreamConfig.get() : ConfUtils.getConstDynConf(this.dlConfig);
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public boolean allowAcquire(Stream stream) {
        return this.acquiredPartitions.addPartition(stream.getPartition(), stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy());
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Future<Void> deleteAndRemoveAsync(final String str) {
        final Promise promise = new Promise();
        return null == schedule(new Runnable() { // from class: org.apache.distributedlog.service.stream.StreamManagerImpl.1
            @Override // java.lang.Runnable
            public void run() {
                promise.become(StreamManagerImpl.this.doDeleteAndRemoveAsync(str));
            }
        }, 0L) ? Future.exception(new ServiceUnavailableException("Couldn't schedule a delete task.")) : promise;
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Future<Void> closeAndRemoveAsync(final String str) {
        final Promise promise = new Promise();
        return null == schedule(new Runnable() { // from class: org.apache.distributedlog.service.stream.StreamManagerImpl.2
            @Override // java.lang.Runnable
            public void run() {
                promise.become(StreamManagerImpl.this.doCloseAndRemoveAsync(str));
            }
        }, 0L) ? Future.exception(new ServiceUnavailableException("Couldn't schedule a release task.")) : promise;
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Future<Void> createStreamAsync(final String str) {
        final Promise promise = new Promise();
        return null == schedule(new Runnable() { // from class: org.apache.distributedlog.service.stream.StreamManagerImpl.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StreamManagerImpl.this.dlNamespace.createLog(str);
                    promise.setValue((Object) null);
                } catch (Exception e) {
                    promise.setException(e);
                }
            }
        }, 0L) ? Future.exception(new ServiceUnavailableException("Couldn't schedule a create task.")) : promise;
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public void notifyReleased(Stream stream) {
        this.acquiredPartitions.removePartition(stream.getPartition());
        if (this.acquiredStreams.remove(stream.getStreamName(), stream)) {
            this.numAcquired.getAndDecrement();
        }
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public void notifyAcquired(Stream stream) {
        if (null == this.acquiredStreams.put(stream.getStreamName(), stream)) {
            this.numAcquired.getAndIncrement();
        }
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public boolean notifyRemoved(Stream stream) {
        this.cachedPartitions.removePartition(stream.getPartition());
        if (!this.streams.remove(stream.getStreamName(), stream)) {
            return false;
        }
        this.numCached.getAndDecrement();
        return true;
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Map<String, String> getStreamOwnershipMap(Optional<String> optional) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Stream> entry : this.acquiredStreams.entrySet()) {
            String key = entry.getKey();
            if (!optional.isPresent() || key.matches((String) optional.get())) {
                Stream value = entry.getValue();
                if (null != value && null == value.getOwner()) {
                    hashMap.put(key, this.clientId);
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Stream getStream(String str) {
        return this.streams.get(str);
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Stream getOrCreateStream(String str, boolean z) throws IOException {
        Stream stream = this.streams.get(str);
        if (null == stream) {
            this.closeLock.readLock().lock();
            try {
                if (this.closed) {
                    return null;
                }
                DynamicDistributedLogConfiguration dynConf = getDynConf(str);
                int maxCachedPartitionsPerProxy = dynConf.getMaxCachedPartitionsPerProxy();
                if (!this.cachedPartitions.addPartition(this.partitionConverter.convert(str), maxCachedPartitionsPerProxy)) {
                    throw new StreamUnavailableException("Stream " + str + " is not allowed to cache more than " + maxCachedPartitionsPerProxy + " partitions");
                }
                stream = newStream(str, dynConf);
                Stream putIfAbsent = this.streams.putIfAbsent(str, stream);
                if (null != putIfAbsent) {
                    stream = putIfAbsent;
                } else {
                    this.numCached.getAndIncrement();
                    logger.info("Inserted mapping stream name {} -> stream {}", str, stream);
                    stream.initialize();
                    if (z) {
                        stream.start();
                    }
                }
                this.closeLock.readLock().unlock();
            } finally {
                this.closeLock.readLock().unlock();
            }
        }
        return stream;
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public Future<List<Void>> closeStreams() {
        logger.info("Closing all acquired streams : acquired = {}, cached = {}.", Integer.valueOf(this.acquiredStreams.size()), Integer.valueOf(this.streams.size()));
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.streams.values());
        return closeStreams(hashSet, Optional.absent());
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public void scheduleRemoval(final Stream stream, long j) {
        if (j > 0) {
            logger.info("Scheduling removal of stream {} from cache after {} sec.", stream.getStreamName(), Long.valueOf(j));
        }
        schedule(new Runnable() { // from class: org.apache.distributedlog.service.stream.StreamManagerImpl.4
            @Override // java.lang.Runnable
            public void run() {
                if (StreamManagerImpl.this.notifyRemoved(stream)) {
                    StreamManagerImpl.logger.info("Removed cached stream {} after probation.", stream.getStreamName());
                } else {
                    StreamManagerImpl.logger.info("Cached stream {} already removed.", stream.getStreamName());
                }
            }
        }, j);
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public int numAcquired() {
        return this.numAcquired.get();
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public int numCached() {
        return this.numCached.get();
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public boolean isAcquired(String str) {
        return this.acquiredStreams.containsKey(str);
    }

    @Override // org.apache.distributedlog.service.stream.StreamManager
    public void close() {
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
        } finally {
            this.closeLock.writeLock().unlock();
        }
    }

    private Future<List<Void>> closeStreams(Set<Stream> set, Optional<RateLimiter> optional) {
        if (set.isEmpty()) {
            logger.info("No streams to close.");
            return Future.value(new ArrayList());
        }
        ArrayList arrayList = new ArrayList(set.size());
        for (Stream stream : set) {
            if (optional.isPresent()) {
                ((RateLimiter) optional.get()).acquire();
            }
            arrayList.add(stream.requestClose("Close Streams"));
        }
        return Future.collect(arrayList);
    }

    private Stream newStream(String str, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration) {
        return this.streamFactory.create(str, dynamicDistributedLogConfiguration, this);
    }

    public Future<Void> doCloseAndRemoveAsync(String str) {
        Stream stream = this.streams.get(str);
        if (null != stream) {
            return stream.requestClose("release ownership");
        }
        logger.info("No stream {} to release.", str);
        return Future.value((Object) null);
    }

    private java.util.concurrent.Future<?> schedule(Runnable runnable, long j) {
        this.closeLock.readLock().lock();
        try {
            try {
                if (this.closed) {
                    this.closeLock.readLock().unlock();
                    return null;
                }
                if (j > 0) {
                    ScheduledFuture<?> schedule = this.executorService.schedule(runnable, j, TimeUnit.MILLISECONDS);
                    this.closeLock.readLock().unlock();
                    return schedule;
                }
                java.util.concurrent.Future<?> submit = this.executorService.submit(runnable);
                this.closeLock.readLock().unlock();
                return submit;
            } catch (RejectedExecutionException e) {
                logger.error("Failed to schedule task {} in {} ms : ", new Object[]{runnable, Long.valueOf(j), e});
                this.closeLock.readLock().unlock();
                return null;
            }
        } catch (Throwable th) {
            this.closeLock.readLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<Void> doDeleteAndRemoveAsync(String str) {
        Future<Void> exception;
        Stream stream = this.streams.get(str);
        if (null == stream) {
            logger.warn("No stream {} to delete.", str);
            return Future.exception(new UnexpectedException("No stream " + str + " to delete."));
        }
        logger.info("Deleting stream {}, {}", str, stream);
        try {
            stream.delete();
            exception = stream.requestClose("Stream Deleted");
        } catch (IOException e) {
            logger.error("Failed on removing stream {} : ", str, e);
            exception = Future.exception(e);
        }
        return exception;
    }

    @VisibleForTesting
    public ConcurrentHashMap<String, Stream> getCachedStreams() {
        return this.streams;
    }

    @VisibleForTesting
    public ConcurrentHashMap<String, Stream> getAcquiredStreams() {
        return this.acquiredStreams;
    }
}
