package org.apache.rocketmq.streams.common.channel.sinkcache.impl;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.context.IMessage;

/* loaded from: input_file:org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache.class */
public abstract class AbstractMultiSplitMessageCache<R> extends MessageCache<R> {
    protected ConcurrentHashMap<String, MessageCache<IMessage>> queueMessageCaches;
    protected transient Boolean isOpenAutoFlush;
    protected transient ExecutorService executorService;

    /* loaded from: input_file:org/apache/rocketmq/streams/common/channel/sinkcache/impl/AbstractMultiSplitMessageCache$MessageFlushCallBack.class */
    protected class MessageFlushCallBack implements IMessageFlushCallBack<R> {
        protected IMessageFlushCallBack<R> callBack;

        public MessageFlushCallBack(IMessageFlushCallBack<R> iMessageFlushCallBack) {
            this.callBack = iMessageFlushCallBack;
        }

        @Override // org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack
        public boolean flushMessage(List<R> list) {
            boolean flushMessage = this.callBack.flushMessage(list);
            AbstractMultiSplitMessageCache.this.messageCount.addAndGet(-list.size());
            return flushMessage;
        }
    }

    public AbstractMultiSplitMessageCache(IMessageFlushCallBack<R> iMessageFlushCallBack) {
        super(null);
        this.queueMessageCaches = new ConcurrentHashMap<>();
        this.isOpenAutoFlush = true;
        this.executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        this.flushCallBack = new MessageFlushCallBack(iMessageFlushCallBack);
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int addCache(R r) {
        String createSplitId = createSplitId(r);
        MessageCache<IMessage> messageCache = this.queueMessageCaches.get(createSplitId);
        if (messageCache == null) {
            synchronized (this) {
                messageCache = this.queueMessageCaches.get(createSplitId);
                if (messageCache == null) {
                    messageCache = new MessageCache<>(this.flushCallBack);
                    messageCache.setAutoFlushSize(this.autoFlushSize);
                    messageCache.setAutoFlushTimeGap(this.autoFlushTimeGap);
                    messageCache.setBatchSize(this.batchSize);
                    if (this.isOpenAutoFlush.booleanValue()) {
                        messageCache.openAutoFlush();
                    }
                    messageCache.setAutoFlushExecutorService(this.autoFlushExecutorService);
                    MessageCache<IMessage> putIfAbsent = this.queueMessageCaches.putIfAbsent(createSplitId, messageCache);
                    if (putIfAbsent != null) {
                        messageCache = putIfAbsent;
                    }
                }
            }
        }
        messageCache.addCache(r);
        int incrementAndGet = this.messageCount.incrementAndGet();
        if (this.batchSize > 0 && incrementAndGet >= this.batchSize) {
            flush(createSplitId);
            incrementAndGet = this.messageCount.get();
        }
        return incrementAndGet;
    }

    protected abstract String createSplitId(R r);

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int flush() {
        int i = 0;
        Iterator<MessageCache<IMessage>> it = this.queueMessageCaches.values().iterator();
        while (it.hasNext()) {
            i += it.next().flush();
        }
        return i;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int flush(Set<String> set) {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        if (this.queueMessageCaches == null || this.queueMessageCaches.size() == 0 || set == null || set.size() == 0) {
            return 0;
        }
        if (set.size() == 1) {
            MessageCache<IMessage> messageCache = this.queueMessageCaches.get(set.iterator().next());
            if (messageCache == null) {
                return 0;
            }
            atomicInteger.addAndGet(messageCache.flush());
            return atomicInteger.get();
        }
        final CountDownLatch countDownLatch = new CountDownLatch(set.size());
        for (final String str : set) {
            this.executorService.execute(new Runnable() { // from class: org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache.1
                @Override // java.lang.Runnable
                public void run() {
                    MessageCache<IMessage> messageCache2 = AbstractMultiSplitMessageCache.this.queueMessageCaches.get(str);
                    if (messageCache2 != null) {
                        atomicInteger.addAndGet(messageCache2.flush());
                    }
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return atomicInteger.get();
    }

    protected int flush(String str) {
        HashSet hashSet = new HashSet();
        hashSet.add(str);
        return flush(hashSet);
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public Integer getMessageCount() {
        return Integer.valueOf(this.messageCount.get());
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public void openAutoFlush() {
        if (this.queueMessageCaches == null) {
            return;
        }
        for (MessageCache<IMessage> messageCache : this.queueMessageCaches.values()) {
            messageCache.setAutoFlushSize(this.autoFlushSize);
            messageCache.setAutoFlushTimeGap(this.autoFlushTimeGap);
            messageCache.openAutoFlush();
        }
        this.isOpenAutoFlush = true;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public void closeAutoFlush() {
        this.isOpenAutoFlush = false;
        Iterator<MessageCache<IMessage>> it = this.queueMessageCaches.values().iterator();
        while (it.hasNext()) {
            it.next().closeAutoFlush();
        }
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    @Override // org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache, org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache
    public int getBatchSize() {
        return this.batchSize;
    }
}
