package cascading.local.tap.kafka;

import cascading.flow.FlowProcess;
import cascading.local.tap.kafka.decorator.ForwardingConsumer;
import cascading.property.PropertyUtil;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TupleEntrySchemeCollector;
import cascading.tuple.TupleEntrySchemeIterator;
import cascading.util.CloseableIterator;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/local/tap/kafka/KafkaTap.class */
public class KafkaTap<K, V> extends Tap<Properties, KafkaConsumerRecordIterator<K, V>, Producer<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTap.class);
    public static final Properties CONSUME_AUTO_COMMIT_LATEST = new Properties() { // from class: cascading.local.tap.kafka.KafkaTap.1
        {
            setProperty("enable.auto.commit", "true");
            setProperty("auto.commit.interval.ms", "1000");
            setProperty("auto.offset.reset", "latest");
        }
    };
    public static final Properties CONSUME_AUTO_COMMIT_EARLIEST = new Properties() { // from class: cascading.local.tap.kafka.KafkaTap.2
        {
            setProperty("enable.auto.commit", "true");
            setProperty("auto.commit.interval.ms", "1000");
            setProperty("auto.offset.reset", "earliest");
        }
    };
    public static final Properties PRODUCE_ACK_ALL_NO_RETRY = new Properties() { // from class: cascading.local.tap.kafka.KafkaTap.3
        {
            setProperty("acks", "all");
            setProperty("retries", "0");
        }
    };
    public static final long DEFAULT_POLL_TIMEOUT = 10000;
    public static final short DEFAULT_REPLICATION_FACTOR = 1;
    public static final int DEFAULT_NUM_PARTITIONS = 1;
    Properties defaultProperties;
    String hostname;
    String[] topics;
    boolean isTopicPattern;
    int numPartitions;
    short replicationFactor;
    String clientID;
    String groupID;
    long pollTimeout;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: cascading.local.tap.kafka.KafkaTap$4, reason: invalid class name */
    /* loaded from: input_file:cascading/local/tap/kafka/KafkaTap$4.class */
    public class AnonymousClass4 implements CloseableIterator<Iterator<ConsumerRecord<K, V>>> {
        boolean completed = false;
        ConsumerRecords<K, V> records;
        final /* synthetic */ Consumer val$consumer;

        AnonymousClass4(Consumer consumer) {
            this.val$consumer = consumer;
        }

        public boolean hasNext() {
            if (this.records != null) {
                return true;
            }
            if (this.completed) {
                return false;
            }
            this.records = this.val$consumer.poll(KafkaTap.this.pollTimeout);
            if (KafkaTap.LOG.isDebugEnabled()) {
                KafkaTap.LOG.debug("kafka records polled: {}", Integer.valueOf(this.records.count()));
            }
            if (this.records.isEmpty()) {
                this.completed = true;
                this.records = null;
            }
            return this.records != null;
        }

        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Iterator<ConsumerRecord<K, V>> m1next() {
            if (!hasNext()) {
                throw new NoSuchElementException("no more elements");
            }
            try {
                return new KafkaConsumerRecordIterator<K, V>() { // from class: cascading.local.tap.kafka.KafkaTap.4.1
                    Iterator<ConsumerRecord<K, V>> delegate;
                    Supplier<Boolean> hasNext = () -> {
                        return Boolean.valueOf(this.delegate.hasNext());
                    };

                    {
                        this.delegate = AnonymousClass4.this.records.iterator();
                    }

                    @Override // cascading.local.tap.kafka.KafkaConsumerRecordIterator
                    protected Consumer<K, V> getConsumer() {
                        return AnonymousClass4.this.val$consumer;
                    }

                    public void close() throws IOException {
                        this.hasNext = () -> {
                            return false;
                        };
                        this.close();
                    }

                    public boolean hasNext() {
                        return this.hasNext.get().booleanValue();
                    }

                    /* renamed from: next, reason: merged with bridge method [inline-methods] */
                    public ConsumerRecord<K, V> m2next() {
                        return this.delegate.next();
                    }
                };
            } finally {
                this.records = null;
            }
        }

        public void close() {
            try {
                try {
                    this.val$consumer.close();
                } catch (IllegalStateException e) {
                    KafkaTap.LOG.error("ignoring exception on closing", e);
                }
            } finally {
                this.completed = true;
            }
        }
    }

    public static URI makeURI(String str, String... strArr) {
        if (str == null) {
            throw new IllegalArgumentException("hostname may not be null");
        }
        Arrays.sort(strArr);
        try {
            return new URI("kafka", str, "/" + Util.join(",", strArr), null, null);
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri) {
        this(properties, (KafkaScheme) kafkaScheme, uri, DEFAULT_POLL_TIMEOUT, 1, (short) 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, long j) {
        this((KafkaScheme) kafkaScheme, uri, j, 1, (short) 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, int i, short s) {
        this(kafkaScheme, uri, DEFAULT_POLL_TIMEOUT, i, s);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, long j, int i, short s) {
        this((Properties) null, kafkaScheme, uri, j, i, s);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri) {
        this((KafkaScheme) kafkaScheme, uri, DEFAULT_POLL_TIMEOUT, 1, (short) 1);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, long j) {
        this(properties, (KafkaScheme) kafkaScheme, uri, j, 1, (short) 1);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, int i, short s) {
        this(properties, kafkaScheme, uri, DEFAULT_POLL_TIMEOUT, i, s);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, long j, int i, short s) {
        this(properties, kafkaScheme, uri, (String) null, j, i, s);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str) {
        this(properties, (KafkaScheme) kafkaScheme, uri, str, DEFAULT_POLL_TIMEOUT, 1, (short) 1);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, String str2) {
        this(properties, (KafkaScheme) kafkaScheme, uri, str, str2, DEFAULT_POLL_TIMEOUT, 1, (short) 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, long j) {
        this((KafkaScheme) kafkaScheme, uri, str, j, 1, (short) 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, int i, short s) {
        this(kafkaScheme, uri, str, DEFAULT_POLL_TIMEOUT, i, s);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, long j, int i, short s) {
        this((Properties) null, kafkaScheme, uri, str, j, i, s);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str) {
        this((KafkaScheme) kafkaScheme, uri, str, DEFAULT_POLL_TIMEOUT, 1, (short) 1);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, String str2) {
        this((Properties) null, (KafkaScheme) kafkaScheme, uri, str, str2, DEFAULT_POLL_TIMEOUT, 1, (short) 1);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, long j) {
        this(properties, (KafkaScheme) kafkaScheme, uri, str, j, 1, (short) 1);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, int i, short s) {
        this(properties, kafkaScheme, uri, str, DEFAULT_POLL_TIMEOUT, i, s);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, long j, int i, short s) {
        this(properties, kafkaScheme, uri, str, (String) null, j, i, s);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, URI uri, String str, String str2, long j, int i, short s) {
        super(kafkaScheme, SinkMode.UPDATE);
        this.defaultProperties = PropertyUtil.merge(new Properties[]{CONSUME_AUTO_COMMIT_EARLIEST, PRODUCE_ACK_ALL_NO_RETRY});
        this.isTopicPattern = false;
        this.numPartitions = 1;
        this.replicationFactor = (short) 1;
        this.clientID = null;
        this.groupID = Tap.id(this);
        this.pollTimeout = DEFAULT_POLL_TIMEOUT;
        if (properties != null) {
            this.defaultProperties = new Properties(properties);
        }
        if (uri == null) {
            throw new IllegalArgumentException("identifier may not be null");
        }
        if (!uri.getScheme().equalsIgnoreCase("kafka")) {
            throw new IllegalArgumentException("identifier does not have kafka scheme");
        }
        this.hostname = uri.getHost();
        if (uri.getPort() != -1) {
            this.hostname += ":" + uri.getPort();
        }
        if (uri.getQuery() == null) {
            throw new IllegalArgumentException("must have at least one topic in the query part of the URI");
        }
        if (str != null) {
            this.clientID = str;
        }
        if (str2 != null) {
            this.groupID = str2;
        }
        this.pollTimeout = j;
        this.numPartitions = i;
        this.replicationFactor = s;
        applyTopics(uri.getQuery().split(","));
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, String str, long j, String... strArr) {
        this((KafkaScheme) kafkaScheme, str, j, 1, (short) 1, strArr);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, String str, long j, int i, short s, String... strArr) {
        this((Properties) null, kafkaScheme, str, j, i, s, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, int i, short s, String... strArr) {
        this(properties, kafkaScheme, str, DEFAULT_POLL_TIMEOUT, i, s, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String... strArr) {
        this(properties, (KafkaScheme) kafkaScheme, str, DEFAULT_POLL_TIMEOUT, 1, (short) 1, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, long j, String... strArr) {
        this(properties, (KafkaScheme) kafkaScheme, str, j, 1, (short) 1, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, long j, int i, short s, String... strArr) {
        this(properties, kafkaScheme, str, (String) null, j, i, s, strArr);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, String... strArr) {
        this((KafkaScheme) kafkaScheme, str, str2, DEFAULT_POLL_TIMEOUT, 1, (short) 1, strArr);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, long j, String... strArr) {
        this((KafkaScheme) kafkaScheme, str, str2, j, 1, (short) 1, strArr);
    }

    public KafkaTap(KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, long j, int i, short s, String... strArr) {
        this((Properties) null, kafkaScheme, str, str2, j, i, s, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, int i, short s, String... strArr) {
        this(properties, kafkaScheme, str, str2, DEFAULT_POLL_TIMEOUT, i, s, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, String... strArr) {
        this(properties, (KafkaScheme) kafkaScheme, str, str2, DEFAULT_POLL_TIMEOUT, 1, (short) 1, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, long j, String... strArr) {
        this(properties, (KafkaScheme) kafkaScheme, str, str2, j, 1, (short) 1, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, long j, int i, short s, String... strArr) {
        this(properties, kafkaScheme, str, str2, null, j, i, s, strArr);
    }

    public KafkaTap(Properties properties, KafkaScheme<K, V, ?, ?> kafkaScheme, String str, String str2, String str3, long j, int i, short s, String... strArr) {
        super(kafkaScheme, SinkMode.UPDATE);
        this.defaultProperties = PropertyUtil.merge(new Properties[]{CONSUME_AUTO_COMMIT_EARLIEST, PRODUCE_ACK_ALL_NO_RETRY});
        this.isTopicPattern = false;
        this.numPartitions = 1;
        this.replicationFactor = (short) 1;
        this.clientID = null;
        this.groupID = Tap.id(this);
        this.pollTimeout = DEFAULT_POLL_TIMEOUT;
        if (properties != null) {
            this.defaultProperties = new Properties(properties);
        }
        this.hostname = str;
        if (str2 != null) {
            this.clientID = str2;
        }
        if (str3 != null) {
            this.groupID = str3;
        }
        this.pollTimeout = j;
        this.numPartitions = i;
        this.replicationFactor = s;
        applyTopics(strArr);
    }

    protected void applyTopics(String[] strArr) {
        if (strArr[0].matches("^/([^/]|//)*/$")) {
            this.topics = new String[]{strArr[0].substring(1, strArr[0].length() - 1)};
            this.isTopicPattern = true;
        } else {
            this.topics = new String[strArr.length];
            System.arraycopy(strArr, 0, this.topics, 0, strArr.length);
        }
    }

    public String getHostname() {
        return this.hostname;
    }

    public String getClientID() {
        return this.clientID;
    }

    public String getGroupID() {
        return this.groupID;
    }

    public String[] getTopics() {
        return this.topics;
    }

    public boolean isTopicPattern() {
        return this.isTopicPattern;
    }

    public String getIdentifier() {
        return makeURI(this.hostname, this.topics).toString();
    }

    protected Consumer<K, V> createKafkaConsumer(Properties properties) {
        return new ForwardingConsumer(properties);
    }

    public TupleEntryIterator openForRead(FlowProcess<? extends Properties> flowProcess, KafkaConsumerRecordIterator<K, V> kafkaConsumerRecordIterator) throws IOException {
        Properties merge = PropertyUtil.merge(new Properties[]{(Properties) flowProcess.getConfig(), this.defaultProperties});
        merge.setProperty("bootstrap.servers", this.hostname);
        Set<String> stringPropertyNames = merge.stringPropertyNames();
        if (this.clientID != null && !stringPropertyNames.contains("client.id")) {
            merge.setProperty("client.id", this.clientID);
        }
        if (!stringPropertyNames.contains("group.id")) {
            merge.setProperty("group.id", this.groupID);
        }
        sourceConfInit(flowProcess, merge);
        Consumer<K, V> createKafkaConsumer = createKafkaConsumer(PropertyUtil.retain(merge, ConsumerConfig.configNames()));
        preConsumerSubscribe(createKafkaConsumer);
        if (this.isTopicPattern) {
            createKafkaConsumer.subscribe(Pattern.compile(this.topics[0]), getConsumerRebalanceListener(createKafkaConsumer));
        } else {
            createKafkaConsumer.subscribe(Arrays.asList(getTopics()), getConsumerRebalanceListener(createKafkaConsumer));
        }
        postConsumerSubscribe(createKafkaConsumer);
        return new TupleEntrySchemeIterator(flowProcess, this, getScheme(), new AnonymousClass4(createKafkaConsumer));
    }

    protected void preConsumerSubscribe(Consumer<K, V> consumer) {
    }

    protected void postConsumerSubscribe(Consumer<K, V> consumer) {
    }

    protected ConsumerRebalanceListener getConsumerRebalanceListener(Consumer<K, V> consumer) {
        return new NoOpConsumerRebalanceListener();
    }

    public TupleEntryCollector openForWrite(FlowProcess<? extends Properties> flowProcess, Producer<K, V> producer) throws IOException {
        Properties merge = PropertyUtil.merge(new Properties[]{(Properties) flowProcess.getConfig(), this.defaultProperties});
        merge.setProperty("bootstrap.servers", this.hostname);
        sinkConfInit(flowProcess, merge);
        return new TupleEntrySchemeCollector(flowProcess, this, getScheme(), new KafkaProducer(PropertyUtil.retain(merge, ProducerConfig.configNames())));
    }

    protected AdminClient createAdminClient(Properties properties) {
        Properties properties2 = new Properties(properties);
        properties2.setProperty("bootstrap.servers", this.hostname);
        return AdminClient.create(properties2);
    }

    public boolean createResource(Properties properties) {
        AdminClient createAdminClient = createAdminClient(properties);
        ArrayList arrayList = new ArrayList(getTopics().length);
        for (String str : getTopics()) {
            arrayList.add(new NewTopic(str, this.numPartitions, this.replicationFactor));
        }
        try {
            createAdminClient.createTopics(arrayList).all().get();
            return true;
        } catch (InterruptedException | ExecutionException e) {
            LOG.info("unable to create topics");
            return false;
        }
    }

    public boolean deleteResource(Properties properties) {
        try {
            createAdminClient(properties).deleteTopics(Arrays.asList(getTopics())).all().get();
            return true;
        } catch (InterruptedException | ExecutionException e) {
            LOG.info("unable to create topics");
            return false;
        }
    }

    public boolean resourceExists(Properties properties) {
        try {
            return ((Map) createAdminClient(properties).describeTopics(Arrays.asList(getTopics())).all().get()).size() == getTopics().length;
        } catch (InterruptedException | ExecutionException e) {
            LOG.info("unable to create topics");
            return false;
        }
    }

    public long getModifiedTime(Properties properties) throws IOException {
        return resourceExists(properties) ? Long.MAX_VALUE : 0L;
    }

    public /* bridge */ /* synthetic */ TupleEntryCollector openForWrite(FlowProcess flowProcess, Object obj) throws IOException {
        return openForWrite((FlowProcess<? extends Properties>) flowProcess, (Producer) obj);
    }

    public /* bridge */ /* synthetic */ TupleEntryIterator openForRead(FlowProcess flowProcess, Object obj) throws IOException {
        return openForRead((FlowProcess<? extends Properties>) flowProcess, (KafkaConsumerRecordIterator) obj);
    }
}
