package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.class */
public final class PipelineDataSourceSink implements PipelineSink {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PipelineDataSourceSink.class);
    private final ImporterConfiguration importerConfig;
    private final DataSource dataSource;
    private final PipelineImportSQLBuilder importSQLBuilder;
    private final DataRecordGroupEngine groupEngine = new DataRecordGroupEngine();
    private final AtomicReference<PreparedStatement> runningStatement = new AtomicReference<>();

    public PipelineDataSourceSink(ImporterConfiguration importerConfiguration, PipelineDataSourceManager pipelineDataSourceManager) {
        this.importerConfig = importerConfiguration;
        this.dataSource = pipelineDataSourceManager.getDataSource(importerConfiguration.getDataSourceConfig());
        this.importSQLBuilder = new PipelineImportSQLBuilder(importerConfiguration.getDataSourceConfig().getDatabaseType());
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink
    public PipelineJobProgressUpdatedParameter write(String str, Collection<Record> collection) {
        Stream<Record> stream = collection.stream();
        Class<DataRecord> cls = DataRecord.class;
        Objects.requireNonNull(DataRecord.class);
        Stream<Record> filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<DataRecord> cls2 = DataRecord.class;
        Objects.requireNonNull(DataRecord.class);
        List<DataRecord> list = (List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return new PipelineJobProgressUpdatedParameter(0);
        }
        for (GroupedDataRecord groupedDataRecord : this.groupEngine.group(list)) {
            batchWrite(groupedDataRecord.getDeleteDataRecords());
            batchWrite(groupedDataRecord.getInsertDataRecords());
            batchWrite(groupedDataRecord.getUpdateDataRecords());
        }
        return new PipelineJobProgressUpdatedParameter((int) list.stream().filter(dataRecord -> {
            return PipelineSQLOperationType.INSERT == dataRecord.getType();
        }).count());
    }

    private void batchWrite(Collection<DataRecord> collection) {
        try {
            if (collection.isEmpty()) {
                return;
            }
            int i = 0;
            while (!Thread.interrupted() && i <= this.importerConfig.getRetryTimes()) {
                try {
                    doWrite(collection, 0 == i);
                    break;
                } catch (SQLException e) {
                    log.error("Flush failed {}/{} times.", new Object[]{Integer.valueOf(i), Integer.valueOf(this.importerConfig.getRetryTimes()), e});
                    if (i == this.importerConfig.getRetryTimes()) {
                        throw new PipelineImporterJobWriteException(e);
                    }
                    Thread.sleep(Math.min(300000L, 1000 << i));
                    i++;
                }
            }
        } catch (InterruptedException e2) {
            throw e2;
        }
    }

    private void doWrite(Collection<DataRecord> collection, boolean z) throws SQLException {
        switch (collection.iterator().next().getType()) {
            case INSERT:
                Optional.ofNullable(this.importerConfig.getRateLimitAlgorithm()).ifPresent(jobRateLimitAlgorithm -> {
                    jobRateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
                });
                executeBatchInsert(collection, z);
                return;
            case UPDATE:
                Optional.ofNullable(this.importerConfig.getRateLimitAlgorithm()).ifPresent(jobRateLimitAlgorithm2 -> {
                    jobRateLimitAlgorithm2.intercept(PipelineSQLOperationType.UPDATE, 1);
                });
                executeUpdate(collection, z);
                return;
            case DELETE:
                Optional.ofNullable(this.importerConfig.getRateLimitAlgorithm()).ifPresent(jobRateLimitAlgorithm3 -> {
                    jobRateLimitAlgorithm3.intercept(PipelineSQLOperationType.DELETE, 1);
                });
                executeBatchDelete(collection);
                return;
            default:
                return;
        }
    }

    private void executeBatchInsert(Collection<DataRecord> collection, boolean z) throws SQLException {
        DataRecord next = collection.iterator().next();
        String buildInsertSQL = this.importSQLBuilder.buildInsertSQL(this.importerConfig.findSchemaName(next.getTableName()).orElse(null), next);
        try {
            Connection connection = this.dataSource.getConnection();
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(buildInsertSQL);
                try {
                    this.runningStatement.set(prepareStatement);
                    if (z) {
                        executeBatchInsertFirstTime(connection, prepareStatement, collection);
                    } else {
                        retryBatchInsert(prepareStatement, collection);
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Throwable th) {
                    if (prepareStatement != null) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            this.runningStatement.set(null);
        }
    }

    private void executeBatchInsertFirstTime(Connection connection, PreparedStatement preparedStatement, Collection<DataRecord> collection) throws SQLException {
        boolean z = collection.size() > 1;
        if (z) {
            connection.setAutoCommit(false);
        }
        preparedStatement.setQueryTimeout(30);
        for (DataRecord dataRecord : collection) {
            for (int i = 0; i < dataRecord.getColumnCount(); i++) {
                preparedStatement.setObject(i + 1, dataRecord.getColumn(i).getValue());
            }
            preparedStatement.addBatch();
        }
        preparedStatement.executeBatch();
        if (z) {
            connection.commit();
        }
    }

    private void retryBatchInsert(PreparedStatement preparedStatement, Collection<DataRecord> collection) throws SQLException {
        for (DataRecord dataRecord : collection) {
            for (int i = 0; i < dataRecord.getColumnCount(); i++) {
                preparedStatement.setObject(i + 1, dataRecord.getColumn(i).getValue());
            }
            preparedStatement.executeUpdate();
        }
    }

    private void executeUpdate(Collection<DataRecord> collection, boolean z) throws SQLException {
        Connection connection = this.dataSource.getConnection();
        try {
            boolean z2 = collection.size() > 1 && z;
            if (z2) {
                connection.setAutoCommit(false);
            }
            Iterator<DataRecord> it = collection.iterator();
            while (it.hasNext()) {
                executeUpdate(connection, it.next());
            }
            if (z2) {
                connection.commit();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void executeUpdate(Connection connection, DataRecord dataRecord) throws SQLException {
        Set<String> shardingColumns = this.importerConfig.getShardingColumns(dataRecord.getTableName());
        List<Column> extractConditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
        List list = (List) dataRecord.getColumns().stream().filter((v0) -> {
            return v0.isUpdated();
        }).collect(Collectors.toList());
        String buildUpdateSQL = this.importSQLBuilder.buildUpdateSQL(this.importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null), dataRecord, extractConditionColumns);
        try {
            try {
                PreparedStatement prepareStatement = connection.prepareStatement(buildUpdateSQL);
                try {
                    this.runningStatement.set(prepareStatement);
                    for (int i = 0; i < list.size(); i++) {
                        prepareStatement.setObject(i + 1, ((Column) list.get(i)).getValue());
                    }
                    for (int i2 = 0; i2 < extractConditionColumns.size(); i2++) {
                        Column column = extractConditionColumns.get(i2);
                        if (shardingColumns.contains(column.getName()) && null == column.getOldValue()) {
                            prepareStatement.setObject(list.size() + i2 + 1, column.getValue());
                        } else {
                            prepareStatement.setObject(list.size() + i2 + 1, column.getOldValue());
                        }
                    }
                    int executeUpdate = prepareStatement.executeUpdate();
                    if (1 != executeUpdate) {
                        log.warn("execute update failed, update count: {}, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}", new Object[]{Integer.valueOf(executeUpdate), buildUpdateSQL, list, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(extractConditionColumns)});
                    }
                    if (prepareStatement != null) {
                        prepareStatement.close();
                    }
                } catch (Throwable th) {
                    if (prepareStatement != null) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (SQLException e) {
                log.error("execute update failed, sql: {}, set columns: {}, sharding columns: {}, condition columns: {}, error message: {}, data record: {}", new Object[]{buildUpdateSQL, list, JsonUtils.toJsonString(shardingColumns), JsonUtils.toJsonString(extractConditionColumns), e.getMessage(), dataRecord});
                throw e;
            }
        } finally {
            this.runningStatement.set(null);
        }
    }

    private void executeBatchDelete(Collection<DataRecord> collection) throws SQLException {
        Connection connection = this.dataSource.getConnection();
        try {
            boolean z = collection.size() > 1;
            if (z) {
                connection.setAutoCommit(false);
            }
            executeBatchDelete(connection, collection, this.importerConfig.getShardingColumns(collection.iterator().next().getTableName()));
            if (z) {
                connection.commit();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void executeBatchDelete(Connection connection, Collection<DataRecord> collection, Set<String> set) throws SQLException {
        DataRecord next = collection.iterator().next();
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(this.importSQLBuilder.buildDeleteSQL(this.importerConfig.findSchemaName(next.getTableName()).orElse(null), next, RecordUtils.extractConditionColumns(next, set)));
            try {
                this.runningStatement.set(prepareStatement);
                prepareStatement.setQueryTimeout(30);
                for (DataRecord dataRecord : collection) {
                    List<Column> extractConditionColumns = RecordUtils.extractConditionColumns(dataRecord, this.importerConfig.getShardingColumns(dataRecord.getTableName()));
                    for (int i = 0; i < extractConditionColumns.size(); i++) {
                        Object oldValue = extractConditionColumns.get(i).getOldValue();
                        if (null == oldValue) {
                            log.warn("Record old value is null, record: {}", dataRecord);
                        }
                        prepareStatement.setObject(i + 1, oldValue);
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                if (prepareStatement != null) {
                    prepareStatement.close();
                }
            } finally {
            }
        } finally {
            this.runningStatement.set(null);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Optional.ofNullable(this.runningStatement.get()).ifPresent((v0) -> {
            PipelineJdbcUtils.cancelStatement(v0);
        });
    }
}
