package dev.responsive.kafka.internal.db;

import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.relation.Relation;
import com.datastax.oss.driver.api.querybuilder.update.Update;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.utils.SessionClients;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/LwtWriterFactory.class */
public class LwtWriterFactory<K> implements WriterFactory<K> {
    private static final Logger LOG = LoggerFactory.getLogger(LwtWriterFactory.class);
    private final long epoch;
    private final PreparedStatement ensureEpoch;
    private final RemoteTable<K> table;

    public static <K> LwtWriterFactory<K> reserveWindowed(RemoteTable<K> remoteTable, CassandraClient cassandraClient, SubPartitioner subPartitioner, int i) {
        return reserve(remoteTable, cassandraClient, subPartitioner, i, true);
    }

    public static <K> LwtWriterFactory<K> reserve(RemoteTable<K> remoteTable, CassandraClient cassandraClient, SubPartitioner subPartitioner, int i) {
        return reserve(remoteTable, cassandraClient, subPartitioner, i, false);
    }

    private static <K> LwtWriterFactory<K> reserve(RemoteTable<K> remoteTable, CassandraClient cassandraClient, SubPartitioner subPartitioner, int i, boolean z) {
        return reserve(remoteTable, cassandraClient, subPartitioner.all(i).toArray(), i, remoteTable.metadata(subPartitioner.first(i)).epoch + 1, z);
    }

    public static <K> LwtWriterFactory<K> reserve(RemoteTable<K> remoteTable, CassandraClient cassandraClient, int[] iArr, int i, long j, boolean z) {
        for (int i2 : iArr) {
            if (!(z ? reserveEpochWindowed(remoteTable, cassandraClient, i2, j) : reserveEpoch(remoteTable, cassandraClient, i2, j)).wasApplied()) {
                String format = String.format("Could not initialize commit buffer %s[%d] - attempted to claim epoch %d, but was fenced by a writer that claimed epoch %d on sub partition %d", remoteTable.name(), Integer.valueOf(i), Long.valueOf(j), Long.valueOf(remoteTable.metadata(i2).epoch), Integer.valueOf(i2));
                TaskMigratedException taskMigratedException = new TaskMigratedException(format);
                LOG.warn(format, taskMigratedException);
                throw taskMigratedException;
            }
        }
        return new LwtWriterFactory<>(remoteTable, j, z ? ensureEpochWindowed(remoteTable, cassandraClient, j) : ensureEpoch(remoteTable, cassandraClient, j));
    }

    public LwtWriterFactory(RemoteTable<K> remoteTable, long j, PreparedStatement preparedStatement) {
        this.table = remoteTable;
        this.epoch = j;
        this.ensureEpoch = preparedStatement;
    }

    @Override // dev.responsive.kafka.internal.db.WriterFactory
    public RemoteWriter<K> createWriter(SessionClients sessionClients, int i, int i2) {
        return new LwtWriter(sessionClients.cassandraClient(), () -> {
            return this.ensureEpoch.bind(new Object[0]).setInt(ColumnName.PARTITION_KEY.bind(), i);
        }, this.table, i, i2);
    }

    public String toString() {
        return "LwtWriterFactory{epoch=" + this.epoch + "}";
    }

    private static ResultSet reserveEpoch(RemoteTable<?> remoteTable, CassandraClient cassandraClient, int i, long j) {
        return cassandraClient.execute((Statement<?>) ((Update) QueryBuilder.update(remoteTable.name()).setColumn(ColumnName.EPOCH.column(), ColumnName.EPOCH.literal(Long.valueOf(j))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(ColumnName.PARTITION_KEY.literal(Integer.valueOf(i)))).ifColumn(ColumnName.EPOCH.column()).isLessThan(ColumnName.EPOCH.literal(Long.valueOf(j)))).build());
    }

    private static ResultSet reserveEpochWindowed(RemoteTable<?> remoteTable, CassandraClient cassandraClient, int i, long j) {
        return cassandraClient.execute((Statement<?>) ((Update) QueryBuilder.update(remoteTable.name()).setColumn(ColumnName.EPOCH.column(), ColumnName.EPOCH.literal(Long.valueOf(j))).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(ColumnName.PARTITION_KEY.literal(Integer.valueOf(i)))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(0L))).ifColumn(ColumnName.EPOCH.column()).isLessThan(ColumnName.EPOCH.literal(Long.valueOf(j)))).build());
    }

    private static PreparedStatement ensureEpoch(RemoteTable<?> remoteTable, CassandraClient cassandraClient, long j) {
        return cassandraClient.prepare(((Update) QueryBuilder.update(remoteTable.name()).setColumn(ColumnName.EPOCH.column(), ColumnName.EPOCH.literal(Long.valueOf(j))).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).ifColumn(ColumnName.EPOCH.column()).isEqualTo(ColumnName.EPOCH.literal(Long.valueOf(j)))).build());
    }

    private static PreparedStatement ensureEpochWindowed(RemoteTable<?> remoteTable, CassandraClient cassandraClient, long j) {
        return cassandraClient.prepare(((Update) QueryBuilder.update(remoteTable.name()).setColumn(ColumnName.EPOCH.column(), ColumnName.EPOCH.literal(Long.valueOf(j))).where((Relation) ColumnName.PARTITION_KEY.relation().isEqualTo(QueryBuilder.bindMarker(ColumnName.PARTITION_KEY.bind()))).where((Relation) ColumnName.ROW_TYPE.relation().isEqualTo(RowType.METADATA_ROW.literal())).where((Relation) ColumnName.DATA_KEY.relation().isEqualTo(ColumnName.DATA_KEY.literal(ColumnName.METADATA_KEY))).where((Relation) ColumnName.WINDOW_START.relation().isEqualTo(ColumnName.WINDOW_START.literal(0L))).ifColumn(ColumnName.EPOCH.column()).isEqualTo(ColumnName.EPOCH.literal(Long.valueOf(j)))).build());
    }
}
