package com.github.sakserv.minicluster.impl;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import com.github.sakserv.minicluster.MiniCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/sakserv/minicluster/impl/StormLocalCluster.class */
public class StormLocalCluster implements MiniCluster {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StormLocalCluster.class);
    private String zookeeperHost;
    private Long zookeeperPort;
    private Boolean enableDebug;
    private Integer numWorkers;
    private Config stormConf;
    private LocalCluster localCluster;

    /* loaded from: input_file:com/github/sakserv/minicluster/impl/StormLocalCluster$Builder.class */
    public static class Builder {
        private String zookeeperHost;
        private Long zookeeperPort;
        private Boolean enableDebug;
        private Integer numWorkers;
        private Config stormConf;

        public Builder setZookeeperHost(String str) {
            this.zookeeperHost = str;
            return this;
        }

        public Builder setZookeeperPort(Long l) {
            this.zookeeperPort = l;
            return this;
        }

        public Builder setEnableDebug(Boolean bool) {
            this.enableDebug = bool;
            return this;
        }

        public Builder setNumWorkers(Integer num) {
            this.numWorkers = num;
            return this;
        }

        public Builder setStormConfig(Config config) {
            this.stormConf = config;
            return this;
        }

        public StormLocalCluster build() {
            StormLocalCluster stormLocalCluster = new StormLocalCluster(this);
            validateObject(stormLocalCluster);
            return stormLocalCluster;
        }

        public void validateObject(StormLocalCluster stormLocalCluster) {
            if (stormLocalCluster.getZookeeperHost() == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Zookeeper Host");
            }
            if (stormLocalCluster.getZookeeperPort() == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Zookeeper Port");
            }
            if (stormLocalCluster.getEnableDebug() == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Enable Debug");
            }
            if (stormLocalCluster.getNumWorkers() == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Num Workers");
            }
            if (stormLocalCluster.getStormConf() == null) {
                throw new IllegalArgumentException("ERROR: Missing required config: Storm Config");
            }
        }
    }

    private StormLocalCluster(Builder builder) {
        this.zookeeperHost = builder.zookeeperHost;
        this.zookeeperPort = builder.zookeeperPort;
        this.enableDebug = builder.enableDebug;
        this.numWorkers = builder.numWorkers;
        this.stormConf = builder.stormConf;
    }

    public String getZookeeperHost() {
        return this.zookeeperHost;
    }

    public Long getZookeeperPort() {
        return this.zookeeperPort;
    }

    public Boolean getEnableDebug() {
        return this.enableDebug;
    }

    public Integer getNumWorkers() {
        return this.numWorkers;
    }

    public Config getStormConf() {
        return this.stormConf;
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void start() throws Exception {
        LOG.info("STORM: Starting StormLocalCluster");
        configure();
        this.localCluster = new LocalCluster(this.zookeeperHost, this.zookeeperPort);
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void stop() throws Exception {
        stop(true);
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void stop(boolean z) throws Exception {
        LOG.info("STORM: Stopping StormLocalCluster");
        this.localCluster.shutdown();
        if (z) {
            cleanUp();
        }
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void configure() throws Exception {
        this.stormConf.setDebug(this.enableDebug.booleanValue());
        this.stormConf.setNumWorkers(this.numWorkers.intValue());
    }

    @Override // com.github.sakserv.minicluster.MiniCluster
    public void cleanUp() throws Exception {
    }

    public void submitTopology(String str, Config config, StormTopology stormTopology) {
        this.localCluster.submitTopology(str, config, stormTopology);
    }

    public void stop(String str) throws Exception {
        this.localCluster.killTopology(str);
        stop();
    }
}
