package org.springframework.integration.kafka.inbound;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.json.JacksonJsonUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-5.5.19.jar:org/springframework/integration/kafka/inbound/KafkaInboundGateway.class */
public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport implements Pausable, OrderlyShutdownCapable {
    private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
    private final KafkaInboundGateway<K, V, R>.IntegrationRecordMessageListener listener = new IntegrationRecordMessageListener();
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final KafkaTemplate<K, R> kafkaTemplate;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
    private boolean bindSourceRecord;
    private boolean containerDeliveryAttemptPresent;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-kafka-5.5.19.jar:org/springframework/integration/kafka/inbound/KafkaInboundGateway$IntegrationRecordMessageListener.class */
    private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> implements RetryListener {
        IntegrationRecordMessageListener() {
            super(null, null);
        }

        @Override // org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter, org.springframework.kafka.listener.ConsumerSeekAware
        public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
            if (KafkaInboundGateway.this.onPartitionsAssignedSeekCallback != null) {
                KafkaInboundGateway.this.onPartitionsAssignedSeekCallback.accept(map, consumerSeekCallback);
            }
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener
        public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message = null;
            try {
                message = enhanceHeaders(toMessagingMessage(consumerRecord, acknowledgment, consumer), consumerRecord);
                KafkaInboundGateway.this.setAttributesIfNecessary(consumerRecord, message);
            } catch (RuntimeException e) {
                MessageChannel errorChannel = KafkaInboundGateway.this.getErrorChannel();
                if (errorChannel != null) {
                    KafkaInboundGateway.this.messagingTemplate.send((MessagingTemplate) errorChannel, (Message<?>) KafkaInboundGateway.this.buildErrorMessage(null, new ConversionException("Failed to convert to message", (ConsumerRecord<?, ?>) consumerRecord, (Throwable) e)));
                }
            }
            if (message == null) {
                KafkaInboundGateway.this.logger.debug(() -> {
                    return "Converter returned a null message for: " + consumerRecord;
                });
                return;
            }
            try {
                Message<?> sendAndReceiveMessage = KafkaInboundGateway.this.sendAndReceiveMessage(message);
                if (sendAndReceiveMessage != null) {
                    KafkaInboundGateway.this.kafkaTemplate.send(enhanceReply(message, sendAndReceiveMessage));
                }
            } finally {
                if (KafkaInboundGateway.this.retryTemplate == null) {
                    KafkaInboundGateway.ATTRIBUTES_HOLDER.remove();
                }
            }
        }

        private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> consumerRecord) {
            Message<?> message2 = message;
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger(((RetryContext) KafkaInboundGateway.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1));
                } else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
                    rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger(ByteBuffer.wrap(consumerRecord.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt()));
                }
                if (KafkaInboundGateway.this.bindSourceRecord) {
                    rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, consumerRecord);
                }
            } else {
                MessageBuilder fromMessage = MessageBuilder.fromMessage(message);
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    fromMessage.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, (Object) new AtomicInteger(((RetryContext) KafkaInboundGateway.ATTRIBUTES_HOLDER.get()).getRetryCount() + 1));
                } else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
                    fromMessage.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, (Object) new AtomicInteger(ByteBuffer.wrap(consumerRecord.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt()));
                }
                if (KafkaInboundGateway.this.bindSourceRecord) {
                    fromMessage.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, (Object) consumerRecord);
                }
                message2 = fromMessage.build();
            }
            return message2;
        }

        private Message<?> enhanceReply(Message<?> message, Message<?> message2) {
            AbstractIntegrationMessageBuilder abstractIntegrationMessageBuilder = null;
            MessageHeaders headers = message2.getHeaders();
            MessageHeaders headers2 = message.getHeaders();
            if (headers.get(KafkaHeaders.CORRELATION_ID) == null && headers2.get(KafkaHeaders.CORRELATION_ID) != null) {
                abstractIntegrationMessageBuilder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(message2).setHeader(KafkaHeaders.CORRELATION_ID, headers2.get(KafkaHeaders.CORRELATION_ID));
            }
            if (headers.get(KafkaHeaders.TOPIC) == null && headers2.get(KafkaHeaders.REPLY_TOPIC) != null) {
                if (abstractIntegrationMessageBuilder == null) {
                    abstractIntegrationMessageBuilder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(message2);
                }
                abstractIntegrationMessageBuilder.setHeader(KafkaHeaders.TOPIC, headers2.get(KafkaHeaders.REPLY_TOPIC));
            }
            if (headers.get(KafkaHeaders.PARTITION_ID) == null && headers2.get(KafkaHeaders.REPLY_PARTITION) != null) {
                if (abstractIntegrationMessageBuilder == null) {
                    abstractIntegrationMessageBuilder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(message2);
                }
                abstractIntegrationMessageBuilder.setHeader(KafkaHeaders.PARTITION_ID, headers2.get(KafkaHeaders.REPLY_PARTITION));
            }
            return abstractIntegrationMessageBuilder != null ? abstractIntegrationMessageBuilder.build() : message2;
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
            if (KafkaInboundGateway.this.retryTemplate == null) {
                return true;
            }
            KafkaInboundGateway.ATTRIBUTES_HOLDER.set(retryContext);
            return true;
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            KafkaInboundGateway.ATTRIBUTES_HOLDER.remove();
        }

        @Override // org.springframework.retry.RetryListener
        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        }

        @Override // org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter, org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener, org.springframework.kafka.listener.GenericMessageListener
        public /* bridge */ /* synthetic */ void onMessage(Object obj, Acknowledgment acknowledgment, Consumer consumer) {
            onMessage((ConsumerRecord) obj, acknowledgment, (Consumer<?, ?>) consumer);
        }
    }

    public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer, KafkaTemplate<K, R> kafkaTemplate) {
        Assert.notNull(abstractMessageListenerContainer, "messageListenerContainer is required");
        Assert.notNull(kafkaTemplate, "kafkaTemplate is required");
        Assert.isNull(abstractMessageListenerContainer.getContainerProperties().getMessageListener(), "Container must not already have a listener");
        this.messageListenerContainer = abstractMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.kafkaTemplate = kafkaTemplate;
        setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
        if (JacksonPresent.isJackson2Present()) {
            MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
            DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
            defaultKafkaHeaderMapper.addTrustedPackages((String[]) JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
            messagingMessageConverter.setHeaderMapper(defaultKafkaHeaderMapper);
            this.listener.setMessageConverter(messagingMessageConverter);
        }
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.listener.setMessageConverter(recordMessageConverter);
    }

    public void setPayloadType(Class<?> cls) {
        this.listener.setFallbackType(cls);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> biConsumer) {
        this.onPartitionsAssignedSeekCallback = biConsumer;
    }

    public void setBindSourceRecord(boolean z) {
        this.bindSourceRecord = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        AcknowledgingConsumerAwareMessageListener acknowledgingConsumerAwareMessageListener = this.listener;
        if (this.retryTemplate != null) {
            acknowledgingConsumerAwareMessageListener = new RetryingMessageListenerAdapter(acknowledgingConsumerAwareMessageListener, this.retryTemplate, this.recoveryCallback);
            this.retryTemplate.registerListener(this.listener);
        }
        ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
        containerProperties.setMessageListener(acknowledgingConsumerAwareMessageListener);
        this.containerDeliveryAttemptPresent = containerProperties.isDeliveryAttemptHeader();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStart() {
        super.doStart();
        this.messageListenerContainer.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStop() {
        super.doStop();
        this.messageListenerContainer.stop();
    }

    @Override // org.springframework.integration.core.Pausable
    public void pause() {
        this.messageListenerContainer.pause();
    }

    @Override // org.springframework.integration.core.Pausable
    public void resume() {
        this.messageListenerContainer.resume();
    }

    @Override // org.springframework.integration.core.Pausable
    public boolean isPaused() {
        return this.messageListenerContainer.isContainerPaused();
    }

    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "kafka:inbound-gateway";
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return getPhase();
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int afterShutdown() {
        return getPhase();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setAttributesIfNecessary(Object obj, Message<?> message) {
        AttributeAccessor attributeAccessor;
        boolean z = getErrorChannel() != null && this.retryTemplate == null;
        boolean z2 = z | (this.retryTemplate != null);
        if (z) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (!z2 || (attributeAccessor = ATTRIBUTES_HOLDER.get()) == null) {
            return;
        }
        attributeAccessor.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
        attributeAccessor.setAttribute(KafkaHeaders.RAW_DATA, obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport
    public AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = ATTRIBUTES_HOLDER.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }
}
