package org.apache.eventmesh.openconnect;

import io.cloudevents.CloudEvent;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.openconnect.api.config.SinkConfig;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.CloudEventUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/openconnect/SinkWorker.class */
public class SinkWorker implements ConnectorWorker {
    private static final Logger log = LoggerFactory.getLogger(SinkWorker.class);
    private final Sink sink;
    private final SinkConfig config;
    private final EventMeshTCPClient<CloudEvent> eventMeshTCPClient;

    /* loaded from: input_file:org/apache/eventmesh/openconnect/SinkWorker$EventHandler.class */
    static class EventHandler implements ReceiveMsgHook<CloudEvent> {
        private final Sink sink;

        public EventHandler(Sink sink) {
            this.sink = sink;
        }

        public Optional<CloudEvent> handle(CloudEvent cloudEvent) {
            ConnectRecord convertEventToRecord = CloudEventUtil.convertEventToRecord(cloudEvent);
            ArrayList arrayList = new ArrayList();
            arrayList.add(convertEventToRecord);
            this.sink.put(arrayList);
            return Optional.empty();
        }
    }

    public SinkWorker(Sink sink, SinkConfig sinkConfig) {
        this.sink = sink;
        this.config = sinkConfig;
        this.eventMeshTCPClient = buildEventMeshSubClient(sinkConfig);
    }

    private EventMeshTCPClient<CloudEvent> buildEventMeshSubClient(SinkConfig sinkConfig) {
        String meshAddress = sinkConfig.getPubSubConfig().getMeshAddress();
        String str = meshAddress.split(":")[0];
        int parseInt = Integer.parseInt(meshAddress.split(":")[1]);
        return EventMeshTCPClientFactory.createEventMeshTCPClient(EventMeshTCPClientConfig.builder().host(str).port(parseInt).userAgent(MessageUtils.generateSubClient(UserAgent.builder().env(sinkConfig.getPubSubConfig().getEnv()).host("localhost").password(sinkConfig.getPubSubConfig().getPassWord()).username(sinkConfig.getPubSubConfig().getUserName()).group(sinkConfig.getPubSubConfig().getGroup()).path("/").port(8362).subsystem(sinkConfig.getPubSubConfig().getAppId()).pid(Integer.parseInt(SystemUtils.getProcessId())).version("2.0").idc(sinkConfig.getPubSubConfig().getIdc()).build())).build(), CloudEvent.class);
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void init() {
        SinkConnectorContext sinkConnectorContext = new SinkConnectorContext();
        sinkConnectorContext.setSinkConfig(this.config);
        try {
            this.sink.init(sinkConnectorContext);
            this.eventMeshTCPClient.init();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void start() {
        log.info("sink worker starting {}", this.sink.name());
        log.info("event mesh address is {}", this.config.getPubSubConfig().getMeshAddress());
        try {
            this.sink.start();
            this.eventMeshTCPClient.subscribe(this.config.getPubSubConfig().getSubject(), SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
            this.eventMeshTCPClient.registerSubBusiHandler(new EventHandler(this.sink));
            this.eventMeshTCPClient.listen();
        } catch (Exception e) {
            log.error("sink worker[{}] start fail", this.sink.name(), e);
        }
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void stop() {
        log.info("sink worker stopping");
        try {
            this.eventMeshTCPClient.unsubscribe();
            this.eventMeshTCPClient.close();
        } catch (Exception e) {
            log.error("event mesh client close", e);
        }
        try {
            this.sink.stop();
        } catch (Exception e2) {
            log.error("sink destroy error", e2);
        }
        log.info("source worker stopped");
    }
}
