package org.apache.eventmesh.client.grpc.consumer;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.util.EventMeshClientUtil;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.grpc.protos.ConsumerServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.protos.Heartbeat;
import org.apache.eventmesh.common.protocol.grpc.protos.HeartbeatServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.protos.RequestHeader;
import org.apache.eventmesh.common.protocol.grpc.protos.Response;
import org.apache.eventmesh.common.protocol.grpc.protos.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/client/grpc/consumer/EventMeshGrpcConsumer.class */
public class EventMeshGrpcConsumer implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(EventMeshGrpcConsumer.class);
    private static final String SDK_STREAM_URL = "grpc_stream";
    private final EventMeshGrpcClientConfig clientConfig;
    private final Map<String, String> subscriptionMap = new ConcurrentHashMap();
    private final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new ThreadFactoryBuilder().setNameFormat("GRPCClientScheduler").setDaemon(true).build());
    private ManagedChannel channel;
    private ConsumerServiceGrpc.ConsumerServiceBlockingStub consumerClient;
    private ConsumerServiceGrpc.ConsumerServiceStub consumerAsyncClient;
    private HeartbeatServiceGrpc.HeartbeatServiceBlockingStub heartbeatClient;
    private ReceiveMsgHook<?> listener;
    private SubStreamHandler<?> subStreamHandler;

    public EventMeshGrpcConsumer(EventMeshGrpcClientConfig eventMeshGrpcClientConfig) {
        this.clientConfig = eventMeshGrpcClientConfig;
    }

    public void init() {
        this.channel = ManagedChannelBuilder.forAddress(this.clientConfig.getServerAddr(), this.clientConfig.getServerPort()).usePlaintext().build();
        this.consumerClient = ConsumerServiceGrpc.newBlockingStub(this.channel);
        this.consumerAsyncClient = ConsumerServiceGrpc.newStub(this.channel);
        this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(this.channel);
        heartBeat();
    }

    public Response subscribe(List<SubscriptionItem> list, String str) {
        logger.info("Create subscription: " + list + ", url: " + str);
        addSubscription(list, str);
        try {
            Response subscribe = this.consumerClient.subscribe(buildSubscription(list, str));
            logger.info("Received response " + subscribe.toString());
            return subscribe;
        } catch (Exception e) {
            logger.error("Error in subscribe. error {}", e.getMessage());
            return null;
        }
    }

    public void subscribe(List<SubscriptionItem> list) {
        logger.info("Create streaming subscription: " + list);
        if (this.listener == null) {
            logger.error("Error in subscriber, no Event Listener is registered.");
            return;
        }
        addSubscription(list, SDK_STREAM_URL);
        Subscription buildSubscription = buildSubscription(list, null);
        synchronized (this) {
            if (this.subStreamHandler == null) {
                this.subStreamHandler = new SubStreamHandler<>(this.consumerAsyncClient, this.clientConfig, this.listener);
                this.subStreamHandler.start();
            }
        }
        this.subStreamHandler.sendSubscription(buildSubscription);
    }

    private void addSubscription(List<SubscriptionItem> list, String str) {
        Iterator<SubscriptionItem> it = list.iterator();
        while (it.hasNext()) {
            this.subscriptionMap.put(it.next().getTopic(), str);
        }
    }

    private void removeSubscription(List<SubscriptionItem> list) {
        Iterator<SubscriptionItem> it = list.iterator();
        while (it.hasNext()) {
            this.subscriptionMap.remove(it.next().getTopic());
        }
    }

    public Response unsubscribe(List<SubscriptionItem> list, String str) {
        logger.info("Removing subscription: " + list + ", url: " + str);
        removeSubscription(list);
        try {
            Response unsubscribe = this.consumerClient.unsubscribe(buildSubscription(list, str));
            logger.info("Received response " + unsubscribe.toString());
            return unsubscribe;
        } catch (Exception e) {
            logger.error("Error in unsubscribe. error {}", e.getMessage());
            return null;
        }
    }

    public Response unsubscribe(List<SubscriptionItem> list) {
        logger.info("Removing subscription stream: " + list);
        removeSubscription(list);
        try {
            Response unsubscribe = this.consumerClient.unsubscribe(buildSubscription(list, null));
            logger.info("Received response " + unsubscribe.toString());
            synchronized (this) {
                if (!this.subscriptionMap.containsValue(SDK_STREAM_URL) && this.subStreamHandler != null) {
                    this.subStreamHandler.close();
                    this.subStreamHandler = null;
                }
            }
            return unsubscribe;
        } catch (Exception e) {
            logger.error("Error in unsubscribe. error {}", e.getMessage());
            return null;
        }
    }

    private Subscription buildSubscription(List<SubscriptionItem> list, String str) {
        Subscription.Builder consumerGroup = Subscription.newBuilder().setHeader(EventMeshClientUtil.buildHeader(this.clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME)).setConsumerGroup(this.clientConfig.getConsumerGroup());
        if (StringUtils.isNotEmpty(str)) {
            consumerGroup.setUrl(str);
        }
        for (SubscriptionItem subscriptionItem : list) {
            consumerGroup.addSubscriptionItems(Subscription.SubscriptionItem.newBuilder().setTopic(subscriptionItem.getTopic()).setMode(SubscriptionMode.CLUSTERING.equals(subscriptionItem.getMode()) ? Subscription.SubscriptionItem.SubscriptionMode.CLUSTERING : SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode()) ? Subscription.SubscriptionItem.SubscriptionMode.BROADCASTING : Subscription.SubscriptionItem.SubscriptionMode.UNRECOGNIZED).setType(SubscriptionType.ASYNC.equals(subscriptionItem.getType()) ? Subscription.SubscriptionItem.SubscriptionType.ASYNC : SubscriptionType.SYNC.equals(subscriptionItem.getType()) ? Subscription.SubscriptionItem.SubscriptionType.SYNC : Subscription.SubscriptionItem.SubscriptionType.UNRECOGNIZED).build());
        }
        return consumerGroup.build();
    }

    public synchronized void registerListener(ReceiveMsgHook<?> receiveMsgHook) {
        if (this.listener == null) {
            this.listener = receiveMsgHook;
        }
    }

    private void heartBeat() {
        RequestHeader buildHeader = EventMeshClientUtil.buildHeader(this.clientConfig, EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME);
        this.scheduler.scheduleAtFixedRate(() -> {
            if (this.subscriptionMap.isEmpty()) {
                return;
            }
            Heartbeat.Builder clientType = Heartbeat.newBuilder().setHeader(buildHeader).setConsumerGroup(this.clientConfig.getConsumerGroup()).setClientType(Heartbeat.ClientType.SUB);
            for (Map.Entry<String, String> entry : this.subscriptionMap.entrySet()) {
                clientType.addHeartbeatItems(Heartbeat.HeartbeatItem.newBuilder().setTopic(entry.getKey()).setUrl(entry.getValue()).build());
            }
            try {
                Response heartbeat = this.heartbeatClient.heartbeat(clientType.build());
                if (logger.isDebugEnabled()) {
                    logger.debug("Grpc Consumer Heartbeat response: {}", heartbeat);
                }
            } catch (Exception e) {
                logger.error("Error in sending out heartbeat. error {}", e.getMessage());
            }
        }, 10000L, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
        logger.info("Grpc Consumer Heartbeat started.");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.subStreamHandler != null) {
            this.subStreamHandler.close();
        }
        this.channel.shutdown();
        this.scheduler.shutdown();
    }
}
