package io.xzxj.canal.core.client;

import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
import com.alibaba.otter.canal.protocol.FlatMessage;
import io.xzxj.canal.core.handler.IMessageHandler;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/xzxj/canal/core/client/KafkaCanalClient.class */
public class KafkaCanalClient extends AbstractCanalClient {
    private static final Logger log = LoggerFactory.getLogger(KafkaCanalClient.class);

    /* loaded from: input_file:io/xzxj/canal/core/client/KafkaCanalClient$Builder.class */
    public static final class Builder {
        private String filter;
        private Integer batchSize;
        private Long timeout;
        private TimeUnit unit;
        private String servers;
        private String topic;
        private Integer partition;
        private String groupId;
        private IMessageHandler<?> messageHandler;

        private Builder() {
            this.filter = "";
            this.batchSize = 1;
            this.timeout = 1L;
            this.unit = TimeUnit.SECONDS;
        }

        public Builder servers(String str) {
            this.servers = str;
            return this;
        }

        public Builder topic(String str) {
            this.topic = str;
            return this;
        }

        public Builder partition(Integer num) {
            this.partition = num;
            return this;
        }

        public Builder groupId(String str) {
            this.groupId = str;
            return this;
        }

        public Builder filter(String str) {
            this.filter = str;
            return this;
        }

        public Builder batchSize(Integer num) {
            this.batchSize = num;
            return this;
        }

        public Builder timeout(Long l) {
            this.timeout = l;
            return this;
        }

        public Builder unit(TimeUnit timeUnit) {
            this.unit = timeUnit;
            return this;
        }

        public Builder messageHandler(IMessageHandler<?> iMessageHandler) {
            this.messageHandler = iMessageHandler;
            return this;
        }

        public KafkaCanalClient build() {
            KafkaCanalConnector kafkaCanalConnector = new KafkaCanalConnector(this.servers, this.topic, this.partition, this.groupId, this.batchSize, true);
            KafkaCanalClient kafkaCanalClient = new KafkaCanalClient();
            kafkaCanalClient.messageHandler = this.messageHandler;
            kafkaCanalClient.connector = kafkaCanalConnector;
            kafkaCanalClient.filter = this.filter;
            kafkaCanalClient.unit = this.unit;
            kafkaCanalClient.batchSize = this.batchSize;
            kafkaCanalClient.timeout = this.timeout;
            return kafkaCanalClient;
        }
    }

    @Override // io.xzxj.canal.core.client.AbstractCanalClient
    public void subscribe() {
        this.connector.subscribe();
    }

    @Override // io.xzxj.canal.core.client.ICanalClient
    public void handleListening() {
        KafkaCanalConnector kafkaCanalConnector = this.connector;
        while (this.runStatus) {
            try {
                List flatListWithoutAck = kafkaCanalConnector.getFlatListWithoutAck(this.timeout, this.unit);
                log.debug("receive message={}", flatListWithoutAck);
                Iterator it = flatListWithoutAck.iterator();
                while (it.hasNext()) {
                    this.messageHandler.handleMessage((FlatMessage) it.next());
                }
                kafkaCanalConnector.ack();
            } catch (Exception e) {
                log.error("canal client exception", e);
                return;
            }
        }
    }

    public static Builder builder() {
        return new Builder();
    }
}
