package stream.net;

import java.net.ServerSocket;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jfree.base.log.LogConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.annotations.Parameter;
import stream.io.Sink;

/* loaded from: input_file:stream/net/DataTapSink.class */
public class DataTapSink implements Sink {
    protected Integer port = 9100;
    protected int clientBufferSize = 10;
    protected boolean gzip = false;
    protected boolean detectClientClose = false;
    protected boolean disconnectSlowClients = false;
    protected boolean logBufferFull = false;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DataTapSink.class);
    protected ConnectionHandler connectionHandler;
    protected String id;

    @Override // stream.io.Sink
    public String getId() {
        return this.id;
    }

    @Override // stream.io.Sink
    public void setId(String str) {
        this.id = str;
    }

    public Integer getPort() {
        return this.port;
    }

    @Parameter(description = "The port to listen on for incoming tap connections, defaults to 9100.")
    public void setPort(Integer num) {
        this.port = num;
    }

    public int getClientBufferSize() {
        return this.clientBufferSize;
    }

    @Parameter(description = "The buffer size (number of items) used for each client", defaultValue = "10")
    public void setClientBufferSize(int i) {
        this.clientBufferSize = i;
    }

    public boolean isGzip() {
        return this.gzip;
    }

    @Parameter(description = "This parameter allows for enabling GZIP compression on the TCP stream, default is no compression.")
    public void setGzip(boolean z) {
        this.gzip = z;
    }

    public boolean isActivelyDetectClientClose() {
        return this.detectClientClose;
    }

    @Parameter(required = false, defaultValue = LogConfiguration.DISABLE_LOGGING_DEFAULT, description = "Defines if this sink actively listens to client disconnect events (= client's input stream read-method returns '-1').If true, this sink will close the connection immediately if the event occurs.Otherwise a client disconnect will be detected (and the connection will also be closed)the next time, an item should be transferred to the client (and therefore the client's output stream write-method is unsuccessful).")
    public void setDetectClientClose(boolean z) {
        this.detectClientClose = z;
    }

    public boolean isDisconnectSlowClients() {
        return this.disconnectSlowClients;
    }

    @Parameter(required = false, defaultValue = LogConfiguration.DISABLE_LOGGING_DEFAULT, description = "Defines if slow clients should be disconnected. A client is defined to be 'slow', if its buffer is completely filled.")
    public void setDisconnectSlowClients(boolean z) {
        this.disconnectSlowClients = z;
    }

    @Override // stream.io.Sink
    public void init() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        this.connectionHandler = new ConnectionHandler(newCachedThreadPool, new ServerSocket(this.port.intValue()));
        this.connectionHandler.init(this.clientBufferSize, this.gzip, this.disconnectSlowClients);
        newCachedThreadPool.execute(this.connectionHandler);
    }

    @Override // stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        Iterator<Data> it = collection.iterator();
        while (it.hasNext()) {
            write(it.next());
        }
        return true;
    }

    @Override // stream.io.Sink
    public boolean write(Data data) throws Exception {
        if (data == null) {
            return true;
        }
        this.connectionHandler.write(data);
        return true;
    }

    @Override // stream.io.Sink
    public void close() throws Exception {
        this.connectionHandler.close();
    }
}
