package dev.responsive.kafka.internal.db.mongo;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import dev.responsive.kafka.internal.db.RemoteTable;
import dev.responsive.kafka.internal.db.RemoteWriter;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/responsive/kafka/internal/db/mongo/MongoWriter.class */
public class MongoWriter<K, P, D> implements RemoteWriter<K, P> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriter.class);
    private final RemoteTable<K, WriteModel<D>> table;
    private final int kafkaPartition;
    private final P tablePartition;
    private final Supplier<MongoCollection<D>> collection;
    private final List<WriteModel<D>> accumulatedWrites = new ArrayList();

    public MongoWriter(RemoteTable<K, WriteModel<D>> remoteTable, int i, P p, Supplier<MongoCollection<D>> supplier) {
        this.table = remoteTable;
        this.kafkaPartition = i;
        this.tablePartition = p;
        this.collection = supplier;
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public void insert(K k, byte[] bArr, long j) {
        this.accumulatedWrites.add(this.table.insert(this.kafkaPartition, k, bArr, j));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public void delete(K k) {
        this.accumulatedWrites.add(this.table.delete(this.kafkaPartition, k));
    }

    @Override // dev.responsive.kafka.internal.db.RemoteWriter
    public CompletionStage<RemoteWriteResult<P>> flush() {
        if (this.accumulatedWrites.isEmpty()) {
            LOG.info("Skipping empty bulk write for partition {}", this.tablePartition);
            return CompletableFuture.completedFuture(RemoteWriteResult.success(this.tablePartition));
        }
        try {
            this.collection.get().bulkWrite(this.accumulatedWrites, new BulkWriteOptions().ordered(false));
            this.accumulatedWrites.clear();
            return CompletableFuture.completedFuture(RemoteWriteResult.success(this.tablePartition));
        } catch (MongoBulkWriteException e) {
            LOG.error("Failed to flush to {}[{}]. If the exception contains 'E11000 duplicate key', then it was likely this writer was fenced", new Object[]{this.table.name(), this.tablePartition, e});
            return CompletableFuture.completedFuture(RemoteWriteResult.failure(this.tablePartition));
        }
    }
}
