package com.xlmkit.springboot.iot;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xlmkit.springboot.iot.prototype.MqttAdapter;
import com.xlmkit.springboot.iot.prototype.OutMqttPahoMessageHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/xlmkit/springboot/iot/IotService.class */
public class IotService implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(IotService.class);

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private IotServiceConfig config;
    private DirectChannel mqttInputChannel;
    private DirectChannel mqttOutputChannel;
    private MqttPahoMessageDrivenChannelAdapter adapter;
    private List<ProvideConfig> provideList;
    private IotProxyHandler iotProxyHandler;
    private Class[] uses;
    private Set<Role> roles = new HashSet();
    private MqttConnectOptions options = new MqttConnectOptions();
    private DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
    private List<ServiceInfo> serviceInfos = new ArrayList();
    private HashMap<String, MethodInvoker> methodInfoMap = new HashMap<>();
    private ConcurrentHashMap<String, ArrayBlockingQueue<String>> returnConcurrentHashMap = new ConcurrentHashMap<>();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public IotService(IotServiceConfig iotServiceConfig) {
        this.config = iotServiceConfig;
        initOptions();
        this.mqttInputChannel = new DirectChannel();
        this.mqttOutputChannel = new DirectChannel();
        this.mqttInputChannel.subscribe(this);
    }

    private void initOptions() {
        this.options.setServerURIs(new String[]{this.config.getHost()});
        this.options.setUserName(this.config.getUsername());
        this.options.setPassword(this.config.getPassword().toCharArray());
        this.clientFactory.setConnectionOptions(this.options);
    }

    @PostConstruct
    public void init() {
        start();
    }

    public void start() {
        this.iotProxyHandler.setIotService(this);
        if (this.uses.length > 0) {
            this.roles.add(Role.CONSUMER);
        }
        Iterator<ProvideConfig> it = this.provideList.iterator();
        while (it.hasNext()) {
            provide(it.next());
        }
        for (ServiceInfo serviceInfo : this.serviceInfos) {
            serviceInfo.setTarget(this.applicationContext.getBean(serviceInfo.getType()));
            for (MethodInvoker methodInvoker : serviceInfo.getMethodInfos()) {
                log.info("provide={}", methodInvoker);
                this.methodInfoMap.put(methodInvoker.getPath(), methodInvoker);
            }
        }
        log.info("config={}", JSON.toJSONString(this.config, true));
        String username = this.config.getUsername();
        if (this.roles.size() > 0) {
            Iterator<Role> it2 = this.roles.iterator();
            while (it2.hasNext()) {
                username = username + "-" + it2.next().name();
            }
        }
        String str = username + "-" + System.currentTimeMillis();
        ArrayList arrayList = new ArrayList(this.config.getTopices());
        if (this.roles.contains(Role.CONSUMER)) {
            arrayList.add("/#");
        }
        if (this.roles.contains(Role.PROVIDER)) {
            arrayList.add("/proservs/" + this.config.getUsername() + "/#");
        }
        arrayList.add("/clients/" + str + "/#");
        this.adapter = (MqttPahoMessageDrivenChannelAdapter) this.applicationContext.getBean(MqttAdapter.class, new Object[]{str, this.clientFactory, this.mqttInputChannel, arrayList});
        OutMqttPahoMessageHandler outMqttPahoMessageHandler = (OutMqttPahoMessageHandler) this.applicationContext.getBean(OutMqttPahoMessageHandler.class, new Object[]{str + "-2", this.clientFactory});
        outMqttPahoMessageHandler.setAsync(true);
        this.mqttOutputChannel.subscribe(outMqttPahoMessageHandler);
        outMqttPahoMessageHandler.start();
        this.adapter.start();
        log.info("started");
    }

    public void provide(ProvideConfig provideConfig) {
        this.roles.add(Role.PROVIDER);
        for (Class cls : provideConfig.getValue()) {
            this.serviceInfos.add(new ServiceInfo(provideConfig.getPathRule(), cls));
        }
    }

    public void handleMessage(Message<?> message) throws MessagingException {
        if (log.isDebugEnabled()) {
            log.info("message={}", message);
        }
        String obj = message.getHeaders().getOrDefault("mqtt_receivedTopic", "").toString();
        if (log.isDebugEnabled()) {
            log.info("receivedTopic={}", obj);
            log.info("payload={}", message.getPayload());
        }
        if (StringUtils.isEmpty(obj)) {
            return;
        }
        if (obj.startsWith("/return/")) {
            ArrayBlockingQueue<String> arrayBlockingQueue = this.returnConcurrentHashMap.get(obj);
            if (arrayBlockingQueue == null) {
                log.error("return超时,topic={}", obj);
                return;
            } else {
                arrayBlockingQueue.add(message.getPayload().toString());
                return;
            }
        }
        if (!obj.startsWith("/proservs/")) {
            log.error("无法提供服务1,topic={}", obj);
            return;
        }
        MethodMatcher of = MethodMatcher.of(obj);
        if (of == null) {
            log.error("无效服务2,topic={}", obj);
            return;
        }
        if (log.isDebugEnabled()) {
            log.info("matcher,matcher={}", of);
        }
        MethodInvoker methodInvoker = this.methodInfoMap.get(of.getMethod());
        if (methodInvoker == null) {
            log.error("无效服务3,topic={}", obj);
            return;
        }
        JSONObject parseObject = JSON.parseObject(message.getPayload().toString());
        String string = parseObject.getString("_returnTopic");
        if (string == null) {
            log.error("无效服务4,topic={}", obj);
            return;
        }
        if (log.isDebugEnabled()) {
            log.info("methodInvoker,methodInvoker={}", methodInvoker);
        }
        String invoke = methodInvoker.invoke(parseObject);
        if (log.isDebugEnabled()) {
            log.info("responseBody={}", invoke);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("mqtt_topic", string);
        hashMap.put("mqtt_qos", 0);
        hashMap.put("mqtt_retained", false);
        hashMap.put("mqtt_duplicate", false);
        this.mqttOutputChannel.send(new GenericMessage(invoke, hashMap));
    }

    public <T> T use(Class<?> cls) {
        this.roles.add(Role.CONSUMER);
        return (T) Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class[]{cls}, (obj, method, objArr) -> {
            return Object.class.equals(method.getDeclaringClass()) ? method.invoke(this, objArr) : rpc(method, objArr);
        });
    }

    public Object rpc(Method method, Object[] objArr) {
        String str = "/proservs/" + objArr[0] + "/prod/" + method.getDeclaringClass().getName() + "/" + method.getName();
        String str2 = "/return/" + objArr[0] + "/" + UUID.randomUUID();
        HashMap hashMap = new HashMap();
        hashMap.put("mqtt_topic", str);
        hashMap.put("mqtt_qos", 0);
        hashMap.put("mqtt_retained", false);
        hashMap.put("mqtt_duplicate", false);
        JSONObject jSONObject = (JSONObject) JSON.toJSON(objArr[1]);
        jSONObject.put("_returnTopic", str2);
        GenericMessage genericMessage = new GenericMessage(jSONObject.toJSONString(), hashMap);
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        this.returnConcurrentHashMap.put(str2, arrayBlockingQueue);
        log.info("rpc={}", str);
        try {
            try {
                Object parseObject = JSONObject.parseObject((String) this.executorService.submit(() -> {
                    return call(arrayBlockingQueue, genericMessage);
                }).get(5L, TimeUnit.SECONDS), method.getReturnType());
                log.info("rpc结束={}", str);
                this.returnConcurrentHashMap.remove(str2);
                return parseObject;
            } catch (Throwable th) {
                log.error("rpc失败,topic={},returnTopic={}", new Object[]{str, str2, th});
                JSONObject jSONObject2 = new JSONObject();
                jSONObject2.put("result_code", "FAIL");
                jSONObject2.put("error_code", "LOCAL:" + th.getClass().getSimpleName());
                jSONObject2.put("error_des", th.getMessage());
                Object javaObject = jSONObject2.toJavaObject(method.getReturnType());
                log.info("rpc结束={}", str);
                this.returnConcurrentHashMap.remove(str2);
                return javaObject;
            }
        } catch (Throwable th2) {
            log.info("rpc结束={}", str);
            this.returnConcurrentHashMap.remove(str2);
            throw th2;
        }
    }

    private String call(ArrayBlockingQueue<String> arrayBlockingQueue, GenericMessage<String> genericMessage) {
        this.executorService.submit(() -> {
            return Boolean.valueOf(this.mqttOutputChannel.send(genericMessage));
        });
        return arrayBlockingQueue.poll(5L, TimeUnit.SECONDS);
    }

    public void stop() {
        this.adapter.stop();
    }

    public void subscribe(String str) {
    }

    public List<Map<String, Object>> provideList() {
        ArrayList arrayList = new ArrayList();
        for (ServiceInfo serviceInfo : this.serviceInfos) {
            HashMap hashMap = new HashMap();
            hashMap.put("type", serviceInfo.getType());
            hashMap.put("methodSize", Integer.valueOf(serviceInfo.getMethodInfos().size()));
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    public List<ProvideConfig> getProvideList() {
        return this.provideList;
    }

    public void setProvideList(List<ProvideConfig> list) {
        this.provideList = list;
    }

    public IotProxyHandler getIotProxyHandler() {
        return this.iotProxyHandler;
    }

    public void setIotProxyHandler(IotProxyHandler iotProxyHandler) {
        this.iotProxyHandler = iotProxyHandler;
    }

    public Class[] getUses() {
        return this.uses;
    }

    public void setUses(Class[] clsArr) {
        this.uses = clsArr;
    }
}
