package tech.ydb.topic.write.impl;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Issue;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.proto.StatusCodesProtos;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.TopicRpc;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.settings.WriterSettings;
import tech.ydb.topic.utils.Encoder;
import tech.ydb.topic.write.InitResult;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.QueueOverflowException;
import tech.ydb.topic.write.WriteAck;

/* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl.class */
public abstract class WriterImpl {
    private static final Logger logger = LoggerFactory.getLogger(WriterImpl.class);
    private static final int MAX_RECONNECT_COUNT = 0;
    private static final int EXP_BACKOFF_BASE_MS = 256;
    private static final int EXP_BACKOFF_CEILING_MS = 40000;
    private static final int EXP_BACKOFF_MAX_POWER = 7;
    private final WriterSettings settings;
    private final TopicRpc topicRpc;
    private final Executor compressionExecutor;
    private final MessageSender messageSender;
    private final long maxSendBufferMemorySize;
    private WriteSession session;
    private String currentSessionId;
    private long availableSizeBytes;
    private CompletableFuture<WriteAck> lastAcceptedMessageFuture;
    private CompletableFuture<InitResult> initResultFuture = new CompletableFuture<>();
    private final Queue<IncomingMessage> incomingQueue = new LinkedList();
    private final Queue<EnqueuedMessage> encodingMessages = new LinkedList();
    private final Queue<EnqueuedMessage> sendingQueue = new ConcurrentLinkedQueue();
    private final Queue<EnqueuedMessage> sentMessages = new ConcurrentLinkedQueue();
    private final AtomicBoolean writeRequestInProgress = new AtomicBoolean(false);
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private final AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private final AtomicInteger reconnectCounter = new AtomicInteger(MAX_RECONNECT_COUNT);
    private Boolean isSeqNoProvided = null;
    private int currentInFlightCount = MAX_RECONNECT_COUNT;
    private final String id = UUID.randomUUID().toString();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: tech.ydb.topic.write.impl.WriterImpl$1, reason: invalid class name */
    /* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason;
        static final /* synthetic */ int[] $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase = new int[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.MessageWriteStatusCase.values().length];

        static {
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.MessageWriteStatusCase.WRITTEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.MessageWriteStatusCase.SKIPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason = new int[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Skipped.Reason.values().length];
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Skipped.Reason.REASON_ALREADY_WRITTEN.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason[YdbTopic.StreamWriteMessage.WriteResponse.WriteAck.Skipped.Reason.REASON_UNSPECIFIED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/topic/write/impl/WriterImpl$IncomingMessage.class */
    public static class IncomingMessage {
        private final EnqueuedMessage message;
        private final CompletableFuture<Void> future;

        private IncomingMessage(EnqueuedMessage enqueuedMessage) {
            this.future = new CompletableFuture<>();
            this.message = enqueuedMessage;
        }

        /* synthetic */ IncomingMessage(EnqueuedMessage enqueuedMessage, AnonymousClass1 anonymousClass1) {
            this(enqueuedMessage);
        }
    }

    public WriterImpl(TopicRpc topicRpc, WriterSettings writerSettings, Executor executor) {
        this.topicRpc = topicRpc;
        this.settings = writerSettings;
        this.session = new WriteSession(topicRpc);
        this.availableSizeBytes = writerSettings.getMaxSendBufferMemorySize();
        this.maxSendBufferMemorySize = writerSettings.getMaxSendBufferMemorySize();
        this.compressionExecutor = executor;
        this.messageSender = new MessageSender(this.session, writerSettings);
        logger.info("Writer (generated id " + this.id + ") created for topic \"" + writerSettings.getTopicPath() + "\" with producerId \"" + writerSettings.getProducerId() + "\" and messageGroupId \"" + writerSettings.getMessageGroupId() + "\"");
    }

    public CompletableFuture<Void> tryToEnqueue(EnqueuedMessage enqueuedMessage, boolean z) {
        synchronized (this.incomingQueue) {
            if (this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                if (z) {
                    logger.trace("[{}] Rejecting a message due to reaching message queue in-flight limit", this.id);
                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                    completableFuture.completeExceptionally(new QueueOverflowException("Message queue in-flight limit reached"));
                    return completableFuture;
                }
                logger.debug("[{}] Message queue in-flight limit reached. Putting the message into incoming waiting queue", this.id);
            } else if (this.availableSizeBytes <= enqueuedMessage.getMessage().getData().length) {
                if (z) {
                    logger.trace("[{}] Rejecting a message due to reaching message queue size limit", this.id);
                    CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
                    completableFuture2.completeExceptionally(new QueueOverflowException("Message queue size limit reached"));
                    return completableFuture2;
                }
                logger.debug("[{}] Message queue size limit reached. Putting the message into incoming waiting queue", this.id);
            } else if (this.incomingQueue.isEmpty()) {
                logger.trace("[{}] Putting a message into the queue right now, enough space in send buffer", this.id);
                acceptMessageIntoSendingQueue(enqueuedMessage);
                return CompletableFuture.completedFuture(null);
            }
            IncomingMessage incomingMessage = new IncomingMessage(enqueuedMessage, null);
            this.incomingQueue.add(incomingMessage);
            return incomingMessage.future;
        }
    }

    private void acceptMessageIntoSendingQueue(EnqueuedMessage enqueuedMessage) {
        this.lastAcceptedMessageFuture = enqueuedMessage.getFuture();
        this.currentInFlightCount++;
        this.availableSizeBytes -= enqueuedMessage.getUncompressedSizeBytes();
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] Accepted 1 message of {} uncompressed bytes. Current In-flight: {}, AvailableSizeBytes: {} ({} / {} acquired)", new Object[]{this.id, Long.valueOf(enqueuedMessage.getUncompressedSizeBytes()), Integer.valueOf(this.currentInFlightCount), Long.valueOf(this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize - this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize)});
        }
        this.encodingMessages.add(enqueuedMessage);
        CompletableFuture.runAsync(() -> {
            encode(enqueuedMessage);
        }, this.compressionExecutor).thenRunAsync(() -> {
            boolean z = MAX_RECONNECT_COUNT;
            synchronized (this.incomingQueue) {
                while (true) {
                    EnqueuedMessage peek = this.encodingMessages.peek();
                    if (peek == null || !(peek.isCompressed() || this.settings.getCodec() == Codec.RAW)) {
                        break;
                    }
                    this.encodingMessages.remove();
                    if (peek.isCompressed()) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("[{}] Message compressed from {} to {} bytes", new Object[]{this.id, Long.valueOf(peek.getUncompressedSizeBytes()), Long.valueOf(peek.getCompressedSizeBytes())});
                        }
                        free(MAX_RECONNECT_COUNT, peek.getUncompressedSizeBytes() - peek.getCompressedSizeBytes());
                    }
                    logger.debug("[{}] Adding message to sending queue", this.id);
                    this.sendingQueue.add(peek);
                    z = true;
                }
            }
            if (z) {
                sendDataRequestIfNeeded();
            }
        }).exceptionally(th -> {
            logger.error("[{}] Exception while encoding message: ", this.id, th);
            free(1, enqueuedMessage.getSizeBytes());
            enqueuedMessage.getFuture().completeExceptionally(th);
            return null;
        });
    }

    private void sendDataRequestIfNeeded() {
        while (!this.isReconnecting.get()) {
            if (!this.initResultFuture.isDone()) {
                logger.debug("[{}] Can't send data: init was not yet received", this.id);
                return;
            }
            if (this.sendingQueue.isEmpty()) {
                logger.trace("[{}] Nothing to send -- sendingQueue is empty", this.id);
                return;
            }
            if (!this.writeRequestInProgress.compareAndSet(false, true)) {
                logger.debug("[{}] Send request is already in progress", this.id);
                return;
            }
            LinkedList linkedList = new LinkedList(this.sendingQueue);
            if (linkedList.isEmpty()) {
                logger.debug("[{}] Nothing to send -- sendingQueue is empty #2", this.id);
            } else {
                this.sendingQueue.removeAll(linkedList);
                this.sentMessages.addAll(linkedList);
                synchronized (this.messageSender) {
                    this.messageSender.sendMessages(linkedList);
                }
                logger.trace("[{}] Sent {} messages to server", this.id, Integer.valueOf(linkedList.size()));
            }
            if (!this.writeRequestInProgress.compareAndSet(true, false)) {
                logger.error("[{}] Couldn't turn off writeRequestInProgress. Should not happen", this.id);
            }
        }
        logger.debug("[{}] Can't send data: reconnect is in progress", this.id);
    }

    private void encode(EnqueuedMessage enqueuedMessage) {
        logger.trace("[{}] Started encoding message", this.id);
        if (this.settings.getCodec() == Codec.RAW) {
            return;
        }
        enqueuedMessage.getMessage().setData(Encoder.encode(this.settings.getCodec(), enqueuedMessage.getMessage().getData()));
        enqueuedMessage.setCompressedSizeBytes(enqueuedMessage.getMessage().getData().length);
        enqueuedMessage.setCompressed(true);
        logger.trace("[{}] Successfully finished encoding message", this.id);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<InitResult> initImpl() {
        logger.debug("[{}] initImpl started", this.id);
        this.session.start(this::processMessage).whenComplete(this::completeSession);
        this.initResultFuture = new CompletableFuture<>();
        YdbTopic.StreamWriteMessage.InitRequest.Builder path = YdbTopic.StreamWriteMessage.InitRequest.newBuilder().setPath(this.settings.getTopicPath());
        String producerId = this.settings.getProducerId();
        if (producerId != null) {
            path.setProducerId(producerId);
        }
        String messageGroupId = this.settings.getMessageGroupId();
        Long partitionId = this.settings.getPartitionId();
        if (messageGroupId != null) {
            if (partitionId != null) {
                throw new RuntimeException("Both MessageGroupId and PartitionId are set in WriterSettings");
            }
            path.setMessageGroupId(messageGroupId);
        } else if (partitionId != null) {
            path.setPartitionId(partitionId.longValue());
        }
        this.session.send(YdbTopic.StreamWriteMessage.FromClient.newBuilder().setInitRequest(path).build());
        return this.initResultFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<CompletableFuture<WriteAck>> sendImpl(Message message, boolean z) {
        if (this.isStopped.get()) {
            throw new RuntimeException("Writer is already stopped");
        }
        if (this.isSeqNoProvided == null) {
            this.isSeqNoProvided = Boolean.valueOf(message.getSeqNo() != null);
        } else {
            if (message.getSeqNo() != null && !this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was provided for a message after it had not been provided for another message. SeqNo should either be provided for all messages or none of them.");
            }
            if (message.getSeqNo() == null && this.isSeqNoProvided.booleanValue()) {
                throw new RuntimeException("SeqNo was not provided for a message after it had been provided for another message. SeqNo should either be provided for all messages or none of them.");
            }
        }
        EnqueuedMessage enqueuedMessage = new EnqueuedMessage(message);
        return tryToEnqueue(enqueuedMessage, z).thenApply(r3 -> {
            return enqueuedMessage.getFuture();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public CompletableFuture<Void> flushImpl() {
        CompletableFuture completedFuture;
        if (this.lastAcceptedMessageFuture == null) {
            return CompletableFuture.completedFuture(null);
        }
        synchronized (this.incomingQueue) {
            completedFuture = this.lastAcceptedMessageFuture.isDone() ? CompletableFuture.completedFuture(null) : this.lastAcceptedMessageFuture.thenApply((Function<? super WriteAck, ? extends U>) writeAck -> {
                return null;
            });
        }
        return completedFuture;
    }

    private void free(int i, long j) {
        synchronized (this.incomingQueue) {
            this.currentInFlightCount -= i;
            this.availableSizeBytes += j;
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Freed {} bytes in {} messages. Current In-flight: {}, current availableSize: {} ({} / {} acquired)", new Object[]{this.id, Long.valueOf(j), Integer.valueOf(i), Integer.valueOf(this.currentInFlightCount), Long.valueOf(this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize - this.availableSizeBytes), Long.valueOf(this.maxSendBufferMemorySize)});
            }
            if (j > 0 && !this.incomingQueue.isEmpty()) {
                while (true) {
                    IncomingMessage peek = this.incomingQueue.peek();
                    if (peek == null) {
                        logger.trace("[{}] All messages from incomingQueue are accepted into send buffer", this.id);
                        break;
                    } else {
                        if (peek.message.getUncompressedSizeBytes() > this.availableSizeBytes || this.currentInFlightCount >= this.settings.getMaxSendBufferMessagesCount()) {
                            break;
                        }
                        logger.trace("[{}] Putting a message into send buffer after freeing some space", this.id);
                        if (peek.future.complete(null)) {
                            acceptMessageIntoSendingQueue(peek.message);
                        }
                        this.incomingQueue.remove();
                    }
                }
                logger.trace("[{}] There are messages in incomingQueue still, but no space in send buffer", this.id);
            }
        }
    }

    private void reconnect() {
        logger.info("[{}] Reconnect #{} started. Creating new WriteSession", this.id, Integer.valueOf(this.reconnectCounter.get()));
        this.session = new WriteSession(this.topicRpc);
        synchronized (this.messageSender) {
            this.messageSender.setSession(this.session);
        }
        initImpl();
        if (this.isReconnecting.compareAndSet(true, false)) {
            return;
        }
        logger.warn("[{}] Couldn't reset reconnect flag. Shouldn't happen", this.id);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> shutdownImpl() {
        logger.info("[{}] Shutting down Topic Writer", this.id);
        this.isStopped.set(true);
        return flushImpl().thenRun(() -> {
            this.session.shutdown();
        });
    }

    private void shutdownImpl(String str) {
        if (!this.initResultFuture.isDone()) {
            initImpl().completeExceptionally(new RuntimeException(str));
        }
        shutdownImpl();
    }

    private void processMessage(YdbTopic.StreamWriteMessage.FromServer fromServer) {
        logger.debug("[{}] processMessage called", this.id);
        if (fromServer.getStatus() != StatusCodesProtos.StatusIds.StatusCode.SUCCESS) {
            logger.error("[{}] Got non-success status in processMessage method: {}", this.id, fromServer);
            completeSession(Status.of(StatusCode.fromProto(fromServer.getStatus())).withIssues(new Issue[]{Issue.of("Got a message with non-success status: " + fromServer, Issue.Severity.ERROR)}), null);
            return;
        }
        this.reconnectCounter.set(MAX_RECONNECT_COUNT);
        if (fromServer.hasInitResponse()) {
            this.currentSessionId = fromServer.getInitResponse().getSessionId();
            logger.info("[{}] Session {} initialized", this.id, this.currentSessionId);
            long lastSeqNo = fromServer.getInitResponse().getLastSeqNo();
            synchronized (this.messageSender) {
                this.messageSender.setSeqNo(lastSeqNo);
                if (!this.sentMessages.isEmpty()) {
                    this.messageSender.sendMessages(this.sentMessages);
                }
            }
            this.initResultFuture.complete(new InitResult(lastSeqNo));
            sendDataRequestIfNeeded();
            return;
        }
        if (fromServer.hasWriteResponse()) {
            List<YdbTopic.StreamWriteMessage.WriteResponse.WriteAck> acksList = fromServer.getWriteResponse().getAcksList();
            int i = MAX_RECONNECT_COUNT;
            long j = 0;
            for (YdbTopic.StreamWriteMessage.WriteResponse.WriteAck writeAck : acksList) {
                while (true) {
                    EnqueuedMessage peek = this.sentMessages.peek();
                    if (peek != null) {
                        if (peek.getSeqNo().longValue() == writeAck.getSeqNo()) {
                            processWriteAck(peek, writeAck);
                            i++;
                            j += peek.getSizeBytes();
                            this.sentMessages.remove();
                            break;
                        }
                        if (peek.getSeqNo().longValue() >= writeAck.getSeqNo()) {
                            logger.info("[{}] Received an ack with seqNo {} which is older than the oldest message with seqNo {} waiting for ack", new Object[]{this.id, Long.valueOf(writeAck.getSeqNo()), peek.getSeqNo()});
                            break;
                        }
                        logger.warn("[{}] Received an ack for seqNo {}, but the oldest seqNo waiting for ack is {}", new Object[]{this.id, Long.valueOf(writeAck.getSeqNo()), peek.getSeqNo()});
                        peek.getFuture().completeExceptionally(new RuntimeException("Didn't get ack from server for this message"));
                        i++;
                        j += peek.getSizeBytes();
                        this.sentMessages.remove();
                    }
                }
            }
            free(i, j);
        }
    }

    private void processWriteAck(EnqueuedMessage enqueuedMessage, YdbTopic.StreamWriteMessage.WriteResponse.WriteAck writeAck) {
        WriteAck writeAck2;
        switch (AnonymousClass1.$SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$MessageWriteStatusCase[writeAck.getMessageWriteStatusCase().ordinal()]) {
            case 1:
                writeAck2 = new WriteAck(writeAck.getSeqNo(), WriteAck.State.WRITTEN, new WriteAck.Details(writeAck.getWritten().getOffset()));
                break;
            case 2:
                switch (AnonymousClass1.$SwitchMap$tech$ydb$proto$topic$YdbTopic$StreamWriteMessage$WriteResponse$WriteAck$Skipped$Reason[writeAck.getSkipped().getReason().ordinal()]) {
                    case 1:
                        writeAck2 = new WriteAck(writeAck.getSeqNo(), WriteAck.State.ALREADY_WRITTEN, null);
                        break;
                    case 2:
                    default:
                        enqueuedMessage.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck skipped reason"));
                        return;
                }
            default:
                enqueuedMessage.getFuture().completeExceptionally(new RuntimeException("Unknown WriteAck state"));
                return;
        }
        enqueuedMessage.getFuture().complete(writeAck2);
    }

    private void completeSession(Status status, Throwable th) {
        logger.info("[{}] CompleteSession called", this.id);
        this.session.stop();
        if (th != null) {
            logger.error("[{}] Exception in writing stream session {}: ", new Object[]{this.id, this.currentSessionId, th});
        } else {
            if (status.isSuccess()) {
                if (this.isStopped.get()) {
                    logger.info("[{}] Writing stream session {} closed successfully", this.id, this.currentSessionId);
                    return;
                } else if (this.isReconnecting.get()) {
                    logger.info("[{}] Current session is closing, but reconnect is already scheduled", this.id);
                    return;
                } else {
                    logger.error("[{}] Writing stream session {} was closed unexpectedly. Shutting down the writer.", this.id, this.currentSessionId);
                    shutdownImpl("Writing stream session " + this.currentSessionId + " was closed unexpectedly. Shutting down writer.");
                    return;
                }
            }
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[MAX_RECONNECT_COUNT] = this.id;
            objArr[1] = this.currentSessionId != null ? this.currentSessionId : "";
            objArr[2] = status;
            logger2.error("[{}] Error in writing stream session {}: {}", objArr);
        }
        if (this.isStopped.get()) {
            return;
        }
        int incrementAndGet = this.reconnectCounter.incrementAndGet();
        if (!this.isReconnecting.compareAndSet(false, true)) {
            logger.debug("[{}] Should reconnect, but reconnect is already in progress", this.id);
            return;
        }
        int i = incrementAndGet <= EXP_BACKOFF_MAX_POWER ? EXP_BACKOFF_BASE_MS * (1 << incrementAndGet) : EXP_BACKOFF_CEILING_MS;
        int nextInt = i + ThreadLocalRandom.current().nextInt(i);
        logger.warn("[{}] Retry #" + incrementAndGet + ". Scheduling reconnect in {}ms...", this.id, Integer.valueOf(nextInt));
        this.topicRpc.getScheduler().schedule(this::reconnect, nextInt, TimeUnit.MILLISECONDS);
    }
}
