package org.tbk.bitcoin.zeromq.client;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/tbk/bitcoin/zeromq/client/MessagePublishService.class */
public final class MessagePublishService<T> extends AbstractIdleService implements Publisher<T> {
    private static final Logger log = LoggerFactory.getLogger(MessagePublishService.class);
    private final String serviceId = Integer.toHexString(System.identityHashCode(this));
    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zmq-msg-pub-" + this.serviceId + "-%d").setDaemon(false).build());
    private final SubmissionPublisher<T> publisher = new SubmissionPublisher<>(this.publisherExecutor, Flow.defaultBufferSize());
    private final Scheduler subscribeOnScheduler = Schedulers.newSingle("zmq-msg-sub-" + this.serviceId);
    private final MessagePublisherFactory<T> bitcoinMessagePublisher;
    private Disposable subscription;

    public MessagePublishService(MessagePublisherFactory<T> messagePublisherFactory) {
        this.bitcoinMessagePublisher = (MessagePublisherFactory) Objects.requireNonNull(messagePublisherFactory);
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        this.publisher.subscribe(FlowAdapters.toFlowSubscriber(subscriber));
    }

    protected String serviceName() {
        return String.format("%s-%s-%s", super.serviceName(), this.bitcoinMessagePublisher.getTopicName(), this.serviceId);
    }

    protected void startUp() {
        log.info("starting..");
        Flux subscribeOn = this.bitcoinMessagePublisher.create().subscribeOn(this.subscribeOnScheduler);
        SubmissionPublisher<T> submissionPublisher = this.publisher;
        Objects.requireNonNull(submissionPublisher);
        this.subscription = subscribeOn.subscribe(submissionPublisher::submit);
        log.info("started successfully");
    }

    protected void shutDown() {
        log.info("terminating..");
        this.subscription.dispose();
        this.subscribeOnScheduler.dispose();
        this.publisher.close();
        if (!MoreExecutors.shutdownAndAwaitTermination(this.publisherExecutor, Duration.ofSeconds(10L))) {
            log.warn("unclean shutdown of executor service");
        }
        log.info("terminated");
    }
}
