package org.apache.pulsar.io.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Map;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "rabbitmq", type = IOType.SINK, help = "A sink connector is used for moving messages from Pulsar to RabbitMQ.", configClass = RabbitMQSinkConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/rabbitmq/RabbitMQSink.class */
public class RabbitMQSink implements Sink<byte[]> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RabbitMQSink.class);
    private Connection rabbitMQConnection;
    private Channel rabbitMQChannel;
    private RabbitMQSinkConfig rabbitMQSinkConfig;
    private String exchangeName;
    private String defaultRoutingKey;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.rabbitMQSinkConfig = RabbitMQSinkConfig.load(map, sinkContext);
        this.rabbitMQSinkConfig.validate();
        this.rabbitMQConnection = this.rabbitMQSinkConfig.createConnectionFactory().newConnection(this.rabbitMQSinkConfig.getConnectionName());
        log.info("A new connection to {}:{} has been opened successfully.", this.rabbitMQConnection.getAddress().getCanonicalHostName(), Integer.valueOf(this.rabbitMQConnection.getPort()));
        this.exchangeName = this.rabbitMQSinkConfig.getExchangeName();
        this.defaultRoutingKey = this.rabbitMQSinkConfig.getRoutingKey();
        String exchangeType = this.rabbitMQSinkConfig.getExchangeType();
        this.rabbitMQChannel = this.rabbitMQConnection.createChannel();
        if (!StringUtils.isNotEmpty(this.rabbitMQSinkConfig.getQueueName())) {
            this.rabbitMQChannel.exchangeDeclare(this.exchangeName, exchangeType, true);
            return;
        }
        this.rabbitMQChannel.exchangeDeclare(this.exchangeName, BuiltinExchangeType.DIRECT, true);
        this.rabbitMQChannel.queueDeclare(this.rabbitMQSinkConfig.getQueueName(), true, false, false, null);
        this.rabbitMQChannel.queueBind(this.rabbitMQSinkConfig.getQueueName(), this.exchangeName, this.defaultRoutingKey);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<byte[]> record) {
        byte[] value = record.getValue();
        try {
            String str = record.getProperties().get("routingKey");
            this.rabbitMQChannel.basicPublish(this.exchangeName, StringUtils.isEmpty(str) ? this.defaultRoutingKey : str, null, value);
            record.ack();
        } catch (IOException e) {
            record.fail();
            log.warn("Failed to publish the message to RabbitMQ ", (Throwable) e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.rabbitMQChannel != null) {
            this.rabbitMQChannel.close();
        }
        if (this.rabbitMQConnection != null) {
            this.rabbitMQConnection.close();
        }
    }
}
