package tech.ydb.topic.read.impl;

import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.StartPartitionSessionSettings;
import tech.ydb.topic.settings.TopicReadSettings;

/* loaded from: input_file:tech/ydb/topic/read/impl/ReaderImpl.class */
public abstract class ReaderImpl {
    private static final Logger logger = LoggerFactory.getLogger(ReaderImpl.class);
    private static final int MAX_RECONNECT_COUNT = 0;
    private static final int EXP_BACKOFF_BASE_MS = 256;
    private static final int EXP_BACKOFF_CEILING_MS = 40000;
    private static final int EXP_BACKOFF_MAX_POWER = 7;
    private static final int DEFAULT_DECOMPRESSION_THREAD_COUNT = 4;
    private final ReaderSettings settings;
    private final TopicRpc topicRpc;
    private final Executor decompressionExecutor;
    private final ExecutorService defaultDecompressionExecutorService;
    private ReadSession session;
    private String currentSessionId;
    private final AtomicInteger reconnectCounter = new AtomicInteger(MAX_RECONNECT_COUNT);
    protected final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final AtomicLong sizeBytesAcquired = new AtomicLong(0);
    private final AtomicLong sizeBytesToRequest = new AtomicLong(0);
    private final Map<Long, PartitionSession> partitionSessions = new ConcurrentHashMap();
    private CompletableFuture<Void> initResultFuture = new CompletableFuture<>();
    private AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private final String id = UUID.randomUUID().toString();

    public ReaderImpl(TopicRpc topicRpc, ReaderSettings readerSettings) {
        this.topicRpc = topicRpc;
        this.settings = readerSettings;
        this.session = new ReadSession(topicRpc);
        if (readerSettings.getDecompressionExecutor() != null) {
            this.defaultDecompressionExecutorService = null;
            this.decompressionExecutor = readerSettings.getDecompressionExecutor();
        } else {
            this.defaultDecompressionExecutorService = Executors.newFixedThreadPool(DEFAULT_DECOMPRESSION_THREAD_COUNT);
            this.decompressionExecutor = this.defaultDecompressionExecutorService;
        }
        StringBuilder sb = new StringBuilder("Reader");
        if (readerSettings.getReaderName() != null && !readerSettings.getReaderName().isEmpty()) {
            sb.append(" \"").append(readerSettings.getReaderName()).append("\"");
        }
        sb.append(" (generated id ").append(this.id).append(")");
        sb.append(" created for topic(s): ");
        for (TopicReadSettings topicReadSettings : readerSettings.getTopics()) {
            if (topicReadSettings != readerSettings.getTopics().get(MAX_RECONNECT_COUNT)) {
                sb.append(", ");
            }
            sb.append("\"").append(topicReadSettings.getPath()).append("\"");
        }
        sb.append(" and Consumer: \"").append(readerSettings.getConsumerName()).append("\"");
        logger.info(sb.toString());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> initImpl() {
        logger.debug("[{}] initImpl started", this.id);
        this.session.start(this::processMessage).whenComplete(this::completeSession);
        this.initResultFuture = new CompletableFuture<>();
        YdbTopic.StreamReadMessage.InitRequest.Builder consumer = YdbTopic.StreamReadMessage.InitRequest.newBuilder().setConsumer(this.settings.getConsumerName());
        this.settings.getTopics().forEach(topicReadSettings -> {
            YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.Builder path = YdbTopic.StreamReadMessage.InitRequest.TopicReadSettings.newBuilder().setPath(topicReadSettings.getPath());
            if (topicReadSettings.getPartitionIds() != null && !topicReadSettings.getPartitionIds().isEmpty()) {
                path.addAllPartitionIds(topicReadSettings.getPartitionIds());
            }
            if (topicReadSettings.getMaxLag() != null) {
                path.setMaxLag(ProtobufUtils.durationToProto(topicReadSettings.getMaxLag()));
            }
            if (topicReadSettings.getReadFrom() != null) {
                path.setReadFrom(ProtobufUtils.instantToProto(topicReadSettings.getReadFrom()));
            }
            consumer.addTopicsReadSettings(path);
        });
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setInitRequest(consumer).build());
        return this.initResultFuture;
    }

    private void sendReadRequest() {
        long andSet = this.sizeBytesToRequest.getAndSet(0L);
        if (andSet <= 0) {
            logger.debug("[{}] Nothing to request in DataRequest. sizeBytesToRequest == {}", this.id, Long.valueOf(andSet));
        } else {
            logger.debug("[{}] Sending DataRequest with {} bytes", this.id, Long.valueOf(andSet));
            this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setReadRequest(YdbTopic.StreamReadMessage.ReadRequest.newBuilder().setBytesSize(andSet).build()).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendStartPartitionSessionResponse(YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest, StartPartitionSessionSettings startPartitionSessionSettings) {
        PartitionSession build = PartitionSession.newBuilder().setId(startPartitionSessionRequest.getPartitionSession().getPartitionSessionId()).setPath(startPartitionSessionRequest.getPartitionSession().getPath()).setPartitionId(startPartitionSessionRequest.getPartitionSession().getPartitionId()).setCommittedOffset(startPartitionSessionRequest.getCommittedOffset()).setPartitionOffsets(new OffsetsRange(startPartitionSessionRequest.getPartitionOffsets().getStart(), startPartitionSessionRequest.getPartitionOffsets().getEnd())).setDecompressionExecutor(this.decompressionExecutor).setDataEventCallback(this::handleDataReceivedEvent).setCommitFunction((v1, v2) -> {
            commitOffset(v1, v2);
        }).build();
        this.partitionSessions.put(Long.valueOf(build.getId()), build);
        YdbTopic.StreamReadMessage.StartPartitionSessionResponse.Builder partitionSessionId = YdbTopic.StreamReadMessage.StartPartitionSessionResponse.newBuilder().setPartitionSessionId(build.getId());
        if (startPartitionSessionSettings != null) {
            if (startPartitionSessionSettings.getReadOffset() != null) {
                partitionSessionId.setReadOffset(startPartitionSessionSettings.getReadOffset().longValue());
            }
            if (startPartitionSessionSettings.getCommitOffset() != null) {
                partitionSessionId.setCommitOffset(startPartitionSessionSettings.getCommitOffset().longValue());
            }
        }
        logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.id, Long.valueOf(build.getId()), Long.valueOf(build.getPartitionId())});
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStartPartitionSessionResponse(partitionSessionId.build()).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendStopPartitionSessionResponse(long j) {
        PartitionSession partitionSession = this.partitionSessions.get(Long.valueOf(j));
        if (partitionSession != null) {
            logger.info("[{}] Sending StartPartitionSessionResponse for partition session {} (partition {})", new Object[]{this.id, Long.valueOf(j), Long.valueOf(partitionSession.getPartitionId())});
        } else {
            logger.warn("[{}] Sending StartPartitionSessionResponse for partition session {}, but have no such partition session running", this.id, Long.valueOf(j));
        }
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setStopPartitionSessionResponse(YdbTopic.StreamReadMessage.StopPartitionSessionResponse.newBuilder().setPartitionSessionId(j).build()).build());
    }

    private void handleReadResponse(YdbTopic.StreamReadMessage.ReadResponse readResponse) {
        long bytesSize = readResponse.getBytesSize();
        this.sizeBytesAcquired.addAndGet(bytesSize);
        logger.trace("Received ReadResponse of {} bytes", Long.valueOf(bytesSize));
        ArrayList arrayList = new ArrayList();
        readResponse.getPartitionDataList().forEach(partitionData -> {
            long partitionSessionId = partitionData.getPartitionSessionId();
            PartitionSession partitionSession = this.partitionSessions.get(Long.valueOf(partitionSessionId));
            if (partitionSession != null) {
                arrayList.add(partitionSession.addBatches(partitionData.getBatchesList()));
            } else {
                logger.debug("[{}] Received PartitionData for unknown(closed?) PartitionSessionId={}", this.id, Long.valueOf(partitionSessionId));
            }
        });
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[MAX_RECONNECT_COUNT])).whenComplete((r11, th) -> {
            if (th != null) {
                logger.error("[{}] Exception while waiting for batches to be read:", this.id, th);
            }
            boolean z = MAX_RECONNECT_COUNT;
            synchronized (this.sizeBytesAcquired) {
                long addAndGet = this.sizeBytesAcquired.addAndGet(-bytesSize);
                if (this.isReconnecting.get()) {
                    logger.trace("[{}] Finished handling ReadResponse of {} bytes. Reconnect is in progress -- no need to send ReadRequest", this.id, Long.valueOf(bytesSize));
                } else {
                    logger.trace("[{}] Finished handling ReadResponse of {} bytes. sizeBytesAcquired is now {}. Sending ReadRequest...", new Object[]{this.id, Long.valueOf(bytesSize), Long.valueOf(addAndGet)});
                    this.sizeBytesToRequest.addAndGet(bytesSize);
                    z = true;
                }
            }
            if (z) {
                sendReadRequest();
            }
        });
    }

    protected void handleCommitOffsetResponse(YdbTopic.StreamReadMessage.CommitOffsetResponse commitOffsetResponse) {
        for (YdbTopic.StreamReadMessage.CommitOffsetResponse.PartitionCommittedOffset partitionCommittedOffset : commitOffsetResponse.getPartitionsCommittedOffsetsList()) {
            PartitionSession partitionSession = this.partitionSessions.get(Long.valueOf(partitionCommittedOffset.getPartitionSessionId()));
            if (partitionSession != null) {
                partitionSession.handleCommitResponse(partitionCommittedOffset.getCommittedOffset());
            } else {
                logger.debug("[{}] Received CommitOffsetResponse for closed partition session with id={}", this.id, Long.valueOf(partitionCommittedOffset.getPartitionSessionId()));
            }
        }
    }

    protected void handleStopPartitionSessionRequest(YdbTopic.StreamReadMessage.StopPartitionSessionRequest stopPartitionSessionRequest) {
        if (stopPartitionSessionRequest.getGraceful()) {
            PartitionSession partitionSession = this.partitionSessions.get(Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
            if (partitionSession != null) {
                logger.info("[{}] Received graceful StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.id, Long.valueOf(partitionSession.getId()), Long.valueOf(partitionSession.getPartitionId())});
            } else {
                logger.warn("[{}] Received graceful StopPartitionSessionRequest for partition session {}, but have no such partition session running", this.id, Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
            }
            handleStopPartitionSession(stopPartitionSessionRequest);
            return;
        }
        PartitionSession remove = this.partitionSessions.remove(Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
        if (remove == null) {
            logger.warn("[{}] Received force StopPartitionSessionRequest for partition session {}, but have no such partition session running", this.id, Long.valueOf(stopPartitionSessionRequest.getPartitionSessionId()));
        } else {
            logger.info("[{}] Received force StopPartitionSessionRequest for partition session {} (partition {})", new Object[]{this.id, Long.valueOf(remove.getId()), Long.valueOf(remove.getPartitionId())});
            closePartitionSession(remove);
        }
    }

    protected abstract CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent dataReceivedEvent);

    protected abstract void handleStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest);

    protected abstract void handleStopPartitionSession(YdbTopic.StreamReadMessage.StopPartitionSessionRequest stopPartitionSessionRequest);

    protected abstract void handleClosePartitionSession(tech.ydb.topic.read.PartitionSession partitionSession);

    protected abstract void handleCloseReader();

    private void commitOffset(long j, OffsetsRange offsetsRange) {
        this.session.send(YdbTopic.StreamReadMessage.FromClient.newBuilder().setCommitOffsetRequest(YdbTopic.StreamReadMessage.CommitOffsetRequest.newBuilder().addCommitOffsets(YdbTopic.StreamReadMessage.CommitOffsetRequest.PartitionCommitOffset.newBuilder().setPartitionSessionId(j).addOffsets(YdbTopic.OffsetsRange.newBuilder().setStart(offsetsRange.getStart()).setEnd(offsetsRange.getEnd())))).build());
    }

    private void reconnect() {
        logger.info("[{}] Reconnect #{} started. Creating new ReadSession", this.id, Integer.valueOf(this.reconnectCounter.get()));
        this.session = new ReadSession(this.topicRpc);
        initImpl();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> shutdownImpl() {
        logger.info("[{}] Shutting down Topic Reader", this.id);
        this.isStopped.set(true);
        return CompletableFuture.runAsync(() -> {
            closePartitionSessions();
            handleCloseReader();
            if (this.defaultDecompressionExecutorService != null) {
                this.defaultDecompressionExecutorService.shutdown();
            }
            this.session.shutdown();
        });
    }

    private void shutdownImpl(String str) {
        if (!this.initResultFuture.isDone()) {
            initImpl().completeExceptionally(new RuntimeException(str));
        }
        shutdownImpl();
    }

    private void closeSessionsAndScheduleReconnect(int i) {
        this.isReconnecting.set(true);
        closePartitionSessions();
        int i2 = i <= EXP_BACKOFF_MAX_POWER ? EXP_BACKOFF_BASE_MS * (1 << i) : EXP_BACKOFF_CEILING_MS;
        int nextInt = i2 + ThreadLocalRandom.current().nextInt(i2);
        logger.warn("[{}] Retry #{}. Scheduling reconnect in {}ms...", new Object[]{this.id, Integer.valueOf(i), Integer.valueOf(nextInt)});
        this.topicRpc.getScheduler().schedule(this::reconnect, nextInt, TimeUnit.MILLISECONDS);
    }

    private void closePartitionSessions() {
        this.partitionSessions.values().forEach(this::closePartitionSession);
        this.partitionSessions.clear();
    }

    private void closePartitionSession(PartitionSession partitionSession) {
        partitionSession.shutdown();
        handleClosePartitionSession(partitionSession.getSessionInfo());
    }

    private void processMessage(YdbTopic.StreamReadMessage.FromServer fromServer) {
        logger.trace("[{}] processMessage called", this.id);
        if (fromServer.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
            logger.error("[{}] Got non-success status in processMessage method: {}", this.id, fromServer);
            completeSession(Status.of(StatusCode.fromProto(fromServer.getStatus())).withIssues(new Issue[]{Issue.of("Got a message with non-success status: " + fromServer, Issue.Severity.ERROR)}), null);
            return;
        }
        this.reconnectCounter.set(MAX_RECONNECT_COUNT);
        if (fromServer.hasInitResponse()) {
            this.currentSessionId = fromServer.getInitResponse().getSessionId();
            this.initResultFuture.complete(null);
            synchronized (this.sizeBytesAcquired) {
                long maxMemoryUsageBytes = this.settings.getMaxMemoryUsageBytes() - this.sizeBytesAcquired.get();
                this.sizeBytesToRequest.addAndGet(maxMemoryUsageBytes);
                this.isReconnecting.set(false);
                logger.info("[{}] Session {} initialized. Requesting available {} bytes...", new Object[]{this.id, this.currentSessionId, Long.valueOf(maxMemoryUsageBytes)});
            }
            sendReadRequest();
            return;
        }
        if (fromServer.hasStartPartitionSessionRequest()) {
            YdbTopic.StreamReadMessage.StartPartitionSessionRequest startPartitionSessionRequest = fromServer.getStartPartitionSessionRequest();
            logger.info("[{}] Received StartPartitionSessionRequest: partition session {} (partition {})", new Object[]{this.id, Long.valueOf(startPartitionSessionRequest.getPartitionSession().getPartitionSessionId()), Long.valueOf(startPartitionSessionRequest.getPartitionSession().getPartitionId())});
            handleStartPartitionSessionRequest(startPartitionSessionRequest);
            return;
        }
        if (fromServer.hasStopPartitionSessionRequest()) {
            handleStopPartitionSessionRequest(fromServer.getStopPartitionSessionRequest());
            return;
        }
        if (fromServer.hasReadResponse()) {
            handleReadResponse(fromServer.getReadResponse());
            return;
        }
        if (fromServer.hasCommitOffsetResponse()) {
            handleCommitOffsetResponse(fromServer.getCommitOffsetResponse());
            return;
        }
        if (!fromServer.hasPartitionSessionStatusResponse()) {
            if (fromServer.hasUpdateTokenResponse()) {
                logger.debug("[{}] Received UpdateTokenResponse", this.id);
                return;
            } else {
                logger.error("[{}] Unhandled message from server: {}", this.id, fromServer);
                return;
            }
        }
        YdbTopic.StreamReadMessage.PartitionSessionStatusResponse partitionSessionStatusResponse = fromServer.getPartitionSessionStatusResponse();
        PartitionSession partitionSession = this.partitionSessions.get(Long.valueOf(partitionSessionStatusResponse.getPartitionSessionId()));
        Logger logger2 = logger;
        Object[] objArr = new Object[6];
        objArr[MAX_RECONNECT_COUNT] = this.id;
        objArr[1] = Long.valueOf(partitionSessionStatusResponse.getPartitionSessionId());
        objArr[2] = partitionSession == null ? "unknown" : Long.valueOf(partitionSession.getPartitionId());
        objArr[3] = Long.valueOf(partitionSessionStatusResponse.getPartitionOffsets().getStart());
        objArr[DEFAULT_DECOMPRESSION_THREAD_COUNT] = Long.valueOf(partitionSessionStatusResponse.getPartitionOffsets().getEnd());
        objArr[5] = Long.valueOf(partitionSessionStatusResponse.getCommittedOffset());
        logger2.info("[{}] Received PartitionSessionStatusResponse: partition session {} (partition {}). Partition offsets: [{}, {}). Committed offset: {}", objArr);
    }

    private void completeSession(Status status, Throwable th) {
        logger.info("[{}] CompleteSession called", this.id);
        this.session.stop();
        if (th != null) {
            logger.error("[{}] Exception in reading stream session {}: ", new Object[]{this.id, this.currentSessionId, th});
        } else {
            if (status.isSuccess()) {
                if (this.isStopped.get()) {
                    logger.info("[{}] Reading stream session {} closed successfully", this.id, this.currentSessionId);
                    return;
                } else {
                    logger.error("[{}] Reading stream session {} was closed unexpectedly. Shutting down the whole reader.", this.id, this.currentSessionId);
                    shutdownImpl("Reading stream session " + this.currentSessionId + " was closed unexpectedly. Shutting down reader.");
                    return;
                }
            }
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[MAX_RECONNECT_COUNT] = this.id;
            objArr[1] = this.currentSessionId != null ? this.currentSessionId : "";
            objArr[2] = status;
            logger2.error("[{}] Error in reading stream session {}: {}", objArr);
        }
        if (this.isStopped.get()) {
            return;
        }
        closeSessionsAndScheduleReconnect(this.reconnectCounter.incrementAndGet());
    }
}
