package org.apache.eventmesh.openconnect;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.openconnect.api.callback.SendExcepionContext;
import org.apache.eventmesh.openconnect.api.callback.SendMessageCallback;
import org.apache.eventmesh.openconnect.api.callback.SendResult;
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.config.OffsetStorageConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffsetManagement;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetManagementServiceImpl;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReaderImpl;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageWriterImpl;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/openconnect/SourceWorker.class */
public class SourceWorker implements ConnectorWorker {
    private static final Logger log = LoggerFactory.getLogger(SourceWorker.class);
    private final Source source;
    private final SourceConfig config;
    private static final int MAX_RETRY_TIMES = 3;
    public static final String CALLBACK_EXTENSION = "callBackExtension";
    private OffsetStorageWriterImpl offsetStorageWriter;
    private OffsetStorageReaderImpl offsetStorageReader;
    private OffsetManagementService offsetManagementService;
    private RecordOffsetManagement offsetManagement;
    private volatile RecordOffsetManagement.CommittableOffsets committableOffsets;
    private final EventMeshTCPClient<CloudEvent> eventMeshTCPClient;
    private final ExecutorService pollService = Executors.newSingleThreadExecutor();
    private final ExecutorService startService = Executors.newSingleThreadExecutor();
    private volatile boolean isRunning = false;
    private final BlockingQueue<ConnectRecord> queue = new LinkedBlockingQueue(1000);

    public SourceWorker(Source source, SourceConfig sourceConfig) {
        this.source = source;
        this.config = sourceConfig;
        this.eventMeshTCPClient = buildEventMeshPubClient(sourceConfig);
    }

    private EventMeshTCPClient<CloudEvent> buildEventMeshPubClient(SourceConfig sourceConfig) {
        String meshAddress = sourceConfig.getPubSubConfig().getMeshAddress();
        String str = meshAddress.split(":")[0];
        int parseInt = Integer.parseInt(meshAddress.split(":")[1]);
        return EventMeshTCPClientFactory.createEventMeshTCPClient(EventMeshTCPClientConfig.builder().host(str).port(parseInt).userAgent(MessageUtils.generatePubClient(UserAgent.builder().env(sourceConfig.getPubSubConfig().getEnv()).host("localhost").password(sourceConfig.getPubSubConfig().getPassWord()).username(sourceConfig.getPubSubConfig().getUserName()).group(sourceConfig.getPubSubConfig().getGroup()).path("/").port(8362).subsystem(sourceConfig.getPubSubConfig().getAppId()).pid(Integer.parseInt(SystemUtils.getProcessId())).version("2.0").idc(sourceConfig.getPubSubConfig().getIdc()).build())).build(), CloudEvent.class);
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void init() {
        SourceConnectorContext sourceConnectorContext = new SourceConnectorContext();
        sourceConnectorContext.setSourceConfig(this.config);
        sourceConnectorContext.setOffsetStorageReader(this.offsetStorageReader);
        try {
            this.source.init(sourceConnectorContext);
            this.eventMeshTCPClient.init();
            this.offsetManagement = new RecordOffsetManagement();
            this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
            OffsetStorageConfig offsetStorageConfig = this.config.getOffsetStorageConfig();
            this.offsetManagementService = (OffsetManagementService) Optional.ofNullable(offsetStorageConfig).map((v0) -> {
                return v0.getOffsetStorageType();
            }).map(str -> {
                return (OffsetManagementService) EventMeshExtensionFactory.getExtension(OffsetManagementService.class, str);
            }).orElse(new DefaultOffsetManagementServiceImpl());
            this.offsetManagementService.initialize(offsetStorageConfig);
            this.offsetStorageWriter = new OffsetStorageWriterImpl(this.source.name(), this.offsetManagementService);
            this.offsetStorageReader = new OffsetStorageReaderImpl(this.source.name(), this.offsetManagementService);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void start() {
        log.info("source worker starting {}", this.source.name());
        log.info("event mesh address is {}", this.config.getPubSubConfig().getMeshAddress());
        this.offsetManagementService.start();
        this.isRunning = true;
        this.pollService.execute(this::startPollAndSend);
        this.startService.execute(() -> {
            try {
                startConnector();
            } catch (Exception e) {
                log.error("source worker[{}] start fail", this.source.name(), e);
                stop();
            }
        });
    }

    public void startPollAndSend() {
        loop0: while (this.isRunning) {
            ConnectRecord connectRecord = null;
            try {
                connectRecord = this.queue.poll(5L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("poll connect record error", e);
            }
            if (connectRecord != null) {
                CloudEvent convertRecordToEvent = convertRecordToEvent(connectRecord);
                Optional<RecordOffsetManagement.SubmittedPosition> prepareToUpdateRecordOffset = prepareToUpdateRecordOffset(connectRecord);
                Optional map = Optional.ofNullable(connectRecord.getExtensionObj(CALLBACK_EXTENSION)).map(obj -> {
                    return (SendMessageCallback) obj;
                });
                int i = 0;
                while (true) {
                    if (i >= 3) {
                        break;
                    }
                    try {
                        if (this.eventMeshTCPClient.publish(convertRecordToEvent, 3000L).getHeader().getCode() != 0) {
                            throw new EventMeshException("failed to send record.");
                            break loop0;
                        }
                        this.source.commit(connectRecord);
                        prepareToUpdateRecordOffset.ifPresent((v0) -> {
                            v0.ack();
                        });
                        map.ifPresent(sendMessageCallback -> {
                            sendMessageCallback.onSuccess(convertToSendResult(convertRecordToEvent));
                        });
                        break;
                    } catch (Throwable th) {
                        i++;
                        log.error("{} failed to send record to {}, retry times = {}, failed record {}, throw {}", new Object[]{this, convertRecordToEvent.getSubject(), Integer.valueOf(i), connectRecord, th.getMessage()});
                        map.ifPresent(sendMessageCallback2 -> {
                            sendMessageCallback2.onException(convertToExceptionContext(convertRecordToEvent, th));
                        });
                    }
                }
                this.offsetManagement.awaitAllMessages(5000L, TimeUnit.MILLISECONDS);
                updateCommittableOffsets();
                commitOffsets();
            }
        }
    }

    private void startConnector() throws Exception {
        this.source.start();
        while (this.isRunning) {
            List<ConnectRecord> poll = this.source.poll();
            if (!CollectionUtils.isEmpty(poll)) {
                Iterator<ConnectRecord> it = poll.iterator();
                while (it.hasNext()) {
                    this.queue.put(it.next());
                }
            }
        }
    }

    private CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
        CloudEventBuilder v1 = io.cloudevents.core.builder.CloudEventBuilder.v1();
        v1.withId(UUID.randomUUID().toString()).withSubject(this.config.getPubSubConfig().getSubject()).withSource(URI.create("/")).withDataContentType("application/cloudevents+json").withType("cloudevents").withData(((String) Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData()))).getBytes(StandardCharsets.UTF_8)).withExtension("ttl", 10000);
        if (connectRecord.getExtensions() != null) {
            for (String str : connectRecord.getExtensions().keySet()) {
                if (CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(str))) {
                    v1.withExtension(str, connectRecord.getExtension(str));
                }
            }
        }
        return v1.build();
    }

    private SendResult convertToSendResult(CloudEvent cloudEvent) {
        SendResult sendResult = new SendResult();
        sendResult.setMessageId(cloudEvent.getId());
        sendResult.setTopic(cloudEvent.getSubject());
        return sendResult;
    }

    private SendExcepionContext convertToExceptionContext(CloudEvent cloudEvent, Throwable th) {
        SendExcepionContext sendExcepionContext = new SendExcepionContext();
        sendExcepionContext.setTopic(cloudEvent.getId());
        sendExcepionContext.setMessageId(cloudEvent.getId());
        sendExcepionContext.setCause(th);
        return sendExcepionContext;
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void stop() {
        log.info("source worker stopping");
        this.isRunning = false;
        try {
            this.source.stop();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("source destroy error", e);
        }
        log.info("pollService stopping");
        this.pollService.shutdown();
        try {
            this.pollService.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            log.error("awaitTermination error", e2);
        }
        log.info("offsetMgmtService stopping");
        this.offsetManagementService.stop();
        try {
            log.info("eventmesh client closing");
            this.eventMeshTCPClient.close();
        } catch (Exception e3) {
            log.error("event mesh client close error", e3);
        }
        log.info("source worker stopped");
    }

    public Optional<RecordOffsetManagement.SubmittedPosition> prepareToUpdateRecordOffset(ConnectRecord connectRecord) {
        return Optional.of(this.offsetManagement.submitRecord(connectRecord.getPosition()));
    }

    public void updateCommittableOffsets() {
        RecordOffsetManagement.CommittableOffsets committableOffsets = this.offsetManagement.committableOffsets();
        synchronized (this) {
            this.committableOffsets = this.committableOffsets.updatedWith(committableOffsets);
        }
    }

    public boolean commitOffsets() {
        RecordOffsetManagement.CommittableOffsets committableOffsets;
        log.info("Start Committing offsets");
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        synchronized (this) {
            committableOffsets = this.committableOffsets;
            this.committableOffsets = RecordOffsetManagement.CommittableOffsets.EMPTY;
        }
        if (this.committableOffsets.isEmpty()) {
            log.debug("Either no records were produced since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors.");
        } else {
            log.info("{} Committing offsets for {} acknowledged messages", this, Integer.valueOf(this.committableOffsets.numCommittableMessages()));
            if (this.committableOffsets.hasPending()) {
                log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. The source partition with the most pending messages is {}, with {} pending messages", new Object[]{this, Integer.valueOf(this.committableOffsets.numUncommittableMessages()), Integer.valueOf(this.committableOffsets.numDeques()), this.committableOffsets.largestDequePartition(), Integer.valueOf(this.committableOffsets.largestDequeSize())});
            } else {
                log.debug("{} There are currently no pending messages for this offset commit; all messages dispatched to the task's producer since the last commit have been acknowledged", this);
            }
        }
        Map offsets = committableOffsets.offsets();
        OffsetStorageWriterImpl offsetStorageWriterImpl = this.offsetStorageWriter;
        offsetStorageWriterImpl.getClass();
        offsets.forEach(offsetStorageWriterImpl::writeOffset);
        if (!this.offsetStorageWriter.beginFlush()) {
            return true;
        }
        try {
            this.offsetStorageWriter.doFlush().get(Math.max(currentTimeMillis - System.currentTimeMillis(), 0L), TimeUnit.MILLISECONDS);
            return true;
        } catch (InterruptedException e) {
            log.warn("{} Flush of offsets interrupted, cancelling", this);
            this.offsetStorageWriter.cancelFlush();
            return false;
        } catch (ExecutionException e2) {
            log.error("{} Flush of offsets threw an unexpected exception: ", this, e2);
            this.offsetStorageWriter.cancelFlush();
            return false;
        } catch (TimeoutException e3) {
            log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this);
            this.offsetStorageWriter.cancelFlush();
            return false;
        }
    }
}
