package io.gridgo.connector.kafka;

import io.gridgo.bean.BElement;
import io.gridgo.bean.BObject;
import io.gridgo.bean.BValue;
import io.gridgo.connector.impl.AbstractProducer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.support.Message;
import io.gridgo.framework.support.impl.MultipartMessage;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.joo.promise4j.Deferred;
import org.joo.promise4j.Promise;
import org.joo.promise4j.impl.CompletableDeferredObject;
import org.joo.promise4j.impl.JoinedPromise;
import org.joo.promise4j.impl.JoinedResults;

/* loaded from: input_file:io/gridgo/connector/kafka/KafkaProducer.class */
public class KafkaProducer extends AbstractProducer {
    private KafkaConfiguration configuration;
    private org.apache.kafka.clients.producer.KafkaProducer<Object, Object> producer;
    private String[] topics;

    public KafkaProducer(ConnectorContext connectorContext, KafkaConfiguration kafkaConfiguration) {
        super(connectorContext);
        this.configuration = kafkaConfiguration;
        this.topics = kafkaConfiguration.getTopic().split(",");
    }

    public void send(Message message) {
        for (String str : this.topics) {
            this.producer.send(buildProducerRecord(str, message));
        }
    }

    public Promise<Message, Exception> sendWithAck(Message message) {
        ArrayList arrayList = new ArrayList();
        for (String str : this.topics) {
            CompletableDeferredObject completableDeferredObject = new CompletableDeferredObject();
            this.producer.send(buildProducerRecord(str, message), (recordMetadata, exc) -> {
                ack(completableDeferredObject, recordMetadata, exc);
            });
            arrayList.add(completableDeferredObject);
        }
        return arrayList.size() == 1 ? (Promise) arrayList.get(0) : JoinedPromise.from(arrayList).filterDone(this::convertJoinedResult);
    }

    public Message convertJoinedResult(JoinedResults<Message> joinedResults) {
        return new MultipartMessage(joinedResults);
    }

    private void ack(Deferred<Message, Exception> deferred, RecordMetadata recordMetadata, Exception exc) {
        ack(deferred, buildAckMessage(recordMetadata), exc);
    }

    private ProducerRecord<Object, Object> buildProducerRecord(String str, Message message) {
        BObject headers = message.getPayload().getHeaders();
        BValue value = headers.getValue(KafkaConstants.PARTITION);
        Integer integer = value != null ? value.getInteger() : null;
        BValue value2 = headers.getValue(KafkaConstants.TIMESTAMP);
        Long l = value2 != null ? value2.getLong() : null;
        BValue value3 = headers.getValue(KafkaConstants.KEY);
        Object data = value3 != null ? value3.getData() : null;
        BElement body = message.getPayload().getBody();
        ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(str, integer, l, data, convert(body));
        if (body != null && !body.isValue()) {
            producerRecord.headers().add(KafkaConstants.RAW, new byte[]{1});
        }
        for (Map.Entry entry : headers.entrySet()) {
            if (((BElement) entry.getValue()).isValue()) {
                producerRecord.headers().add((String) entry.getKey(), ((BElement) entry.getValue()).asValue().toBytes());
            }
        }
        return producerRecord;
    }

    private Object convert(BElement bElement) {
        if (bElement == null) {
            return null;
        }
        return bElement.isValue() ? bElement.asValue().getData() : bElement.toBytes();
    }

    private Message buildAckMessage(RecordMetadata recordMetadata) {
        if (recordMetadata == null) {
            return null;
        }
        return createMessage(BObject.ofEmpty().setAny(KafkaConstants.IS_ACK_MSG, "true").setAny(KafkaConstants.TIMESTAMP, Long.valueOf(recordMetadata.timestamp())).setAny(KafkaConstants.OFFSET, Long.valueOf(recordMetadata.offset())).setAny(KafkaConstants.PARTITION, Integer.valueOf(recordMetadata.partition())).setAny(KafkaConstants.TOPIC, recordMetadata.topic()), BValue.ofEmpty());
    }

    public Promise<Message, Exception> call(Message message) {
        throw new UnsupportedOperationException();
    }

    protected void onStart() {
        if (this.configuration.isTopicIsPattern()) {
            getLogger().warn("topicIsPattern won't work with KafkaProducer, will ignore");
        }
        Properties props = getProps();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
            this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    protected void onStop() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    private Properties getProps() {
        return this.configuration.createProducerProperties();
    }

    protected String generateName() {
        return "producer.kafka." + this.configuration.getTopic();
    }

    public boolean isCallSupported() {
        return false;
    }
}
