package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.class */
public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> implements CheckpointedFunction {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected transient Cluster cluster;
    protected transient Session session;
    private AtomicReference<Throwable> throwable;
    private FutureCallback<V> callback;
    private Semaphore semaphore;
    private final ClusterBuilder builder;
    private final CassandraSinkBaseConfig config;
    private final CassandraFailureHandler failureHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CassandraSinkBase(ClusterBuilder clusterBuilder, CassandraSinkBaseConfig cassandraSinkBaseConfig, CassandraFailureHandler cassandraFailureHandler) {
        this.builder = clusterBuilder;
        this.config = cassandraSinkBaseConfig;
        this.failureHandler = (CassandraFailureHandler) Preconditions.checkNotNull(cassandraFailureHandler);
        ClosureCleaner.clean(clusterBuilder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
    }

    public void open(Configuration configuration) {
        this.callback = new FutureCallback<V>() { // from class: org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.1
            public void onSuccess(V v) {
                CassandraSinkBase.this.semaphore.release();
            }

            public void onFailure(Throwable th) {
                CassandraSinkBase.this.throwable.compareAndSet(null, th);
                CassandraSinkBase.this.log.error("Error while sending value.", th);
                CassandraSinkBase.this.semaphore.release();
            }
        };
        this.cluster = this.builder.getCluster();
        this.session = createSession();
        this.throwable = new AtomicReference<>();
        this.semaphore = new Semaphore(this.config.getMaxConcurrentRequests());
    }

    public void close() throws Exception {
        try {
            checkAsyncErrors();
            flush();
            checkAsyncErrors();
            try {
                if (this.session != null) {
                    this.session.close();
                }
            } catch (Exception e) {
                this.log.error("Error while closing session.", e);
            }
            try {
                if (this.cluster != null) {
                    this.cluster.close();
                }
            } catch (Exception e2) {
                this.log.error("Error while closing cluster.", e2);
            }
        } catch (Throwable th) {
            try {
                if (this.session != null) {
                    this.session.close();
                }
            } catch (Exception e3) {
                this.log.error("Error while closing session.", e3);
            }
            try {
                if (this.cluster != null) {
                    this.cluster.close();
                }
            } catch (Exception e4) {
                this.log.error("Error while closing cluster.", e4);
            }
            throw th;
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkAsyncErrors();
        flush();
        checkAsyncErrors();
    }

    public void invoke(IN in) throws Exception {
        checkAsyncErrors();
        tryAcquire(1);
        try {
            Futures.addCallback(send(in), this.callback, MoreExecutors.directExecutor());
        } catch (Throwable th) {
            this.semaphore.release();
            throw th;
        }
    }

    protected Session createSession() {
        return this.cluster.connect();
    }

    public abstract ListenableFuture<V> send(IN in);

    private void tryAcquire(int i) throws InterruptedException, TimeoutException {
        if (!this.semaphore.tryAcquire(i, this.config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(String.format("Failed to acquire %d out of %d permits to send value in %s.", Integer.valueOf(i), Integer.valueOf(this.config.getMaxConcurrentRequests()), this.config.getMaxConcurrentRequestsTimeout()));
        }
    }

    private void checkAsyncErrors() throws Exception {
        Throwable andSet = this.throwable.getAndSet(null);
        if (andSet != null) {
            this.failureHandler.onFailure(andSet);
        }
    }

    private void flush() throws InterruptedException, TimeoutException {
        tryAcquire(this.config.getMaxConcurrentRequests());
        this.semaphore.release(this.config.getMaxConcurrentRequests());
    }

    @VisibleForTesting
    int getAvailablePermits() {
        return this.semaphore.availablePermits();
    }

    @VisibleForTesting
    int getAcquiredPermits() {
        return this.config.getMaxConcurrentRequests() - this.semaphore.availablePermits();
    }
}
