package com.amazonaws.services.kinesisanalytics.flink.connectors.producer;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.AWSConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.FlinkKinesisFirehoseException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.exception.RecordCouldNotBeSentException;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.impl.FirehoseProducer;
import com.amazonaws.services.kinesisanalytics.flink.connectors.provider.credential.CredentialProvider;
import com.amazonaws.services.kinesisanalytics.flink.connectors.provider.credential.factory.CredentialProviderFactory;
import com.amazonaws.services.kinesisanalytics.flink.connectors.serialization.KinesisFirehoseSerializationSchema;
import com.amazonaws.services.kinesisanalytics.flink.connectors.util.AWSUtil;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.nio.ByteBuffer;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/amazonaws/services/kinesisanalytics/flink/connectors/producer/FlinkKinesisFirehoseProducer.class */
public class FlinkKinesisFirehoseProducer<OUT> extends RichSinkFunction<OUT> implements CheckpointedFunction {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkKinesisFirehoseProducer.class);
    private final KinesisFirehoseSerializationSchema<OUT> schema;
    private final Properties config;
    private final AWSConfigConstants.CredentialProviderType credentialProviderType;
    private final String defaultDeliveryStream;
    private boolean failOnError;
    private volatile transient Throwable lastThrownException;
    private transient CredentialProvider credentialsProvider;
    private transient AmazonKinesisFirehose firehoseClient;
    private transient IProducer<FirehoseProducer.UserRecordResult, Record> firehoseProducer;
    private transient FutureCallback<FirehoseProducer.UserRecordResult> monitorCallback;

    public FlinkKinesisFirehoseProducer(String str, KinesisFirehoseSerializationSchema<OUT> kinesisFirehoseSerializationSchema, Properties properties, AWSConfigConstants.CredentialProviderType credentialProviderType) {
        this.defaultDeliveryStream = (String) Validate.notBlank(str, "Delivery stream cannot be null or empty", new Object[0]);
        this.schema = (KinesisFirehoseSerializationSchema) Validate.notNull(kinesisFirehoseSerializationSchema, "Kinesis serialization schema cannot be null", new Object[0]);
        this.config = (Properties) Validate.notNull(properties, "Configuration properties cannot be null", new Object[0]);
        this.credentialProviderType = (AWSConfigConstants.CredentialProviderType) Validate.notNull(credentialProviderType, "Credential Provider type cannot be null", new Object[0]);
    }

    public FlinkKinesisFirehoseProducer(String str, final SerializationSchema<OUT> serializationSchema, Properties properties, AWSConfigConstants.CredentialProviderType credentialProviderType) {
        this(str, new KinesisFirehoseSerializationSchema<OUT>() { // from class: com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer.1
            @Override // com.amazonaws.services.kinesisanalytics.flink.connectors.serialization.KinesisFirehoseSerializationSchema
            public ByteBuffer serialize(OUT out) {
                return ByteBuffer.wrap(serializationSchema.serialize(out));
            }
        }, properties, credentialProviderType);
    }

    public FlinkKinesisFirehoseProducer(String str, KinesisFirehoseSerializationSchema<OUT> kinesisFirehoseSerializationSchema, Properties properties) {
        this(str, kinesisFirehoseSerializationSchema, properties, getCredentialProviderType(properties));
    }

    public FlinkKinesisFirehoseProducer(String str, SerializationSchema<OUT> serializationSchema, Properties properties) {
        this(str, serializationSchema, properties, getCredentialProviderType(properties));
    }

    @VisibleForTesting
    FlinkKinesisFirehoseProducer(String str, KinesisFirehoseSerializationSchema<OUT> kinesisFirehoseSerializationSchema, Properties properties, AmazonKinesisFirehose amazonKinesisFirehose, IProducer<FirehoseProducer.UserRecordResult, Record> iProducer) {
        this(str, kinesisFirehoseSerializationSchema, properties);
        this.firehoseClient = (AmazonKinesisFirehose) Validate.notNull(amazonKinesisFirehose);
        this.firehoseProducer = (IProducer) Validate.notNull(iProducer);
    }

    public void setFailOnError(boolean z) {
        this.failOnError = z;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.credentialsProvider = CredentialProviderFactory.newCredentialProvider(this.credentialProviderType, this.config);
        LOGGER.info("Credential provider: {}", this.credentialsProvider.getAwsCredentialsProvider().getClass().getName());
        this.firehoseClient = this.firehoseClient != null ? this.firehoseClient : AWSUtil.createKinesisFirehoseClientFromConfiguration(this.config, this.credentialsProvider);
        this.firehoseProducer = this.firehoseProducer != null ? this.firehoseProducer : new FirehoseProducer<>(this.defaultDeliveryStream, this.firehoseClient, this.config);
        LOGGER.info("Started Kinesis Firehose client. Delivering to stream: {}", this.defaultDeliveryStream);
        this.monitorCallback = new FutureCallback<FirehoseProducer.UserRecordResult>() { // from class: com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer.2
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(@Nullable FirehoseProducer.UserRecordResult userRecordResult) {
                if (userRecordResult == null || userRecordResult.isSuccessful()) {
                    return;
                }
                if (!FlinkKinesisFirehoseProducer.this.failOnError || FlinkKinesisFirehoseProducer.this.lastThrownException != null) {
                    FlinkKinesisFirehoseProducer.LOGGER.warn("Record could not be successfully sent.");
                } else {
                    FlinkKinesisFirehoseProducer.this.lastThrownException = new RecordCouldNotBeSentException("Record could not be successfully sent.", userRecordResult.getException());
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                if (FlinkKinesisFirehoseProducer.this.failOnError) {
                    FlinkKinesisFirehoseProducer.this.lastThrownException = th;
                } else {
                    FlinkKinesisFirehoseProducer.LOGGER.warn("An error has occurred trying to write a record.", th);
                }
            }
        };
    }

    public void invoke(OUT out, SinkFunction.Context context) throws Exception {
        Validate.notNull(out);
        ByteBuffer serialize = this.schema.serialize(out);
        Validate.validState((this.firehoseProducer == null || this.firehoseProducer.isDestroyed()) ? false : true, "Firehose producer has been destroyed", new Object[0]);
        Validate.validState(this.firehoseClient != null, "Kinesis Firehose client has been closed", new Object[0]);
        propagateAsyncExceptions();
        Futures.addCallback(this.firehoseProducer.addUserRecord(new Record().withData(serialize)), this.monitorCallback);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        propagateAsyncExceptions();
        LOGGER.debug("Outstanding records before snapshot: {}", Integer.valueOf(this.firehoseProducer.getOutstandingRecordsCount()));
        flushSync();
        LOGGER.debug("Outstanding records after snapshot: {}", Integer.valueOf(this.firehoseProducer.getOutstandingRecordsCount()));
        if (this.firehoseProducer.getOutstandingRecordsCount() > 0) {
            throw new IllegalStateException("An error has occurred trying to flush the buffer synchronously.");
        }
        propagateAsyncExceptions();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void close() throws Exception {
        try {
            try {
                super.close();
                propagateAsyncExceptions();
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
                throw e;
            }
        } finally {
            flushSync();
            this.firehoseProducer.destroy();
            if (this.firehoseClient != null) {
                LOGGER.debug("Shutting down Kinesis Firehose client...");
                this.firehoseClient.shutdown();
            }
        }
    }

    private static AWSConfigConstants.CredentialProviderType getCredentialProviderType(Properties properties) {
        return AWSUtil.containsBasicProperties(properties) ? AWSConfigConstants.CredentialProviderType.BASIC : AWSConfigConstants.CredentialProviderType.AUTO;
    }

    private void propagateAsyncExceptions() throws Exception {
        if (this.lastThrownException == null) {
            return;
        }
        if (this.failOnError) {
            throw new FlinkKinesisFirehoseException("An exception has been thrown while trying to process a record", this.lastThrownException);
        }
        LOGGER.warn("An exception has been thrown while trying to process a record", this.lastThrownException);
        this.lastThrownException = null;
    }

    private void flushSync() {
        while (this.firehoseProducer.getOutstandingRecordsCount() > 0 && !this.firehoseProducer.isFlushFailed()) {
            this.firehoseProducer.flush();
            try {
                LOGGER.debug("Number of outstanding records before going to sleep: {}", Integer.valueOf(this.firehoseProducer.getOutstandingRecordsCount()));
                Thread.sleep(500L);
            } catch (InterruptedException unused) {
                LOGGER.warn("Flushing has been interrupted.");
                return;
            }
        }
    }
}
