package link.thingscloud.vertx.remoting.impl;

import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import link.thingscloud.vertx.remoting.api.AsyncHandler;
import link.thingscloud.vertx.remoting.api.RemotingEndPoint;
import link.thingscloud.vertx.remoting.api.RemotingHandlerContext;
import link.thingscloud.vertx.remoting.api.RemotingService;
import link.thingscloud.vertx.remoting.api.RequestProcessor;
import link.thingscloud.vertx.remoting.api.channel.ChannelEventListener;
import link.thingscloud.vertx.remoting.api.channel.RemotingChannel;
import link.thingscloud.vertx.remoting.api.command.RemotingCommand;
import link.thingscloud.vertx.remoting.api.command.RemotingCommandFactory;
import link.thingscloud.vertx.remoting.api.command.TrafficType;
import link.thingscloud.vertx.remoting.api.exception.RemotingAccessException;
import link.thingscloud.vertx.remoting.api.exception.RemotingRuntimeException;
import link.thingscloud.vertx.remoting.api.exception.RemotingTimeoutException;
import link.thingscloud.vertx.remoting.api.exception.SemaphoreExhaustedException;
import link.thingscloud.vertx.remoting.api.interceptor.Interceptor;
import link.thingscloud.vertx.remoting.api.interceptor.InterceptorGroup;
import link.thingscloud.vertx.remoting.api.interceptor.RequestContext;
import link.thingscloud.vertx.remoting.api.interceptor.ResponseContext;
import link.thingscloud.vertx.remoting.common.ChannelEventListenerGroup;
import link.thingscloud.vertx.remoting.common.Pair;
import link.thingscloud.vertx.remoting.common.ResponseFuture;
import link.thingscloud.vertx.remoting.common.SemaphoreReleaseOnlyOnce;
import link.thingscloud.vertx.remoting.config.RemotingConfig;
import link.thingscloud.vertx.remoting.external.ThreadUtils;
import link.thingscloud.vertx.remoting.impl.command.RemotingCommandFactoryImpl;
import link.thingscloud.vertx.remoting.impl.command.RemotingSysResponseCode;
import link.thingscloud.vertx.remoting.internal.RemotingUtil;

/* loaded from: input_file:link/thingscloud/vertx/remoting/impl/VertxRemotingAbstract.class */
public abstract class VertxRemotingAbstract implements RemotingService {
    private final Semaphore semaphoreOneway;
    private final Semaphore semaphoreAsync;
    private final ExecutorService publicExecutor;
    private final ExecutorService asyncHandlerExecutor;
    protected static final Logger LOG = LoggerFactory.getLogger(VertxRemotingAbstract.class);
    protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
    protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
    private final InterceptorGroup interceptorGroup = new InterceptorGroup();
    private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
    private final Map<Integer, ResponseFuture> ackTables = new ConcurrentHashMap(256);
    private final Map<String, Map<Integer, Pair<RequestProcessor, ExecutorService>>> processorTables = new ConcurrentHashMap();
    private final RemotingCommandFactory remotingCommandFactory = new RemotingCommandFactoryImpl();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: link.thingscloud.vertx.remoting.impl.VertxRemotingAbstract$2, reason: invalid class name */
    /* loaded from: input_file:link/thingscloud/vertx/remoting/impl/VertxRemotingAbstract$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType;
        static final /* synthetic */ int[] $SwitchMap$link$thingscloud$vertx$remoting$impl$NettyChannelEventType = new int[NettyChannelEventType.values().length];

        static {
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$impl$NettyChannelEventType[NettyChannelEventType.CLOSE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$impl$NettyChannelEventType[NettyChannelEventType.CONNECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$impl$NettyChannelEventType[NettyChannelEventType.EXCEPTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType = new int[TrafficType.values().length];
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType[TrafficType.REQUEST_ONEWAY.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType[TrafficType.REQUEST_ASYNC.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType[TrafficType.REQUEST_SYNC.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType[TrafficType.RESPONSE.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:link/thingscloud/vertx/remoting/impl/VertxRemotingAbstract$ChannelEventExecutor.class */
    public class ChannelEventExecutor extends Thread {
        private static final int MAX_SIZE = 10000;
        private final LinkedBlockingQueue<NettyChannelEvent> eventQueue;
        private String name;

        public ChannelEventExecutor(String str) {
            super(str);
            this.eventQueue = new LinkedBlockingQueue<>();
            this.name = str;
        }

        public void putNettyEvent(NettyChannelEvent nettyChannelEvent) {
            if (this.eventQueue.size() <= MAX_SIZE) {
                this.eventQueue.add(nettyChannelEvent);
            } else {
                VertxRemotingAbstract.LOG.warn(String.format("Event queue size[%s] meets the limit, so drop this event %s", Integer.valueOf(this.eventQueue.size()), nettyChannelEvent.toString()));
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            VertxRemotingAbstract.LOG.info(this.name + " service started");
            ChannelEventListenerGroup channelEventListenerGroup = VertxRemotingAbstract.this.channelEventListenerGroup;
            while (true) {
                try {
                    NettyChannelEvent poll = this.eventQueue.poll(3000L, TimeUnit.MILLISECONDS);
                    if (poll != null && channelEventListenerGroup != null) {
                        VertxRemotingAbstract.LOG.info(String.format("Dispatch received channel event, %s", poll));
                        switch (AnonymousClass2.$SwitchMap$link$thingscloud$vertx$remoting$impl$NettyChannelEventType[poll.getType().ordinal()]) {
                            case RemotingSysResponseCode.SYSTEM_ERROR /* 1 */:
                                channelEventListenerGroup.onChannelClose(poll.getChannel());
                                break;
                            case RemotingSysResponseCode.SYSTEM_BUSY /* 2 */:
                                channelEventListenerGroup.onChannelConnect(poll.getChannel());
                                break;
                            case RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED /* 3 */:
                                channelEventListenerGroup.onChannelException(poll.getChannel(), poll.getCause());
                                break;
                        }
                    }
                } catch (Exception e) {
                    VertxRemotingAbstract.LOG.warn("Exception thrown when dispatching channel event", e);
                    return;
                }
            }
        }
    }

    public VertxRemotingAbstract(RemotingConfig remotingConfig) {
        this.semaphoreOneway = new Semaphore(remotingConfig.getOnewayInvokeSemaphore(), true);
        this.semaphoreAsync = new Semaphore(remotingConfig.getAsyncInvokeSemaphore(), true);
        this.publicExecutor = ThreadUtils.newFixedThreadPool(remotingConfig.getPublicExecutorThreads(), 10000, "Remoting-PublicExecutor", true);
        this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool(remotingConfig.getAsyncHandlerExecutorThreads(), 10000, "Remoting-AsyncExecutor", true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void putNettyEvent(NettyChannelEvent nettyChannelEvent) {
        if (this.channelEventListenerGroup == null || this.channelEventListenerGroup.size() == 0) {
            return;
        }
        this.channelEventExecutor.putNettyEvent(nettyChannelEvent);
    }

    public void registerChannelEventListener(ChannelEventListener channelEventListener) {
        this.channelEventListenerGroup.registerChannelEventListener(channelEventListener);
    }

    public void start() {
        startUpHouseKeepingService();
        if (this.channelEventListenerGroup.size() > 0) {
            this.channelEventExecutor.start();
        }
    }

    public void stop() {
        ThreadUtils.shutdownGracefully(this.houseKeepingService, 3000L, TimeUnit.MILLISECONDS);
        ThreadUtils.shutdownGracefully(this.publicExecutor, 2000L, TimeUnit.MILLISECONDS);
        ThreadUtils.shutdownGracefully(this.asyncHandlerExecutor, 2000L, TimeUnit.MILLISECONDS);
        ThreadUtils.shutdownGracefully(this.channelEventExecutor);
    }

    public void registerInterceptor(Interceptor interceptor) {
        this.interceptorGroup.registerInterceptor(interceptor);
    }

    public void registerRequestProcessor(String str, int i, RequestProcessor requestProcessor) {
        registerRequestProcessor(str, i, requestProcessor, this.publicExecutor);
    }

    public void registerRequestProcessor(String str, int i, RequestProcessor requestProcessor, ExecutorService executorService) {
        Map<Integer, Pair<RequestProcessor, ExecutorService>> computeIfAbsent = this.processorTables.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap(8);
        });
        if (computeIfAbsent.containsKey(Integer.valueOf(i))) {
            return;
        }
        computeIfAbsent.put(Integer.valueOf(i), new Pair<>(requestProcessor, executorService));
    }

    public void unregisterRequestProcessor(String str, int i) {
        Map<Integer, Pair<RequestProcessor, ExecutorService>> map = this.processorTables.get(str);
        if (map != null) {
            map.remove(Integer.valueOf(i));
        }
    }

    public Map<Integer, Pair<RequestProcessor, ExecutorService>> processor(String str) {
        return this.processorTables.getOrDefault(str, Collections.emptyMap());
    }

    public Pair<RequestProcessor, ExecutorService> processor(String str, int i) {
        return this.processorTables.getOrDefault(str, Collections.emptyMap()).get(Integer.valueOf(i));
    }

    public RemotingCommandFactory commandFactory() {
        return this.remotingCommandFactory;
    }

    public void invokeAsyncWithInterceptor(RemotingChannel remotingChannel, RemotingCommand remotingCommand, AsyncHandler asyncHandler, long j) {
        remotingCommand.trafficType(TrafficType.REQUEST_ASYNC);
        String extractRemoteAddress = RemotingUtil.extractRemoteAddress(remotingChannel);
        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress, remotingCommand));
        invokeAsync0(extractRemoteAddress, remotingChannel, remotingCommand, asyncHandler, j);
    }

    private void invokeAsync0(String str, RemotingChannel remotingChannel, RemotingCommand remotingCommand, AsyncHandler asyncHandler, long j) {
        if (!this.semaphoreAsync.tryAcquire()) {
            String format = String.format("No available async semaphore to issue the request request %s", remotingCommand.toString());
            requestFail(new ResponseFuture(remotingCommand.requestID(), j, asyncHandler, null), (RemotingRuntimeException) new SemaphoreExhaustedException(format));
            LOG.error(format);
            return;
        }
        int requestID = remotingCommand.requestID();
        ResponseFuture responseFuture = new ResponseFuture(requestID, j, asyncHandler, new SemaphoreReleaseOnlyOnce(this.semaphoreAsync));
        responseFuture.setRequestCommand(remotingCommand);
        responseFuture.setRemoteAddr(str);
        this.ackTables.put(Integer.valueOf(requestID), responseFuture);
        try {
            remotingChannel.reply(remotingCommand, channelFuture -> {
                responseFuture.setSendRequestOK(channelFuture.succeeded());
                if (channelFuture.succeeded()) {
                    return;
                }
                requestFail(requestID, (RemotingRuntimeException) new RemotingAccessException(RemotingUtil.extractRemoteAddress(remotingChannel), channelFuture.cause()));
                LOG.warn(String.format("Send request command to channel %s failed.", str));
            });
        } catch (Exception e) {
            requestFail(requestID, (RemotingRuntimeException) new RemotingAccessException(RemotingUtil.extractRemoteAddress(remotingChannel), e));
            LOG.error("Send request command to channel " + remotingChannel + " error !", e);
        }
    }

    public void invokeOnewayWithInterceptor(RemotingChannel remotingChannel, RemotingCommand remotingCommand) {
        remotingCommand.trafficType(TrafficType.REQUEST_ONEWAY);
        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(remotingChannel), remotingCommand));
        invokeOneway0(remotingChannel, remotingCommand);
    }

    private void invokeOneway0(RemotingChannel remotingChannel, RemotingCommand remotingCommand) {
        if (!this.semaphoreOneway.tryAcquire()) {
            LOG.error(String.format("No available oneway semaphore to issue the request %s", remotingCommand.toString()));
            return;
        }
        SemaphoreReleaseOnlyOnce semaphoreReleaseOnlyOnce = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            SocketAddress remoteAddress = remotingChannel.remoteAddress();
            remotingChannel.reply(remotingCommand, channelFuture -> {
                semaphoreReleaseOnlyOnce.release();
                if (channelFuture.succeeded()) {
                    return;
                }
                LOG.warn(String.format("Send request command to channel %s failed !", remoteAddress));
            });
        } catch (Exception e) {
            semaphoreReleaseOnlyOnce.release();
            LOG.error("Send request command to channel " + remotingChannel + " error !", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessageReceived(RemotingHandlerContext remotingHandlerContext, RemotingCommand remotingCommand) {
        if (remotingCommand != null) {
            switch (AnonymousClass2.$SwitchMap$link$thingscloud$vertx$remoting$api$command$TrafficType[remotingCommand.trafficType().ordinal()]) {
                case RemotingSysResponseCode.SYSTEM_ERROR /* 1 */:
                case RemotingSysResponseCode.SYSTEM_BUSY /* 2 */:
                case RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED /* 3 */:
                    processRequestCommand(remotingHandlerContext, remotingCommand);
                    return;
                case 4:
                    processResponseCommand(remotingHandlerContext, remotingCommand);
                    return;
                default:
                    LOG.warn(String.format("The traffic type %s is NOT supported!", remotingCommand.trafficType()));
                    return;
            }
        }
    }

    private void startUpHouseKeepingService() {
        this.houseKeepingService.scheduleAtFixedRate(new Runnable() { // from class: link.thingscloud.vertx.remoting.impl.VertxRemotingAbstract.1
            @Override // java.lang.Runnable
            public void run() {
                VertxRemotingAbstract.this.scanResponseTable();
            }
        }, 3000L, 1000L, TimeUnit.MICROSECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scanResponseTable() {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Integer, ResponseFuture>> it = this.ackTables.entrySet().iterator();
        while (it.hasNext()) {
            ResponseFuture value = it.next().getValue();
            if (value.getBeginTimestamp() + value.getTimeoutMillis() <= System.currentTimeMillis()) {
                arrayList.add(Integer.valueOf(value.getRequestId()));
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ResponseFuture remove = this.ackTables.remove((Integer) it2.next());
            if (remove != null) {
                LOG.warn("Removes timeout request " + remove.getRequestCommand());
                remove.setCause(new RemotingTimeoutException(String.format("Request to %s timeout", remove.getRemoteAddr()), remove.getTimeoutMillis()));
                executeAsyncHandler(remove);
            }
        }
    }

    public void processRequestCommand(RemotingHandlerContext remotingHandlerContext, RemotingCommand remotingCommand) {
        Map<Integer, Pair<RequestProcessor, ExecutorService>> map = this.processorTables.get(remotingHandlerContext.uri());
        if (map == null) {
            RemotingCommand createResponse = commandFactory().createResponse(remotingCommand);
            createResponse.opCode(3);
            remotingHandlerContext.reply(createResponse);
            LOG.warn(String.format("The uri %s is NOT supported!", remotingHandlerContext.uri()));
            return;
        }
        Pair<RequestProcessor, ExecutorService> pair = map.get(Integer.valueOf(remotingCommand.cmdCode()));
        if (pair == null) {
            RemotingCommand createResponse2 = commandFactory().createResponse(remotingCommand);
            createResponse2.opCode(3);
            remotingHandlerContext.reply(createResponse2);
            LOG.warn(String.format("The command code %s is NOT supported!", Integer.valueOf(remotingCommand.cmdCode())));
            return;
        }
        try {
            ((ExecutorService) pair.getRight()).submit(buildProcessorTask(remotingHandlerContext, remotingCommand, pair));
        } catch (RejectedExecutionException e) {
            LOG.warn(String.format("Request %s from %s is rejected by server executor %s !", remotingCommand, RemotingUtil.extractRemoteAddress(remotingHandlerContext.channel()), ((ExecutorService) pair.getRight()).toString()));
            if (remotingCommand.trafficType() != TrafficType.REQUEST_ONEWAY) {
                RemotingCommand createResponse3 = this.remotingCommandFactory.createResponse(remotingCommand);
                createResponse3.opCode(2);
                createResponse3.remark("SYSTEM_BUSY");
                remotingHandlerContext.reply(createResponse3);
            }
        }
    }

    private void processResponseCommand(RemotingHandlerContext remotingHandlerContext, RemotingCommand remotingCommand) {
        ResponseFuture remove = this.ackTables.remove(Integer.valueOf(remotingCommand.requestID()));
        if (remove == null) {
            LOG.warn(String.format("Response %s from %s doesn't have a matched request!", remotingCommand, RemotingUtil.extractRemoteAddress(remotingHandlerContext.channel())));
            return;
        }
        remove.setResponseCommand(remotingCommand);
        remove.release();
        this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(remotingHandlerContext.channel()), remove.getRequestCommand(), remotingCommand));
        if (remove.getAsyncHandler() != null) {
            executeAsyncHandler(remove);
        } else {
            remove.putResponse(remotingCommand);
            remove.release();
        }
    }

    private Runnable buildProcessorTask(RemotingHandlerContext remotingHandlerContext, RemotingCommand remotingCommand, Pair<RequestProcessor, ExecutorService> pair) {
        return () -> {
            try {
                this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, RemotingUtil.extractRemoteAddress(remotingHandlerContext.channel()), remotingCommand));
                RemotingCommand processRequest = ((RequestProcessor) pair.getLeft()).processRequest(remotingHandlerContext.channel(), remotingCommand);
                this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, RemotingUtil.extractRemoteAddress(remotingHandlerContext.channel()), remotingCommand, processRequest));
                handleResponse(processRequest, remotingCommand, remotingHandlerContext);
            } catch (Throwable th) {
                LOG.error(String.format("Process request %s error !", remotingCommand.toString()), th);
                handleException(th, remotingCommand, remotingHandlerContext);
            }
        };
    }

    private void executeAsyncHandler(ResponseFuture responseFuture) {
        boolean z = false;
        ExecutorService executorService = this.asyncHandlerExecutor;
        if (executorService != null) {
            try {
                executorService.submit(() -> {
                    try {
                        responseFuture.executeAsyncHandler();
                    } catch (Throwable th) {
                        LOG.warn("Execute async handler in specific executor exception, ", th);
                    } finally {
                        responseFuture.release();
                    }
                });
            } catch (Throwable th) {
                z = true;
                LOG.warn("Execute async handler in executor exception, maybe the executor is busy now", th);
            }
        } else {
            z = true;
        }
        try {
            if (z) {
                try {
                    responseFuture.executeAsyncHandler();
                    responseFuture.release();
                } catch (Throwable th2) {
                    LOG.warn("Execute async handler in current thread exception", th2);
                    responseFuture.release();
                }
            }
        } catch (Throwable th3) {
            responseFuture.release();
            throw th3;
        }
    }

    private void requestFail(int i, RemotingRuntimeException remotingRuntimeException) {
        ResponseFuture remove = this.ackTables.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.setSendRequestOK(false);
            remove.putResponse(null);
            remove.setCause(remotingRuntimeException);
            executeAsyncHandler(remove);
        }
    }

    private void requestFail(ResponseFuture responseFuture, RemotingRuntimeException remotingRuntimeException) {
        responseFuture.setCause(remotingRuntimeException);
        executeAsyncHandler(responseFuture);
    }

    private void handleResponse(RemotingCommand remotingCommand, RemotingCommand remotingCommand2, RemotingHandlerContext remotingHandlerContext) {
        if (remotingCommand2.trafficType() == TrafficType.REQUEST_ONEWAY || remotingCommand == null) {
            return;
        }
        try {
            remotingHandlerContext.reply(remotingCommand);
        } catch (Throwable th) {
            LOG.error(String.format("Process request %s success, but transfer response %s failed !", remotingCommand2, remotingCommand), th);
        }
    }

    private void handleException(Throwable th, RemotingCommand remotingCommand, RemotingHandlerContext remotingHandlerContext) {
        if (remotingCommand.trafficType() != TrafficType.REQUEST_ONEWAY) {
            RemotingCommand createResponse = this.remotingCommandFactory.createResponse(remotingCommand);
            createResponse.opCode(1);
            createResponse.remark("SYSTEM_ERROR");
            remotingHandlerContext.reply(createResponse);
        }
    }
}
