package io.pravega.client.stream.impl;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/EventStreamWriterImpl.class */
public class EventStreamWriterImpl<Type> implements EventStreamWriter<Type> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final Stream stream;
    private final Serializer<Type> serializer;
    private final SegmentOutputStreamFactory outputStreamFactory;
    private final Controller controller;
    private final EventWriterConfig config;
    private final SegmentSelector selector;
    private final ExecutorService retransmitPool;
    private final Pinger pinger;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object writeFlushLock = new Object();
    private final Object writeSealLock = new Object();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<Segment> sealedSegmentQueue = new ConcurrentLinkedQueue<>();
    private final Consumer<Segment> segmentSealedCallBack = this::handleLogSealed;

    /* loaded from: input_file:io/pravega/client/stream/impl/EventStreamWriterImpl$TransactionImpl.class */
    private static class TransactionImpl<Type> implements Transaction<Type> {
        private final Map<Segment, SegmentTransaction<Type>> inner;
        private final UUID txId;
        private final AtomicBoolean closed;
        private final Controller controller;
        private final Stream stream;
        private final Pinger pinger;
        private StreamSegments segments;

        TransactionImpl(UUID uuid, Map<Segment, SegmentTransaction<Type>> map, StreamSegments streamSegments, Controller controller, Stream stream, Pinger pinger) {
            this.closed = new AtomicBoolean(false);
            this.txId = uuid;
            this.inner = map;
            this.segments = streamSegments;
            this.controller = controller;
            this.stream = stream;
            this.pinger = pinger;
        }

        TransactionImpl(UUID uuid, Controller controller, Stream stream) {
            this.closed = new AtomicBoolean(false);
            this.txId = uuid;
            this.inner = null;
            this.segments = null;
            this.controller = controller;
            this.stream = stream;
            this.pinger = null;
            this.closed.set(true);
        }

        @Override // io.pravega.client.stream.Transaction
        public void writeEvent(Type type) throws TxnFailedException {
            writeEvent(this.txId.toString(), type);
        }

        @Override // io.pravega.client.stream.Transaction
        public void writeEvent(String str, Type type) throws TxnFailedException {
            Preconditions.checkNotNull(type);
            throwIfClosed();
            this.inner.get(this.segments.getSegmentForKey(str)).writeEvent(type);
        }

        @Override // io.pravega.client.stream.Transaction
        public void commit() throws TxnFailedException {
            throwIfClosed();
            Iterator<SegmentTransaction<Type>> it = this.inner.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Futures.getAndHandleExceptions(this.controller.commitTransaction(this.stream, this.txId), TxnFailedException::new);
            this.pinger.stopPing(this.txId);
            this.closed.set(true);
        }

        @Override // io.pravega.client.stream.Transaction
        public void abort() {
            if (this.closed.get()) {
                return;
            }
            Iterator<SegmentTransaction<Type>> it = this.inner.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (TxnFailedException e) {
                    EventStreamWriterImpl.log.debug("Got exception while writing to transaction on abort: {}", e.getMessage());
                }
            }
            this.pinger.stopPing(this.txId);
            Futures.getAndHandleExceptions(this.controller.abortTransaction(this.stream, this.txId), RuntimeException::new);
            this.closed.set(true);
        }

        @Override // io.pravega.client.stream.Transaction
        public Transaction.Status checkStatus() {
            return (Transaction.Status) Futures.getAndHandleExceptions(this.controller.checkTransactionStatus(this.stream, this.txId), RuntimeException::new);
        }

        @Override // io.pravega.client.stream.Transaction
        public void flush() throws TxnFailedException {
            throwIfClosed();
            Iterator<SegmentTransaction<Type>> it = this.inner.values().iterator();
            while (it.hasNext()) {
                it.next().flush();
            }
        }

        @Override // io.pravega.client.stream.Transaction
        public UUID getTxnId() {
            return this.txId;
        }

        private void throwIfClosed() throws TxnFailedException {
            if (this.closed.get()) {
                throw new TxnFailedException();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventStreamWriterImpl(Stream stream, Controller controller, SegmentOutputStreamFactory segmentOutputStreamFactory, Serializer<Type> serializer, EventWriterConfig eventWriterConfig, ExecutorService executorService) {
        this.stream = (Stream) Preconditions.checkNotNull(stream);
        this.controller = (Controller) Preconditions.checkNotNull(controller);
        this.outputStreamFactory = (SegmentOutputStreamFactory) Preconditions.checkNotNull(segmentOutputStreamFactory);
        this.selector = new SegmentSelector(stream, controller, segmentOutputStreamFactory, eventWriterConfig);
        this.serializer = (Serializer) Preconditions.checkNotNull(serializer);
        this.config = eventWriterConfig;
        this.retransmitPool = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.pinger = new Pinger(eventWriterConfig, stream, controller);
        List<PendingEvent> refreshSegmentEventWriters = this.selector.refreshSegmentEventWriters(this.segmentSealedCallBack);
        if (!$assertionsDisabled && !refreshSegmentEventWriters.isEmpty()) {
            throw new AssertionError("There should not be any events to have failed");
        }
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public CompletableFuture<Void> writeEvent(Type type) {
        return writeEventInternal(null, type);
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public CompletableFuture<Void> writeEvent(String str, Type type) {
        Preconditions.checkNotNull(str);
        return writeEventInternal(str, type);
    }

    private CompletableFuture<Void> writeEventInternal(String str, Type type) {
        Preconditions.checkNotNull(type);
        Exceptions.checkNotClosed(this.closed.get(), this);
        ByteBuffer serialize = this.serializer.serialize(type);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        synchronized (this.writeFlushLock) {
            synchronized (this.writeSealLock) {
                SegmentOutputStream segmentOutputStreamForKey = this.selector.getSegmentOutputStreamForKey(str);
                while (segmentOutputStreamForKey == null) {
                    log.info("Don't have a writer for segment: {}", this.selector.getSegmentForEvent(str));
                    handleMissingLog();
                    segmentOutputStreamForKey = this.selector.getSegmentOutputStreamForKey(str);
                }
                segmentOutputStreamForKey.write(PendingEvent.withHeader(str, serialize, completableFuture));
            }
        }
        return completableFuture;
    }

    @GuardedBy("writeSealLock")
    private void handleMissingLog() {
        resend(this.selector.refreshSegmentEventWriters(this.segmentSealedCallBack));
    }

    private void handleLogSealed(Segment segment) {
        this.sealedSegmentQueue.add(segment);
        this.retransmitPool.execute(() -> {
            Retry.indefinitelyWithExpBackoff(this.config.getInitalBackoffMillis(), this.config.getBackoffMultiple(), this.config.getMaxBackoffMillis(), th -> {
                log.error("Encountered excemption when handeling a sealed segment: ", th);
            }).run(() -> {
                synchronized (this.writeSealLock) {
                    Segment poll = this.sealedSegmentQueue.poll();
                    log.info("Sealing segment {} ", poll);
                    while (poll != null) {
                        resend(this.selector.refreshSegmentEventWritersUponSealed(poll, this.segmentSealedCallBack));
                        flushInternal();
                        poll = this.sealedSegmentQueue.poll();
                        log.info("Sealing another segment {} ", poll);
                    }
                }
                return null;
            });
        });
    }

    @GuardedBy("writeSealLock")
    private void resend(List<PendingEvent> list) {
        while (!list.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            boolean z = false;
            log.info("Resending {} events", Integer.valueOf(list.size()));
            for (PendingEvent pendingEvent : list) {
                if (z) {
                    arrayList.add(pendingEvent);
                } else {
                    SegmentOutputStream segmentOutputStreamForKey = this.selector.getSegmentOutputStreamForKey(pendingEvent.getRoutingKey());
                    if (segmentOutputStreamForKey == null) {
                        log.info("No writer for segment during resend.");
                        arrayList.addAll(this.selector.refreshSegmentEventWriters(this.segmentSealedCallBack));
                        z = true;
                    } else {
                        segmentOutputStreamForKey.write(pendingEvent);
                    }
                }
            }
            list = arrayList;
        }
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public Transaction<Type> beginTxn() {
        TxnSegments txnSegments = (TxnSegments) Futures.getAndHandleExceptions(this.controller.createTransaction(this.stream, this.config.getTransactionTimeoutTime()), RuntimeException::new);
        UUID txnId = txnSegments.getTxnId();
        HashMap hashMap = new HashMap();
        for (Segment segment : txnSegments.getSteamSegments().getSegments()) {
            hashMap.put(segment, new SegmentTransactionImpl(txnId, this.outputStreamFactory.createOutputStreamForTransaction(segment, txnId, this.config, txnSegments.getSteamSegments().getDelegationToken()), this.serializer));
        }
        this.pinger.startPing(txnId);
        return new TransactionImpl(txnId, hashMap, txnSegments.getSteamSegments(), this.controller, this.stream, this.pinger);
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public Transaction<Type> getTxn(UUID uuid) {
        StreamSegments streamSegments = (StreamSegments) Futures.getAndHandleExceptions(this.controller.getCurrentSegments(this.stream.getScope(), this.stream.getStreamName()), RuntimeException::new);
        if (((Transaction.Status) Futures.getAndHandleExceptions(this.controller.checkTransactionStatus(this.stream, uuid), RuntimeException::new)) != Transaction.Status.OPEN) {
            return new TransactionImpl(uuid, this.controller, this.stream);
        }
        HashMap hashMap = new HashMap();
        for (Segment segment : streamSegments.getSegments()) {
            hashMap.put(segment, new SegmentTransactionImpl(uuid, this.outputStreamFactory.createOutputStreamForTransaction(segment, uuid, this.config, streamSegments.getDelegationToken()), this.serializer));
        }
        return new TransactionImpl(uuid, hashMap, streamSegments, this.controller, this.stream, this.pinger);
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public void flush() {
        Preconditions.checkState(!this.closed.get());
        synchronized (this.writeFlushLock) {
            boolean z = false;
            while (!z) {
                z = flushInternal();
            }
        }
    }

    private boolean flushInternal() {
        boolean z = true;
        for (SegmentOutputStream segmentOutputStream : this.selector.getWriters()) {
            try {
                segmentOutputStream.flush();
            } catch (SegmentSealedException e) {
                z = false;
                log.warn("Flush on segment {} failed due to {}, it will be retried.", segmentOutputStream.getSegmentName(), e.getMessage());
            }
        }
        return z;
    }

    @Override // io.pravega.client.stream.EventStreamWriter, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.pinger.close();
        synchronized (this.writeFlushLock) {
            boolean z = false;
            while (!z) {
                z = true;
                Iterator<SegmentOutputStream> it = this.selector.getWriters().iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (SegmentSealedException e) {
                        z = false;
                        log.warn("Close failed due to {}, it will be retried.", e.getMessage());
                    }
                }
            }
        }
        ExecutorServiceHelpers.shutdown(this.retransmitPool);
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public EventWriterConfig getConfig() {
        return this.config;
    }

    @SuppressFBWarnings(justification = "generated code")
    public String toString() {
        return "EventStreamWriterImpl(stream=" + this.stream + ", closed=" + this.closed + ")";
    }

    static {
        $assertionsDisabled = !EventStreamWriterImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger((Class<?>) EventStreamWriterImpl.class);
    }
}
