package io.xzxj.canal.core.handler;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import io.xzxj.canal.core.listener.EntryListener;
import io.xzxj.canal.core.util.TableInfoUtil;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/xzxj/canal/core/handler/AbstractMessageHandler.class */
public abstract class AbstractMessageHandler implements IMessageHandler<Message> {
    private static final Logger log = LoggerFactory.getLogger(AbstractMessageHandler.class);
    private final Map<String, EntryListener<?>> entryListenerMap;
    private final RowDataHandler<CanalEntry.RowData> rowDataHandler;

    public AbstractMessageHandler(List<EntryListener<?>> list, RowDataHandler<CanalEntry.RowData> rowDataHandler) {
        this.entryListenerMap = (Map) list.stream().collect(Collectors.toMap(TableInfoUtil::getTableName, entryListener -> {
            return entryListener;
        }, (entryListener2, entryListener3) -> {
            return entryListener2;
        }));
        this.rowDataHandler = rowDataHandler;
    }

    @Override // io.xzxj.canal.core.handler.IMessageHandler
    public void handleMessage(Message message) {
        for (CanalEntry.Entry entry : message.getEntries()) {
            if (CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {
                String schemaName = entry.getHeader().getSchemaName();
                String tableName = entry.getHeader().getTableName();
                EntryListener<?> entryListener = this.entryListenerMap.get(schemaName + "." + tableName);
                if (entryListener == null) {
                    entryListener = this.entryListenerMap.get(tableName);
                }
                try {
                    CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    List<CanalEntry.RowData> rowDatasList = parseFrom.getRowDatasList();
                    CanalEntry.EventType eventType = parseFrom.getEventType();
                    try {
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            if (entryListener != null) {
                                this.rowDataHandler.handleRowData(rowData, entryListener, eventType);
                            }
                        }
                    } catch (Exception e) {
                        log.error("handle row data error", e);
                    }
                } catch (Exception e2) {
                    throw new RuntimeException("parse event has an error , data:" + entry, e2);
                }
            }
        }
    }
}
