package streams.net;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;
import streams.codec.Codec;
import streams.codec.DefaultCodec;
import streams.io.BobCodec;
import streams.logging.Message;
import streams.runtime.Hook;
import streams.runtime.Signals;

/* loaded from: input_file:streams/net/MessageQueue.class */
public class MessageQueue {
    static Logger log = LoggerFactory.getLogger(MessageQueue.class);
    static final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
    static List<Sender> senders = new ArrayList();
    private static Sender sender;

    /* loaded from: input_file:streams/net/MessageQueue$Sender.class */
    public static class Sender extends Thread {
        final Codec<Data> mc;
        private final int port;
        DataOutputStream out;
        final String host;
        BufferedReader in;
        final LinkedBlockingQueue<Message> messages;
        boolean running;

        public Sender(String str, int i) {
            this(str, i, new LinkedBlockingQueue());
        }

        public Sender(String str, int i, LinkedBlockingQueue<Message> linkedBlockingQueue) {
            this.mc = new DefaultCodec();
            this.running = false;
            this.host = str;
            this.port = i;
            this.messages = linkedBlockingQueue;
            setDaemon(true);
        }

        protected boolean connect() {
            try {
                Socket connect = SecureConnect.connect(this.host, this.port);
                try {
                    this.out = new DataOutputStream(connect.getOutputStream());
                    this.in = new BufferedReader(new InputStreamReader(connect.getInputStream()));
                    return true;
                } catch (IOException e) {
                    MessageQueue.log.error("Error while creating output and input readers using socket connection: {}", e.toString());
                    return false;
                }
            } catch (Exception e2) {
                MessageQueue.log.error("Connection could have not been build to {}:{}\nError message: {}", new Object[]{this.host, Integer.valueOf(this.port), e2.toString()});
                return false;
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.running = true;
            if (!MessageQueue.senders.contains(this)) {
                MessageQueue.senders.add(this);
            }
            while (true) {
                if (!this.running && this.messages.isEmpty()) {
                    return;
                }
                try {
                    Message take = this.messages.take();
                    if (take != null) {
                        send(take);
                    }
                    MessageQueue.log.debug("Sending message " + take);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public void send(Message message) {
            try {
                if ((this.out == null || this.in == null) && !connect()) {
                    MessageQueue.log.error("Connection could not have been established.");
                    return;
                }
                byte[] encode = this.mc.encode(DataFactory.create(message));
                MessageQueue.log.debug("Encoded message to " + encode.length + " bytes");
                MessageQueue.log.debug(BobCodec.writeBlock(encode, this.out) + " bytes written to socket...");
                this.out.flush();
            } catch (Exception e) {
                MessageQueue.log.error("Error while writing message to output stream: {}", e.toString());
            }
        }

        public int messagesPending() {
            return this.messages.size();
        }

        public void add(Message message) {
            this.messages.add(message);
        }
    }

    /* loaded from: input_file:streams/net/MessageQueue$Shutdown.class */
    public static class Shutdown extends Thread implements Hook {
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            signal(0);
        }

        public void signal(int i) {
            if (i == 0) {
                MessageQueue.log.info("Shutting down message queue...");
                for (Sender sender : MessageQueue.senders) {
                    sender.running = false;
                    while (sender != null && !MessageQueue.messages.isEmpty()) {
                        try {
                            MessageQueue.log.info("Waiting for sender to finish ({} messages pending)...", Integer.valueOf(MessageQueue.messages.size()));
                            sender.join(1000L);
                        } catch (Exception e) {
                            MessageQueue.log.error("Waiting for sender was interrupted: " + e);
                        }
                    }
                }
            }
        }
    }

    public static void add(Message message) {
        if (sender != null) {
            messages.offer(message);
        }
    }

    static {
        String property = System.getProperty("rlog.host");
        log.info("Initializing global MessageQueue rlog {}", property);
        if (property == null) {
            log.info("'rlog.host' not set, disabling rlog-sender");
            sender = null;
        } else {
            sender = new Sender(property, 6001, messages);
            log.info("rlog.host={} using port={}", property, 6001);
            sender.setDaemon(true);
            sender.start();
        }
        Signals.register(new Shutdown());
    }
}
