package net.sourceforge.jbizmo.commons.spring.kafka;

import jakarta.inject.Inject;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ExecutionException;
import net.sourceforge.jbizmo.commons.avro.util.AvroObjectSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:net/sourceforge/jbizmo/commons/spring/kafka/KafkaSender.class */
public class KafkaSender {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Inject
    private KafkaTemplate<String, byte[]> kafkaTemplate;

    public <T extends SpecificRecordBase> void sendResponse(String str, T t, byte[] bArr) {
        sendResponse(str, t, bArr, null);
    }

    public <T extends SpecificRecordBase> void sendResponse(String str, T t, byte[] bArr, Integer num) {
        if (logger.isDebugEnabled()) {
            if (num == null) {
                logger.debug("Send message to topic {}", str);
            } else {
                logger.debug("Send message to partition {} of topic {}", num, str);
            }
        }
        ProducerRecord producerRecord = new ProducerRecord(str, num, Long.valueOf(System.currentTimeMillis()), (Object) null, new AvroObjectSerializer(t).serialize());
        producerRecord.headers().add(new RecordHeader(KafkaHeaderHelper.CORRELATION_ID_KEY, bArr));
        try {
            this.kafkaTemplate.send(producerRecord).get();
            if (logger.isDebugEnabled()) {
                if (num == null) {
                    logger.debug("Message sent to topic {}", str);
                } else {
                    logger.debug("Message sent to partition {} of topic {}", num, str);
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (num == null) {
                logger.warn("Waiting for message to be sent to topic {} has been interrupted!", str);
            } else {
                logger.warn("Waiting for message to be sent to partition {} of topic {} has been interrupted!", num, str);
            }
            throw new KafkaSenderException(e);
        } catch (ExecutionException e2) {
            if (num == null) {
                logger.warn("Error while sending message to topic {}!", str);
            } else {
                logger.warn("Error while sending message to partition {} of topic {}!", num, str);
            }
            throw new KafkaSenderException(e2);
        }
    }
}
