package io.xzxj.canal.core.client;

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.kafka.MessageDeserializer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.google.common.collect.Lists;
import io.xzxj.canal.core.context.MqTopicMap;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.keyvalue.MultiKey;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:io/xzxj/canal/core/client/DynamicTopicKafkaCanalConnector.class */
public class DynamicTopicKafkaCanalConnector implements CanalMQConnector {
    protected KafkaConsumer<String, Message> kafkaConsumer;
    protected KafkaConsumer<String, String> kafkaConsumer2;
    private final Map<String, String> dynamicTopic;
    private final List<String> topics;
    protected boolean flatMessage;
    protected volatile boolean connected = false;
    protected volatile boolean running = false;
    private final Map<TopicPartitionKey, Long> currentOffsets = new ConcurrentHashMap();
    protected Properties properties = new Properties();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/xzxj/canal/core/client/DynamicTopicKafkaCanalConnector$TopicPartitionKey.class */
    public static class TopicPartitionKey extends MultiKey<Object> {
        private final String topic;
        private final Integer partition;

        public TopicPartitionKey(TopicPartition topicPartition) {
            super(topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
            this.topic = topicPartition.topic();
            this.partition = Integer.valueOf(topicPartition.partition());
        }

        public TopicPartitionKey(String str, Integer num) {
            super(str, num);
            this.topic = str;
            this.partition = num;
        }

        public TopicPartition toTopicPartition() {
            return new TopicPartition(this.topic, this.partition.intValue());
        }

        public String getTopic() {
            return this.topic;
        }

        public Integer getPartition() {
            return this.partition;
        }
    }

    public DynamicTopicKafkaCanalConnector(String str, Map<String, String> map, List<String> list, String str2, Integer num, boolean z) {
        this.flatMessage = z;
        this.properties.put("bootstrap.servers", str);
        this.properties.put("group.id", str2);
        this.properties.put("enable.auto.commit", false);
        this.properties.put("auto.commit.interval.ms", "1000");
        this.properties.put("auto.offset.reset", "latest");
        this.properties.put("request.timeout.ms", "40000");
        this.properties.put("session.timeout.ms", "30000");
        this.properties.put("isolation.level", "read_committed");
        this.properties.put("max.poll.records", (num == null ? 100 : num).toString());
        this.properties.put("key.deserializer", StringDeserializer.class.getName());
        if (z) {
            this.properties.put("value.deserializer", StringDeserializer.class.getName());
        } else {
            this.properties.put("value.deserializer", MessageDeserializer.class.getName());
        }
        this.dynamicTopic = map;
        this.topics = list;
    }

    public void connect() {
        if (this.connected) {
            return;
        }
        this.connected = true;
        if (this.kafkaConsumer == null && !this.flatMessage) {
            this.kafkaConsumer = new KafkaConsumer<>(this.properties);
        }
        if (this.kafkaConsumer2 == null && this.flatMessage) {
            this.kafkaConsumer2 = new KafkaConsumer<>(this.properties);
        }
    }

    public void disconnect() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
            this.kafkaConsumer = null;
        }
        if (this.kafkaConsumer2 != null) {
            this.kafkaConsumer2.close();
            this.kafkaConsumer2 = null;
        }
        this.connected = false;
    }

    protected void waitClientRunning() {
        this.running = true;
    }

    public boolean checkValid() {
        return true;
    }

    public void subscribe() {
        waitClientRunning();
        if (this.running) {
            if (!CollectionUtils.isNotEmpty(this.dynamicTopic.keySet())) {
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.subscribe(this.topics);
                }
                if (this.kafkaConsumer2 != null) {
                    this.kafkaConsumer2.subscribe(this.topics);
                    return;
                }
                return;
            }
            List list = (List) this.dynamicTopic.entrySet().stream().filter(entry -> {
                return StringUtils.isNotBlank((CharSequence) entry.getValue());
            }).flatMap(entry2 -> {
                String str = (String) entry2.getKey();
                return Arrays.stream(((String) entry2.getValue()).split(",")).map(str2 -> {
                    return new TopicPartition(str, Integer.parseInt(str2.trim()));
                });
            }).collect(Collectors.toList());
            if (CollectionUtils.isNotEmpty(list)) {
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.assign(list);
                }
                if (this.kafkaConsumer2 != null) {
                    this.kafkaConsumer2.assign(list);
                }
            }
        }
    }

    public void unsubscribe() {
        waitClientRunning();
        if (this.running) {
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.unsubscribe();
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.unsubscribe();
            }
        }
    }

    public List<Message> getList(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        List<Message> listWithoutAck = getListWithoutAck(l, timeUnit);
        if (listWithoutAck != null && !listWithoutAck.isEmpty()) {
            ack();
        }
        return listWithoutAck;
    }

    public List<Message> getListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        ConsumerRecords poll = this.kafkaConsumer.poll(Duration.of(l.longValue(), ChronoUnit.valueOf(timeUnit.name())));
        this.currentOffsets.clear();
        for (TopicPartition topicPartition : poll.partitions()) {
            this.currentOffsets.put(new TopicPartitionKey(topicPartition), Long.valueOf(this.kafkaConsumer.position(topicPartition)));
        }
        if (poll.isEmpty()) {
            return Lists.newArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            Message message = (Message) consumerRecord.value();
            arrayList.add(message);
            MqTopicMap.setTopic(Long.valueOf(message.getId()), consumerRecord.topic(), consumerRecord.partition());
        }
        return arrayList;
    }

    public List<FlatMessage> getFlatList(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        List<FlatMessage> flatListWithoutAck = getFlatListWithoutAck(l, timeUnit);
        if (flatListWithoutAck != null && !flatListWithoutAck.isEmpty()) {
            ack();
        }
        return flatListWithoutAck;
    }

    public List<FlatMessage> getFlatListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        ConsumerRecords poll = this.kafkaConsumer2.poll(Duration.of(l.longValue(), ChronoUnit.valueOf(timeUnit.name())));
        this.currentOffsets.clear();
        for (TopicPartition topicPartition : poll.partitions()) {
            this.currentOffsets.put(new TopicPartitionKey(topicPartition), Long.valueOf(this.kafkaConsumer2.position(topicPartition)));
        }
        if (poll.isEmpty()) {
            return Lists.newArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            FlatMessage flatMessage = (FlatMessage) JSON.parseObject((String) consumerRecord.value(), FlatMessage.class);
            arrayList.add(flatMessage);
            MqTopicMap.setTopic(Long.valueOf(flatMessage.getId()), consumerRecord.topic(), consumerRecord.partition());
        }
        return arrayList;
    }

    public void rollback() {
        waitClientRunning();
        if (this.running) {
            rollback((KafkaConsumer<String, ?>) this.kafkaConsumer);
            rollback(this.kafkaConsumer2);
        }
    }

    private void rollback(KafkaConsumer<String, ?> kafkaConsumer) {
        if (kafkaConsumer == null) {
            return;
        }
        for (Map.Entry<TopicPartitionKey, Long> entry : this.currentOffsets.entrySet()) {
            kafkaConsumer.seek(entry.getKey().toTopicPartition(), entry.getValue().longValue() - 1);
        }
    }

    public void ack() {
        waitClientRunning();
        if (this.running) {
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.commitSync();
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.commitSync();
            }
        }
    }

    public void subscribe(String str) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message get(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message get(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message getWithoutAck(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public Message getWithoutAck(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void ack(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void rollback(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void setSessionTimeout(Long l, TimeUnit timeUnit) {
        long millis = timeUnit.toMillis(l.longValue());
        this.properties.put("request.timeout.ms", String.valueOf(millis + 60000));
        this.properties.put("session.timeout.ms", String.valueOf(millis));
    }
}
