package com.amazonaws.services.kinesisanalytics.flink.connectors.producer.impl;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.FlinkKinesisFirehoseException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.RecordCouldNotBeSentException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.TimeoutExpiredException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.impl.FirehoseProducer.UserRecordResult;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
import com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.ServiceUnavailableException;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayDeque;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/amazonaws/services/kinesisanalytics/flink/connectors/producer/impl/FirehoseProducer.class */
public class FirehoseProducer<O extends UserRecordResult, R extends Record> implements IProducer<O, R> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FirehoseProducer.class);
    private final int maxBufferSize;
    private final long bufferTimeoutInMillis;
    private final long bufferFullWaitTimeoutInMillis;
    private final long bufferTimeoutBetweenFlushes;
    private final int numberOfRetries;
    private final long maxBackOffInMillis;
    private final long baseBackOffInMillis;
    private final long maxOperationTimeoutInMillis;
    private final AmazonKinesisFirehose firehoseClient;
    private final String deliveryStream;
    private final ExecutorService flusher;

    @GuardedBy("this")
    private final Object producerBufferLock = new Object();
    private volatile Queue<Record> producerBuffer;
    private volatile Queue<Record> flusherBuffer;
    private volatile long lastSucceededFlushTimestamp;
    private volatile boolean isDestroyed;
    private volatile boolean syncFlush;
    private volatile boolean isFlusherFailed;

    /* loaded from: input_file:com/amazonaws/services/kinesisanalytics/flink/connectors/producer/impl/FirehoseProducer$UserRecordResult.class */
    public static class UserRecordResult {
        private Throwable exception;
        private boolean successful;

        public Throwable getException() {
            return this.exception;
        }

        public UserRecordResult setException(Throwable th) {
            this.exception = th;
            return this;
        }

        public boolean isSuccessful() {
            return this.successful;
        }

        public UserRecordResult setSuccessful(boolean z) {
            this.successful = z;
            return this;
        }
    }

    public FirehoseProducer(String str, AmazonKinesisFirehose amazonKinesisFirehose, Properties properties) {
        this.firehoseClient = (AmazonKinesisFirehose) Validate.notNull(amazonKinesisFirehose, "Kinesis Firehose client cannot be null", new Object[0]);
        this.deliveryStream = (String) Validate.notBlank(str, "Kinesis Firehose delivery stream cannot be null or empty.", new Object[0]);
        Validate.notNull(properties, "Firehose producer configuration properties cannot be null", new Object[0]);
        this.maxBufferSize = Integer.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_MAX_SIZE, String.valueOf(500))).intValue();
        Validate.isTrue(this.maxBufferSize <= 0 || this.maxBufferSize <= 500, String.format("Buffer size cannot be <= 0 or > %s", 500), new Object[0]);
        this.bufferTimeoutInMillis = Long.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_MAX_TIMEOUT, String.valueOf(ProducerConfigConstants.DEFAULT_MAX_BUFFER_TIMEOUT))).longValue();
        Validate.isTrue(this.bufferTimeoutInMillis >= 0, "Flush timeout should be > 0.", new Object[0]);
        this.numberOfRetries = Integer.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_FLUSH_MAX_NUMBER_OF_RETRIES, String.valueOf(10))).intValue();
        Validate.isTrue(this.numberOfRetries >= 0, "Number of retries cannot be negative.", new Object[0]);
        this.bufferFullWaitTimeoutInMillis = Long.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_FULL_WAIT_TIMEOUT, String.valueOf(100L))).longValue();
        Validate.isTrue(this.bufferFullWaitTimeoutInMillis >= 0, "Buffer full waiting timeout should be > 0.", new Object[0]);
        this.bufferTimeoutBetweenFlushes = Long.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_FLUSH_TIMEOUT, String.valueOf(50L))).longValue();
        Validate.isTrue(this.bufferTimeoutBetweenFlushes >= 0, "Interval between flushes cannot be negative.", new Object[0]);
        this.maxBackOffInMillis = Long.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_MAX_BACKOFF_TIMEOUT, String.valueOf(100L))).longValue();
        Validate.isTrue(this.maxBackOffInMillis >= 0, "Max backoff timeout should be > 0.", new Object[0]);
        this.baseBackOffInMillis = Long.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_BUFFER_BASE_BACKOFF_TIMEOUT, String.valueOf(10L))).longValue();
        Validate.isTrue(this.baseBackOffInMillis >= 0, "Base backoff timeout should be > 0.", new Object[0]);
        this.maxOperationTimeoutInMillis = Long.valueOf(properties.getProperty(ProducerConfigConstants.FIREHOSE_PRODUCER_MAX_OPERATION_TIMEOUT, String.valueOf(ProducerConfigConstants.DEFAULT_MAX_OPERATION_TIMEOUT))).longValue();
        Validate.isTrue(this.maxOperationTimeoutInMillis >= 0, "Max operation timeout should be > 0.", new Object[0]);
        this.producerBuffer = new ArrayDeque(this.maxBufferSize);
        this.flusherBuffer = new ArrayDeque(this.maxBufferSize);
        this.flusher = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(false).setNameFormat("kda-writer-thread-%d").build());
        this.flusher.submit(() -> {
            flushBuffer();
        });
    }

    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public ListenableFuture<O> addUserRecord(R r) throws Exception {
        return addUserRecord((FirehoseProducer<O, R>) r, this.maxOperationTimeoutInMillis);
    }

    /* JADX WARN: Type inference failed for: r0v9, types: [java.lang.Throwable, java.lang.Object] */
    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public ListenableFuture<O> addUserRecord(R r, long j) throws TimeoutExpiredException, InterruptedException {
        Validate.notNull(r, "Record cannot be null.", new Object[0]);
        Validate.isTrue(j > 0, "Operation timeout should be > 0.", new Object[0]);
        long nanos = TimeUnit.MILLISECONDS.toNanos(j);
        synchronized (this.producerBufferLock) {
            long nanoTime = System.nanoTime();
            while (this.producerBuffer.size() >= this.maxBufferSize) {
                if (System.nanoTime() - nanoTime >= nanos) {
                    throw new TimeoutExpiredException("Timeout has expired for the given operation");
                }
                if (this.flusherBuffer.isEmpty()) {
                    this.producerBufferLock.notify();
                }
                this.producerBufferLock.wait(this.bufferFullWaitTimeoutInMillis);
            }
            this.producerBuffer.offer(r);
            if (this.producerBuffer.size() >= this.maxBufferSize && this.flusherBuffer.isEmpty()) {
                this.producerBufferLock.notify();
            }
        }
        UserRecordResult successful = new UserRecordResult().setSuccessful(true);
        SettableFuture create = SettableFuture.create();
        create.set(successful);
        return create;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v16 */
    /* JADX WARN: Type inference failed for: r0v31, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v32, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v34 */
    /* JADX WARN: Type inference failed for: r0v43, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v44, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v47, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v62 */
    /* JADX WARN: Type inference failed for: r0v66, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v71 */
    /* JADX WARN: Type inference failed for: r0v72 */
    /* JADX WARN: Type inference failed for: r0v9, types: [java.lang.Object] */
    private void flushBuffer() {
        this.lastSucceededFlushTimestamp = System.nanoTime();
        long nanos = TimeUnit.MILLISECONDS.toNanos(this.bufferTimeoutInMillis);
        while (true) {
            boolean z = System.nanoTime() - this.lastSucceededFlushTimestamp >= nanos;
            ?? r0 = this.producerBufferLock;
            synchronized (r0) {
                Validate.validState(this.flusherBuffer.isEmpty());
                if (this.isDestroyed) {
                    r0 = r0;
                    return;
                }
                if (!this.syncFlush && this.producerBuffer.size() < this.maxBufferSize) {
                    boolean z2 = z;
                    r0 = z2;
                    if (z2) {
                        int size = this.producerBuffer.size();
                        r0 = size;
                        if (size > 0) {
                        }
                    }
                    try {
                        r0 = this.producerBufferLock;
                        r0.wait(this.bufferTimeoutBetweenFlushes);
                    } catch (InterruptedException e) {
                        LOGGER.info("An interrupted exception has been thrown, while trying to sleep and release the lock during a flush.", e);
                    }
                }
                Queue<Record> queue = this.flusherBuffer;
                this.flusherBuffer = this.producerBuffer;
                this.producerBuffer = queue;
                this.producerBufferLock.notify();
                try {
                    submitBatchWithRetry(this.flusherBuffer);
                    ArrayDeque arrayDeque = new ArrayDeque(this.maxBufferSize);
                    ?? r02 = this.producerBufferLock;
                    synchronized (r02) {
                        r02 = this.flusherBuffer.isEmpty();
                        Validate.validState(r02 == 0);
                        this.flusherBuffer = arrayDeque;
                        if (this.syncFlush) {
                            this.syncFlush = false;
                            this.producerBufferLock.notify();
                        }
                    }
                } catch (Exception e2) {
                    if ((e2 instanceof AmazonKinesisFirehoseException) && ((AmazonKinesisFirehoseException) e2).getStatusCode() == 413) {
                        LOGGER.error(String.valueOf("An error has occurred while trying to send data to Kinesis Firehose.") + "Batch of records too large. Please try to reduce your batch size by passing FIREHOSE_PRODUCER_BUFFER_MAX_SIZE into your configuration.", e2);
                    } else {
                        LOGGER.error("An error has occurred while trying to send data to Kinesis Firehose.", e2);
                    }
                    ?? r03 = this.producerBufferLock;
                    synchronized (r03) {
                        this.isFlusherFailed = true;
                        r03 = r03;
                        throw e2;
                    }
                }
            }
        }
    }

    private void submitBatchWithRetry(Queue<Record> queue) throws AmazonKinesisFirehoseException, RecordCouldNotBeSentException {
        PutRecordBatchResult submitBatch;
        String str = null;
        for (int i = 0; i < this.numberOfRetries; i++) {
            try {
                LOGGER.debug("Trying to flush Buffer of size: {} on attempt: {}", Integer.valueOf(queue.size()), Integer.valueOf(i));
                submitBatch = submitBatch(queue);
            } catch (ServiceUnavailableException e) {
                LOGGER.info("Kinesis Firehose has thrown a recoverable exception.", e);
            } catch (AmazonKinesisFirehoseException e2) {
                throw e2;
            } catch (InterruptedException e3) {
                LOGGER.info("An interrupted exception has been thrown between retry attempts.", e3);
            }
            if (submitBatch.getFailedPutCount() == null || submitBatch.getFailedPutCount().intValue() == 0) {
                this.lastSucceededFlushTimestamp = System.nanoTime();
                LOGGER.debug("Firehose Buffer has been flushed with size: {} on attempt: {}", Integer.valueOf(queue.size()), Integer.valueOf(i));
                return;
            }
            PutRecordBatchResponseEntry orElse = submitBatch.getRequestResponses().stream().filter(putRecordBatchResponseEntry -> {
                return putRecordBatchResponseEntry.getRecordId() == null;
            }).findFirst().orElse(null);
            str = String.format("Number of failed records: %s.", submitBatch.getFailedPutCount());
            if (orElse != null) {
                str = String.format("Last Kinesis Firehose putRecordBath encountered an error and failed trying to put: %s records with error: %s - %s.", submitBatch.getFailedPutCount(), orElse.getErrorCode(), orElse.getErrorMessage());
            }
            LOGGER.warn(str);
            long nextLong = RandomUtils.nextLong(0L, Math.min(this.maxBackOffInMillis, this.baseBackOffInMillis * 2 * i));
            LOGGER.info("Sleeping for: {}ms on attempt: {}", Long.valueOf(nextLong), Integer.valueOf(i));
            Thread.sleep(nextLong);
        }
        throw new RecordCouldNotBeSentException("Exceeded number of attempts! " + str);
    }

    private PutRecordBatchResult submitBatch(Queue<Record> queue) throws AmazonKinesisFirehoseException {
        LOGGER.debug("Sending {} records to Kinesis Firehose on stream: {}", Integer.valueOf(queue.size()), this.deliveryStream);
        try {
            return this.firehoseClient.putRecordBatch(new PutRecordBatchRequest().withDeliveryStreamName(this.deliveryStream).withRecords(queue));
        } catch (AmazonKinesisFirehoseException e) {
            throw e;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public void destroy() throws Exception {
        ?? r0 = this.producerBufferLock;
        synchronized (r0) {
            this.isDestroyed = true;
            this.producerBuffer = null;
            this.producerBufferLock.notify();
            r0 = r0;
            if (this.flusher.isShutdown() || this.flusher.isTerminated()) {
                return;
            }
            LOGGER.info("Shutting down scheduled service.");
            this.flusher.shutdown();
            try {
                LOGGER.info("Awaiting executor service termination...");
                this.flusher.awaitTermination(1L, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                LOGGER.error("Error waiting executor writer termination.", e);
                throw new FlinkKinesisFirehoseException("Error waiting executor writer termination.", e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4, types: [boolean] */
    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public boolean isDestroyed() {
        ?? r0 = this.producerBufferLock;
        synchronized (r0) {
            r0 = this.isDestroyed;
        }
        return r0;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6, types: [int] */
    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public int getOutstandingRecordsCount() {
        ?? r0 = this.producerBufferLock;
        synchronized (r0) {
            r0 = this.producerBuffer.size() + this.flusherBuffer.size();
        }
        return r0;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4, types: [boolean] */
    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public boolean isFlushFailed() {
        ?? r0 = this.producerBufferLock;
        synchronized (r0) {
            r0 = this.isFlusherFailed;
        }
        return r0;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6 */
    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public void flush() {
        ?? r0 = this.producerBufferLock;
        synchronized (r0) {
            this.syncFlush = true;
            this.producerBufferLock.notify();
            r0 = r0;
        }
    }

    @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.producer.IProducer
    public void flushSync() {
        while (getOutstandingRecordsCount() > 0 && !isFlushFailed()) {
            flush();
            try {
                Thread.sleep(500L);
            } catch (InterruptedException unused) {
                LOGGER.warn("An interruption has happened while trying to flush the buffer synchronously.");
                Thread.currentThread().interrupt();
            }
        }
        if (isFlushFailed()) {
            LOGGER.warn("The flusher thread has failed trying to synchronously flush the buffer.");
        }
    }
}
