package org.tbk.bitcoin.tool.mqtt;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.moquette.broker.Server;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.bitcoinj.core.Block;
import org.bitcoinj.core.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tbk.bitcoin.zeromq.client.MessagePublishService;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/tbk/bitcoin/tool/mqtt/BitcoinMqttServerImpl.class */
public class BitcoinMqttServerImpl extends AbstractIdleService implements BitcoinMqttServer {
    private static final Logger log = LoggerFactory.getLogger(BitcoinMqttServerImpl.class);
    private final String serviceId = Integer.toHexString(System.identityHashCode(this));
    private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("zmq-msg-pub-" + this.serviceId + "-%d").setDaemon(false).build());
    private final Scheduler subscribeOnScheduler = Schedulers.newSingle("mqtt-msg-sub-" + this.serviceId);
    private final String clientId;
    private final Server mqttServer;
    private final MessagePublishService<Block> blockMessagePublisherService;
    private final MessagePublishService<Transaction> transactionMessagePublisherService;
    private Disposable blockSubscription;
    private Disposable transactionSubscription;

    public BitcoinMqttServerImpl(String str, Server server, MessagePublishService<Block> messagePublishService, MessagePublishService<Transaction> messagePublishService2) {
        Objects.requireNonNull(str);
        Preconditions.checkArgument(!str.isBlank(), "'clientId' must not be blank.");
        this.clientId = (String) Objects.requireNonNull(str);
        this.mqttServer = (Server) Objects.requireNonNull(server);
        this.blockMessagePublisherService = (MessagePublishService) Objects.requireNonNull(messagePublishService);
        this.transactionMessagePublisherService = (MessagePublishService) Objects.requireNonNull(messagePublishService2);
    }

    protected void startUp() {
        log.info("starting..");
        this.blockSubscription = Flux.from(this.blockMessagePublisherService).subscribeOn(this.subscribeOnScheduler).subscribe(block -> {
            this.mqttServer.internalPublish(MqttMessageBuilders.publish().topicName("/hashblock").retained(true).qos(MqttQoS.AT_LEAST_ONCE).payload(Unpooled.copiedBuffer(block.getHash().getBytes())).build(), this.clientId);
            this.mqttServer.internalPublish(MqttMessageBuilders.publish().topicName("/rawblock").retained(true).qos(MqttQoS.AT_LEAST_ONCE).payload(Unpooled.copiedBuffer(block.bitcoinSerialize())).build(), this.clientId);
        });
        this.transactionSubscription = Flux.from(this.transactionMessagePublisherService).subscribeOn(this.subscribeOnScheduler).subscribe(transaction -> {
            this.mqttServer.internalPublish(MqttMessageBuilders.publish().topicName("/hashtx").retained(false).qos(MqttQoS.AT_LEAST_ONCE).payload(Unpooled.copiedBuffer(transaction.getTxId().getBytes())).build(), this.clientId);
            this.mqttServer.internalPublish(MqttMessageBuilders.publish().topicName("/rawtx").retained(false).qos(MqttQoS.AT_LEAST_ONCE).payload(Unpooled.copiedBuffer(transaction.bitcoinSerialize())).build(), this.clientId);
        });
        log.info("started successfully");
    }

    protected void shutDown() {
        log.info("terminating..");
        this.blockSubscription.dispose();
        this.transactionSubscription.dispose();
        if (!MoreExecutors.shutdownAndAwaitTermination(this.publisherExecutor, Duration.ofSeconds(10L))) {
            log.warn("unclean shutdown of executor service");
        }
        log.info("terminated");
    }
}
