package org.apache.iotdb.db.subscription.receiver;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPayloadExceedException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
import org.apache.iotdb.rpc.subscription.payload.poll.PollFilePayload;
import org.apache.iotdb.rpc.subscription.payload.poll.PollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.PollTabletsPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequest;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequestType;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHeartbeatReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribePollReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeRequestType;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeRequestVersion;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeCloseResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeCommitResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseType;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseVersion;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.subscription.util.PollTimer;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.class */
public class SubscriptionReceiverV1 implements SubscriptionReceiver {
    private static final double POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD = 0.9d;
    private final ThreadLocal<ConsumerConfig> consumerConfigThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<PollTimer> pollTimerThreadLocal = new ThreadLocal<>();
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionReceiverV1.class);
    private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
    private static final TPipeSubscribeResp SUBSCRIPTION_MISSING_CUSTOMER_RESP = new TPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_MISSING_CUSTOMER, "Missing consumer config, please handshake first."), PipeSubscribeResponseVersion.VERSION_1.getVersion(), PipeSubscribeResponseType.ACK.getType());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.db.subscription.receiver.SubscriptionReceiverV1$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollRequestType = new int[SubscriptionPollRequestType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollRequestType[SubscriptionPollRequestType.POLL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollRequestType[SubscriptionPollRequestType.POLL_FILE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollRequestType[SubscriptionPollRequestType.POLL_TABLETS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType = new int[PipeSubscribeRequestType.values().length];
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.HANDSHAKE.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.HEARTBEAT.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.SUBSCRIBE.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.UNSUBSCRIBE.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.POLL.ordinal()] = 5;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.COMMIT.ordinal()] = 6;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.CLOSE.ordinal()] = 7;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    @Override // org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver
    public final TPipeSubscribeResp handle(TPipeSubscribeReq tPipeSubscribeReq) {
        short type = tPipeSubscribeReq.getType();
        if (PipeSubscribeRequestType.isValidatedRequestType(type)) {
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$rpc$subscription$payload$request$PipeSubscribeRequestType[PipeSubscribeRequestType.valueOf(type).ordinal()]) {
                case 1:
                    return handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
                case 2:
                    return handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
                case 3:
                    return handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
                case 4:
                    return handlePipeSubscribeUnsubscribe(PipeSubscribeUnsubscribeReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
                case 5:
                    return handlePipeSubscribePoll(PipeSubscribePollReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
                case 6:
                    return handlePipeSubscribeCommit(PipeSubscribeCommitReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
                case 7:
                    return handlePipeSubscribeClose(PipeSubscribeCloseReq.fromTPipeSubscribeReq(tPipeSubscribeReq));
            }
        }
        TSStatus status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_TYPE_ERROR, String.format("Unknown PipeSubscribeRequestType %s.", Short.valueOf(type)));
        LOGGER.warn("Subscription: Unknown PipeSubscribeRequestType, response status = {}.", status);
        return new TPipeSubscribeResp(status, PipeSubscribeResponseVersion.VERSION_1.getVersion(), PipeSubscribeResponseType.ACK.getType());
    }

    @Override // org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver
    public PipeSubscribeRequestVersion getVersion() {
        return PipeSubscribeRequestVersion.VERSION_1;
    }

    @Override // org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver
    public void handleExit() {
        if (Objects.nonNull(this.consumerConfigThreadLocal.get())) {
            LOGGER.info("Subscription: remove consumer config {} when handling exit", this.consumerConfigThreadLocal.get());
            this.consumerConfigThreadLocal.remove();
        }
    }

    @Override // org.apache.iotdb.db.subscription.receiver.SubscriptionReceiver
    public long remainingMs() {
        PollTimer pollTimer = this.pollTimerThreadLocal.get();
        return Objects.isNull(pollTimer) ? SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs() : pollTimer.remainingMs();
    }

    private TPipeSubscribeResp handlePipeSubscribeHandshake(PipeSubscribeHandshakeReq pipeSubscribeHandshakeReq) {
        try {
            return handlePipeSubscribeHandshakeInternal(pipeSubscribeHandshakeReq);
        } catch (Exception e) {
            LOGGER.warn("Exception occurred when handshaking with request {}", pipeSubscribeHandshakeReq, e);
            return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HANDSHAKE_ERROR, String.format("Subscription: something unexpected happened when handshaking with request %s: %s", pipeSubscribeHandshakeReq, e)), -1, "", "");
        }
    }

    private TPipeSubscribeResp handlePipeSubscribeHandshakeInternal(PipeSubscribeHandshakeReq pipeSubscribeHandshakeReq) throws SubscriptionException {
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        ConsumerConfig consumerConfig2 = pipeSubscribeHandshakeReq.getConsumerConfig();
        String consumerId = consumerConfig2.getConsumerId();
        if (Objects.isNull(consumerId)) {
            consumerId = UUID.randomUUID().toString();
            consumerConfig2.setConsumerId(consumerId);
        }
        String consumerGroupId = consumerConfig2.getConsumerGroupId();
        if (Objects.isNull(consumerGroupId)) {
            consumerGroupId = UUID.randomUUID().toString();
            consumerConfig2.setConsumerGroupId(consumerGroupId);
        }
        if (Objects.isNull(consumerConfig)) {
            this.consumerConfigThreadLocal.set(consumerConfig2);
        } else if (!consumerConfig.equals(consumerConfig2)) {
            LOGGER.warn("Subscription: Detect stale consumer config when handshaking, stale consumer config {} will be cleared, consumer config will set to the incoming consumer config {}.", consumerConfig, consumerConfig2);
            dropConsumer(consumerConfig);
            this.consumerConfigThreadLocal.set(consumerConfig2);
        }
        if (SubscriptionAgent.consumer().isConsumerExisted(consumerGroupId, consumerId)) {
            LOGGER.info("Subscription: The consumer {} has already existed when handshaking, skip creating consumer.", consumerConfig2);
        } else {
            createConsumer(consumerConfig2);
        }
        int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
        LOGGER.info("Subscription: consumer {} handshake successfully, data node id: {}", pipeSubscribeHandshakeReq.getConsumerConfig(), Integer.valueOf(dataNodeId));
        return PipeSubscribeHandshakeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, dataNodeId, consumerId, consumerGroupId);
    }

    private TPipeSubscribeResp handlePipeSubscribeHeartbeat(PipeSubscribeHeartbeatReq pipeSubscribeHeartbeatReq) {
        try {
            return handlePipeSubscribeHeartbeatInternal(pipeSubscribeHeartbeatReq);
        } catch (Exception e) {
            LOGGER.warn("Exception occurred when heartbeat with request {}", pipeSubscribeHeartbeatReq, e);
            return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_HEARTBEAT_ERROR, String.format("Subscription: something unexpected happened when heartbeat with request %s: %s", pipeSubscribeHeartbeatReq, e)));
        }
    }

    private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal(PipeSubscribeHeartbeatReq pipeSubscribeHeartbeatReq) throws IOException {
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        if (Objects.isNull(consumerConfig)) {
            LOGGER.warn("Subscription: missing consumer config when handling PipeSubscribeHeartbeatReq: {}", pipeSubscribeHeartbeatReq);
            return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
        }
        LOGGER.info("Subscription: consumer {} heartbeat successfully", consumerConfig);
        return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, SubscriptionAgent.topic().getTopicConfigs(SubscriptionAgent.consumer().getTopicNamesSubscribedByConsumer(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())));
    }

    private TPipeSubscribeResp handlePipeSubscribeSubscribe(PipeSubscribeSubscribeReq pipeSubscribeSubscribeReq) {
        try {
            return handlePipeSubscribeSubscribeInternal(pipeSubscribeSubscribeReq);
        } catch (SubscriptionPipeTimeoutException e) {
            return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR, e.getMessage()));
        } catch (Exception e2) {
            LOGGER.warn("Exception occurred when subscribing with request {}", pipeSubscribeSubscribeReq, e2);
            return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_SUBSCRIBE_ERROR, String.format("Subscription: something unexpected happened when subscribing with request %s: %s", pipeSubscribeSubscribeReq, e2)));
        }
    }

    private TPipeSubscribeResp handlePipeSubscribeSubscribeInternal(PipeSubscribeSubscribeReq pipeSubscribeSubscribeReq) throws SubscriptionException, IOException {
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        if (Objects.isNull(consumerConfig)) {
            LOGGER.warn("Subscription: missing consumer config when handling PipeSubscribeSubscribeReq: {}", pipeSubscribeSubscribeReq);
            return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
        }
        Set<String> topicNames = pipeSubscribeSubscribeReq.getTopicNames();
        subscribe(consumerConfig, topicNames);
        LOGGER.info("Subscription: consumer {} subscribe {} successfully", consumerConfig, topicNames);
        return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, SubscriptionAgent.topic().getTopicConfigs(SubscriptionAgent.consumer().getTopicNamesSubscribedByConsumer(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())));
    }

    private TPipeSubscribeResp handlePipeSubscribeUnsubscribe(PipeSubscribeUnsubscribeReq pipeSubscribeUnsubscribeReq) {
        try {
            return handlePipeSubscribeUnsubscribeInternal(pipeSubscribeUnsubscribeReq);
        } catch (SubscriptionPipeTimeoutException e) {
            return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR, e.getMessage()));
        } catch (Exception e2) {
            LOGGER.warn("Exception occurred when unsubscribing with request {}", pipeSubscribeUnsubscribeReq, e2);
            return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_UNSUBSCRIBE_ERROR, String.format("Subscription: something unexpected happened when unsubscribing with request %s: %s", pipeSubscribeUnsubscribeReq, e2)));
        }
    }

    private TPipeSubscribeResp handlePipeSubscribeUnsubscribeInternal(PipeSubscribeUnsubscribeReq pipeSubscribeUnsubscribeReq) throws IOException {
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        if (Objects.isNull(consumerConfig)) {
            LOGGER.warn("Subscription: missing consumer config when handling PipeSubscribeUnsubscribeReq: {}", pipeSubscribeUnsubscribeReq);
            return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
        }
        Set<String> topicNames = pipeSubscribeUnsubscribeReq.getTopicNames();
        unsubscribe(consumerConfig, topicNames);
        LOGGER.info("Subscription: consumer {} unsubscribe {} successfully", consumerConfig, topicNames);
        return PipeSubscribeUnsubscribeResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, SubscriptionAgent.topic().getTopicConfigs(SubscriptionAgent.consumer().getTopicNamesSubscribedByConsumer(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())));
    }

    private TPipeSubscribeResp handlePipeSubscribePoll(PipeSubscribePollReq pipeSubscribePollReq) {
        try {
            try {
                TPipeSubscribeResp handlePipeSubscribePollInternal = handlePipeSubscribePollInternal(pipeSubscribePollReq);
                this.pollTimerThreadLocal.remove();
                return handlePipeSubscribePollInternal;
            } catch (Exception e) {
                LOGGER.warn("Exception occurred when polling with request {}", pipeSubscribePollReq, e);
                PipeSubscribePollResp tPipeSubscribeResp = PipeSubscribePollResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, String.format("Subscription: something unexpected happened when polling with request %s: %s", pipeSubscribePollReq, e)), Collections.emptyList());
                this.pollTimerThreadLocal.remove();
                return tPipeSubscribeResp;
            }
        } catch (Throwable th) {
            this.pollTimerThreadLocal.remove();
            throw th;
        }
    }

    private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq pipeSubscribePollReq) throws SubscriptionException {
        List<SubscriptionEvent> list;
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        if (Objects.isNull(consumerConfig)) {
            LOGGER.warn("Subscription: missing consumer config when handling PipeSubscribePollReq: {}", pipeSubscribePollReq);
            return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
        }
        SubscriptionPollRequest request = pipeSubscribePollReq.getRequest();
        this.pollTimerThreadLocal.set(new PollTimer(System.currentTimeMillis(), request.getTimeoutMs()));
        long maxBytes = (long) (request.getMaxBytes() * POLL_PAYLOAD_SIZE_EXCEED_THRESHOLD);
        short requestType = request.getRequestType();
        if (SubscriptionPollRequestType.isValidatedRequestType(requestType)) {
            switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$rpc$subscription$payload$poll$SubscriptionPollRequestType[SubscriptionPollRequestType.valueOf(requestType).ordinal()]) {
                case 1:
                    list = handlePipeSubscribePollRequest(consumerConfig, (PollPayload) request.getPayload(), maxBytes);
                    break;
                case 2:
                    list = handlePipeSubscribePollTsFileRequest(consumerConfig, (PollFilePayload) request.getPayload());
                    break;
                case 3:
                    list = handlePipeSubscribePollTabletsRequest(consumerConfig, (PollTabletsPayload) request.getPayload());
                    break;
                default:
                    list = null;
                    break;
            }
        } else {
            list = null;
        }
        if (Objects.isNull(list)) {
            throw new SubscriptionException(String.format("unexpected request type: %s", Short.valueOf(requestType)));
        }
        AtomicLong atomicLong = new AtomicLong();
        return PipeSubscribePollResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, (List) list.stream().map(subscriptionEvent -> {
            SubscriptionCommitContext commitContext = subscriptionEvent.getCommitContext();
            SubscriptionPollResponse currentResponse = subscriptionEvent.getCurrentResponse();
            if (Objects.isNull(currentResponse)) {
                LOGGER.warn("Subscription: consumer {} poll null response for event {} with request: {}", new Object[]{consumerConfig, subscriptionEvent, pipeSubscribePollReq.getRequest()});
                SubscriptionAgent.broker().commit(consumerConfig, Collections.singletonList(commitContext), true);
                return null;
            }
            try {
                ByteBuffer currentResponseByteBuffer = subscriptionEvent.getCurrentResponseByteBuffer();
                long currentResponseSize = subscriptionEvent.getCurrentResponseSize();
                if (atomicLong.get() + currentResponseSize > maxBytes) {
                    throw new SubscriptionPayloadExceedException(String.format("payload size %s byte(s) will exceed the threshold %s byte(s)", Long.valueOf(atomicLong.get() + currentResponseSize), Long.valueOf(maxBytes)));
                }
                atomicLong.getAndAdd(currentResponseSize);
                SubscriptionPrefetchingQueueMetrics.getInstance().mark(SubscriptionPrefetchingQueue.generatePrefetchingQueueId(commitContext.getConsumerGroupId(), commitContext.getTopicName()), currentResponseSize);
                subscriptionEvent.invalidateCurrentResponseByteBuffer();
                LOGGER.info("Subscription: consumer {} poll {} successfully with request: {}", new Object[]{consumerConfig, currentResponse, pipeSubscribePollReq.getRequest()});
                return currentResponseByteBuffer;
            } catch (Exception e) {
                if (e instanceof SubscriptionPayloadExceedException) {
                    LOGGER.error("Subscription: consumer {} poll excessive payload {} with request: {}, something unexpected happened with parameter configuration or payload control...", new Object[]{consumerConfig, currentResponse, pipeSubscribePollReq.getRequest(), e});
                } else {
                    LOGGER.warn("Subscription: consumer {} poll {} failed with request: {}", new Object[]{consumerConfig, currentResponse, pipeSubscribePollReq.getRequest(), e});
                }
                SubscriptionAgent.broker().commit(consumerConfig, Collections.singletonList(commitContext), true);
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList()));
    }

    private List<SubscriptionEvent> handlePipeSubscribePollRequest(ConsumerConfig consumerConfig, PollPayload pollPayload, long j) {
        Set<String> topicNamesSubscribedByConsumer = SubscriptionAgent.consumer().getTopicNamesSubscribedByConsumer(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId());
        Set<String> topicNames = pollPayload.getTopicNames();
        if (topicNames.isEmpty()) {
            return Collections.emptyList();
        }
        topicNames.removeIf(str -> {
            return !topicNamesSubscribedByConsumer.contains(str);
        });
        return SubscriptionAgent.broker().poll(consumerConfig, topicNames, j);
    }

    private List<SubscriptionEvent> handlePipeSubscribePollTsFileRequest(ConsumerConfig consumerConfig, PollFilePayload pollFilePayload) {
        return SubscriptionAgent.broker().pollTsFile(consumerConfig, pollFilePayload.getCommitContext(), pollFilePayload.getWritingOffset());
    }

    private List<SubscriptionEvent> handlePipeSubscribePollTabletsRequest(ConsumerConfig consumerConfig, PollTabletsPayload pollTabletsPayload) {
        return SubscriptionAgent.broker().pollTablets(consumerConfig, pollTabletsPayload.getCommitContext(), pollTabletsPayload.getOffset());
    }

    private TPipeSubscribeResp handlePipeSubscribeCommit(PipeSubscribeCommitReq pipeSubscribeCommitReq) {
        try {
            return handlePipeSubscribeCommitInternal(pipeSubscribeCommitReq);
        } catch (Exception e) {
            LOGGER.warn("Exception occurred when committing with request {}", pipeSubscribeCommitReq, e);
            return PipeSubscribeCommitResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_COMMIT_ERROR, String.format("Subscription: something unexpected happened when committing with request %s: %s", pipeSubscribeCommitReq, e)));
        }
    }

    private TPipeSubscribeResp handlePipeSubscribeCommitInternal(PipeSubscribeCommitReq pipeSubscribeCommitReq) {
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        if (Objects.isNull(consumerConfig)) {
            LOGGER.warn("Subscription: missing consumer config when handling PipeSubscribeCommitReq: {}", pipeSubscribeCommitReq);
            return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
        }
        List<SubscriptionCommitContext> commitContexts = pipeSubscribeCommitReq.getCommitContexts();
        boolean isNack = pipeSubscribeCommitReq.isNack();
        List<SubscriptionCommitContext> commit = SubscriptionAgent.broker().commit(consumerConfig, commitContexts, isNack);
        if (Objects.equals(Integer.valueOf(commit.size()), Integer.valueOf(commitContexts.size()))) {
            LOGGER.info("Subscription: consumer {} commit (nack: {}) successfully, commit contexts: {}", new Object[]{consumerConfig, Boolean.valueOf(isNack), commitContexts});
        } else {
            LOGGER.warn("Subscription: consumer {} commit (nack: {}) partially successful, commit contexts: {}, successful commit contexts: {}", new Object[]{consumerConfig, Boolean.valueOf(isNack), commitContexts, commit});
        }
        return PipeSubscribeCommitResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
    }

    private TPipeSubscribeResp handlePipeSubscribeClose(PipeSubscribeCloseReq pipeSubscribeCloseReq) {
        try {
            return handlePipeSubscribeCloseInternal(pipeSubscribeCloseReq);
        } catch (Exception e) {
            LOGGER.warn("Exception occurred when closing with request {}", pipeSubscribeCloseReq, e);
            return PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_CLOSE_ERROR, String.format("Subscription: something unexpected happened when closing with request %s: %s", pipeSubscribeCloseReq, e)));
        }
    }

    private TPipeSubscribeResp handlePipeSubscribeCloseInternal(PipeSubscribeCloseReq pipeSubscribeCloseReq) {
        ConsumerConfig consumerConfig = this.consumerConfigThreadLocal.get();
        if (Objects.isNull(consumerConfig)) {
            LOGGER.warn("Subscription: missing consumer config when handling PipeSubscribeCloseReq: {}", pipeSubscribeCloseReq);
            return SUBSCRIPTION_MISSING_CUSTOMER_RESP;
        }
        closeConsumer(consumerConfig);
        return PipeSubscribeCloseResp.toTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS);
    }

    private void closeConsumer(ConsumerConfig consumerConfig) {
        Set<String> topicNamesSubscribedByConsumer = SubscriptionAgent.consumer().getTopicNamesSubscribedByConsumer(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId());
        if (!topicNamesSubscribedByConsumer.isEmpty()) {
            LOGGER.info("Subscription: unsubscribe all subscribed topics {} before close consumer {}", topicNamesSubscribedByConsumer, consumerConfig);
            try {
                unsubscribe(consumerConfig, topicNamesSubscribedByConsumer);
            } catch (SubscriptionPipeTimeoutException e) {
                LOGGER.warn(e.getMessage());
            }
        }
        if (SubscriptionAgent.consumer().isConsumerExisted(consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())) {
            dropConsumer(consumerConfig);
        } else {
            LOGGER.info("Subscription: The consumer {} does not existed when closing, skip dropping consumer.", consumerConfig);
        }
        LOGGER.info("Subscription: consumer {} close successfully", consumerConfig);
    }

    private void createConsumer(ConsumerConfig consumerConfig) throws SubscriptionException {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus createConsumer = configNodeClient.createConsumer(new TCreateConsumerReq().setConsumerId(consumerConfig.getConsumerId()).setConsumerGroupId(consumerConfig.getConsumerGroupId()).setConsumerAttributes(consumerConfig.getAttribute()));
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != createConsumer.getCode()) {
                    LOGGER.warn("Unexpected status code {} when creating consumer {} in config node", createConsumer, consumerConfig);
                    throw new SubscriptionException(String.format("Subscription: Failed to create consumer %s in config node, status is %s.", consumerConfig, createConsumer));
                }
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
            } finally {
            }
        } catch (ClientManagerException | TException e) {
            LOGGER.warn("Exception occurred when creating consumer {} in config node", consumerConfig, e);
            throw new SubscriptionException(String.format("Subscription: Failed to create consumer %s in config node, exception is %s.", consumerConfig, e));
        }
    }

    private void dropConsumer(ConsumerConfig consumerConfig) throws SubscriptionException {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus closeConsumer = configNodeClient.closeConsumer(new TCloseConsumerReq().setConsumerId(consumerConfig.getConsumerId()).setConsumerGroupId(consumerConfig.getConsumerGroupId()));
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != closeConsumer.getCode()) {
                    LOGGER.warn("Unexpected status code {} when closing consumer {} in config node", closeConsumer, consumerConfig);
                    throw new SubscriptionException(String.format("Subscription: Failed to close consumer %s in config node, status is %s.", consumerConfig, closeConsumer));
                }
                if (configNodeClient != null) {
                    configNodeClient.close();
                }
            } finally {
            }
        } catch (ClientManagerException | TException e) {
            LOGGER.warn("Exception occurred when closing consumer {} in config node", consumerConfig, e);
            throw new SubscriptionException(String.format("Subscription: Failed to close consumer %s in config node, exception is %s.", consumerConfig, e));
        }
    }

    private void subscribe(ConsumerConfig consumerConfig, Set<String> set) throws SubscriptionException {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus createSubscription = configNodeClient.createSubscription(new TSubscribeReq().setConsumerId(consumerConfig.getConsumerId()).setConsumerGroupId(consumerConfig.getConsumerGroupId()).setTopicNames(set));
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == createSubscription.getCode()) {
                    if (configNodeClient != null) {
                        configNodeClient.close();
                    }
                } else {
                    LOGGER.warn("Unexpected status code {} when subscribing topics {} for consumer {} in config node", new Object[]{createSubscription, set, consumerConfig});
                    String format = String.format("Subscription: Failed to subscribe topics %s for consumer %s in config node, status is %s.", set, consumerConfig, createSubscription);
                    if (TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode() != createSubscription.getCode()) {
                        throw new SubscriptionException(format);
                    }
                    throw new SubscriptionPipeTimeoutException(format);
                }
            } finally {
            }
        } catch (ClientManagerException | TException e) {
            LOGGER.warn("Exception occurred when subscribing topics {} for consumer {} in config node", new Object[]{set, consumerConfig, e});
            throw new SubscriptionException(String.format("Subscription: Failed to subscribe topics %s for consumer %s in config node, exception is %s.", set, consumerConfig, e));
        }
    }

    private void unsubscribe(ConsumerConfig consumerConfig, Set<String> set) throws SubscriptionException {
        try {
            ConfigNodeClient configNodeClient = (ConfigNodeClient) CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
            try {
                TSStatus dropSubscription = configNodeClient.dropSubscription(new TUnsubscribeReq().setConsumerId(consumerConfig.getConsumerId()).setConsumerGroupId(consumerConfig.getConsumerGroupId()).setTopicNames(set));
                if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == dropSubscription.getCode()) {
                    if (configNodeClient != null) {
                        configNodeClient.close();
                    }
                } else {
                    LOGGER.warn("Unexpected status code {} when unsubscribing topics {} for consumer {} in config node", new Object[]{dropSubscription, set, consumerConfig});
                    String format = String.format("Subscription: Failed to unsubscribe topics %s for consumer %s in config node, status is %s.", set, consumerConfig, dropSubscription);
                    if (TSStatusCode.SUBSCRIPTION_PIPE_TIMEOUT_ERROR.getStatusCode() != dropSubscription.getCode()) {
                        throw new SubscriptionException(format);
                    }
                    throw new SubscriptionPipeTimeoutException(format);
                }
            } finally {
            }
        } catch (ClientManagerException | TException e) {
            LOGGER.warn("Exception occurred when unsubscribing topics {} for consumer {} in config node", new Object[]{set, consumerConfig, e});
            throw new SubscriptionException(String.format("Subscription: Failed to unsubscribe topics %s for consumer %s in config node, exception is %s.", set, consumerConfig, e));
        }
    }
}
