package org.apache.plc4x.nifi;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.PlcTag;

@CapabilityDescription("Processor able to write data to industrial PLCs using Apache PLC4X")
@WritesAttributes({@WritesAttribute(attribute = Plc4xSinkRecordProcessor.RESULT_ROW_COUNT, description = "Number of rows from the input FlowFile written into the PLC"), @WritesAttribute(attribute = Plc4xSinkRecordProcessor.RESULT_QUERY_EXECUTION_TIME, description = "Time between request and response from the PLC"), @WritesAttribute(attribute = "input.flowfile.uuid", description = "UUID of the input FlowFile")})
@ReadsAttributes({@ReadsAttribute(attribute = "value", description = "some value")})
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"plc4x", "put", "sink", "record"})
@SeeAlso({Plc4xSourceRecordProcessor.class, Plc4xListenRecordProcessor.class})
/* loaded from: input_file:org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.class */
public class Plc4xSinkRecordProcessor extends BasePlc4xProcessor {
    public static final String RESULT_ROW_COUNT = "plc4x.write.row.count";
    public static final String RESULT_QUERY_EXECUTION_TIME = "plc4x.write.query.executiontime";
    public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
    public static final String EXCEPTION = "plc4x.write.exception";
    public static final PropertyDescriptor PLC_RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading record from a FlowFile. The Record Reader may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the reader, and will be supplied by the same logic used to infer the schema from the column types.").identifiesControllerService(RecordReaderFactory.class).required(true).build();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.plc4x.nifi.BasePlc4xProcessor
    public void init(ProcessorInitializationContext processorInitializationContext) {
        super.init(processorInitializationContext);
        this.relationships = Collections.unmodifiableSet(new HashSet(super.getRelationships()));
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(PLC_RECORD_READER_FACTORY);
        this.properties = Collections.unmodifiableList(arrayList);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = getLogger();
        AtomicLong atomicLong = new AtomicLong(0L);
        StopWatch stopWatch = new StopWatch(true);
        try {
            processSession.read(flowFile, inputStream -> {
                Throwable th = null;
                try {
                    try {
                        RecordReader createRecordReader = processContext.getProperty(PLC_RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, inputStream, logger);
                        while (true) {
                            try {
                                Record nextRecord = createRecordReader.nextRecord();
                                if (nextRecord == null) {
                                    break;
                                }
                                AtomicLong atomicLong2 = new AtomicLong(0L);
                                Map<String, String> plcAddressMap = getPlcAddressMap(processContext, flowFile);
                                Map<String, PlcTag> retrieveTags = getSchemaCache().retrieveTags(plcAddressMap);
                                Throwable th2 = null;
                                try {
                                    try {
                                        PlcConnection connection = getConnectionManager().getConnection(getConnectionString(processContext, flowFile));
                                        try {
                                            PlcWriteRequest writeRequest = getWriteRequest(logger, plcAddressMap, retrieveTags, nextRecord.toMap(), connection, atomicLong2);
                                            evaluateWriteResponse(logger, nextRecord.toMap(), (PlcWriteResponse) writeRequest.execute().get(getTimeout(processContext, flowFile).longValue(), TimeUnit.MILLISECONDS));
                                            if (connection != null) {
                                                connection.close();
                                            }
                                            if (retrieveTags == null) {
                                                if (this.debugEnabled) {
                                                    logger.debug("Adding PlcTypes resolution into cache with key: " + plcAddressMap);
                                                }
                                                getSchemaCache().addSchema(plcAddressMap, writeRequest.getTagNames(), writeRequest.getTags(), null);
                                            }
                                            atomicLong.getAndAdd(atomicLong2.get());
                                        } catch (Throwable th3) {
                                            th2 = th3;
                                            if (connection != null) {
                                                connection.close();
                                            }
                                            throw th2;
                                        }
                                    } catch (Throwable th4) {
                                        if (th2 == null) {
                                            th2 = th4;
                                        } else if (th2 != th4) {
                                            th2.addSuppressed(th4);
                                        }
                                        throw th2;
                                    }
                                } catch (TimeoutException e) {
                                    logger.error("Timeout writting the data to the PLC", e);
                                    getConnectionManager().removeCachedConnection(getConnectionString(processContext, flowFile));
                                    throw new ProcessException(e);
                                } catch (Exception e2) {
                                    logger.error("Exception writting the data to the PLC", e2);
                                    if (!(e2 instanceof ProcessException)) {
                                        throw new ProcessException(e2);
                                    }
                                    throw e2;
                                } catch (PlcConnectionException e3) {
                                    logger.error("Error getting the PLC connection", e3);
                                    throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e3);
                                }
                            } catch (Throwable th5) {
                                if (createRecordReader != null) {
                                    createRecordReader.close();
                                }
                                throw th5;
                            }
                        }
                        if (createRecordReader != null) {
                            createRecordReader.close();
                        }
                    } catch (Throwable th6) {
                        if (0 == 0) {
                            th = th6;
                        } else if (null != th6) {
                            th.addSuppressed(th6);
                        }
                        throw th;
                    }
                } catch (Exception e4) {
                    if (!(e4 instanceof ProcessException)) {
                        throw new ProcessException(e4);
                    }
                }
            });
            long elapsed = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
            HashMap hashMap = new HashMap();
            hashMap.put(RESULT_ROW_COUNT, String.valueOf(atomicLong.get()));
            hashMap.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(elapsed));
            hashMap.put("input.flowfile.uuid", flowFile.getAttribute(CoreAttributes.UUID.key()));
            processSession.putAllAttributes(flowFile, hashMap);
            processSession.transfer(flowFile, REL_SUCCESS);
            logger.info("Writing {} fields from {} records; transferring to 'success'", new Object[]{Long.valueOf(atomicLong.get()), flowFile});
            if (processContext.hasIncomingConnection()) {
                processSession.getProvenanceReporter().fetch(flowFile, "Writted " + atomicLong.get() + " rows", elapsed);
            } else {
                processSession.getProvenanceReporter().receive(flowFile, "Writted " + atomicLong.get() + " rows", elapsed);
            }
        } catch (ProcessException e) {
            logger.error("Exception writing the data to the PLC", e);
            processSession.putAttribute(flowFile, "plc4x.write.exception", e.getLocalizedMessage());
            processSession.transfer(flowFile, REL_FAILURE);
            processSession.commitAsync();
            throw e;
        }
    }
}
