package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.kafka.KafkaOutputFormat;
import org.apache.hadoop.hive.kafka.RetryUtils;
import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafkaesqueesque.clients.consumer.OffsetAndMetadata;
import org.apache.kafkaesqueesque.common.TopicPartition;
import org.apache.kafkaesqueesque.common.errors.ProducerFencedException;
import org.apache.kafkaesqueesque.common.serialization.ByteArrayDeserializer;
import org.apache.kafkaesqueesque.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/kafka/KafkaStorageHandler.class */
public class KafkaStorageHandler extends DefaultHiveMetaHook implements HiveStorageHandler {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStorageHandler.class);
    private static final String KAFKA_STORAGE_HANDLER = "org.apache.hadoop.hive.kafka.KafkaStorageHandler";
    private Configuration configuration;
    private static final String KAFKA_PREFIX = "kafka:";

    public Class<? extends InputFormat> getInputFormatClass() {
        return KafkaInputFormat.class;
    }

    public Class<? extends OutputFormat> getOutputFormatClass() {
        return KafkaOutputFormat.class;
    }

    public Class<? extends AbstractSerDe> getSerDeClass() {
        return KafkaSerDe.class;
    }

    public HiveMetaHook getMetaHook() {
        return this;
    }

    public HiveAuthorizationProvider getAuthorizationProvider() {
        return new DefaultHiveAuthorizationProvider();
    }

    public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
        configureCommonProperties(tableDesc, map);
    }

    private void configureCommonProperties(TableDesc tableDesc, Map<String, String> map) {
        String property = tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName(), "");
        if (property.isEmpty()) {
            throw new IllegalArgumentException("Kafka topic missing set table property->" + KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
        }
        map.put(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName(), property);
        String property2 = tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(), "");
        if (property2.isEmpty()) {
            throw new IllegalArgumentException("Broker address missing set table property->" + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        }
        map.put(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(), property2);
        Arrays.stream(KafkaTableProperties.values()).filter(kafkaTableProperties -> {
            return !kafkaTableProperties.isMandatory();
        }).forEach(kafkaTableProperties2 -> {
        });
        if (map.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName()).equals(KafkaOutputFormat.WriteSemantic.EXACTLY_ONCE.name())) {
            map.put("kafka.consumer.isolation.level", "read_committed");
        }
    }

    public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> map) {
    }

    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
        configureCommonProperties(tableDesc, map);
    }

    public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {
        configureInputJobProperties(tableDesc, map);
        configureOutputJobProperties(tableDesc, map);
    }

    public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
        HashMap hashMap = new HashMap();
        configureInputJobProperties(tableDesc, hashMap);
        configureOutputJobProperties(tableDesc, hashMap);
        jobConf.getClass();
        hashMap.forEach(jobConf::set);
        try {
            KafkaUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void setConf(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConf() {
        return this.configuration;
    }

    public String toString() {
        return KAFKA_STORAGE_HANDLER;
    }

    public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
        String str = (String) table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
        if (str == null || str.isEmpty()) {
            throw new MetaException("topic is null or empty");
        }
        String str2 = (String) table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        if (str2 == null || str2.isEmpty()) {
            throw new MetaException("kafka brokers string is null or empty");
        }
        Properties properties = new Properties();
        properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("bootstrap.servers", str2);
        properties.setProperty("client.id", Utilities.getTaskId(getConf()));
        if (UserGroupInformation.isSecurityEnabled()) {
            KafkaUtils.addKerberosJaasConf(getConf(), properties);
        }
        table.getParameters().entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).toLowerCase().startsWith("kafka.consumer");
        }).forEach(entry2 -> {
            String substring = ((String) entry2.getKey()).substring("kafka.consumer".length() + 1);
            if (KafkaUtils.FORBIDDEN_PROPERTIES.contains(substring)) {
                throw new IllegalArgumentException("Not suppose to set Kafka Property " + substring);
            }
            properties.put(substring, entry2.getValue());
        });
        return new KafkaStorageHandlerInfo(str, properties);
    }

    public URI getURIForAuth(Table table) throws URISyntaxException {
        Map tableProperties = HiveCustomStorageHandlerUtils.getTableProperties(table);
        String str = tableProperties.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName()) != null ? (String) tableProperties.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName()) : this.configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        Preconditions.checkNotNull(str, "Set Table property " + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        String str2 = (String) tableProperties.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
        Preconditions.checkNotNull(str2, "Set Table property " + KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
        return new URI("kafka://" + str + "/" + str2);
    }

    private Properties buildProducerProperties(Table table) {
        String str = (String) table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
        if (str == null || str.isEmpty()) {
            throw new RuntimeException("kafka brokers string is null or empty");
        }
        Properties properties = new Properties();
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("bootstrap.servers", str);
        if (UserGroupInformation.isSecurityEnabled()) {
            KafkaUtils.addKerberosJaasConf(getConf(), properties);
        }
        table.getParameters().entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).toLowerCase().startsWith("kafka.producer");
        }).forEach(entry2 -> {
            String substring = ((String) entry2.getKey()).substring("kafka.producer".length() + 1);
            if (KafkaUtils.FORBIDDEN_PROPERTIES.contains(substring)) {
                throw new IllegalArgumentException("Not suppose to set Kafka Property " + substring);
            }
            properties.put(substring, entry2.getValue());
        });
        return properties;
    }

    public LockType getLockType(WriteEntity writeEntity) {
        return writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT) ? LockType.SHARED_READ : LockType.SHARED_WRITE;
    }

    private String getQueryId() {
        return HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID);
    }

    public void commitInsertTable(Table table, boolean z) throws MetaException {
        boolean equals = ((String) table.getParameters().get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName())).equals(KafkaOutputFormat.WriteSemantic.EXACTLY_ONCE.name());
        boolean z2 = !Boolean.parseBoolean((String) table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName()));
        if (equals && z2) {
            final Path queryWorkingDir = getQueryWorkingDir(table);
            int parseInt = Integer.parseInt((String) table.getParameters().get(KafkaTableProperties.MAX_RETRIES.getName()));
            try {
                final Map map = (Map) RetryUtils.retry(new RetryUtils.Task<Map<String, Pair<Long, Short>>>() { // from class: org.apache.hadoop.hive.kafka.KafkaStorageHandler.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.hadoop.hive.kafka.RetryUtils.Task
                    public Map<String, Pair<Long, Short>> perform() throws Exception {
                        return TransactionalKafkaWriter.getTransactionsState(FileSystem.get(KafkaStorageHandler.this.getConf()), queryWorkingDir);
                    }
                }, th -> {
                    return th instanceof IOException;
                }, parseInt);
                final Properties buildProducerProperties = buildProducerProperties(table);
                final HashMap hashMap = new HashMap();
                RetryUtils.Task<Void> task = new RetryUtils.Task<Void>() { // from class: org.apache.hadoop.hive.kafka.KafkaStorageHandler.2
                    static final /* synthetic */ boolean $assertionsDisabled;

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.hadoop.hive.kafka.RetryUtils.Task
                    public Void perform() throws Exception {
                        if (!$assertionsDisabled && hashMap.size() != 0) {
                            throw new AssertionError();
                        }
                        Map map2 = map;
                        Properties properties = buildProducerProperties;
                        Map map3 = hashMap;
                        map2.forEach((str, pair) -> {
                            properties.setProperty("transactional.id", str);
                            HiveKafkaProducer hiveKafkaProducer = new HiveKafkaProducer(properties);
                            hiveKafkaProducer.resumeTransaction(((Long) pair.getLeft()).longValue(), ((Short) pair.getRight()).shortValue());
                            hiveKafkaProducer.sendOffsetsToTransaction((Map<TopicPartition, OffsetAndMetadata>) ImmutableMap.of(), "__dry_run");
                            map3.put(str, hiveKafkaProducer);
                        });
                        return null;
                    }

                    static {
                        $assertionsDisabled = !KafkaStorageHandler.class.desiredAssertionStatus();
                    }
                };
                RetryUtils.CleanupAfterFailure cleanupAfterFailure = new RetryUtils.CleanupAfterFailure() { // from class: org.apache.hadoop.hive.kafka.KafkaStorageHandler.3
                    @Override // org.apache.hadoop.hive.kafka.RetryUtils.CleanupAfterFailure
                    public void cleanup() {
                        hashMap.forEach((str, hiveKafkaProducer) -> {
                            hiveKafkaProducer.close(Duration.ofMillis(0L));
                        });
                        hashMap.clear();
                    }
                };
                Predicate predicate = th2 -> {
                    return (KafkaUtils.exceptionIsFatal(th2) || (th2 instanceof ProducerFencedException)) ? false : true;
                };
                try {
                    RetryUtils.retry(task, predicate, cleanupAfterFailure, parseInt, "Error while Builing Producers");
                    final HashSet hashSet = new HashSet();
                    try {
                        RetryUtils.retry(new RetryUtils.Task() { // from class: org.apache.hadoop.hive.kafka.KafkaStorageHandler.4
                            @Override // org.apache.hadoop.hive.kafka.RetryUtils.Task
                            public Object perform() throws Exception {
                                Map map2 = hashMap;
                                Set set = hashSet;
                                map2.forEach((str, hiveKafkaProducer) -> {
                                    if (set.contains(str)) {
                                        return;
                                    }
                                    hiveKafkaProducer.commitTransaction();
                                    set.add(str);
                                    hiveKafkaProducer.close();
                                    KafkaStorageHandler.LOG.info("Committed Transaction [{}]", str);
                                });
                                return null;
                            }
                        }, predicate, parseInt);
                        try {
                            RetryUtils.retry(new RetryUtils.Task<Void>() { // from class: org.apache.hadoop.hive.kafka.KafkaStorageHandler.5
                                /* JADX WARN: Can't rename method to resolve collision */
                                @Override // org.apache.hadoop.hive.kafka.RetryUtils.Task
                                public Void perform() throws Exception {
                                    KafkaStorageHandler.this.cleanWorkingDirectory(queryWorkingDir);
                                    return null;
                                }
                            }, th3 -> {
                                return th3 instanceof IOException;
                            }, parseInt);
                        } catch (Exception e) {
                            LOG.error("Faild to clean Query Working Directory [{}] due to [{}]", queryWorkingDir, e.getMessage());
                        }
                    } catch (Exception e2) {
                        hashMap.forEach((str, hiveKafkaProducer) -> {
                            hiveKafkaProducer.close(Duration.ofMillis(0L));
                        });
                        LOG.error("Commit transaction failed", e2);
                        if (hashSet.size() > 0) {
                            LOG.error("Partial Data Got Commited Some actions need to be Done");
                            hashSet.stream().forEach(str2 -> {
                                LOG.error("Transaction [{}] is an orphen commit", str2);
                            });
                        }
                        throw new MetaException(e2.getMessage());
                    }
                } catch (Exception e3) {
                    LOG.error("Can not fetch build produces due [{}]", e3);
                    throw new MetaException(e3.getMessage());
                }
            } catch (Exception e4) {
                LOG.error("Can not fetch Transaction states due [{}]", e4.getMessage());
                throw new MetaException(e4.getMessage());
            }
        }
    }

    public void preInsertTable(Table table, boolean z) throws MetaException {
        if (z) {
            throw new MetaException("Kafa Table does not support the overwite SQL Smentic");
        }
    }

    public void rollbackInsertTable(Table table, boolean z) throws MetaException {
    }

    public void preCreateTable(Table table) throws MetaException {
        if (!table.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
            throw new MetaException("org.apache.hadoop.hive.kafka.KafkaStorageHandler supports only " + TableType.EXTERNAL_TABLE);
        }
        Arrays.stream(KafkaTableProperties.values()).filter((v0) -> {
            return v0.isMandatory();
        }).forEach(kafkaTableProperties -> {
        });
        Arrays.stream(KafkaTableProperties.values()).forEach(kafkaTableProperties2 -> {
            if (table.getParameters().get(kafkaTableProperties2.getName()) == null) {
                table.putToParameters(kafkaTableProperties2.getName(), kafkaTableProperties2.getDefaultValue());
            }
        });
    }

    public void rollbackCreateTable(Table table) throws MetaException {
    }

    public void commitCreateTable(Table table) throws MetaException {
        commitInsertTable(table, false);
    }

    public void preDropTable(Table table) throws MetaException {
    }

    public void rollbackDropTable(Table table) throws MetaException {
    }

    public void commitDropTable(Table table, boolean z) throws MetaException {
    }

    private Path getQueryWorkingDir(Table table) {
        return new Path(table.getSd().getLocation(), getQueryId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanWorkingDirectory(Path path) throws IOException {
        FileSystem.get(getConf()).delete(path, true);
    }
}
