package io.gridgo.connector.kafka;

import io.gridgo.bean.BFactory;
import io.gridgo.bean.BObject;
import io.gridgo.connector.impl.AbstractConsumer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.execution.ExecutionStrategy;
import io.gridgo.framework.execution.impl.ExecutorExecutionStrategy;
import io.gridgo.framework.support.Message;
import io.gridgo.framework.support.impl.MultipartMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.joo.promise4j.Promise;
import org.joo.promise4j.impl.AsyncDeferredObject;
import org.joo.promise4j.impl.SimpleDonePromise;
import org.joo.promise4j.impl.SimpleFailurePromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gridgo/connector/kafka/KafkaConsumer.class */
public class KafkaConsumer extends AbstractConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final int DEFAULT_THREADS = 8;
    private static final ExecutionStrategy DEFAULT_EXECUTION_STRATEGY = new ExecutorExecutionStrategy(DEFAULT_THREADS);
    private final KafkaConfiguration configuration;
    private List<KafkaFetchRecords> tasks;

    /* loaded from: input_file:io/gridgo/connector/kafka/KafkaConsumer$KafkaFetchRecords.class */
    class KafkaFetchRecords implements Runnable {
        private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;
        private final String topicName;
        private final Properties kafkaProps;
        private String id;
        private volatile boolean stopped = false;
        private Pattern pattern;

        public KafkaFetchRecords(String str, Pattern pattern, String str2, Properties properties) {
            this.topicName = str;
            this.pattern = pattern;
            this.id = str2;
            this.kafkaProps = properties;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            boolean z2 = true;
            while (z2) {
                if (!z) {
                    try {
                        doInit();
                    } catch (Throwable th) {
                        KafkaConsumer.log.warn("Exception caught when initializing KafkaConsumer", th);
                    }
                }
                if (!z) {
                    try {
                        Thread.sleep(KafkaConsumer.this.configuration.getPollTimeoutMs().longValue());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                z = false;
                z2 = doRun();
            }
        }

        protected void doInit() {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(this.kafkaProps);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }

        private boolean doRun() {
            boolean z = false;
            Duration ofMillis = Duration.ofMillis(100L);
            boolean isBatchEnabled = KafkaConsumer.this.configuration.isBatchEnabled();
            Thread.currentThread().setName("KAFKA-CONSUMER-" + this.topicName + "-" + this.id);
            try {
                try {
                    subscribeTopics();
                    seekOffset(ofMillis);
                    while (!this.stopped && !z && !Thread.currentThread().isInterrupted()) {
                        ConsumerRecords poll = this.consumer.poll(ofMillis);
                        for (TopicPartition topicPartition : poll.partitions()) {
                            List<ConsumerRecord<Object, Object>> records = poll.records(topicPartition);
                            if (!records.isEmpty()) {
                                long j = -1;
                                try {
                                    j = ((Long) (isBatchEnabled ? processBatchRecords(records) : processSingleRecord(topicPartition, records)).get()).longValue();
                                } catch (Exception e) {
                                    KafkaConsumer.log.error("Exception caught on processing records", e);
                                    KafkaConsumer.this.getContext().getExceptionHandler().accept(e);
                                    z = true;
                                }
                                commitOffset(j, topicPartition);
                            }
                        }
                    }
                    if (!z && KafkaConsumer.this.configuration.isAutoCommitEnable()) {
                        if ("async".equals(KafkaConsumer.this.configuration.getAutoCommitOnStop())) {
                            this.consumer.commitAsync();
                        } else if ("sync".equals(KafkaConsumer.this.configuration.getAutoCommitOnStop())) {
                            this.consumer.commitSync();
                        }
                    }
                    cleanUpConsumer();
                } catch (Throwable th) {
                    cleanUpConsumer();
                    throw th;
                }
            } catch (KafkaException e2) {
                KafkaConsumer.log.error("KafkaException caught on consumer thread", e2);
                z = true;
                cleanUpConsumer();
            } catch (Exception e3) {
                KafkaConsumer.log.error("Exception caught on consumer thread", e3);
                KafkaConsumer.this.getContext().getExceptionHandler().accept(e3);
                cleanUpConsumer();
            } catch (WakeupException e4) {
                KafkaConsumer.log.warn("WakeupException caught on consumer thread", e4);
                cleanUpConsumer();
            }
            return z;
        }

        private void subscribeTopics() {
            if (KafkaConsumer.this.configuration.isTopicIsPattern()) {
                this.consumer.subscribe(this.pattern);
            } else {
                this.consumer.subscribe(Arrays.asList(this.topicName.split(",")));
            }
        }

        private void cleanUpConsumer() {
            try {
                this.consumer.unsubscribe();
            } finally {
                this.consumer.close();
            }
        }

        private void commitOffset(long j, TopicPartition topicPartition) {
            if (j != -1) {
                this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)));
            }
        }

        private void seekOffset(Duration duration) {
            if (KafkaConsumer.this.configuration.getSeekTo() != null) {
                if (KafkaConsumer.this.configuration.getSeekTo().equals("beginning")) {
                    this.consumer.poll(duration);
                    this.consumer.seekToBeginning(this.consumer.assignment());
                } else {
                    if (!KafkaConsumer.this.configuration.getSeekTo().equals("end")) {
                        throw new IllegalArgumentException("Invalid seekTo option: " + KafkaConsumer.this.configuration.getSeekTo());
                    }
                    this.consumer.poll(duration);
                    this.consumer.seekToEnd(this.consumer.assignment());
                }
            }
        }

        private Promise<Long, Exception> processBatchRecords(List<ConsumerRecord<Object, Object>> list) {
            long offset = list.get(list.size() - 1).offset();
            AsyncDeferredObject asyncDeferredObject = new AsyncDeferredObject();
            KafkaConsumer.this.publish(buildMessageForBatch(list), asyncDeferredObject);
            return asyncDeferredObject.promise().filterDone(message -> {
                return Long.valueOf(offset);
            });
        }

        private Message buildMessageForBatch(List<ConsumerRecord<Object, Object>> list) {
            MultipartMessage multipartMessage = new MultipartMessage((Message[]) list.stream().map(this::buildMessage).toArray(i -> {
                return new Message[i];
            }));
            BObject headers = multipartMessage.getPayload().getHeaders();
            ConsumerRecord<Object, Object> consumerRecord = list.get(list.size() - 1);
            populateCommonHeaders(headers, consumerRecord);
            headers.putAny(KafkaConstants.IS_BATCH, true);
            headers.putAny(KafkaConstants.BATCH_SIZE, Integer.valueOf(list.size()));
            headers.putAny(KafkaConstants.OFFSET, Long.valueOf(consumerRecord.offset()));
            return multipartMessage;
        }

        private Promise<Long, Exception> processSingleRecord(TopicPartition topicPartition, List<ConsumerRecord<Object, Object>> list) {
            boolean isBreakOnFirstError = KafkaConsumer.this.configuration.isBreakOnFirstError();
            long j = -1;
            for (ConsumerRecord<Object, Object> consumerRecord : list) {
                Message buildMessage = buildMessage(consumerRecord);
                AsyncDeferredObject asyncDeferredObject = new AsyncDeferredObject();
                KafkaConsumer.this.publish(buildMessage, asyncDeferredObject);
                try {
                    asyncDeferredObject.promise().get();
                    j = consumerRecord.offset();
                } catch (Exception e) {
                    KafkaConsumer.log.error("Exception caught while processing ConsumerRecord", e);
                    if (isBreakOnFirstError) {
                        commitOffset(j, topicPartition);
                        return new SimpleFailurePromise(e);
                    }
                }
            }
            return new SimpleDonePromise(Long.valueOf(j));
        }

        private Message buildMessage(ConsumerRecord<Object, Object> consumerRecord) {
            BObject ofEmpty = BObject.ofEmpty();
            populateCommonHeaders(ofEmpty, consumerRecord);
            ofEmpty.putAny(KafkaConstants.OFFSET, Long.valueOf(consumerRecord.offset()));
            if (consumerRecord.key() != null) {
                ofEmpty.putAny(KafkaConstants.KEY, consumerRecord.key());
            }
            boolean z = false;
            for (Header header : consumerRecord.headers()) {
                ofEmpty.putAny(header.key(), header.value());
                if (KafkaConstants.RAW.equals(header.key())) {
                    z = true;
                }
            }
            return KafkaConsumer.this.createMessage(ofEmpty, z ? BFactory.DEFAULT.fromRaw((byte[]) consumerRecord.value()) : BFactory.DEFAULT.fromAny(consumerRecord.value()));
        }

        private void populateCommonHeaders(BObject bObject, ConsumerRecord<Object, Object> consumerRecord) {
            bObject.putAny(KafkaConstants.PARTITION, Integer.valueOf(consumerRecord.partition()));
            bObject.putAny(KafkaConstants.TOPIC, consumerRecord.topic());
            bObject.putAny(KafkaConstants.TIMESTAMP, Long.valueOf(consumerRecord.timestamp()));
        }

        private void shutdown() {
            this.stopped = true;
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
        }
    }

    public KafkaConsumer(ConnectorContext connectorContext, @NonNull KafkaConfiguration kafkaConfiguration) {
        super(connectorContext);
        if (kafkaConfiguration == null) {
            throw new NullPointerException("configuration is marked @NonNull but is null");
        }
        this.configuration = kafkaConfiguration;
    }

    protected void onStart() {
        ExecutionStrategy executionStrategy = (ExecutionStrategy) getContext().getConsumerExecutionStrategy().orElse(DEFAULT_EXECUTION_STRATEGY);
        executionStrategy.start();
        this.tasks = new ArrayList();
        Properties props = getProps();
        Pattern compile = this.configuration.isTopicIsPattern() ? Pattern.compile(this.configuration.getTopic()) : null;
        for (int i = 0; i < this.configuration.getConsumersCount(); i++) {
            KafkaFetchRecords kafkaFetchRecords = new KafkaFetchRecords(this.configuration.getTopic(), compile, i, props);
            kafkaFetchRecords.doInit();
            executionStrategy.execute(kafkaFetchRecords);
            this.tasks.add(kafkaFetchRecords);
        }
    }

    private Properties getProps() {
        return this.configuration.createConsumerProperties();
    }

    protected void onStop() {
        Iterator<KafkaFetchRecords> it = this.tasks.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        ExecutionStrategy executionStrategy = (ExecutionStrategy) getContext().getConsumerExecutionStrategy().orElse(null);
        if (executionStrategy != null) {
            executionStrategy.stop();
        }
    }

    protected String generateName() {
        return "consumer.kafka." + this.configuration.getTopic();
    }

    static {
        DEFAULT_EXECUTION_STRATEGY.start();
        Runtime runtime = Runtime.getRuntime();
        ExecutionStrategy executionStrategy = DEFAULT_EXECUTION_STRATEGY;
        Objects.requireNonNull(executionStrategy);
        runtime.addShutdownHook(new Thread(executionStrategy::stop));
    }
}
