package com.datarobot.mlops.spooler.kafka;

import com.datarobot.mlops.common.config.MappedConfig;
import com.datarobot.mlops.common.constants.ConfigConstants;
import com.datarobot.mlops.common.enums.SpoolerType;
import com.datarobot.mlops.common.exceptions.DRCommonException;
import com.datarobot.mlops.common.exceptions.DRQueueException;
import com.datarobot.mlops.common.records.Record;
import com.datarobot.mlops.common.spooler.RecordSpooler;
import com.datarobot.mlops.common.spooler.SpoolerOffsetManager;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datarobot/mlops/spooler/kafka/KafkaSpooler.class */
public class KafkaSpooler extends RecordSpooler {
    private static final int CLIENT_DEFAULT_POLL_TIMEOUT_MS = 3000;
    private static final int CLIENT_DEFAULT_MAX_POLL_RECORDS = 500;
    private static final int KAFKA_DEFAULT_MESSAGE_BYTE_SIZE_LIMIT = 1000000;
    private static final int DEFAULT_KAFKA_ACK_DEADLINE_MS = 600000;
    private static final int DEFAULT_KAFKA_ACK_RETRY = 3;
    private static final int KAFKA_NUM_RECORDS_PER_OFFSET = 1;
    private Map<String, Object> producerConfig;
    private Map<String, Object> consumerConfig;
    private KafkaProducer<String, String> producer;
    private KafkaConsumer<String, String> consumer;
    private Duration maxFlushTime;
    private String topic;
    private SpoolerOffsetManager spoolerOffsetManager;
    private int ackDeadline;
    private long lastRebalancing;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaSpooler.class);
    private static final String CLIENT_DEFAULT_SERIALIZER = StringSerializer.class.getName();
    private static final String CLIENT_DEFAULT_DESERIALIZER = StringDeserializer.class.getName();

    public KafkaSpooler(MappedConfig mappedConfig) {
        super(mappedConfig);
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public SpoolerType getType() {
        return SpoolerType.KAFKA;
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public List<String> getRequiredConfigKeys() {
        return ImmutableList.builder().add((ImmutableList.Builder) ConfigConstants.SPOOLER_ACTION_STR).add((ImmutableList.Builder) ConfigConstants.KAFKA_BOOTSTRAP_SERVERS).add((ImmutableList.Builder) ConfigConstants.KAFKA_TOPIC_NAME).build();
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public List<String> getOptionalConfigKeys() {
        return ImmutableList.builder().add((ImmutableList.Builder) ConfigConstants.KAFKA_CONSUMER_GROUP_ID).add((ImmutableList.Builder) ConfigConstants.KAFKA_CONSUMER_MAX_NUM_MESSAGES).add((ImmutableList.Builder) ConfigConstants.KAFKA_CONSUMER_POLL_TIMEOUT_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_MAX_FLUSH_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_BUFFER_MAX_KB).add((ImmutableList.Builder) ConfigConstants.KAFKA_MESSAGE_BYTE_SIZE_LIMIT).add((ImmutableList.Builder) ConfigConstants.KAFKA_ACK_DEADLINE_STR).add((ImmutableList.Builder) ConfigConstants.KAFKA_REQUEST_TIMEOUT_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_SESSION_TIMEOUT_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_DELIVERY_TIMEOUT_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_METADATA_MAX_AGE_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_CONNECTIONS_MAX_IDLE_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_LINGER_MS).add((ImmutableList.Builder) ConfigConstants.KAFKA_SECURITY_PROTOCOL).add((ImmutableList.Builder) ConfigConstants.KAFKA_SSL_TRUSTSTORE_LOCATION).add((ImmutableList.Builder) ConfigConstants.KAFKA_SSL_KEYSTORE_LOCATION).add((ImmutableList.Builder) ConfigConstants.KAFKA_SSL_KEYSTORE_PASSWORD).add((ImmutableList.Builder) ConfigConstants.KAFKA_SSL_KEY_PASSWORD).add((ImmutableList.Builder) ConfigConstants.KAFKA_SASL_MECHANISM).add((ImmutableList.Builder) ConfigConstants.KAFKA_SASL_JAAS_CONFIG).add((ImmutableList.Builder) ConfigConstants.KAFKA_SASL_LOGIN_CALLBACK_CLASS).build();
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler
    public void verifyConfig() throws DRCommonException {
        this.config.getStringValue(ConfigConstants.KAFKA_TOPIC_NAME);
        this.producerConfig = getKafkaProperties();
        this.consumerConfig = new HashMap(this.producerConfig);
        this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CLIENT_DEFAULT_SERIALIZER);
        this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CLIENT_DEFAULT_SERIALIZER);
        this.consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, CLIENT_DEFAULT_DESERIALIZER);
        this.consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CLIENT_DEFAULT_DESERIALIZER);
        this.consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.config.getValueWithDefault(ConfigConstants.KAFKA_AUTO_RELEASE_OFFSET, "latest"));
        this.consumerConfig.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(this.config.getValueWithDefault(ConfigConstants.KAFKA_CONSUMER_MAX_NUM_MESSAGES, 500)));
        this.consumerConfig.putIfAbsent("group.id", this.config.getValueWithDefault(ConfigConstants.KAFKA_CONSUMER_GROUP_ID, "tracking-agent"));
        this.consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(!this.enableDequeueAckRecord));
        this.maxFlushTime = Duration.ofMillis(this.config.getValueWithDefault(ConfigConstants.KAFKA_MAX_FLUSH_MS, Integer.MAX_VALUE));
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public Collection<Record> dequeue() throws DRQueueException {
        ArrayList arrayList = new ArrayList();
        int valueWithDefault = this.config.getValueWithDefault(ConfigConstants.KAFKA_CONSUMER_POLL_TIMEOUT_MS, 3000);
        try {
            if (this.enableDequeueAckRecord) {
                commitNextValidOffset();
                if (this.lastRebalancing + this.ackDeadline < System.currentTimeMillis()) {
                    this.lastRebalancing = System.currentTimeMillis();
                    this.consumer.enforceRebalance();
                }
                this.spoolerOffsetManager.clearRecordsProcessed();
            }
            Iterator<ConsumerRecord<String, String>> it = this.consumer.poll(Duration.ofMillis(valueWithDefault)).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, String> next = it.next();
                try {
                    Record fromJson = Record.fromJson(next.value());
                    if (!this.enableDequeueAckRecord || !this.spoolerOffsetManager.isRecordProcessed(fromJson.getId())) {
                        arrayList.add(fromJson);
                        if (this.enableDequeueAckRecord) {
                            addPendingRecord(fromJson.getId(), next);
                            this.spoolerOffsetManager.trackOffsetRecord(new SpoolerOffsetManager.OffsetMeta(next.offset(), 1L, fromJson.getId(), next.partition()));
                        }
                    }
                } catch (DRCommonException e) {
                    logger.error("Failed to de-serialized record: " + e.getMessage());
                    logger.debug(next.value());
                }
            }
            updateEmptyCount(arrayList.size());
            return arrayList;
        } catch (Exception e2) {
            logger.error("Dequeue failed: {}", e2.getMessage());
            throw new DRQueueException(e2.getMessage(), Collections.emptyList());
        }
    }

    private void commitNextValidOffset() {
        this.spoolerOffsetManager.findNextOffsets().forEach((num, linkedList) -> {
            if (linkedList.isEmpty()) {
                return;
            }
            try {
                SpoolerOffsetManager.OffsetMeta offsetMeta = (SpoolerOffsetManager.OffsetMeta) linkedList.getLast();
                ConsumerRecord consumerRecord = (ConsumerRecord) this.recordsPendingAck.get(offsetMeta.getRecordId());
                if (consumerRecord != null) {
                    this.consumer.commitSync(Collections.singletonMap(new TopicPartition(this.topic, consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1, consumerRecord.leaderEpoch(), "")));
                    this.spoolerOffsetManager.setLastCommittedOffset(offsetMeta);
                }
                linkedList.forEach(offsetMeta2 -> {
                    this.recordsPendingAck.remove(offsetMeta2.getRecordId());
                });
            } catch (CommitFailedException | RebalanceInProgressException e) {
                logger.error("Fail to commit offset: " + e.getMessage());
            }
        });
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler, com.datarobot.mlops.common.spooler.Spooler
    public void ackRecords(Collection<String> collection) {
        if (this.enableDequeueAckRecord) {
            collection.forEach(str -> {
                ConsumerRecord consumerRecord = (ConsumerRecord) this.recordsPendingAck.get(str);
                if (consumerRecord != null) {
                    this.spoolerOffsetManager.ackRecord(Long.valueOf(consumerRecord.offset()), consumerRecord.partition());
                }
            });
        }
    }

    @Override // com.datarobot.mlops.common.spooler.RecordSpooler, com.datarobot.mlops.common.spooler.Spooler
    public void nackRecords(Collection<String> collection) throws DRQueueException {
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public int enqueue(Collection<Record> collection) throws DRQueueException {
        int i = 0;
        String valueWithDefault = this.config.getValueWithDefault(ConfigConstants.KAFKA_TOPIC_NAME, "");
        Iterator<Record> it = collection.iterator();
        while (it.hasNext()) {
            String str = null;
            try {
                str = it.next().toJson();
                this.producer.send(new ProducerRecord<>(valueWithDefault, str));
            } catch (Exception e) {
                logger.error("Failed to enqueue records into the Kafka topic {}. Error: {}", valueWithDefault, e.getMessage());
                logger.debug(str);
                i++;
            }
        }
        return collection.size() - i;
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public int getMessageByteSizeLimit() {
        return this.config.getValueWithDefault(ConfigConstants.KAFKA_MESSAGE_BYTE_SIZE_LIMIT, 1000000);
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public void open() throws DRCommonException {
        verifyConfig();
        String valueWithDefault = this.config.getValueWithDefault(ConfigConstants.SPOOLER_ACTION_STR, RecordSpooler.Action.ENQUEUE.name());
        this.topic = this.config.getStringValue(ConfigConstants.KAFKA_TOPIC_NAME);
        try {
            RecordSpooler.Action valueOf = RecordSpooler.Action.valueOf(valueWithDefault);
            if (valueOf == RecordSpooler.Action.ENQUEUE || valueOf == RecordSpooler.Action.ENQUEUE_DEQUEUE) {
                this.producer = new KafkaProducer<>(this.producerConfig);
            }
            if (valueOf == RecordSpooler.Action.DEQUEUE || valueOf == RecordSpooler.Action.ENQUEUE_DEQUEUE) {
                this.consumer = new KafkaConsumer<>(this.consumerConfig);
                this.consumer.subscribe(Collections.singletonList(this.topic));
                logger.info("List of topics: " + this.consumer.listTopics().keySet());
            }
            this.ackDeadline = this.config.getValueWithDefault(ConfigConstants.KAFKA_ACK_DEADLINE_STR, DEFAULT_KAFKA_ACK_DEADLINE_MS);
            this.lastRebalancing = System.currentTimeMillis();
            this.spoolerOffsetManager = new SpoolerOffsetManager(this.ackDeadline * 3, 3);
            logger.info("Connection to Kafka topic: '{}' successful", this.topic);
        } catch (Exception e) {
            logger.error("Error initializing Kafka Spooler. Error: {}", e.getMessage());
            logger.debug("Details: {}", (Throwable) e);
            throw new DRCommonException(e.getMessage());
        }
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public void close() {
        if (this.producer != null) {
            try {
                this.producer.close(this.maxFlushTime);
            } catch (Exception e) {
                logger.warn("Error during shutdown: {}", e.getMessage());
            }
        }
        if (this.consumer != null) {
            try {
                if (this.enableDequeueAckRecord) {
                    commitNextValidOffset();
                } else {
                    this.consumer.commitSync();
                }
                this.consumer.close();
            } catch (Exception e2) {
                logger.warn("Error during shutdown: {}", e2.getMessage());
            }
        }
    }

    @Override // com.datarobot.mlops.common.spooler.Spooler
    public boolean needsRetry() {
        return false;
    }

    private Map<String, Object> getKafkaProperties() throws DRCommonException {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.config.getStringValue(ConfigConstants.KAFKA_BOOTSTRAP_SERVERS));
        String valueWithDefault = this.config.getValueWithDefault(ConfigConstants.KAFKA_CONNECTIONS_MAX_IDLE_MS, (String) null);
        if (valueWithDefault != null) {
            hashMap.put("connections.max.idle.ms", valueWithDefault);
        }
        String valueWithDefault2 = this.config.getValueWithDefault(ConfigConstants.KAFKA_METADATA_MAX_AGE_MS, (String) null);
        if (valueWithDefault2 != null) {
            hashMap.put("metadata.max.age.ms", valueWithDefault2);
        }
        String valueWithDefault3 = this.config.getValueWithDefault(ConfigConstants.KAFKA_REQUEST_TIMEOUT_MS, (String) null);
        if (valueWithDefault3 != null) {
            hashMap.put("request.timeout.ms", valueWithDefault3);
        }
        String valueWithDefault4 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SESSION_TIMEOUT_MS, (String) null);
        if (valueWithDefault4 != null) {
            hashMap.put("session.timeout.ms", valueWithDefault4);
        }
        String valueWithDefault5 = this.config.getValueWithDefault(ConfigConstants.KAFKA_DELIVERY_TIMEOUT_MS, (String) null);
        if (valueWithDefault5 != null) {
            hashMap.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, valueWithDefault5);
        }
        String valueWithDefault6 = this.config.getValueWithDefault(ConfigConstants.KAFKA_LINGER_MS, (String) null);
        if (valueWithDefault6 != null) {
            hashMap.put(ProducerConfig.LINGER_MS_CONFIG, valueWithDefault6);
        }
        String valueWithDefault7 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SSL_TRUSTSTORE_LOCATION, (String) null);
        if (valueWithDefault7 != null) {
            hashMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, valueWithDefault7);
        }
        String valueWithDefault8 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SSL_KEYSTORE_LOCATION, (String) null);
        if (valueWithDefault8 != null) {
            hashMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, valueWithDefault8);
        }
        String valueWithDefault9 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SSL_KEYSTORE_PASSWORD, (String) null);
        if (valueWithDefault9 != null) {
            hashMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, valueWithDefault9);
        }
        String valueWithDefault10 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SSL_KEY_PASSWORD, (String) null);
        if (valueWithDefault10 != null) {
            hashMap.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, valueWithDefault10);
        }
        String valueWithDefault11 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SECURITY_PROTOCOL, (String) null);
        if (valueWithDefault11 != null) {
            hashMap.put("security.protocol", valueWithDefault11);
        }
        String valueWithDefault12 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SASL_MECHANISM, (String) null);
        if (valueWithDefault12 != null) {
            hashMap.put(SaslConfigs.SASL_MECHANISM, valueWithDefault12);
        }
        String valueWithDefault13 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SASL_JAAS_CONFIG, (String) null);
        if (valueWithDefault13 != null) {
            hashMap.put(SaslConfigs.SASL_JAAS_CONFIG, valueWithDefault13);
        }
        String valueWithDefault14 = this.config.getValueWithDefault(ConfigConstants.KAFKA_SASL_LOGIN_CALLBACK_CLASS, (String) null);
        if (valueWithDefault14 != null) {
            hashMap.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, valueWithDefault14);
        }
        int valueWithDefault15 = this.config.getValueWithDefault(ConfigConstants.KAFKA_BUFFER_MAX_KB, -1);
        if (valueWithDefault15 != -1) {
            hashMap.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.valueOf(valueWithDefault15 * 1024));
        }
        return hashMap;
    }
}
