package io.gridgo.extras.flink;

import io.gridgo.connector.Connector;
import io.gridgo.connector.ConnectorFactory;
import io.gridgo.connector.Producer;
import io.gridgo.connector.impl.factories.DefaultConnectorFactory;
import io.gridgo.framework.support.Message;
import io.gridgo.utils.helper.Loggable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/* loaded from: input_file:io/gridgo/extras/flink/GridgoConnectorSink.class */
public class GridgoConnectorSink extends RichSinkFunction<Message> implements Loggable {
    private static final ConnectorFactory DEFAULT_FACTORY = new DefaultConnectorFactory();
    private static final long serialVersionUID = -5571127197760351635L;
    private String endpoint;
    private transient Connector connector;

    public GridgoConnectorSink(String str) {
        this.endpoint = str;
    }

    public void invoke(Message message) throws Exception {
        ((Producer) this.connector.getProducer().orElseThrow()).sendWithAck(message).fail(exc -> {
            getLogger().error("Exception caught while sending message", exc);
        });
    }

    public void open(Configuration configuration) throws Exception {
        this.connector = DEFAULT_FACTORY.createConnector(this.endpoint);
    }

    public void close() throws Exception {
        if (this.connector != null) {
            this.connector.stop();
        }
    }
}
