package io.xzxj.canal.core.client;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import io.xzxj.canal.core.handler.IMessageHandler;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/xzxj/canal/core/client/AbstractCanalClient.class */
public abstract class AbstractCanalClient implements ICanalClient {
    private static final Logger log = LoggerFactory.getLogger(AbstractCanalClient.class);
    protected volatile boolean runStatus;
    private Thread thread;
    protected CanalConnector connector;
    protected IMessageHandler messageHandler;
    protected String filter = "";
    protected Integer batchSize = 1;
    protected Long timeout = 1L;
    protected TimeUnit unit = TimeUnit.SECONDS;

    @Override // io.xzxj.canal.core.client.ICanalClient
    public void init() {
        log.info("canal client init");
        connectCanal();
        this.thread = new Thread(this::handleListening);
        this.thread.setName("canal-client-thread");
        this.runStatus = true;
        this.thread.start();
    }

    private void connectCanal() {
        try {
            log.info("canal client connecting");
            this.connector.connect();
            subscribe();
            log.info("canal client connect success");
        } catch (CanalClientException e) {
            log.error("canal client connect error: {}", e.getMessage(), e);
            destroy();
        }
    }

    public void subscribe() {
        this.connector.subscribe(this.filter);
    }

    @Override // io.xzxj.canal.core.client.ICanalClient
    public void destroy() {
        this.connector.unsubscribe();
        log.info("canal client destroy");
        if (this.thread != null) {
            this.thread.interrupt();
        }
        this.runStatus = false;
    }
}
