package tech.ydb.topic.read.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.MessageImpl;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;
import tech.ydb.topic.utils.Encoder;

/* loaded from: input_file:tech/ydb/topic/read/impl/PartitionSessionImpl.class */
public class PartitionSessionImpl {
    private static final Logger logger = LoggerFactory.getLogger(PartitionSessionImpl.class);
    private final long id;
    private final String path;
    private final long partitionId;
    private final PartitionSession sessionInfo;
    private final OffsetsRange partitionOffsets;
    private final Executor decompressionExecutor;
    private final AtomicBoolean isWorking;
    private final Queue<Batch> decodingBatches;
    private final Queue<Batch> readingQueue;
    private final Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
    private final AtomicBoolean isReadingNow;
    private final Consumer<List<OffsetsRange>> commitFunction;
    private final NavigableMap<Long, CompletableFuture<Void>> commitFutures;
    private long lastReadOffset;
    private long lastCommittedOffset;

    /* loaded from: input_file:tech/ydb/topic/read/impl/PartitionSessionImpl$Builder.class */
    public static class Builder {
        private long id;
        private String path;
        private long partitionId;
        private long committedOffset;
        private OffsetsRange partitionOffsets;
        private Executor decompressionExecutor;
        private Function<DataReceivedEvent, CompletableFuture<Void>> dataEventCallback;
        private Consumer<List<OffsetsRange>> commitFunction;

        public Builder setId(long j) {
            this.id = j;
            return this;
        }

        public Builder setPath(String str) {
            this.path = str;
            return this;
        }

        public Builder setPartitionId(long j) {
            this.partitionId = j;
            return this;
        }

        public Builder setCommittedOffset(long j) {
            this.committedOffset = j;
            return this;
        }

        public Builder setPartitionOffsets(OffsetsRange offsetsRange) {
            this.partitionOffsets = offsetsRange;
            return this;
        }

        public Builder setDecompressionExecutor(Executor executor) {
            this.decompressionExecutor = executor;
            return this;
        }

        public Builder setDataEventCallback(Function<DataReceivedEvent, CompletableFuture<Void>> function) {
            this.dataEventCallback = function;
            return this;
        }

        public Builder setCommitFunction(Consumer<List<OffsetsRange>> consumer) {
            this.commitFunction = consumer;
            return this;
        }

        public PartitionSessionImpl build() {
            return new PartitionSessionImpl(this);
        }
    }

    private PartitionSessionImpl(Builder builder) {
        this.isWorking = new AtomicBoolean(true);
        this.decodingBatches = new LinkedList();
        this.readingQueue = new ConcurrentLinkedQueue();
        this.isReadingNow = new AtomicBoolean();
        this.commitFutures = new ConcurrentSkipListMap();
        this.id = builder.id;
        this.path = builder.path;
        this.partitionId = builder.partitionId;
        this.sessionInfo = new PartitionSession(this.id, this.partitionId, this.path);
        this.lastReadOffset = builder.committedOffset;
        this.lastCommittedOffset = builder.committedOffset;
        this.partitionOffsets = builder.partitionOffsets;
        this.decompressionExecutor = builder.decompressionExecutor;
        this.dataEventCallback = builder.dataEventCallback;
        this.commitFunction = builder.commitFunction;
        logger.info("[{}] Partition session {} (partition {}) is started. CommittedOffset: {}. Partition offsets: {}-{}", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId), Long.valueOf(this.lastReadOffset), Long.valueOf(this.partitionOffsets.getStart()), Long.valueOf(this.partitionOffsets.getEnd())});
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public long getId() {
        return this.id;
    }

    public long getPartitionId() {
        return this.partitionId;
    }

    public String getPath() {
        return this.path;
    }

    public PartitionSession getSessionInfo() {
        return this.sessionInfo;
    }

    public void setLastReadOffset(long j) {
        this.lastReadOffset = j;
    }

    public void setLastCommittedOffset(long j) {
        this.lastCommittedOffset = j;
    }

    public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadResponse.Batch> list) {
        if (!this.isWorking.get()) {
            return CompletableFuture.completedFuture(null);
        }
        LinkedList linkedList = new LinkedList();
        list.forEach(batch -> {
            BatchMeta batchMeta = new BatchMeta(batch);
            Batch batch = new Batch(batchMeta);
            List messageDataList = batch.getMessageDataList();
            if (messageDataList.isEmpty()) {
                logger.error("[{}] Received empty batch for partition session {} (partition {}). This shouldn't happen", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId)});
            } else if (logger.isDebugEnabled()) {
                logger.debug("[{}] Received a batch of {} messages (offsets {} - {}) for partition session {} (partition {})", new Object[]{this.path, Integer.valueOf(messageDataList.size()), Long.valueOf(((YdbTopic.StreamReadMessage.ReadResponse.MessageData) messageDataList.get(0)).getOffset()), Long.valueOf(((YdbTopic.StreamReadMessage.ReadResponse.MessageData) messageDataList.get(messageDataList.size() - 1)).getOffset()), Long.valueOf(this.id), Long.valueOf(this.partitionId)});
            }
            messageDataList.forEach(messageData -> {
                long j = this.lastReadOffset;
                long offset = messageData.getOffset();
                long j2 = offset + 1;
                if (j2 > this.lastReadOffset) {
                    this.lastReadOffset = j2;
                    if (logger.isTraceEnabled()) {
                        logger.trace("[{}] Received a message with offset {} for partition session {} (partition {}). lastReadOffset is now {}", new Object[]{this.path, Long.valueOf(offset), Long.valueOf(this.id), Long.valueOf(this.partitionId), Long.valueOf(this.lastReadOffset)});
                    }
                } else {
                    logger.error("[{}] Received a message with offset {} which is less than last read offset {} for partition session {} (partition {})", new Object[]{this.path, Long.valueOf(offset), Long.valueOf(this.lastReadOffset), Long.valueOf(this.id), Long.valueOf(this.partitionId)});
                }
                batch.addMessage(new MessageImpl.Builder().setBatchMeta(batchMeta).setPartitionSession(this).setData(messageData.getData().toByteArray()).setOffset(offset).setSeqNo(messageData.getSeqNo()).setCommitOffsetFrom(j).setCreatedAt(ProtobufUtils.protoToInstant(messageData.getCreatedAt())).setMessageGroupId(messageData.getMessageGroupId()).setMetadataItems((List) messageData.getMetadataItemsList().stream().map(metadataItem -> {
                    return new MetadataItem(metadataItem.getKey(), metadataItem.getValue().toByteArray());
                }).collect(Collectors.toList())).build());
            });
            linkedList.add(batch.getReadFuture());
            synchronized (this.decodingBatches) {
                this.decodingBatches.add(batch);
            }
            CompletableFuture.runAsync(() -> {
                decode(batch);
            }, this.decompressionExecutor).thenRun(() -> {
                boolean z = false;
                synchronized (this.decodingBatches) {
                    while (true) {
                        Batch peek = this.decodingBatches.peek();
                        if (peek == null || !(peek.isDecompressed() || peek.getCodec() == Codec.RAW)) {
                            break;
                        }
                        this.decodingBatches.remove();
                        if (logger.isTraceEnabled()) {
                            List<MessageImpl> messages = peek.getMessages();
                            logger.trace("[{}] Adding batch with offsets {}-{} to reading queue of partition session {} (partition {})", new Object[]{this.path, Long.valueOf(messages.get(0).getOffset()), Long.valueOf(messages.get(messages.size() - 1).getOffset()), Long.valueOf(this.id), Long.valueOf(this.partitionId)});
                        }
                        this.readingQueue.add(peek);
                        z = true;
                    }
                }
                if (z) {
                    sendDataToReadersIfNeeded();
                }
            });
        });
        return CompletableFuture.allOf((CompletableFuture[]) linkedList.toArray(new CompletableFuture[0]));
    }

    public CompletableFuture<Void> commitOffsetRange(OffsetsRange offsetsRange) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        synchronized (this.commitFutures) {
            if (!this.isWorking.get()) {
                logger.info("[{}] Offset range [{}, {}) is requested to be committed, but partition session {} (partition {}) is already closed", new Object[]{this.path, Long.valueOf(offsetsRange.getStart()), Long.valueOf(offsetsRange.getEnd()), Long.valueOf(this.id), Long.valueOf(this.partitionId)});
                completableFuture.completeExceptionally(new RuntimeException("Partition session " + this.id + " (partition " + this.partitionId + ") for " + this.path + " is already closed"));
                return completableFuture;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] Offset range [{}, {}) is requested to be committed for partition session {} (partition {}). Last committed offset is {} (commit lag is {})", new Object[]{this.path, Long.valueOf(offsetsRange.getStart()), Long.valueOf(offsetsRange.getEnd()), Long.valueOf(this.id), Long.valueOf(this.partitionId), Long.valueOf(this.lastCommittedOffset), Long.valueOf(offsetsRange.getStart() - this.lastCommittedOffset)});
            }
            this.commitFutures.put(Long.valueOf(offsetsRange.getEnd()), completableFuture);
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(offsetsRange);
            this.commitFunction.accept(arrayList);
            return completableFuture;
        }
    }

    public void commitOffsetRanges(List<OffsetsRange> list) {
        if (this.isWorking.get()) {
            if (logger.isInfoEnabled()) {
                StringBuilder append = new StringBuilder("[").append(this.path).append("] Sending CommitRequest for partition session ").append(this.id).append(" (partition ").append(this.partitionId).append(") with offset ranges ");
                addRangesToString(append, list);
                logger.info(append.toString());
            }
            this.commitFunction.accept(list);
            return;
        }
        if (logger.isInfoEnabled()) {
            StringBuilder append2 = new StringBuilder("[").append(this.path).append("] Offset ranges ");
            addRangesToString(append2, list);
            append2.append(" are requested to be committed, but partition session ").append(this.id).append(" (partition ").append(this.partitionId).append(") is already closed");
            logger.info(append2.toString());
        }
    }

    private static void addRangesToString(StringBuilder sb, List<OffsetsRange> list) {
        for (int i = 0; i < list.size(); i++) {
            if (i > 0) {
                sb.append(", ");
            }
            OffsetsRange offsetsRange = list.get(i);
            sb.append("[").append(offsetsRange.getStart()).append(",").append(offsetsRange.getEnd()).append(")");
        }
    }

    public void handleCommitResponse(long j) {
        if (j <= this.lastCommittedOffset) {
            logger.error("[{}] Commit response received for partition session {} (partition {}). Committed offset: {} which is not greater than previous committed offset: {}.", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId), Long.valueOf(j), Long.valueOf(this.lastCommittedOffset)});
            return;
        }
        NavigableMap<Long, CompletableFuture<Void>> headMap = this.commitFutures.headMap(Long.valueOf(j), true);
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}. Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId), Long.valueOf(j), Long.valueOf(this.lastCommittedOffset), Long.valueOf(j - this.lastCommittedOffset), Integer.valueOf(headMap.size())});
        }
        this.lastCommittedOffset = j;
        headMap.values().forEach(completableFuture -> {
            completableFuture.complete(null);
        });
        headMap.clear();
    }

    private void decode(Batch batch) {
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Started decoding batch for partition session {} (partition {})", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId)});
        }
        if (batch.getCodec() == Codec.RAW) {
            return;
        }
        batch.getMessages().forEach(messageImpl -> {
            try {
                messageImpl.setData(Encoder.decode(batch.getCodec(), messageImpl.getData()));
                messageImpl.setDecompressed(true);
            } catch (IOException e) {
                messageImpl.setException(e);
                logger.info("[{}] Exception was thrown while decoding a message in partition session {} (partition {})", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId)});
            }
        });
        batch.setDecompressed(true);
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Finished decoding batch for partition session {} (partition {})", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId)});
        }
    }

    private void sendDataToReadersIfNeeded() {
        if (this.isWorking.get()) {
            if (!this.isReadingNow.compareAndSet(false, true)) {
                if (logger.isTraceEnabled()) {
                    logger.trace("[{}] Partition session {} (partition {}) - no need to send data to readers: reading is already being performed", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId)});
                    return;
                }
                return;
            }
            Batch poll = this.readingQueue.poll();
            if (poll == null) {
                this.isReadingNow.set(false);
                return;
            }
            List<MessageImpl> messages = poll.getMessages();
            ArrayList arrayList = new ArrayList(messages);
            DataReceivedEventImpl dataReceivedEventImpl = new DataReceivedEventImpl(this, arrayList, new OffsetsRangeImpl(messages.get(0).getCommitOffsetFrom(), messages.get(messages.size() - 1).getOffset() + 1));
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition session {} (partition {}) is about to be called...", new Object[]{this.path, Integer.valueOf(arrayList.size()), Long.valueOf(((Message) arrayList.get(0)).getOffset()), Long.valueOf(((Message) arrayList.get(arrayList.size() - 1)).getOffset()), Long.valueOf(this.id), Long.valueOf(this.partitionId)});
            }
            this.dataEventCallback.apply(dataReceivedEventImpl).whenComplete((r12, th) -> {
                if (th != null) {
                    logger.error("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition session {} (partition {}) finished with error: ", new Object[]{this.path, Integer.valueOf(arrayList.size()), Long.valueOf(((Message) arrayList.get(0)).getOffset()), Long.valueOf(((Message) arrayList.get(arrayList.size() - 1)).getOffset()), Long.valueOf(this.id), Long.valueOf(this.partitionId), th});
                } else if (logger.isDebugEnabled()) {
                    logger.debug("[{}] DataReceivedEvent callback with {} message(s) (offsets {}-{}) for partition session {} (partition {}) successfully finished", new Object[]{this.path, Integer.valueOf(arrayList.size()), Long.valueOf(((Message) arrayList.get(0)).getOffset()), Long.valueOf(((Message) arrayList.get(arrayList.size() - 1)).getOffset()), Long.valueOf(this.id), Long.valueOf(this.partitionId)});
                }
                this.isReadingNow.set(false);
                poll.complete();
                sendDataToReadersIfNeeded();
            });
        }
    }

    public void shutdown() {
        synchronized (this.commitFutures) {
            this.isWorking.set(false);
            logger.info("[{}] Partition session {} (partition {}) is shutting down. Failing {} commit futures...", new Object[]{this.path, Long.valueOf(this.id), Long.valueOf(this.partitionId), Integer.valueOf(this.commitFutures.size())});
            this.commitFutures.values().forEach(completableFuture -> {
                completableFuture.completeExceptionally(new RuntimeException("Partition session " + this.id + " (partition " + this.partitionId + ") for " + this.path + " is closed"));
            });
        }
        synchronized (this.decodingBatches) {
            this.decodingBatches.forEach((v0) -> {
                v0.complete();
            });
            this.readingQueue.forEach((v0) -> {
                v0.complete();
            });
        }
    }
}
