package com.amazonaws.services.kinesis.connectors.redshift;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.class */
public class RedshiftManifestEmitter implements IEmitter<String> {
    private static final Log LOG = LogFactory.getLog(RedshiftManifestEmitter.class);
    private final String s3Bucket;
    private final String dataTable;
    private final String fileTable;
    private final String fileKeyColumn;
    private final char dataDelimiter;
    private final AWSCredentialsProvider credentialsProvider;
    private final String s3Endpoint;
    private final AmazonS3Client s3Client;
    private final boolean copyMandatory;
    private final Properties loginProps;
    private final String redshiftURL;
    private static final String MANIFEST_PREFIX = "manifests/";

    public RedshiftManifestEmitter(KinesisConnectorConfiguration kinesisConnectorConfiguration) {
        this.dataTable = kinesisConnectorConfiguration.REDSHIFT_DATA_TABLE;
        this.fileTable = kinesisConnectorConfiguration.REDSHIFT_FILE_TABLE;
        this.fileKeyColumn = kinesisConnectorConfiguration.REDSHIFT_FILE_KEY_COLUMN;
        this.dataDelimiter = kinesisConnectorConfiguration.REDSHIFT_DATA_DELIMITER.charValue();
        this.copyMandatory = kinesisConnectorConfiguration.REDSHIFT_COPY_MANDATORY;
        this.s3Bucket = kinesisConnectorConfiguration.S3_BUCKET;
        this.s3Endpoint = kinesisConnectorConfiguration.S3_ENDPOINT;
        this.s3Client = new AmazonS3Client(kinesisConnectorConfiguration.AWS_CREDENTIALS_PROVIDER);
        if (this.s3Endpoint != null) {
            this.s3Client.setEndpoint(this.s3Endpoint);
        }
        this.credentialsProvider = kinesisConnectorConfiguration.AWS_CREDENTIALS_PROVIDER;
        this.loginProps = new Properties();
        this.loginProps.setProperty("user", kinesisConnectorConfiguration.REDSHIFT_USERNAME);
        this.loginProps.setProperty("password", kinesisConnectorConfiguration.REDSHIFT_PASSWORD);
        this.redshiftURL = kinesisConnectorConfiguration.REDSHIFT_URL;
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public List<String> emit(UnmodifiableBuffer<String> unmodifiableBuffer) throws IOException {
        List<String> records = unmodifiableBuffer.getRecords();
        String manifestFile = getManifestFile(records);
        try {
            try {
                Connection connection = DriverManager.getConnection(this.redshiftURL, this.loginProps);
                connection.setAutoCommit(false);
                List<String> checkForExistingFiles = checkForExistingFiles(connection, records);
                if (checkForExistingFiles.isEmpty()) {
                    LOG.info("All the files in this set were already copied to Redshift.");
                    rollbackAndCloseConnection(connection);
                    return Collections.emptyList();
                }
                if (checkForExistingFiles.size() != records.size()) {
                    manifestFile = getManifestFile(checkForExistingFiles);
                }
                try {
                    writeManifestToS3(manifestFile, checkForExistingFiles);
                    LOG.info("Inserting " + checkForExistingFiles.size() + " rows into the files table.");
                    insertRecords(connection, checkForExistingFiles);
                    LOG.info("Initiating Amazon Redshift manifest copy of " + checkForExistingFiles.size() + " files.");
                    redshiftCopy(connection, manifestFile);
                    connection.commit();
                    LOG.info("Successful Amazon Redshift manifest copy of " + getNumberOfCopiedRecords(connection) + " records from " + checkForExistingFiles.size() + " files using manifest s3://" + this.s3Bucket + "/" + getManifestFile(records));
                    closeConnection(connection);
                    return Collections.emptyList();
                } catch (Exception e) {
                    LOG.error("Error writing file " + manifestFile + " to S3. Failing this emit attempt.", e);
                    return unmodifiableBuffer.getRecords();
                }
            } catch (IOException | SQLException e2) {
                LOG.error("Error copying data from manifest file " + manifestFile + " into Amazon Redshift. Failing this emit attempt.", e2);
                rollbackAndCloseConnection(null);
                return unmodifiableBuffer.getRecords();
            }
        } catch (Exception e3) {
            LOG.error("Error copying data from manifest file " + manifestFile + " into Redshift. Failing this emit attempt.", e3);
            rollbackAndCloseConnection(null);
            return unmodifiableBuffer.getRecords();
        }
    }

    private void rollbackAndCloseConnection(Connection connection) {
        if (connection != null) {
            try {
                if (!connection.isClosed()) {
                    connection.rollback();
                }
            } catch (Exception e) {
                LOG.error("Unable to rollback Amazon Redshift transaction.", e);
            }
        }
        closeConnection(connection);
    }

    private void closeConnection(Connection connection) {
        if (connection != null) {
            try {
                if (!connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e) {
                LOG.error("Unable to close Amazon Redshift connection.", e);
            }
        }
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public void fail(List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            LOG.error("Record failed: " + it.next());
        }
    }

    private String writeManifestToS3(String str, List<String> list) throws IOException {
        this.s3Client.putObject(new PutObjectRequest(this.s3Bucket, str, new ByteArrayInputStream(generateManifestFile(list).getBytes()), (ObjectMetadata) null));
        return str;
    }

    private void insertRecords(Connection connection, Collection<String> collection) throws IOException {
        executeStatement(connection, "INSERT INTO " + this.fileTable + " VALUES " + getCollectionString(collection, "(", "),(", ")") + ";");
    }

    private List<String> checkForExistingFiles(Connection connection, List<String> list) throws IOException {
        TreeSet treeSet = new TreeSet(list);
        String collectionString = getCollectionString(treeSet, "(", ",", ")");
        StringBuilder sb = new StringBuilder();
        sb.append("SELECT " + this.fileKeyColumn + " FROM ");
        sb.append(this.fileTable);
        sb.append(" WHERE ");
        sb.append(this.fileKeyColumn);
        sb.append(" IN ");
        sb.append(collectionString);
        sb.append(";");
        Statement statement = null;
        ResultSet resultSet = null;
        try {
            statement = connection.createStatement();
            resultSet = statement.executeQuery(sb.toString());
            while (resultSet.next()) {
                String string = resultSet.getString(1);
                LOG.info("File " + string + " has already been copied. Leaving it out.");
                treeSet.remove(string);
            }
            resultSet.close();
            statement.close();
            return new ArrayList(treeSet);
        } catch (SQLException e) {
            try {
                resultSet.close();
            } catch (Exception e2) {
            }
            try {
                statement.close();
            } catch (Exception e3) {
            }
            throw new IOException(e);
        }
    }

    private int getNumberOfCopiedRecords(Connection connection) throws IOException {
        Statement statement = null;
        ResultSet resultSet = null;
        try {
            statement = connection.createStatement();
            resultSet = statement.executeQuery("select pg_last_copy_count();");
            resultSet.next();
            int i = resultSet.getInt(1);
            resultSet.close();
            statement.close();
            return i;
        } catch (SQLException e) {
            try {
                resultSet.close();
            } catch (Exception e2) {
            }
            try {
                statement.close();
            } catch (Exception e3) {
            }
            throw new IOException(e);
        }
    }

    protected void redshiftCopy(Connection connection, String str) throws IOException {
        AWSSessionCredentials credentials = this.credentialsProvider.getCredentials();
        StringBuilder sb = new StringBuilder();
        sb.append("COPY " + this.dataTable + " ");
        sb.append("FROM 's3://" + this.s3Bucket + "/" + str + "' ");
        sb.append("CREDENTIALS '");
        sb.append("aws_access_key_id=" + credentials.getAWSAccessKeyId());
        sb.append(";");
        sb.append("aws_secret_access_key=" + credentials.getAWSSecretKey());
        if (credentials instanceof AWSSessionCredentials) {
            sb.append(";");
            sb.append("token=" + credentials.getSessionToken());
        }
        sb.append("' ");
        sb.append("DELIMITER '" + this.dataDelimiter + "' ");
        sb.append("MANIFEST");
        sb.append(";");
        executeStatement(connection, sb.toString());
    }

    private void executeStatement(Connection connection, String str) throws IOException {
        try {
            Statement createStatement = connection.createStatement();
            createStatement.execute(str);
            createStatement.close();
        } catch (SQLException e) {
            LOG.error("Amazon S3 endpoint set to: " + this.s3Endpoint);
            LOG.error("Error executing statement: " + str, e);
            throw new IOException(e);
        }
    }

    private String getCollectionString(Collection<String> collection, String str, String str2, String str3) {
        StringBuilder sb = new StringBuilder();
        sb.append(str);
        for (String str4 : collection) {
            sb.append("'");
            sb.append(str4);
            sb.append("'");
            sb.append(str2);
        }
        sb.replace(sb.length() - str2.length(), sb.length(), KinesisConnectorConfiguration.DEFAULT_ELASTICSEARCH_CLOUDFORMATION_KEY_PAIR_NAME);
        sb.append(str3);
        return sb.toString();
    }

    private String getManifestFile(List<String> list) {
        return MANIFEST_PREFIX + list.get(0) + "-" + list.get(list.size() - 1);
    }

    private String generateManifestFile(List<String> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("{\n");
        sb.append("\t\"entries\": [\n");
        for (String str : list) {
            sb.append("\t\t{");
            sb.append("\"url\":\"s3://");
            sb.append(this.s3Bucket);
            sb.append("/");
            sb.append(str);
            sb.append("\"");
            sb.append(",");
            sb.append("\"mandatory\":" + Boolean.toString(this.copyMandatory));
            sb.append("},\n");
        }
        sb.replace(sb.length() - 2, sb.length() - 1, KinesisConnectorConfiguration.DEFAULT_ELASTICSEARCH_CLOUDFORMATION_KEY_PAIR_NAME);
        sb.append("\t]\n");
        sb.append("}\n");
        return sb.toString();
    }

    @Override // com.amazonaws.services.kinesis.connectors.interfaces.IEmitter
    public void shutdown() {
        this.s3Client.shutdown();
    }
}
