package org.apache.rocketmq.client.java.impl;

import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.ThreadStackTrace;
import apache.rocketmq.v2.VerifyMessageCommand;
import apache.rocketmq.v2.VerifyMessageResult;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.StatusChecker;
import org.apache.rocketmq.client.java.hook.CompositedMessageInterceptor;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.hook.MessageInterceptorContext;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.message.GeneralMessage;
import org.apache.rocketmq.client.java.metrics.ClientMeterManager;
import org.apache.rocketmq.client.java.metrics.MessageMeterInterceptor;
import org.apache.rocketmq.client.java.metrics.Metric;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.rpc.RpcFuture;
import org.apache.rocketmq.client.java.rpc.Signature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/client/java/impl/ClientImpl.class */
public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler, MessageInterceptor {
    private static final Logger log = LoggerFactory.getLogger(ClientImpl.class);
    private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(21900);
    protected final ClientConfiguration clientConfiguration;
    protected final Endpoints endpoints;
    protected final Set<String> topics;
    protected final ExecutorService clientCallbackExecutor;
    protected final ClientMeterManager clientMeterManager;
    protected final ThreadPoolExecutor telemetryCommandExecutor;
    private volatile ScheduledFuture<?> updateRouteCacheFuture;
    private final CompositedMessageInterceptor compositedMessageInterceptor;
    protected final ClientId clientId = new ClientId();
    private final ConcurrentMap<String, TopicRouteData> topicRouteCache = new ConcurrentHashMap();

    @GuardedBy("inflightRouteFutureLock")
    private final Map<String, Set<SettableFuture<TopicRouteData>>> inflightRouteFutureTable = new ConcurrentHashMap();
    private final Lock inflightRouteFutureLock = new ReentrantLock();

    @GuardedBy("sessionsLock")
    private final Map<Endpoints, ClientSessionImpl> sessionsTable = new HashMap();
    private final ReadWriteLock sessionsLock = new ReentrantReadWriteLock();
    protected final Set<Endpoints> isolated = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ClientManager clientManager = new ClientManagerImpl(this);

    public ClientImpl(ClientConfiguration clientConfiguration, Set<String> set) {
        this.clientConfiguration = (ClientConfiguration) Preconditions.checkNotNull(clientConfiguration, "clientConfiguration should not be null");
        this.endpoints = new Endpoints(clientConfiguration.getEndpoints());
        this.topics = set;
        long index = this.clientId.getIndex();
        this.clientCallbackExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("ClientCallbackWorker", index));
        this.clientMeterManager = new ClientMeterManager(this.clientId, clientConfiguration);
        this.compositedMessageInterceptor = new CompositedMessageInterceptor(Collections.singletonList(new MessageMeterInterceptor(this, this.clientMeterManager)));
        this.telemetryCommandExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("CommandExecutor", index));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startUp() throws Exception {
        log.info("Begin to start the rocketmq client, clientId={}", this.clientId);
        this.clientManager.startAsync().awaitRunning();
        log.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}", this.clientId, this.topics);
        Iterator<String> it = this.topics.iterator();
        while (it.hasNext()) {
            fetchTopicRoute(it.next()).get();
        }
        log.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}", this.clientId, this.topics);
        this.updateRouteCacheFuture = this.clientManager.getScheduler().scheduleWithFixedDelay(() -> {
            try {
                updateRouteCache();
            } catch (Throwable th) {
                log.error("Exception raised while updating topic route cache, clientId={}", this.clientId, th);
            }
        }, 10L, 30L, TimeUnit.SECONDS);
        log.info("The rocketmq client starts successfully, clientId={}", this.clientId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDown() throws InterruptedException {
        log.info("Begin to shutdown the rocketmq client, clientId={}", this.clientId);
        notifyClientTermination();
        if (null != this.updateRouteCacheFuture) {
            this.updateRouteCacheFuture.cancel(false);
        }
        this.telemetryCommandExecutor.shutdown();
        if (ExecutorServices.awaitTerminated(this.telemetryCommandExecutor)) {
            log.info("Shutdown the telemetry command executor successfully, clientId={}", this.clientId);
        } else {
            log.error("[Bug] Timeout to shutdown the telemetry command executor, clientId={}", this.clientId);
        }
        log.info("Begin to release all telemetry sessions, clientId={}", this.clientId);
        releaseClientSessions();
        log.info("Release all telemetry sessions successfully, clientId={}", this.clientId);
        this.clientManager.stopAsync().awaitTerminated();
        this.clientCallbackExecutor.shutdown();
        if (!ExecutorServices.awaitTerminated(this.clientCallbackExecutor)) {
            log.error("[Bug] Timeout to shutdown the client callback executor, clientId={}", this.clientId);
        }
        this.clientMeterManager.shutdown();
        log.info("Shutdown the rocketmq client successfully, clientId={}", this.clientId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addMessageInterceptor(MessageInterceptor messageInterceptor) {
        if (isRunning()) {
            return;
        }
        this.compositedMessageInterceptor.addInterceptor(messageInterceptor);
    }

    @Override // org.apache.rocketmq.client.java.hook.MessageInterceptor
    public void doBefore(MessageInterceptorContext messageInterceptorContext, List<GeneralMessage> list) {
        try {
            this.compositedMessageInterceptor.doBefore(messageInterceptorContext, list);
        } catch (Throwable th) {
            log.error("[Bug] Exception raised while handling messages, clientId={}", this.clientId, th);
        }
    }

    @Override // org.apache.rocketmq.client.java.hook.MessageInterceptor
    public void doAfter(MessageInterceptorContext messageInterceptorContext, List<GeneralMessage> list) {
        try {
            this.compositedMessageInterceptor.doAfter(messageInterceptorContext, list);
        } catch (Throwable th) {
            log.error("[Bug] Exception raised while handling messages, clientId={}", this.clientId, th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public TelemetryCommand settingsCommand() {
        return TelemetryCommand.newBuilder().setSettings(getSettings().toProtobuf()).build();
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> streamObserver) throws ClientException {
        try {
            return this.clientManager.telemetry(endpoints, TELEMETRY_TIMEOUT, streamObserver);
        } catch (ClientException e) {
            throw e;
        } catch (Throwable th) {
            throw new InternalErrorException(th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public boolean isEndpointsDeprecated(Endpoints endpoints) {
        return !getTotalRouteEndpoints().contains(endpoints);
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand printThreadStackTraceCommand) {
        String nonce = printThreadStackTraceCommand.getNonce();
        try {
            this.telemetryCommandExecutor.submit(() -> {
                try {
                    telemetry(endpoints, TelemetryCommand.newBuilder().setThreadStackTrace(ThreadStackTrace.newBuilder().setThreadStackTrace(Utilities.stackTrace()).setNonce(printThreadStackTraceCommand.getNonce()).build()).setStatus(Status.newBuilder().setCode(Code.OK).build()).build());
                } catch (Throwable th) {
                    log.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}", new Object[]{endpoints, nonce, this.clientId, th});
                }
            });
        } catch (Throwable th) {
            log.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, nonce={}, clientId={}", new Object[]{endpoints, nonce, this.clientId, th});
        }
    }

    public abstract Settings getSettings();

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public final void onSettingsCommand(Endpoints endpoints, apache.rocketmq.v2.Settings settings) {
        this.clientMeterManager.reset(new Metric(settings.getMetric()));
        getSettings().sync(settings);
    }

    @Override // org.apache.rocketmq.client.java.impl.Client
    public void syncSettings() {
        TelemetryCommand build = TelemetryCommand.newBuilder().setSettings(getSettings().toProtobuf()).build();
        for (Endpoints endpoints : getTotalRouteEndpoints()) {
            try {
                telemetry(endpoints, build);
            } catch (Throwable th) {
                log.error("Failed to telemeter settings, clientId={}, endpoints={}", new Object[]{this.clientId, endpoints, th});
            }
        }
    }

    public void telemetry(Endpoints endpoints, TelemetryCommand telemetryCommand) {
        try {
            getClientSession(endpoints).write(telemetryCommand);
        } catch (Throwable th) {
            log.error("Failed to fire write telemetry command, clientId={}, endpoints={}", new Object[]{this.clientId, endpoints, th});
        }
    }

    private void releaseClientSessions() {
        this.sessionsLock.readLock().lock();
        try {
            this.sessionsTable.values().forEach((v0) -> {
                v0.release();
            });
        } finally {
            this.sessionsLock.readLock().unlock();
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public void removeClientSession(Endpoints endpoints, ClientSessionImpl clientSessionImpl) {
        this.sessionsLock.writeLock().lock();
        try {
            log.info("Remove client session, clientId={}, endpoints={}", this.clientId, endpoints);
            this.sessionsTable.remove(endpoints, clientSessionImpl);
        } finally {
            this.sessionsLock.writeLock().unlock();
        }
    }

    private ClientSessionImpl getClientSession(Endpoints endpoints) throws ClientException {
        this.sessionsLock.readLock().lock();
        try {
            ClientSessionImpl clientSessionImpl = this.sessionsTable.get(endpoints);
            if (null != clientSessionImpl) {
                return clientSessionImpl;
            }
            this.sessionsLock.readLock().unlock();
            this.sessionsLock.writeLock().lock();
            try {
                ClientSessionImpl clientSessionImpl2 = this.sessionsTable.get(endpoints);
                if (null != clientSessionImpl2) {
                    return clientSessionImpl2;
                }
                ClientSessionImpl clientSessionImpl3 = new ClientSessionImpl(this, this.clientConfiguration.getRequestTimeout(), endpoints);
                this.sessionsTable.put(endpoints, clientSessionImpl3);
                this.sessionsLock.writeLock().unlock();
                return clientSessionImpl3;
            } finally {
                this.sessionsLock.writeLock().unlock();
            }
        } finally {
            this.sessionsLock.readLock().unlock();
        }
    }

    public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String str, TopicRouteData topicRouteData) throws ClientException {
        HashSet hashSet = new HashSet((Collection) Sets.difference((Set) topicRouteData.getMessageQueues().stream().map(messageQueueImpl -> {
            return messageQueueImpl.getBroker().getEndpoints();
        }).collect(Collectors.toSet()), getTotalRouteEndpoints()));
        ArrayList arrayList = new ArrayList();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            arrayList.add(getClientSession((Endpoints) it.next()).syncSettings());
        }
        return Futures.transform(Futures.allAsList(arrayList), obj -> {
            this.topicRouteCache.put(str, topicRouteData);
            onTopicRouteDataUpdate0(str, topicRouteData);
            return topicRouteData;
        }, MoreExecutors.directExecutor());
    }

    public void onTopicRouteDataUpdate0(String str, TopicRouteData topicRouteData) {
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand verifyMessageCommand) {
        log.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}", this.clientId, verifyMessageCommand);
        String nonce = verifyMessageCommand.getNonce();
        try {
            telemetry(endpoints, TelemetryCommand.newBuilder().setVerifyMessageResult(VerifyMessageResult.newBuilder().setNonce(nonce).build()).setStatus(Status.newBuilder().setCode(Code.NOT_IMPLEMENTED).build()).build());
        } catch (Throwable th) {
            log.warn("Failed to send message verification result, clientId={}", this.clientId, th);
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand) {
        log.warn("Ignore orphaned transaction recovery command from remote, which is not expected, clientId={}, command={}", this.clientId, recoverOrphanedTransactionCommand);
    }

    private void updateRouteCache() {
        log.info("Start to update route cache for a new round, clientId={}", this.clientId);
        this.topicRouteCache.keySet().forEach(str -> {
            Futures.addCallback(fetchTopicRoute(str), new FutureCallback<TopicRouteData>() { // from class: org.apache.rocketmq.client.java.impl.ClientImpl.1
                public void onSuccess(TopicRouteData topicRouteData) {
                }

                public void onFailure(Throwable th) {
                    ClientImpl.log.error("Failed to fetch topic route for update cache, topic={}, clientId={}", new Object[]{str, ClientImpl.this.clientId, th});
                }
            }, MoreExecutors.directExecutor());
        });
    }

    public abstract NotifyClientTerminationRequest wrapNotifyClientTerminationRequest();

    private void notifyClientTermination() {
        log.info("Notify remote that client is terminated, clientId={}", this.clientId);
        Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
        NotifyClientTerminationRequest wrapNotifyClientTerminationRequest = wrapNotifyClientTerminationRequest();
        try {
            Iterator<Endpoints> it = totalRouteEndpoints.iterator();
            while (it.hasNext()) {
                this.clientManager.notifyClientTermination(it.next(), wrapNotifyClientTerminationRequest, this.clientConfiguration.getRequestTimeout());
            }
        } catch (Throwable th) {
            log.error("[Bug] Exception raised while notifying client's termination, clientId={}", this.clientId, th);
        }
    }

    public ClientManager getClientManager() {
        return this.clientManager;
    }

    @Override // org.apache.rocketmq.client.java.impl.Client
    public Endpoints getEndpoints() {
        return this.endpoints;
    }

    @Override // org.apache.rocketmq.client.java.impl.Client, org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public ClientId getClientId() {
        return this.clientId;
    }

    @Override // org.apache.rocketmq.client.java.impl.Client
    public void doHeartbeat() {
        Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
        HeartbeatRequest wrapHeartbeatRequest = wrapHeartbeatRequest();
        Iterator<Endpoints> it = totalRouteEndpoints.iterator();
        while (it.hasNext()) {
            doHeartbeat(wrapHeartbeatRequest, it.next());
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.Client
    public Metadata sign() throws NoSuchAlgorithmException, InvalidKeyException {
        return Signature.sign(this.clientConfiguration, this.clientId);
    }

    @Override // org.apache.rocketmq.client.java.impl.Client
    public boolean isSslEnabled() {
        return this.clientConfiguration.isSslEnabled();
    }

    private void doHeartbeat(HeartbeatRequest heartbeatRequest, final Endpoints endpoints) {
        try {
            Futures.addCallback(this.clientManager.heartbeat(endpoints, heartbeatRequest, this.clientConfiguration.getRequestTimeout()), new FutureCallback<HeartbeatResponse>() { // from class: org.apache.rocketmq.client.java.impl.ClientImpl.2
                public void onSuccess(HeartbeatResponse heartbeatResponse) {
                    Status status = heartbeatResponse.getStatus();
                    Code code = status.getCode();
                    if (Code.OK != code) {
                        ClientImpl.log.warn("Failed to send heartbeat, code={}, status message=[{}], endpoints={}, clientId={}", new Object[]{code, status.getMessage(), endpoints, ClientImpl.this.clientId});
                        return;
                    }
                    ClientImpl.log.info("Send heartbeat successfully, endpoints={}, clientId={}", endpoints, ClientImpl.this.clientId);
                    if (ClientImpl.this.isolated.remove(endpoints)) {
                        ClientImpl.log.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", ClientImpl.this.clientId, endpoints);
                    }
                }

                public void onFailure(Throwable th) {
                    ClientImpl.log.warn("Failed to send heartbeat, endpoints={}, clientId={}", new Object[]{endpoints, ClientImpl.this.clientId, th});
                }
            }, MoreExecutors.directExecutor());
        } catch (Throwable th) {
            log.error("[Bug] Exception raised while preparing heartbeat, endpoints={}, clientId={}", new Object[]{endpoints, this.clientId, th});
        }
    }

    public abstract HeartbeatRequest wrapHeartbeatRequest();

    @Override // org.apache.rocketmq.client.java.impl.Client
    public void doStats() {
    }

    private ListenableFuture<TopicRouteData> fetchTopicRoute(final String str) {
        ListenableFuture<TopicRouteData> transformAsync = Futures.transformAsync(fetchTopicRoute0(str), topicRouteData -> {
            return onTopicRouteDataFetched(str, topicRouteData);
        }, MoreExecutors.directExecutor());
        Futures.addCallback(transformAsync, new FutureCallback<TopicRouteData>() { // from class: org.apache.rocketmq.client.java.impl.ClientImpl.3
            public void onSuccess(TopicRouteData topicRouteData2) {
                ClientImpl.log.info("Fetch topic route successfully, clientId={}, topic={}, topicRouteData={}", new Object[]{ClientImpl.this.clientId, str, topicRouteData2});
            }

            public void onFailure(Throwable th) {
                ClientImpl.log.error("Failed to fetch topic route, clientId={}, topic={}", new Object[]{ClientImpl.this.clientId, str, th});
            }
        }, MoreExecutors.directExecutor());
        return transformAsync;
    }

    protected ListenableFuture<TopicRouteData> fetchTopicRoute0(String str) {
        RpcFuture<QueryRouteRequest, QueryRouteResponse> queryRoute = this.clientManager.queryRoute(this.endpoints, QueryRouteRequest.newBuilder().setTopic(Resource.newBuilder().setResourceNamespace(this.clientConfiguration.getNamespace()).setName(str).build()).setEndpoints(this.endpoints.toProtobuf()).build(), this.clientConfiguration.getRequestTimeout());
        return Futures.transformAsync(queryRoute, queryRouteResponse -> {
            StatusChecker.check(queryRouteResponse.getStatus(), queryRoute);
            return Futures.immediateFuture(new TopicRouteData(queryRouteResponse.getMessageQueuesList()));
        }, MoreExecutors.directExecutor());
    }

    protected Set<Endpoints> getTotalRouteEndpoints() {
        HashSet hashSet = new HashSet();
        Iterator<TopicRouteData> it = this.topicRouteCache.values().iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next().getTotalEndpoints());
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ListenableFuture<TopicRouteData> getRouteData(final String str) {
        SettableFuture<TopicRouteData> create = SettableFuture.create();
        TopicRouteData topicRouteData = this.topicRouteCache.get(str);
        if (null != topicRouteData) {
            create.set(topicRouteData);
            return create;
        }
        this.inflightRouteFutureLock.lock();
        try {
            TopicRouteData topicRouteData2 = this.topicRouteCache.get(str);
            if (null != topicRouteData2) {
                create.set(topicRouteData2);
                this.inflightRouteFutureLock.unlock();
                return create;
            }
            Set<SettableFuture<TopicRouteData>> set = this.inflightRouteFutureTable.get(str);
            if (null != set) {
                set.add(create);
                this.inflightRouteFutureLock.unlock();
                return create;
            }
            HashSet hashSet = new HashSet();
            hashSet.add(create);
            this.inflightRouteFutureTable.put(str, hashSet);
            this.inflightRouteFutureLock.unlock();
            Futures.addCallback(fetchTopicRoute(str), new FutureCallback<TopicRouteData>() { // from class: org.apache.rocketmq.client.java.impl.ClientImpl.4
                public void onSuccess(TopicRouteData topicRouteData3) {
                    ClientImpl.this.inflightRouteFutureLock.lock();
                    try {
                        try {
                            Set set2 = (Set) ClientImpl.this.inflightRouteFutureTable.remove(str);
                            if (null == set2) {
                                ClientImpl.log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", str, ClientImpl.this.clientId);
                                ClientImpl.this.inflightRouteFutureLock.unlock();
                                return;
                            }
                            ClientImpl.log.debug("Fetch topic route successfully, topic={}, in-flight route future size={}, clientId={}", new Object[]{str, Integer.valueOf(set2.size()), ClientImpl.this.clientId});
                            Iterator it = set2.iterator();
                            while (it.hasNext()) {
                                ((SettableFuture) it.next()).set(topicRouteData3);
                            }
                            ClientImpl.this.inflightRouteFutureLock.unlock();
                        } catch (Throwable th) {
                            ClientImpl.log.error("[Bug] Exception raised while update route data, topic={}, clientId={}", new Object[]{str, ClientImpl.this.clientId, th});
                            ClientImpl.this.inflightRouteFutureLock.unlock();
                        }
                    } catch (Throwable th2) {
                        ClientImpl.this.inflightRouteFutureLock.unlock();
                        throw th2;
                    }
                }

                public void onFailure(Throwable th) {
                    ClientImpl.this.inflightRouteFutureLock.lock();
                    try {
                        Set set2 = (Set) ClientImpl.this.inflightRouteFutureTable.remove(str);
                        if (null == set2) {
                            ClientImpl.log.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", str, ClientImpl.this.clientId);
                            ClientImpl.this.inflightRouteFutureLock.unlock();
                        } else {
                            ClientImpl.log.debug("Failed to fetch topic route, topic={}, in-flight route future size={}, clientId={}", new Object[]{str, Integer.valueOf(set2.size()), ClientImpl.this.clientId, th});
                            Iterator it = set2.iterator();
                            while (it.hasNext()) {
                                ((SettableFuture) it.next()).setException(th);
                            }
                        }
                    } finally {
                        ClientImpl.this.inflightRouteFutureLock.unlock();
                    }
                }
            }, MoreExecutors.directExecutor());
            return create;
        } catch (Throwable th) {
            this.inflightRouteFutureLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler
    public ScheduledExecutorService getScheduler() {
        return this.clientManager.getScheduler();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> T handleClientFuture(ListenableFuture<T> listenableFuture) throws ClientException {
        try {
            return (T) listenableFuture.get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            ClientException cause = e2.getCause();
            if (cause instanceof ClientException) {
                throw cause;
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            throw new ClientException(null == cause ? e2 : cause);
        }
    }

    public ClientConfiguration getClientConfiguration() {
        return this.clientConfiguration;
    }

    protected String serviceName() {
        return super.serviceName() + "-" + this.clientId.getIndex();
    }
}
