package org.apache.flink.connector.jdbc.internal;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.class */
public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExecutor<JdbcIn>> extends AbstractJdbcOutputFormat<In> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JdbcBatchingOutputFormat.class);
    private final JdbcExecutionOptions executionOptions;
    private final StatementExecutorFactory<JdbcExec> statementExecutorFactory;
    private final RecordExtractor<In, JdbcIn> jdbcRecordExtractor;
    private transient JdbcExec jdbcStatementExecutor;
    private transient int batchCount;
    private volatile transient boolean closed;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;

    /* loaded from: input_file:org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$Builder.class */
    public static class Builder {
        private JdbcOptions options;
        private String[] fieldNames;
        private String[] keyFields;
        private int[] fieldTypes;
        private JdbcExecutionOptions.Builder executionOptionsBuilder = JdbcExecutionOptions.builder();

        public Builder setOptions(JdbcOptions jdbcOptions) {
            this.options = jdbcOptions;
            return this;
        }

        public Builder setFieldNames(String[] strArr) {
            this.fieldNames = strArr;
            return this;
        }

        public Builder setKeyFields(String[] strArr) {
            this.keyFields = strArr;
            return this;
        }

        public Builder setFieldTypes(int[] iArr) {
            this.fieldTypes = iArr;
            return this;
        }

        public Builder setFlushMaxSize(int i) {
            this.executionOptionsBuilder.withBatchSize(i);
            return this;
        }

        public Builder setFlushIntervalMills(long j) {
            this.executionOptionsBuilder.withBatchIntervalMs(j);
            return this;
        }

        public Builder setMaxRetryTimes(int i) {
            this.executionOptionsBuilder.withMaxRetries(i);
            return this;
        }

        public JdbcBatchingOutputFormat<Tuple2<Boolean, Row>, Row, JdbcBatchStatementExecutor<Row>> build() {
            Preconditions.checkNotNull(this.options, "No options supplied.");
            Preconditions.checkNotNull(this.fieldNames, "No fieldNames supplied.");
            JdbcDmlOptions build = JdbcDmlOptions.builder().withTableName(this.options.getTableName()).withDialect(this.options.getDialect()).withFieldNames(this.fieldNames).withKeyFields(this.keyFields).withFieldTypes(this.fieldTypes).build();
            if (build.getKeyFields().isPresent() && build.getKeyFields().get().length > 0) {
                return new TableJdbcUpsertOutputFormat(new SimpleJdbcConnectionProvider(this.options), build, this.executionOptionsBuilder.build());
            }
            String parseNamedStatement = FieldNamedPreparedStatementImpl.parseNamedStatement(this.options.getDialect().getInsertIntoStatement(build.getTableName(), build.getFieldNames()), new HashMap());
            return new JdbcBatchingOutputFormat<>(new SimpleJdbcConnectionProvider(this.options), this.executionOptionsBuilder.build(), runtimeContext -> {
                return JdbcBatchingOutputFormat.createSimpleRowExecutor(parseNamedStatement, build.getFieldTypes(), runtimeContext.getExecutionConfig().isObjectReuseEnabled());
            }, tuple2 -> {
                Preconditions.checkArgument(((Boolean) tuple2.f0).booleanValue());
                return (Row) tuple2.f1;
            });
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1483061429:
                    if (implMethodName.equals("lambda$build$19d3c953$1")) {
                        z = false;
                        break;
                    }
                    break;
                case 40926167:
                    if (implMethodName.equals("lambda$build$3957502f$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$StatementExecutorFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$Builder") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/connector/jdbc/internal/options/JdbcDmlOptions;Lorg/apache/flink/api/common/functions/RuntimeContext;)Lorg/apache/flink/connector/jdbc/internal/executor/JdbcBatchStatementExecutor;")) {
                        String str = (String) serializedLambda.getCapturedArg(0);
                        JdbcDmlOptions jdbcDmlOptions = (JdbcDmlOptions) serializedLambda.getCapturedArg(1);
                        return runtimeContext -> {
                            return JdbcBatchingOutputFormat.createSimpleRowExecutor(str, jdbcDmlOptions.getFieldTypes(), runtimeContext.getExecutionConfig().isObjectReuseEnabled());
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$RecordExtractor") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$Builder") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/flink/types/Row;")) {
                        return tuple2 -> {
                            Preconditions.checkArgument(((Boolean) tuple2.f0).booleanValue());
                            return (Row) tuple2.f1;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$RecordExtractor.class */
    public interface RecordExtractor<F, T> extends Function<F, T>, Serializable {
        static <T> RecordExtractor<T, T> identity() {
            return obj -> {
                return obj;
            };
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1492331246:
                    if (implMethodName.equals("lambda$identity$f334d097$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$RecordExtractor") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$RecordExtractor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                        return obj -> {
                            return obj;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat$StatementExecutorFactory.class */
    public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>> extends Function<RuntimeContext, T>, Serializable {
    }

    public JdbcBatchingOutputFormat(@Nonnull JdbcConnectionProvider jdbcConnectionProvider, @Nonnull JdbcExecutionOptions jdbcExecutionOptions, @Nonnull StatementExecutorFactory<JdbcExec> statementExecutorFactory, @Nonnull RecordExtractor<In, JdbcIn> recordExtractor) {
        super(jdbcConnectionProvider);
        this.batchCount = 0;
        this.closed = false;
        this.executionOptions = (JdbcExecutionOptions) Preconditions.checkNotNull(jdbcExecutionOptions);
        this.statementExecutorFactory = (StatementExecutorFactory) Preconditions.checkNotNull(statementExecutorFactory);
        this.jdbcRecordExtractor = (RecordExtractor) Preconditions.checkNotNull(recordExtractor);
    }

    @Override // org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat, org.apache.flink.api.common.io.OutputFormat
    public void open(int i, int i2) throws IOException {
        super.open(i, i2);
        this.jdbcStatementExecutor = createAndOpenStatementExecutor(this.statementExecutorFactory);
        if (this.executionOptions.getBatchIntervalMs() == 0 || this.executionOptions.getBatchSize() == 1) {
            return;
        }
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        flush();
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.executionOptions.getBatchIntervalMs(), this.executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
    }

    private JdbcExec createAndOpenStatementExecutor(StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException {
        JdbcExec jdbcexec = (JdbcExec) statementExecutorFactory.apply(getRuntimeContext());
        try {
            jdbcexec.prepareStatements(this.connectionProvider.getConnection());
            return jdbcexec;
        } catch (SQLException e) {
            throw new IOException("unable to open JDBC writer", e);
        }
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to JDBC failed.", this.flushException);
        }
    }

    @Override // org.apache.flink.api.common.io.OutputFormat
    public final synchronized void writeRecord(In in) throws IOException {
        checkFlushException();
        try {
            addToBatch(in, this.jdbcRecordExtractor.apply(in));
            this.batchCount++;
            if (this.executionOptions.getBatchSize() > 0 && this.batchCount >= this.executionOptions.getBatchSize()) {
                flush();
            }
        } catch (Exception e) {
            throw new IOException("Writing records to JDBC failed.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addToBatch(In in, JdbcIn jdbcin) throws SQLException {
        this.jdbcStatementExecutor.addToBatch(jdbcin);
    }

    @Override // org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat, java.io.Flushable
    public synchronized void flush() throws IOException {
        checkFlushException();
        for (int i = 0; i <= this.executionOptions.getMaxRetries(); i++) {
            try {
                attemptFlush();
                this.batchCount = 0;
                return;
            } catch (SQLException e) {
                LOG.error("JDBC executeBatch error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.executionOptions.getMaxRetries()) {
                    throw new IOException(e);
                }
                try {
                    if (!this.connectionProvider.isConnectionValid()) {
                        updateExecutor(true);
                    }
                    try {
                        Thread.sleep(1000 * i);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        throw new IOException("unable to flush; interrupted while doing another attempt", e);
                    }
                } catch (Exception e3) {
                    LOG.error("JDBC connection is not valid, and reestablish connection failed.", (Throwable) e3);
                    throw new IOException("Reestablish JDBC connection failed", e3);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void attemptFlush() throws SQLException {
        this.jdbcStatementExecutor.executeBatch();
    }

    @Override // org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat, org.apache.flink.api.common.io.OutputFormat
    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.batchCount > 0) {
                try {
                    flush();
                } catch (Exception e) {
                    LOG.warn("Writing records to JDBC failed.", (Throwable) e);
                    throw new RuntimeException("Writing records to JDBC failed.", e);
                }
            }
            try {
                if (this.jdbcStatementExecutor != null) {
                    this.jdbcStatementExecutor.closeStatements();
                }
            } catch (SQLException e2) {
                LOG.warn("Close JDBC writer failed.", (Throwable) e2);
            }
        }
        super.close();
        checkFlushException();
    }

    public static Builder builder() {
        return new Builder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static JdbcBatchStatementExecutor<Row> createSimpleRowExecutor(String str, int[] iArr, boolean z) {
        return JdbcBatchStatementExecutor.simple(str, createRowJdbcStatementBuilder(iArr), z ? Row::copy : Function.identity());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static JdbcStatementBuilder<Row> createRowJdbcStatementBuilder(int[] iArr) {
        return (preparedStatement, row) -> {
            JdbcUtils.setRecordToStatement(preparedStatement, iArr, row);
        };
    }

    public void updateExecutor(boolean z) throws SQLException, ClassNotFoundException {
        this.jdbcStatementExecutor.closeStatements();
        this.jdbcStatementExecutor.prepareStatements(z ? this.connectionProvider.reestablishConnection() : this.connectionProvider.getConnection());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -548063870:
                if (implMethodName.equals("lambda$createRowJdbcStatementBuilder$cae3d881$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/jdbc/JdbcStatementBuilder") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat") && serializedLambda.getImplMethodSignature().equals("([ILjava/sql/PreparedStatement;Lorg/apache/flink/types/Row;)V")) {
                    int[] iArr = (int[]) serializedLambda.getCapturedArg(0);
                    return (preparedStatement, row) -> {
                        JdbcUtils.setRecordToStatement(preparedStatement, iArr, row);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
