package org.apache.spark.sql.mlsql.sources.hbase.wal;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.slf4j.Logger;
import org.spark_project.guava.cache.CacheBuilder;
import org.spark_project.guava.cache.CacheLoader;
import org.spark_project.guava.cache.LoadingCache;
import scala.Function0;
import scala.Function1;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.ResizableArray;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;
import tech.mlsql.binlog.common.HDFSContext;
import tech.mlsql.common.utils.log.Logging;

/* compiled from: HBaseWALClient.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0005e\u0001B\u0001\u0003\u0001M\u0011a\u0002\u0013\"bg\u0016<\u0016\tT\"mS\u0016tGO\u0003\u0002\u0004\t\u0005\u0019q/\u00197\u000b\u0005\u00151\u0011!\u00025cCN,'BA\u0004\t\u0003\u001d\u0019x.\u001e:dKNT!!\u0003\u0006\u0002\u000b5d7/\u001d7\u000b\u0005-a\u0011aA:rY*\u0011QBD\u0001\u0006gB\f'o\u001b\u0006\u0003\u001fA\ta!\u00199bG\",'\"A\t\u0002\u0007=\u0014xm\u0001\u0001\u0014\u0007\u0001!\"\u0004\u0005\u0002\u001615\taCC\u0001\u0018\u0003\u0015\u00198-\u00197b\u0013\tIbC\u0001\u0004B]f\u0014VM\u001a\t\u00037\u0015j\u0011\u0001\b\u0006\u0003;y\t1\u0001\\8h\u0015\ty\u0002%A\u0003vi&d7O\u0003\u0002\"E\u000511m\\7n_:T!!C\u0012\u000b\u0003\u0011\nA\u0001^3dQ&\u0011a\u0005\b\u0002\b\u0019><w-\u001b8h\u0011!A\u0003A!A!\u0002\u0013I\u0013AC<bY2{w\rU1uQB\u0011!&\f\b\u0003+-J!\u0001\f\f\u0002\rA\u0013X\rZ3g\u0013\tqsF\u0001\u0004TiJLgn\u001a\u0006\u0003YYA\u0001\"\r\u0001\u0003\u0002\u0003\u0006I!K\u0001\u000e_2$w+\u0011'M_\u001e\u0004\u0016\r\u001e5\t\u0011M\u0002!\u0011!Q\u0001\nQ\n\u0011b\u001d;beR$\u0016.\\3\u0011\u0005U)\u0014B\u0001\u001c\u0017\u0005\u0011auN\\4\t\u0011a\u0002!\u0011!Q\u0001\ne\nAaY8oMB\u0011!HP\u0007\u0002w)\u0011\u0001\b\u0010\u0006\u0003{9\ta\u0001[1e_>\u0004\u0018BA <\u00055\u0019uN\u001c4jOV\u0014\u0018\r^5p]\")\u0011\t\u0001C\u0001\u0005\u00061A(\u001b8jiz\"RaQ#G\u000f\"\u0003\"\u0001\u0012\u0001\u000e\u0003\tAQ\u0001\u000b!A\u0002%BQ!\r!A\u0002%BQa\r!A\u0002QBQ\u0001\u000f!A\u0002eBqA\u0013\u0001C\u0002\u0013\u00051*A\u0004sK\u0006$WM]:\u0016\u00031\u00032!\u0014*U\u001b\u0005q%BA(Q\u0003\u001diW\u000f^1cY\u0016T!!\u0015\f\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002T\u001d\nY\u0011I\u001d:bs\n+hMZ3s!\t!U+\u0003\u0002W\u0005\ti\u0001+\u0019;i\u0003:$'+Z1eKJDa\u0001\u0017\u0001!\u0002\u0013a\u0015\u0001\u0003:fC\u0012,'o\u001d\u0011\t\u000fi\u0003!\u0019!C\u0001\u0017\u0006Yq\u000e\u001c3`e\u0016\fG-\u001a:t\u0011\u0019a\u0006\u0001)A\u0005\u0019\u0006aq\u000e\u001c3`e\u0016\fG-\u001a:tA!9a\f\u0001b\u0001\n\u0003y\u0016AD3wK:$H*[:uK:,'o]\u000b\u0002AB\u0019QJU1\u0011\u0005\u0011\u0013\u0017BA2\u0003\u0005UA%)Y:f/\u0006cUI^3oi2K7\u000f^3oKJDa!\u001a\u0001!\u0002\u0013\u0001\u0017aD3wK:$H*[:uK:,'o\u001d\u0011\t\u000f\u001d\u0004!\u0019!C\u0001Q\u0006aq/\u00197E_:,g)\u001b7fgV\t\u0011\u000e\u0005\u0003kc&JS\"A6\u000b\u00051l\u0017!B2bG\",'B\u00018p\u0003\u00159W/\u0019<b\u0015\t\u0001\b#A\u0007ta\u0006\u00148n\u00189s_*,7\r^\u0005\u0003e.\u0014A\u0002T8bI&twmQ1dQ\u0016Da\u0001\u001e\u0001!\u0002\u0013I\u0017!D<bY\u0012{g.\u001a$jY\u0016\u001c\b\u0005C\u0004w\u0001\t\u0007I\u0011\u00015\u0002\u001f=dGmV!M\t>tWMR5mKNDa\u0001\u001f\u0001!\u0002\u0013I\u0017\u0001E8mI^\u000bE\nR8oK\u001aKG.Z:!\u0011\u001dQ\bA1A\u0005\u0002m\fa\"Y2uSZ,w+\u00197GS2,7/F\u0001}!\riU0K\u0005\u0003}:\u0013q\u0001S1tQN+G\u000fC\u0004\u0002\u0002\u0001\u0001\u000b\u0011\u0002?\u0002\u001f\u0005\u001cG/\u001b<f/\u0006dg)\u001b7fg\u0002Bq!!\u0002\u0001\t\u0003\t9!A\u0004d_:tWm\u0019;\u0015\u0005\u0005%\u0001cA\u000b\u0002\f%\u0019\u0011Q\u0002\f\u0003\tUs\u0017\u000e\u001e\u0005\b\u0003#\u0001A\u0011BA\u0004\u000311W\r^2i\u001f2$w+\u0011't\u0011\u001d\t)\u0002\u0001C\u0005\u0003/\tq\"\u001b;fe\u0006$XMU3bI\u001aKG.\u001a\u000b\t\u0003\u0013\tI\"a\u0007\u0002\u001e!1!*a\u0005A\u00021CaaZA\n\u0001\u0004I\u0007\u0002CA\u0010\u0003'\u0001\r!!\t\u0002!%<gn\u001c:f\u0003\u000e$\u0018N^3GS2,\u0007cA\u000b\u0002$%\u0019\u0011Q\u0005\f\u0003\u000f\t{w\u000e\\3b]\"9\u0011\u0011\u0006\u0001\u0005\n\u0005\u001d\u0011!\u00024fi\u000eD\u0007bBA\u0017\u0001\u0011\u0005\u0011qF\u0001\u000bI&\u001c8i\u001c8oK\u000e$XCAA\u0005\u0011\u001d\t\u0019\u0004\u0001C\u0005\u0003k\t1!\\1q)\u0019\tI!a\u000e\u0002P!A\u0011\u0011HA\u0019\u0001\u0004\tY$A\u0003f]R\u0014\u0018\u0010\u0005\u0003\u0002>\u0005%c\u0002BA \u0003\u000bj!!!\u0011\u000b\u0007\r\t\u0019E\u0003\u0002\u0006y%!\u0011qIA!\u0003\r9\u0016\tT\u0005\u0005\u0003\u0017\niEA\u0003F]R\u0014\u0018P\u0003\u0003\u0002H\u0005\u0005\u0003\u0002CA)\u0003c\u0001\r!a\u0015\u0002\u0015\r|G\u000e\\3di\u00163H\u000fE\u0004\u0016\u0003+\nI&!\u0003\n\u0007\u0005]cCA\u0005Gk:\u001cG/[8ocA1\u00111LA6\u0003crA!!\u0018\u0002h9!\u0011qLA3\u001b\t\t\tGC\u0002\u0002dI\ta\u0001\u0010:p_Rt\u0014\"A\f\n\u0007\u0005%d#A\u0004qC\u000e\\\u0017mZ3\n\t\u00055\u0014q\u000e\u0002\u0004'\u0016\f(bAA5-A\u0019A)a\u001d\n\u0007\u0005U$A\u0001\tSC^D%)Y:f/\u0006cUI^3oi\"9\u0011\u0011\u0010\u0001\u0005\u0002\u0005m\u0014\u0001\u0003:fO&\u001cH/\u001a:\u0015\t\u0005%\u0011Q\u0010\u0005\b\u0003\u007f\n9\b1\u0001b\u00035)g/\u001a8u\u0019&\u001cH/\u001a8fe\u0002")
/* loaded from: input_file:org/apache/spark/sql/mlsql/sources/hbase/wal/HBaseWALClient.class */
public class HBaseWALClient implements Logging {
    private final String walLogPath;
    private final String oldWALLogPath;
    public final Configuration org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$conf;
    private final ArrayBuffer<PathAndReader> readers;
    private final ArrayBuffer<PathAndReader> old_readers;
    private final ArrayBuffer<HBaseWALEventListener> eventListeners;
    private final LoadingCache<String, String> walDoneFiles;
    private final LoadingCache<String, String> oldWALDoneFiles;
    private final HashSet<String> activeWalFiles;
    private transient Logger tech$mlsql$common$utils$log$Logging$$log_;

    public Logger tech$mlsql$common$utils$log$Logging$$log_() {
        return this.tech$mlsql$common$utils$log$Logging$$log_;
    }

    public void tech$mlsql$common$utils$log$Logging$$log__$eq(Logger logger) {
        this.tech$mlsql$common$utils$log$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public ArrayBuffer<PathAndReader> readers() {
        return this.readers;
    }

    public ArrayBuffer<PathAndReader> old_readers() {
        return this.old_readers;
    }

    public ArrayBuffer<HBaseWALEventListener> eventListeners() {
        return this.eventListeners;
    }

    public LoadingCache<String, String> walDoneFiles() {
        return this.walDoneFiles;
    }

    public LoadingCache<String, String> oldWALDoneFiles() {
        return this.oldWALDoneFiles;
    }

    public HashSet<String> activeWalFiles() {
        return this.activeWalFiles;
    }

    public void connect() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        while (!atomicBoolean.get()) {
            activeWalFiles().clear();
            fetch();
            fetchOldWALs();
        }
    }

    private void fetchOldWALs() {
        String str = this.oldWALLogPath;
        if (str == null) {
            if ("" == 0) {
                return;
            }
        } else if (str.equals("")) {
            return;
        }
        Path path = new Path(this.oldWALLogPath);
        HDFSContext hDFSContext = new HDFSContext(path, this.org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$conf);
        RemoteIterator listStatus = hDFSContext.fc().listStatus(path);
        ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        while (listStatus.hasNext()) {
            apply.$plus$eq(((FileStatus) listStatus.next()).getPath());
        }
        ((ResizableArray) ((ArrayBuffer) ((SeqLike) apply.filterNot(new HBaseWALClient$$anonfun$1(this))).sortBy(new HBaseWALClient$$anonfun$2(this), Ordering$String$.MODULE$)).filterNot(new HBaseWALClient$$anonfun$fetchOldWALs$1(this))).foreach(new HBaseWALClient$$anonfun$fetchOldWALs$2(this, hDFSContext));
        iterateReadFile(old_readers(), oldWALDoneFiles(), false);
        old_readers().foreach(new HBaseWALClient$$anonfun$fetchOldWALs$3(this));
        old_readers().clear();
    }

    private void iterateReadFile(ArrayBuffer<PathAndReader> arrayBuffer, LoadingCache<String, String> loadingCache, boolean z) {
        arrayBuffer.foreach(new HBaseWALClient$$anonfun$iterateReadFile$1(this, loadingCache, z));
    }

    private void fetch() {
        Path path = new Path(this.walLogPath);
        HDFSContext hDFSContext = new HDFSContext(path, this.org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$conf);
        RemoteIterator listStatus = hDFSContext.fc().listStatus(path);
        while (listStatus.hasNext()) {
            FileStatus fileStatus = (FileStatus) listStatus.next();
            RemoteIterator listStatus2 = hDFSContext.fc().listStatus(fileStatus.getPath());
            ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
            while (listStatus2.hasNext()) {
                apply.$plus$eq(((FileStatus) listStatus2.next()).getPath());
            }
            ArrayBuffer arrayBuffer = (ArrayBuffer) ((SeqLike) apply.filterNot(new HBaseWALClient$$anonfun$4(this))).sortBy(new HBaseWALClient$$anonfun$5(this), Ordering$String$.MODULE$);
            if (arrayBuffer.size() > 0) {
                activeWalFiles().$plus$eq(((Path) arrayBuffer.last()).toString());
                ((ResizableArray) arrayBuffer.filterNot(new HBaseWALClient$$anonfun$fetch$1(this))).foreach(new HBaseWALClient$$anonfun$fetch$2(this, hDFSContext, fileStatus));
            }
        }
        iterateReadFile(readers(), walDoneFiles(), true);
        readers().foreach(new HBaseWALClient$$anonfun$fetch$3(this));
        readers().clear();
    }

    public void disConnect() {
    }

    public void org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$map(WAL.Entry entry, Function1<Seq<RawHBaseWALEvent>, BoxedUnit> function1) {
        WALKeyImpl key = entry.getKey();
        WALEdit edit = entry.getEdit();
        String namespaceAsString = key.getTableName().getNamespaceAsString();
        String nameAsString = key.getTableName().getNameAsString();
        long sequenceId = key.getSequenceId();
        String str = new String(key.getEncodedRegionName(), Charset.forName("utf-8"));
        long writeTime = key.getWriteTime();
        ObjectRef create = ObjectRef.create((Object) null);
        ObjectRef create2 = ObjectRef.create((Object) null);
        ObjectRef create3 = ObjectRef.create((Object) null);
        ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        ((IterableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(edit.getCells()).asScala()).filterNot(new HBaseWALClient$$anonfun$org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$map$1(this))).foreach(new HBaseWALClient$$anonfun$org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$map$2(this, namespaceAsString, nameAsString, sequenceId, str, writeTime, create, create2, create3, apply));
        if (((Put) create.elem) == null) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            apply.$plus$eq(new RawHBaseWALEvent((Put) create.elem, null, namespaceAsString, nameAsString, new RawHBaseEventOffset(str, sequenceId), writeTime));
        }
        if (((Delete) create2.elem) == null) {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            apply.$plus$eq(new RawHBaseWALEvent(null, (Delete) create2.elem, namespaceAsString, nameAsString, new RawHBaseEventOffset(str, sequenceId), writeTime));
        }
        function1.apply(apply.toSeq());
    }

    public void register(HBaseWALEventListener hBaseWALEventListener) {
        eventListeners().$plus$eq(hBaseWALEventListener);
    }

    public HBaseWALClient(String str, String str2, long j, Configuration configuration) {
        this.walLogPath = str;
        this.oldWALLogPath = str2;
        this.org$apache$spark$sql$mlsql$sources$hbase$wal$HBaseWALClient$$conf = configuration;
        Logging.class.$init$(this);
        this.readers = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        this.old_readers = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        this.eventListeners = ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        this.walDoneFiles = CacheBuilder.newBuilder().maximumSize(100000L).expireAfterWrite(60L, TimeUnit.MINUTES).build(new CacheLoader<String, String>(this) { // from class: org.apache.spark.sql.mlsql.sources.hbase.wal.HBaseWALClient$$anon$1
            public String load(String str3) {
                return WALFileStat$.MODULE$.DEFAULT();
            }
        });
        this.oldWALDoneFiles = CacheBuilder.newBuilder().maximumSize(100000L).expireAfterWrite(60L, TimeUnit.MINUTES).build(new CacheLoader<String, String>(this) { // from class: org.apache.spark.sql.mlsql.sources.hbase.wal.HBaseWALClient$$anon$2
            public String load(String str3) {
                return WALFileStat$.MODULE$.DEFAULT();
            }
        });
        this.activeWalFiles = new HashSet<>();
    }
}
