package io.debezium.relational;

import io.debezium.data.Envelope;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.util.Clock;
import java.util.Objects;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-0.10.0.Final.jar:io/debezium/relational/RelationalChangeRecordEmitter.class */
public abstract class RelationalChangeRecordEmitter implements ChangeRecordEmitter {
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private final OffsetContext offsetContext;
    private final Clock clock;

    public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
        this.offsetContext = offsetContext;
        this.clock = clock;
    }

    @Override // io.debezium.pipeline.spi.ChangeRecordEmitter
    public void emitChangeRecords(DataCollectionSchema dataCollectionSchema, ChangeRecordEmitter.Receiver receiver) throws InterruptedException {
        TableSchema tableSchema = (TableSchema) dataCollectionSchema;
        Envelope.Operation operation = getOperation();
        switch (operation) {
            case CREATE:
                emitCreateRecord(receiver, tableSchema);
                return;
            case READ:
                emitReadRecord(receiver, tableSchema);
                return;
            case UPDATE:
                emitUpdateRecord(receiver, tableSchema);
                return;
            case DELETE:
                emitDeleteRecord(receiver, tableSchema);
                return;
            default:
                throw new IllegalArgumentException("Unsupported operation: " + operation);
        }
    }

    @Override // io.debezium.pipeline.spi.ChangeRecordEmitter
    public OffsetContext getOffset() {
        return this.offsetContext;
    }

    private void emitCreateRecord(ChangeRecordEmitter.Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Object[] newColumnValues = getNewColumnValues();
        Object keyFromColumnData = tableSchema.keyFromColumnData(newColumnValues);
        Struct create = tableSchema.getEnvelopeSchema().create(tableSchema.valueFromColumnData(newColumnValues), this.offsetContext.getSourceInfo(), Long.valueOf(this.clock.currentTimeInMillis()));
        if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
            this.logger.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, this.offsetContext.getSourceInfo());
        } else {
            receiver.changeRecord(tableSchema, Envelope.Operation.CREATE, keyFromColumnData, create, this.offsetContext);
        }
    }

    private void emitReadRecord(ChangeRecordEmitter.Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Object[] newColumnValues = getNewColumnValues();
        receiver.changeRecord(tableSchema, Envelope.Operation.READ, tableSchema.keyFromColumnData(newColumnValues), tableSchema.getEnvelopeSchema().read(tableSchema.valueFromColumnData(newColumnValues), this.offsetContext.getSourceInfo(), Long.valueOf(this.clock.currentTimeInMillis())), this.offsetContext);
    }

    private void emitUpdateRecord(ChangeRecordEmitter.Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Object[] oldColumnValues = getOldColumnValues();
        Object[] newColumnValues = getNewColumnValues();
        Object keyFromColumnData = tableSchema.keyFromColumnData(oldColumnValues);
        Object keyFromColumnData2 = tableSchema.keyFromColumnData(newColumnValues);
        Struct valueFromColumnData = tableSchema.valueFromColumnData(newColumnValues);
        Struct valueFromColumnData2 = tableSchema.valueFromColumnData(oldColumnValues);
        if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
            this.logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, this.offsetContext.getSourceInfo());
            return;
        }
        if (keyFromColumnData == null || Objects.equals(keyFromColumnData, keyFromColumnData2)) {
            receiver.changeRecord(tableSchema, Envelope.Operation.UPDATE, keyFromColumnData2, tableSchema.getEnvelopeSchema().update(valueFromColumnData2, valueFromColumnData, this.offsetContext.getSourceInfo(), Long.valueOf(this.clock.currentTimeInMillis())), this.offsetContext);
        } else {
            receiver.changeRecord(tableSchema, Envelope.Operation.DELETE, keyFromColumnData, tableSchema.getEnvelopeSchema().delete(valueFromColumnData2, this.offsetContext.getSourceInfo(), Long.valueOf(this.clock.currentTimeInMillis())), this.offsetContext);
            receiver.changeRecord(tableSchema, Envelope.Operation.CREATE, keyFromColumnData2, tableSchema.getEnvelopeSchema().create(valueFromColumnData, this.offsetContext.getSourceInfo(), Long.valueOf(this.clock.currentTimeInMillis())), this.offsetContext);
        }
    }

    private void emitDeleteRecord(ChangeRecordEmitter.Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Object[] oldColumnValues = getOldColumnValues();
        Object keyFromColumnData = tableSchema.keyFromColumnData(oldColumnValues);
        Struct valueFromColumnData = tableSchema.valueFromColumnData(oldColumnValues);
        if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
            this.logger.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, this.offsetContext.getSourceInfo());
        } else {
            receiver.changeRecord(tableSchema, Envelope.Operation.DELETE, keyFromColumnData, tableSchema.getEnvelopeSchema().delete(valueFromColumnData, this.offsetContext.getSourceInfo(), Long.valueOf(this.clock.currentTimeInMillis())), this.offsetContext);
        }
    }

    protected abstract Envelope.Operation getOperation();

    protected abstract Object[] getOldColumnValues();

    protected abstract Object[] getNewColumnValues();

    protected boolean skipEmptyMessages() {
        return false;
    }
}
