package io.tidb.bigdata.flink.connector.source.enumerator;

import io.tidb.bigdata.flink.connector.source.split.TiDBSourceSplit;
import io.tidb.bigdata.tidb.ClientConfig;
import io.tidb.bigdata.tidb.ClientSession;
import io.tidb.bigdata.tidb.SplitManagerInternal;
import io.tidb.bigdata.tidb.TableHandleInternal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.meta.TiTimestamp;

/* loaded from: input_file:io/tidb/bigdata/flink/connector/source/enumerator/TiDBSourceSplitEnumerator.class */
public class TiDBSourceSplitEnumerator implements SplitEnumerator<TiDBSourceSplit, TiDBSourceSplitEnumState> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TiDBSourceSplitEnumerator.class);
    private final Map<String, String> properties;
    private final SplitEnumeratorContext<TiDBSourceSplit> context;
    private final Map<Integer, Set<TiDBSourceSplit>> pendingSplitAssignment;
    private final Set<Integer> assignedReaders;
    private final Set<Integer> notifiedReaders;
    private final Set<TiDBSourceSplit> assignedSplits;
    private TiTimestamp timestamp;

    public TiDBSourceSplitEnumerator(Map<String, String> map, SplitEnumeratorContext<TiDBSourceSplit> splitEnumeratorContext) {
        this(map, splitEnumeratorContext, Collections.emptySet());
    }

    public TiDBSourceSplitEnumerator(Map<String, String> map, SplitEnumeratorContext<TiDBSourceSplit> splitEnumeratorContext, Set<TiDBSourceSplit> set) {
        this.properties = map;
        this.context = splitEnumeratorContext;
        this.assignedSplits = new HashSet(set);
        this.pendingSplitAssignment = new HashMap();
        this.assignedReaders = new HashSet();
        this.notifiedReaders = new HashSet();
        initPendingSplitAssignment();
    }

    private void assignPendingSplits(Set<Integer> set) {
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Set<TiDBSourceSplit> remove = this.pendingSplitAssignment.remove(Integer.valueOf(intValue));
            if (remove != null && !remove.isEmpty()) {
                ((List) hashMap.computeIfAbsent(Integer.valueOf(intValue), num -> {
                    return new ArrayList();
                })).addAll(remove);
                this.assignedSplits.addAll(remove);
            }
            this.assignedReaders.add(Integer.valueOf(intValue));
        }
        if (!hashMap.isEmpty()) {
            LOG.info("Assigning splits to readers {}", hashMap);
            this.context.assignSplits(new SplitsAssignment<>(hashMap));
        }
        Iterator<Integer> it2 = this.assignedReaders.iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            if (!this.notifiedReaders.contains(Integer.valueOf(intValue2)) && this.context.registeredReaders().containsKey(Integer.valueOf(intValue2))) {
                this.context.signalNoMoreSplits(intValue2);
                this.notifiedReaders.add(Integer.valueOf(intValue2));
            }
        }
    }

    public void initPendingSplitAssignment() {
        try {
            ClientSession createWithSingleConnection = ClientSession.createWithSingleConnection(new ClientConfig(this.properties));
            try {
                String str = this.properties.get("tidb.database.name");
                String str2 = this.properties.get("tidb.table.name");
                createWithSingleConnection.getTableMust(str, str2);
                this.timestamp = createWithSingleConnection.getTimestamp();
                List list = (List) new SplitManagerInternal(createWithSingleConnection).getSplits(new TableHandleInternal(UUID.randomUUID().toString(), str, str2), this.timestamp).stream().map(TiDBSourceSplit::new).collect(Collectors.toList());
                int currentParallelism = this.context.currentParallelism();
                for (int i = 0; i < list.size(); i++) {
                    this.pendingSplitAssignment.computeIfAbsent(Integer.valueOf(i % currentParallelism), num -> {
                        return new HashSet();
                    }).add((TiDBSourceSplit) list.get(i));
                }
                if (createWithSingleConnection != null) {
                    createWithSingleConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void start() {
    }

    public TiTimestamp getTimestamp() {
        return this.timestamp;
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void handleSplitRequest(int i, @Nullable String str) {
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void addSplitsBack(List<TiDBSourceSplit> list, int i) {
        this.pendingSplitAssignment.computeIfAbsent(Integer.valueOf(i), num -> {
            return new HashSet();
        }).addAll(list);
        this.notifiedReaders.remove(Integer.valueOf(i));
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void addReader(int i) {
        LOG.debug("Adding reader {} to TiDBSourceSplitEnumerator", Integer.valueOf(i));
        assignPendingSplits(Collections.singleton(Integer.valueOf(i)));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public TiDBSourceSplitEnumState snapshotState(long j) {
        return new TiDBSourceSplitEnumState(this.assignedSplits);
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.flink.api.connector.source.SplitEnumerator
    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        super.handleSourceEvent(i, sourceEvent);
    }
}
