package solutions.a2.cdc.oracle;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.utils.ExceptionUtils;
import solutions.a2.cdc.oracle.utils.Version;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraCdcJdbcSinkTask.class */
public class OraCdcJdbcSinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(OraCdcJdbcSinkTask.class);
    private OraCdcJdbcSinkConnectorConfig config;
    private int schemaType;
    private OraCdcJdbcSinkConnectionPool sinkPool;
    private final Map<String, OraTable4SinkConnector> tablesInProcessing = new HashMap();
    private int batchSize = 1000;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting oracdc '{}' Sink Task", map.get("name"));
        this.config = new OraCdcJdbcSinkConnectorConfig(map);
        try {
            LOGGER.debug("BEGIN: Hikari Connection Pool initialization.");
            this.sinkPool = new OraCdcJdbcSinkConnectionPool(map.get("name"), this.config.getString(ParamConstants.CONNECTION_URL_PARAM), this.config.getString(ParamConstants.CONNECTION_USER_PARAM), this.config.getPassword(ParamConstants.CONNECTION_PASSWORD_PARAM).value());
            LOGGER.debug("END: Hikari Connection Pool initialization.");
            this.batchSize = this.config.getInt(ParamConstants.BATCH_SIZE_PARAM).intValue();
            LOGGER.debug("batchSize = {} records.", Integer.valueOf(this.batchSize));
            String str = map.get(ParamConstants.SCHEMA_TYPE_PARAM);
            LOGGER.debug("a2.schema.type set to {}.", str);
            if (ParamConstants.SCHEMA_TYPE_DEBEZIUM.equals(str)) {
                this.schemaType = 1;
            } else {
                this.schemaType = 2;
            }
        } catch (SQLException e) {
            LOGGER.error("Unable to connect to {}", this.config.getString(ParamConstants.CONNECTION_URL_PARAM));
            LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            throw new ConnectException("Unable to start oracdc Sink Connector Task.");
        }
    }

    public void put(Collection<SinkRecord> collection) {
        String str;
        LOGGER.debug("BEGIN: put()");
        HashSet<String> hashSet = new HashSet();
        Throwable th = null;
        try {
            try {
                Connection connection = this.sinkPool.getConnection();
                try {
                    int i = 0;
                    HashMap hashMap = new HashMap();
                    for (SinkRecord sinkRecord : collection) {
                        if (this.schemaType == 2 || this.schemaType == 3) {
                            str = sinkRecord.topic();
                            LOGGER.debug("Table name from Kafka topic = {}.", str);
                        } else {
                            str = ((Struct) sinkRecord.value()).getStruct("source").getString("table");
                            LOGGER.debug("Table name from 'source' field = {}.", str);
                        }
                        OraTable4SinkConnector oraTable4SinkConnector = this.tablesInProcessing.get(str);
                        if (oraTable4SinkConnector == null) {
                            LOGGER.debug("Create new table definition for {} and add it to processing map,", str);
                            oraTable4SinkConnector = new OraTable4SinkConnector(this.sinkPool, str, sinkRecord, this.schemaType, this.config);
                            this.tablesInProcessing.put(str, oraTable4SinkConnector);
                        }
                        if (!hashSet.contains(str)) {
                            LOGGER.debug("Adding {} to current batch set.", str);
                            hashSet.add(str);
                        }
                        if (oraTable4SinkConnector.duplicatedKeyInBatch(sinkRecord)) {
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("Executing batch due to duplicate key for table {} .", oraTable4SinkConnector.getTableName());
                            }
                            for (String str2 : hashSet) {
                                LOGGER.debug("Executing batch for table {}.", str2);
                                if (StringUtils.equals(str2, str)) {
                                    oraTable4SinkConnector.execAndCloseCursors();
                                } else {
                                    this.tablesInProcessing.get(str2).exec();
                                }
                            }
                            flush(hashMap);
                            connection.commit();
                            hashMap.clear();
                            i = 0;
                        }
                        oraTable4SinkConnector.putData(connection, sinkRecord);
                        hashMap.put(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue()), new OffsetAndMetadata(sinkRecord.kafkaOffset()));
                        i++;
                        if (i == this.batchSize) {
                            for (String str3 : hashSet) {
                                LOGGER.debug("Executing batch for table {}.", str3);
                                this.tablesInProcessing.get(str3).exec();
                            }
                            flush(hashMap);
                            connection.commit();
                            hashMap.clear();
                            i = 0;
                        }
                    }
                    LOGGER.debug("Execute and close cursors");
                    for (String str4 : hashSet) {
                        LOGGER.debug("Last batch execution and statements closing for table {}.", str4);
                        this.tablesInProcessing.get(str4).execAndCloseCursors();
                    }
                    connection.commit();
                    if (connection != null) {
                        connection.close();
                    }
                    LOGGER.debug("END: put()");
                } catch (Throwable th2) {
                    if (connection != null) {
                        connection.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (0 == 0) {
                    th = th3;
                } else if (null != th3) {
                    th.addSuppressed(th3);
                }
                throw th;
            }
        } catch (SQLException e) {
            LOGGER.error("Error '{}' when put to target system, SQL errorCode = {}, SQL state = '{}'", new Object[]{e.getMessage(), Integer.valueOf(e.getErrorCode()), e.getSQLState()});
            LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            throw new ConnectException(e);
        }
    }

    public void stop() {
        this.sinkPool = null;
    }
}
