package io.xzxj.canal.core.client;

import com.alibaba.fastjson2.JSONException;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;

/* loaded from: input_file:io/xzxj/canal/core/client/AbstractMqCanalClient.class */
public abstract class AbstractMqCanalClient extends AbstractCanalClient {
    private static final Logger log = LoggerFactory.getLogger(AbstractMqCanalClient.class);
    protected Boolean flatMessage = Boolean.TRUE;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleListening(@NonNull Boolean bool) {
        while (this.runStatus) {
            if (Boolean.TRUE.equals(bool)) {
                defaultFlatMessageHandle();
            } else {
                defaultMessageHandle();
            }
        }
        this.connector.unsubscribe();
        this.connector.disconnect();
    }

    protected final void defaultFlatMessageHandle() {
        CanalMQConnector canalMQConnector = this.connector;
        try {
            List flatListWithoutAck = canalMQConnector.getFlatListWithoutAck(this.timeout, this.unit);
            log.debug("receive message = {}", flatListWithoutAck);
            Iterator it = flatListWithoutAck.iterator();
            while (it.hasNext()) {
                this.messageHandler.handleMessage((FlatMessage) it.next());
            }
            canalMQConnector.ack();
        } catch (JSONException e) {
            log.error("canal 消息json解析异常", e);
        } catch (Exception e2) {
            log.error("canal 消费异常 回滚消息", e2);
            this.connector.rollback();
        }
    }

    protected final void defaultMessageHandle() {
        CanalMQConnector canalMQConnector = this.connector;
        try {
            List listWithoutAck = canalMQConnector.getListWithoutAck(this.timeout, this.unit);
            log.debug("receive message = {}", listWithoutAck);
            Iterator it = listWithoutAck.iterator();
            while (it.hasNext()) {
                this.messageHandler.handleMessage((Message) it.next());
            }
            canalMQConnector.ack();
        } catch (JSONException e) {
            log.error("canal 消息json解析异常", e);
        } catch (Exception e2) {
            log.error("canal 消费异常 回滚消息", e2);
            this.connector.rollback();
        }
    }
}
