package org.flowable.eventregistry.spring.kafka;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.eventregistry.api.OutboundEvent;
import org.flowable.eventregistry.api.OutboundEventChannelAdapter;
import org.springframework.kafka.core.KafkaOperations;

/* loaded from: input_file:WEB-INF/lib/flowable-event-registry-spring-7.1.0.jar:org/flowable/eventregistry/spring/kafka/KafkaOperationsOutboundEventChannelAdapter.class */
public class KafkaOperationsOutboundEventChannelAdapter implements OutboundEventChannelAdapter<Object> {
    protected KafkaOperations<Object, Object> kafkaOperations;
    protected KafkaPartitionProvider partitionProvider;
    protected KafkaMessageKeyProvider<?> messageKeyProvider;
    protected String topic;

    public KafkaOperationsOutboundEventChannelAdapter(KafkaOperations<Object, Object> kafkaOperations, KafkaPartitionProvider kafkaPartitionProvider, String str, String str2) {
        this(kafkaOperations, kafkaPartitionProvider, str, (KafkaMessageKeyProvider<?>) outboundEvent -> {
            return StringUtils.defaultIfEmpty(str2, null);
        });
    }

    public KafkaOperationsOutboundEventChannelAdapter(KafkaOperations<Object, Object> kafkaOperations, KafkaPartitionProvider kafkaPartitionProvider, String str, KafkaMessageKeyProvider<?> kafkaMessageKeyProvider) {
        this.kafkaOperations = kafkaOperations;
        this.partitionProvider = kafkaPartitionProvider;
        this.messageKeyProvider = kafkaMessageKeyProvider;
        this.topic = str;
    }

    @Override // org.flowable.eventregistry.api.OutboundEventChannelAdapter
    public void sendEvent(OutboundEvent<Object> outboundEvent) {
        try {
            Object body = outboundEvent.getBody();
            Map<String, Object> headers = outboundEvent.getHeaders();
            ArrayList arrayList = new ArrayList();
            for (String str : headers.keySet()) {
                Object obj = headers.get(str);
                if (obj != null) {
                    arrayList.add(new RecordHeader(str, obj.toString().getBytes(StandardCharsets.UTF_8)));
                }
            }
            this.kafkaOperations.send(new ProducerRecord<>(this.topic, this.partitionProvider == null ? null : this.partitionProvider.determinePartition(outboundEvent), this.messageKeyProvider == null ? null : this.messageKeyProvider.determineMessageKey(outboundEvent), body, arrayList)).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlowableException("Sending the event was interrupted", e);
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof RuntimeException)) {
                throw new FlowableException("failed to send event", e2.getCause());
            }
            throw ((RuntimeException) e2.getCause());
        }
    }

    @Override // org.flowable.eventregistry.api.OutboundEventChannelAdapter
    public void sendEvent(Object obj, Map<String, Object> map) {
        throw new UnsupportedOperationException("Outbound processor should never call this");
    }
}
