package org.apache.iotdb.session.subscription.consumer.base;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeNonCriticalException;
import org.apache.iotdb.rpc.subscription.payload.poll.PollFilePayload;
import org.apache.iotdb.rpc.subscription.payload.poll.PollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.PollTabletsPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequest;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequestType;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHeartbeatReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribePollReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribePollResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribeResp;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.AbstractSessionBuilder;
import org.apache.iotdb.session.subscription.SubscriptionSessionConnection;
import org.apache.iotdb.session.subscription.SubscriptionSessionWrapper;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.class */
public abstract class AbstractSubscriptionProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSubscriptionProvider.class);
    private static final String STATUS_FORMATTER = "Status code is [%s], status message is [%s].";
    private static final String INTERNAL_ERROR_FORMATTER = "Internal error occurred. Status code is [%s], status message is [%s].";
    private static final String SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER = "A timeout has occurred in procedures related to the pipe within the subscription procedure. Please manually check the subscription correctness later. Status code is [%s], status message is [%s].";
    private final SubscriptionSessionWrapper session;
    private String consumerId;
    private String consumerGroupId;
    private final AtomicBoolean isClosed = new AtomicBoolean(true);
    private final AtomicBoolean isAvailable = new AtomicBoolean(false);
    private final TEndPoint endPoint;
    private int dataNodeId;

    protected abstract AbstractSessionBuilder constructSubscriptionSessionBuilder(String str, int i, String str2, String str3, int i2);

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSubscriptionProvider(TEndPoint tEndPoint, String str, String str2, String str3, String str4, int i) {
        this.session = new SubscriptionSessionWrapper(constructSubscriptionSessionBuilder(tEndPoint.ip, tEndPoint.port, str, str2, i));
        this.endPoint = tEndPoint;
        this.consumerId = str3;
        this.consumerGroupId = str4;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionSessionConnection getSessionConnection() {
        return this.session.getSessionConnection();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAvailable() {
        return this.isAvailable.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setAvailable() {
        this.isAvailable.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setUnavailable() {
        this.isAvailable.set(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getDataNodeId() {
        return this.dataNodeId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getConsumerId() {
        return this.consumerId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    TEndPoint getEndPoint() {
        return this.endPoint;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handshake() throws SubscriptionException, IoTDBConnectionException {
        if (this.isClosed.get()) {
            this.session.open();
            HashMap hashMap = new HashMap();
            hashMap.put("group-id", this.consumerGroupId);
            hashMap.put("consumer-id", this.consumerId);
            PipeSubscribeHandshakeResp handshake = handshake(new ConsumerConfig(hashMap));
            this.dataNodeId = handshake.getDataNodeId();
            this.consumerId = handshake.getConsumerId();
            this.consumerGroupId = handshake.getConsumerGroupId();
            this.isClosed.set(false);
            setAvailable();
        }
    }

    PipeSubscribeHandshakeResp handshake(ConsumerConfig consumerConfig) throws SubscriptionException {
        try {
            try {
                TPipeSubscribeResp pipeSubscribe = getSessionConnection().pipeSubscribe(PipeSubscribeHandshakeReq.toTPipeSubscribeReq(consumerConfig));
                verifyPipeSubscribeSuccess(pipeSubscribe.status);
                return PipeSubscribeHandshakeResp.fromTPipeSubscribeResp(pipeSubscribe);
            } catch (TException e) {
                LOGGER.warn("TException occurred when SubscriptionProvider {} handshake with request {}, set SubscriptionProvider unavailable", new Object[]{this, consumerConfig, e});
                setUnavailable();
                throw new SubscriptionConnectionException(e.getMessage(), e);
            }
        } catch (IOException e2) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize handshake request {}", new Object[]{this, consumerConfig, e2});
            throw new SubscriptionRuntimeNonCriticalException(e2.getMessage(), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close() throws SubscriptionException, IoTDBConnectionException {
        if (this.isClosed.get()) {
            return;
        }
        try {
            closeInternal();
        } finally {
            this.session.close();
            setUnavailable();
            this.isClosed.set(true);
        }
    }

    void closeInternal() throws SubscriptionException {
        try {
            verifyPipeSubscribeSuccess(getSessionConnection().pipeSubscribe(PipeSubscribeCloseReq.toTPipeSubscribeReq()).status);
        } catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} close, set SubscriptionProvider unavailable", this, e);
            setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, TopicConfig> heartbeat() throws SubscriptionException {
        try {
            TPipeSubscribeResp pipeSubscribe = getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
            verifyPipeSubscribeSuccess(pipeSubscribe.status);
            return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(pipeSubscribe).getTopics();
        } catch (TException e) {
            LOGGER.warn("TException occurred when SubscriptionProvider {} heartbeat, set SubscriptionProvider unavailable", this, e);
            setUnavailable();
            throw new SubscriptionConnectionException(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, TopicConfig> subscribe(Set<String> set) throws SubscriptionException {
        try {
            try {
                TPipeSubscribeResp pipeSubscribe = getSessionConnection().pipeSubscribe(PipeSubscribeSubscribeReq.toTPipeSubscribeReq(set));
                verifyPipeSubscribeSuccess(pipeSubscribe.status);
                return PipeSubscribeSubscribeResp.fromTPipeSubscribeResp(pipeSubscribe).getTopics();
            } catch (TException e) {
                LOGGER.warn("TException occurred when SubscriptionProvider {} subscribe with request {}, set SubscriptionProvider unavailable", new Object[]{this, set, e});
                setUnavailable();
                throw new SubscriptionConnectionException(e.getMessage(), e);
            }
        } catch (IOException e2) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize subscribe request {}", new Object[]{this, set, e2});
            throw new SubscriptionRuntimeNonCriticalException(e2.getMessage(), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, TopicConfig> unsubscribe(Set<String> set) throws SubscriptionException {
        try {
            try {
                TPipeSubscribeResp pipeSubscribe = getSessionConnection().pipeSubscribe(PipeSubscribeUnsubscribeReq.toTPipeSubscribeReq(set));
                verifyPipeSubscribeSuccess(pipeSubscribe.status);
                return PipeSubscribeUnsubscribeResp.fromTPipeSubscribeResp(pipeSubscribe).getTopics();
            } catch (TException e) {
                LOGGER.warn("TException occurred when SubscriptionProvider {} unsubscribe with request {}, set SubscriptionProvider unavailable", new Object[]{this, set, e});
                setUnavailable();
                throw new SubscriptionConnectionException(e.getMessage(), e);
            }
        } catch (IOException e2) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize unsubscribe request {}", new Object[]{this, set, e2});
            throw new SubscriptionRuntimeNonCriticalException(e2.getMessage(), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubscriptionPollResponse> poll(Set<String> set, long j) throws SubscriptionException {
        return poll(new SubscriptionPollRequest(SubscriptionPollRequestType.POLL.getType(), new PollPayload(set), j, this.session.getThriftMaxFrameSize()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubscriptionPollResponse> pollFile(SubscriptionCommitContext subscriptionCommitContext, long j, long j2) throws SubscriptionException {
        return poll(new SubscriptionPollRequest(SubscriptionPollRequestType.POLL_FILE.getType(), new PollFilePayload(subscriptionCommitContext, j), j2, this.session.getThriftMaxFrameSize()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SubscriptionPollResponse> pollTablets(SubscriptionCommitContext subscriptionCommitContext, int i, long j) throws SubscriptionException {
        return poll(new SubscriptionPollRequest(SubscriptionPollRequestType.POLL_TABLETS.getType(), new PollTabletsPayload(subscriptionCommitContext, i), j, this.session.getThriftMaxFrameSize()));
    }

    List<SubscriptionPollResponse> poll(SubscriptionPollRequest subscriptionPollRequest) throws SubscriptionException {
        try {
            try {
                TPipeSubscribeResp pipeSubscribe = getSessionConnection().pipeSubscribe(PipeSubscribePollReq.toTPipeSubscribeReq(subscriptionPollRequest));
                verifyPipeSubscribeSuccess(pipeSubscribe.status);
                return PipeSubscribePollResp.fromTPipeSubscribeResp(pipeSubscribe).getResponses();
            } catch (TException e) {
                LOGGER.warn("TException occurred when SubscriptionProvider {} poll with request {}, set SubscriptionProvider unavailable", new Object[]{this, subscriptionPollRequest, e});
                setUnavailable();
                throw new SubscriptionConnectionException(e.getMessage(), e);
            }
        } catch (IOException e2) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize poll request {}", new Object[]{this, subscriptionPollRequest, e2});
            throw new SubscriptionRuntimeNonCriticalException(e2.getMessage(), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commit(List<SubscriptionCommitContext> list, boolean z) throws SubscriptionException {
        try {
            try {
                verifyPipeSubscribeSuccess(getSessionConnection().pipeSubscribe(PipeSubscribeCommitReq.toTPipeSubscribeReq(list, z)).status);
            } catch (TException e) {
                LOGGER.warn("TException occurred when SubscriptionProvider {} commit with request {}, set SubscriptionProvider unavailable", new Object[]{this, list, e});
                setUnavailable();
                throw new SubscriptionConnectionException(e.getMessage(), e);
            }
        } catch (IOException e2) {
            LOGGER.warn("IOException occurred when SubscriptionProvider {} serialize commit request {}", new Object[]{this, list, e2});
            throw new SubscriptionRuntimeNonCriticalException(e2.getMessage(), e2);
        }
    }

    private static void verifyPipeSubscribeSuccess(TSStatus tSStatus) throws SubscriptionException {
        switch (tSStatus.code) {
            case 200:
                return;
            case 1900:
            case 1901:
            case 1909:
            default:
                LOGGER.warn(String.format(INTERNAL_ERROR_FORMATTER, Integer.valueOf(tSStatus.code), tSStatus.message));
                throw new SubscriptionRuntimeCriticalException(tSStatus.message);
            case 1902:
            case 1903:
            case 1904:
            case 1905:
            case 1906:
            case 1907:
            case 1908:
                String format = String.format(INTERNAL_ERROR_FORMATTER, Integer.valueOf(tSStatus.code), tSStatus.message);
                LOGGER.warn(format);
                throw new SubscriptionRuntimeNonCriticalException(format);
            case 1911:
                throw new SubscriptionPipeTimeoutException(String.format(SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER, Integer.valueOf(tSStatus.code), tSStatus.message));
        }
    }

    public String toString() {
        return "SubscriptionProvider{endPoint=" + this.endPoint + ", dataNodeId=" + this.dataNodeId + ", consumerId=" + this.consumerId + ", consumerGroupId=" + this.consumerGroupId + ", isAvailable=" + this.isAvailable + ", isClosed=" + this.isClosed + "}";
    }
}
