package io.gridgo.connector.impl;

import io.gridgo.bean.BElement;
import io.gridgo.bean.BObject;
import io.gridgo.connector.Consumer;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.AbstractComponentLifecycle;
import io.gridgo.framework.support.Message;
import io.gridgo.framework.support.Payload;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import lombok.NonNull;
import org.joo.promise4j.Deferred;

/* loaded from: input_file:io/gridgo/connector/impl/AbstractConsumer.class */
public abstract class AbstractConsumer extends AbstractComponentLifecycle implements Consumer {
    private final Collection<BiConsumer<Message, Deferred<Message, Exception>>> subscribers = new CopyOnWriteArrayList();
    private final ConnectorContext context;

    public AbstractConsumer(ConnectorContext connectorContext) {
        this.context = connectorContext;
    }

    @Override // io.gridgo.connector.Consumer
    public void clearSubscribers() {
        this.subscribers.clear();
    }

    protected Message createMessage() {
        return Message.of((Payload) null);
    }

    protected Message createMessage(BElement bElement) {
        return createMessage(BObject.ofEmpty(), bElement);
    }

    protected Message createMessage(@NonNull BObject bObject, BElement bElement) {
        if (bObject == null) {
            throw new NullPointerException("headers is marked @NonNull but is null");
        }
        Payload of = Payload.of(bObject, bElement);
        ensurePayloadId(of);
        return Message.of(of);
    }

    protected void ensurePayloadId(Message message) {
        if (message != null) {
            ensurePayloadId(message.getPayload());
        }
    }

    protected void ensurePayloadId(Payload payload) {
        if (payload == null || !payload.getId().isEmpty()) {
            return;
        }
        payload.setId(this.context.getIdGenerator().generateId());
    }

    private void notifyErrors(Deferred<Message, Exception> deferred, Exception exc) {
        try {
            getLogger().error("Exception caught while publishing message", exc);
            if (deferred != null) {
                deferred.reject(exc);
            }
            getContext().getExceptionHandler().accept(exc);
        } catch (Exception e) {
            getLogger().error("Exception caught while trying to handle exception :(", e);
        }
    }

    private void notifySubscriber(Message message, Deferred<Message, Exception> deferred, BiConsumer<Message, Deferred<Message, Exception>> biConsumer) {
        try {
            biConsumer.accept(message, deferred);
        } catch (Exception e) {
            notifyErrors(deferred, e);
        }
    }

    protected Message parseMessage(BElement bElement) {
        Message parse = Message.parse(bElement);
        ensurePayloadId(parse);
        return parse;
    }

    protected Message parseMessage(byte[] bArr) {
        Message parse = Message.parse(bArr);
        ensurePayloadId(parse);
        return parse;
    }

    protected Message parseMessage(ByteBuffer byteBuffer) {
        Message parse = Message.parse(byteBuffer);
        ensurePayloadId(parse);
        return parse;
    }

    protected Message parseMessage(InputStream inputStream) {
        Message parse = Message.parse(inputStream);
        ensurePayloadId(parse);
        return parse;
    }

    protected void publish(@NonNull Message message, Deferred<Message, Exception> deferred) {
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        message.attachSource(getName());
        for (BiConsumer<Message, Deferred<Message, Exception>> biConsumer : this.subscribers) {
            try {
                this.context.getCallbackInvokerStrategy().execute(() -> {
                    notifySubscriber(message, deferred, biConsumer);
                });
            } catch (Exception e) {
                notifyErrors(deferred, e);
            }
        }
    }

    @Override // io.gridgo.connector.Consumer
    public Consumer subscribe(BiConsumer<Message, Deferred<Message, Exception>> biConsumer) {
        if (!this.subscribers.contains(biConsumer)) {
            this.subscribers.add(biConsumer);
        }
        return this;
    }

    protected Collection<BiConsumer<Message, Deferred<Message, Exception>>> getSubscribers() {
        return this.subscribers;
    }

    public ConnectorContext getContext() {
        return this.context;
    }
}
