package io.xzxj.canal.core.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.handler.IMessageHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/xzxj/canal/core/client/TcpCanalClient.class */
public class TcpCanalClient extends AbstractCanalClient {
    private static final Logger log = LoggerFactory.getLogger(TcpCanalClient.class);

    /* loaded from: input_file:io/xzxj/canal/core/client/TcpCanalClient$Builder.class */
    public static class Builder {
        private String filter;
        private Integer batchSize;
        private Long timeout;
        private TimeUnit unit;
        private String hostname;
        private Integer port;
        private String destination;
        private String username;
        private String password;
        private IMessageHandler<?> messageHandler;

        private Builder() {
            this.filter = "";
            this.batchSize = 1;
            this.timeout = 1L;
            this.unit = TimeUnit.SECONDS;
        }

        public Builder hostname(String str) {
            this.hostname = str;
            return this;
        }

        public Builder port(Integer num) {
            this.port = num;
            return this;
        }

        public Builder destination(String str) {
            this.destination = str;
            return this;
        }

        public Builder username(String str) {
            this.username = str;
            return this;
        }

        public Builder password(String str) {
            this.password = str;
            return this;
        }

        public Builder filter(String str) {
            this.filter = str;
            return this;
        }

        public Builder batchSize(Integer num) {
            this.batchSize = num;
            return this;
        }

        public Builder timeout(Long l) {
            this.timeout = l;
            return this;
        }

        public Builder unit(TimeUnit timeUnit) {
            this.unit = timeUnit;
            return this;
        }

        public Builder messageHandler(IMessageHandler<?> iMessageHandler) {
            this.messageHandler = iMessageHandler;
            return this;
        }

        public TcpCanalClient build() {
            CanalConnector newSingleConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(this.hostname, this.port.intValue()), this.destination, this.username, this.password);
            TcpCanalClient tcpCanalClient = new TcpCanalClient();
            tcpCanalClient.connector = newSingleConnector;
            tcpCanalClient.destination = this.destination;
            tcpCanalClient.messageHandler = this.messageHandler;
            tcpCanalClient.filter = this.filter;
            tcpCanalClient.unit = this.unit;
            tcpCanalClient.batchSize = this.batchSize;
            tcpCanalClient.timeout = this.timeout;
            return tcpCanalClient;
        }
    }

    @Override // io.xzxj.canal.core.client.ICanalClient
    public void handleListening() {
        long j = 0;
        try {
            Message withoutAck = this.connector.getWithoutAck(this.batchSize.intValue(), this.timeout, this.unit);
            log.debug("receive message={}", withoutAck);
            j = withoutAck.getId();
            if (withoutAck.getId() != -1 && !withoutAck.getEntries().isEmpty()) {
                this.messageHandler.handleMessage(this.destination, withoutAck);
            }
            this.connector.ack(j);
        } catch (Exception e) {
            if (e.getCause() instanceof IOException) {
                this.connector.disconnect();
                this.runStatus = false;
                reconnectToCanal();
            } else {
                if (e.getCause().getMessage().contains("should start first")) {
                    return;
                }
                log.error("canal 消费异常 回滚消息", e);
                if (this.connector != null) {
                    this.connector.rollback(j);
                }
            }
        }
    }

    private void reconnectToCanal() {
        if (this.runStatus) {
            return;
        }
        try {
            log.info("canal client [{}] reconnecting", this.destination);
            this.connector.connect();
            subscribe();
            this.runStatus = true;
            log.info("canal client [{}] reconnect success", this.destination);
        } catch (Exception e) {
            log.error("canal client reconnect error: {}", e.getMessage());
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            reconnectToCanal();
        }
    }

    public static Builder builder() {
        return new Builder();
    }
}
