package org.apache.rocketmq.client.java.impl;

import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.MetadataUtils;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.Context;
import org.apache.rocketmq.client.java.rpc.RpcClient;
import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/client/java/impl/ClientManagerImpl.class */
public class ClientManagerImpl extends ClientManager {
    public static final Duration RPC_CLIENT_MAX_IDLE_DURATION = Duration.ofMinutes(30);
    public static final Duration RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY = Duration.ofSeconds(5);
    public static final Duration RPC_CLIENT_IDLE_CHECK_PERIOD = Duration.ofMinutes(1);
    public static final Duration HEART_BEAT_INITIAL_DELAY = Duration.ofSeconds(1);
    public static final Duration HEART_BEAT_PERIOD = Duration.ofSeconds(10);
    public static final Duration LOG_STATS_INITIAL_DELAY = Duration.ofSeconds(1);
    public static final Duration LOG_STATS_PERIOD = Duration.ofSeconds(60);
    public static final Duration SYNC_SETTINGS_DELAY = Duration.ofSeconds(1);
    public static final Duration SYNC_SETTINGS_PERIOD = Duration.ofMinutes(5);
    private static final Logger log = LoggerFactory.getLogger(ClientManagerImpl.class);
    private final Client client;

    @GuardedBy("rpcClientTableLock")
    private final Map<Endpoints, RpcClient> rpcClientTable = new HashMap();
    private final ReadWriteLock rpcClientTableLock = new ReentrantReadWriteLock();
    private final ScheduledExecutorService scheduler;
    private final ExecutorService asyncWorker;

    public ClientManagerImpl(Client client) {
        this.client = client;
        long index = client.getClientId().getIndex();
        this.scheduler = Executors.newSingleThreadScheduledExecutor(Thread.ofVirtual().name("ClientSchedulerVirtual" + index).factory());
        this.asyncWorker = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("ClientAsyncWorkerVirtual" + index).factory());
    }

    private void clearIdleRpcClients() throws InterruptedException {
        this.rpcClientTableLock.writeLock().lock();
        try {
            Iterator<Map.Entry<Endpoints, RpcClient>> it = this.rpcClientTable.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Endpoints, RpcClient> next = it.next();
                Endpoints key = next.getKey();
                RpcClient value = next.getValue();
                Duration idleDuration = value.idleDuration();
                if (idleDuration.compareTo(RPC_CLIENT_MAX_IDLE_DURATION) > 0) {
                    it.remove();
                    value.shutdown();
                    log.info("Rpc client has been idle for a long time, endpoints={}, idleDuration={}, rpcClientMaxIdleDuration={}, clientId={}", new Object[]{key, idleDuration, RPC_CLIENT_MAX_IDLE_DURATION, this.client.getClientId()});
                }
            }
        } finally {
            this.rpcClientTableLock.writeLock().unlock();
        }
    }

    private RpcClient getRpcClient(Endpoints endpoints) throws ClientException {
        this.rpcClientTableLock.readLock().lock();
        try {
            RpcClient rpcClient = this.rpcClientTable.get(endpoints);
            if (null != rpcClient) {
                return rpcClient;
            }
            this.rpcClientTableLock.readLock().unlock();
            this.rpcClientTableLock.writeLock().lock();
            try {
                RpcClient rpcClient2 = this.rpcClientTable.get(endpoints);
                if (null != rpcClient2) {
                    return rpcClient2;
                }
                try {
                    RpcClientImpl rpcClientImpl = new RpcClientImpl(endpoints, this.client.isSslEnabled());
                    this.rpcClientTable.put(endpoints, rpcClientImpl);
                    this.rpcClientTableLock.writeLock().unlock();
                    return rpcClientImpl;
                } catch (SSLException e) {
                    log.error("Failed to get RPC client, endpoints={}, clientId={}", new Object[]{endpoints, this.client.getClientId(), e});
                    throw new ClientException("Failed to generate RPC client", e);
                }
            } finally {
                this.rpcClientTableLock.writeLock().unlock();
            }
        } finally {
            this.rpcClientTableLock.readLock().unlock();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<QueryRouteRequest, QueryRouteResponse> queryRoute(Endpoints endpoints, QueryRouteRequest queryRouteRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), queryRouteRequest, getRpcClient(endpoints).queryRoute(sign, queryRouteRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<HeartbeatRequest, HeartbeatResponse> heartbeat(Endpoints endpoints, HeartbeatRequest heartbeatRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), heartbeatRequest, getRpcClient(endpoints).heartbeat(sign, heartbeatRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<SendMessageRequest, SendMessageResponse> sendMessage(Endpoints endpoints, SendMessageRequest sendMessageRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), sendMessageRequest, getRpcClient(endpoints).sendMessage(sign, sendMessageRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<QueryAssignmentRequest, QueryAssignmentResponse> queryAssignment(Endpoints endpoints, QueryAssignmentRequest queryAssignmentRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), queryAssignmentRequest, getRpcClient(endpoints).queryAssignment(sign, queryAssignmentRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<ReceiveMessageRequest, List<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints, ReceiveMessageRequest receiveMessageRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), receiveMessageRequest, getRpcClient(endpoints).receiveMessage(sign, receiveMessageRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endpoints endpoints, AckMessageRequest ackMessageRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), ackMessageRequest, getRpcClient(endpoints).ackMessage(sign, ackMessageRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse> changeInvisibleDuration(Endpoints endpoints, ChangeInvisibleDurationRequest changeInvisibleDurationRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), changeInvisibleDurationRequest, getRpcClient(endpoints).changeInvisibleDuration(sign, changeInvisibleDurationRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<ForwardMessageToDeadLetterQueueRequest, ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(Endpoints endpoints, ForwardMessageToDeadLetterQueueRequest forwardMessageToDeadLetterQueueRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), forwardMessageToDeadLetterQueueRequest, getRpcClient(endpoints).forwardMessageToDeadLetterQueue(sign, forwardMessageToDeadLetterQueueRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<EndTransactionRequest, EndTransactionResponse> endTransaction(Endpoints endpoints, EndTransactionRequest endTransactionRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), endTransactionRequest, getRpcClient(endpoints).endTransaction(sign, endTransactionRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public RpcFuture<NotifyClientTerminationRequest, NotifyClientTerminationResponse> notifyClientTermination(Endpoints endpoints, NotifyClientTerminationRequest notifyClientTerminationRequest, Duration duration) {
        try {
            Metadata sign = this.client.sign();
            return new RpcFuture<>(new Context(endpoints, sign), notifyClientTerminationRequest, getRpcClient(endpoints).notifyClientTermination(sign, notifyClientTerminationRequest, this.asyncWorker, duration));
        } catch (Throwable th) {
            return new RpcFuture<>(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Duration duration, StreamObserver<TelemetryCommand> streamObserver) throws ClientException {
        try {
            return getRpcClient(endpoints).telemetry(this.client.sign(), this.asyncWorker, duration, streamObserver);
        } catch (Throwable th) {
            throw new InternalErrorException(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.ClientManager
    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    protected void startUp() {
        ClientId clientId = this.client.getClientId();
        log.info("Begin to start the client manager, clientId={}", clientId);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                clearIdleRpcClients();
            } catch (Throwable th) {
                log.error("Exception raised during the clearing of idle rpc clients, clientId={}", clientId, th);
            }
        }, RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY.toNanos(), RPC_CLIENT_IDLE_CHECK_PERIOD.toNanos(), TimeUnit.NANOSECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.client.doHeartbeat();
            } catch (Throwable th) {
                log.error("Exception raised during heartbeat, clientId={}", clientId, th);
            }
        }, HEART_BEAT_INITIAL_DELAY.toNanos(), HEART_BEAT_PERIOD.toNanos(), TimeUnit.NANOSECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                log.info("Start to log statistics, clientVersion={}, clientWrapperVersion={}, clientEndpoints={}, os description=[{}], java description=[{}], clientId={}", new Object[]{MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion(), this.client.getEndpoints(), Utilities.getOsDescription(), Utilities.getJavaDescription(), clientId});
                this.client.doStats();
            } catch (Throwable th) {
                log.error("Exception raised during statistics logging, clientId={}", clientId, th);
            }
        }, LOG_STATS_INITIAL_DELAY.toNanos(), LOG_STATS_PERIOD.toNanos(), TimeUnit.NANOSECONDS);
        this.scheduler.scheduleWithFixedDelay(() -> {
            try {
                this.client.syncSettings();
            } catch (Throwable th) {
                log.error("Exception raised during the setting synchronization, clientId={}", clientId, th);
            }
        }, SYNC_SETTINGS_DELAY.toNanos(), SYNC_SETTINGS_PERIOD.toNanos(), TimeUnit.NANOSECONDS);
        log.info("The client manager starts successfully, clientId={}", clientId);
    }

    /* JADX WARN: Finally extract failed */
    protected void shutDown() throws IOException {
        ClientId clientId = this.client.getClientId();
        log.info("Begin to shutdown the client manager, clientId={}", clientId);
        this.scheduler.shutdown();
        try {
            if (ExecutorServices.awaitTerminated(this.scheduler)) {
                log.info("Shutdown the client scheduler successfully, clientId={}", clientId);
            } else {
                log.error("[Bug] Timeout to shutdown the client scheduler, clientId={}", clientId);
            }
            this.rpcClientTableLock.writeLock().lock();
            try {
                Iterator<Map.Entry<Endpoints, RpcClient>> it = this.rpcClientTable.entrySet().iterator();
                while (it.hasNext()) {
                    RpcClient value = it.next().getValue();
                    it.remove();
                    value.shutdown();
                }
                this.rpcClientTableLock.writeLock().unlock();
                log.info("Shutdown all rpc client(s) successfully, clientId={}", clientId);
                this.asyncWorker.shutdown();
                if (ExecutorServices.awaitTerminated(this.asyncWorker)) {
                    log.info("Shutdown the client async worker successfully, clientId={}", clientId);
                } else {
                    log.error("[Bug] Timeout to shutdown the client async worker, clientId={}", clientId);
                }
                log.info("Shutdown the client manager successfully, clientId={}", clientId);
            } catch (Throwable th) {
                this.rpcClientTableLock.writeLock().unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            log.error("[Bug] Unexpected exception raised while shutdown client manager, clientId={}", clientId, e);
            throw new IOException(e);
        }
    }

    protected String serviceName() {
        return super.serviceName() + "-" + this.client.getClientId().getIndex();
    }
}
