package org.apache.spark.sql.mlsql.sources;

import java.io.DataOutputStream;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerCommand$;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor$;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.ExecutorBinlogServer;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.MySQLBinlogServer;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.MySQLConnectionInfo;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.ReportBinlogSocketServerHostAndPort;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.ShutdownBinlogServer;
import org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$;
import org.apache.spark.util.SerializableConfiguration;
import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.TaskFailureListener;
import scala.Option;
import scala.Serializable;
import scala.runtime.AbstractFunction1;

/* compiled from: MLSQLBinLogDataSource.scala */
/* loaded from: input_file:org/apache/spark/sql/mlsql/sources/MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1.class */
public final class MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1 extends AbstractFunction1<String, ExecutorBinlogServer> implements Serializable {
    public static final long serialVersionUID = 0;
    private final String bingLogHost$1;
    private final int bingLogPort$1;
    private final String bingLogUserName$1;
    private final String bingLogPassword$1;
    private final Option databaseNamePattern$1;
    private final Option tableNamePattern$1;
    private final String checkPointDir$1;
    private final Option binlogFilename$1;
    private final Option binlogPos$1;
    private final String tempSocketServerHost$1;
    private final int tempSocketServerPort$1;
    private final long maxBinlogQueueSize$1;
    private final SerializableConfiguration broadcastedHadoopConf$1;

    public final ExecutorBinlogServer apply(String str) {
        final AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(TaskContext$.MODULE$.get());
        final BinLogSocketServerInExecutor<?> binLogSocketServerInExecutor = new BinLogSocketServerInExecutor<>(atomicReference, this.checkPointDir$1, this.broadcastedHadoopConf$1.value(), BinLogSocketServerInExecutor$.MODULE$.$lessinit$greater$default$4());
        binLogSocketServerInExecutor.setMaxBinlogQueueSize(this.maxBinlogQueueSize$1);
        TaskContext$.MODULE$.get().addTaskFailureListener(new TaskFailureListener(this, atomicReference, binLogSocketServerInExecutor) { // from class: org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1$$anon$2
            private final /* synthetic */ MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1 $outer;
            private final AtomicReference taskContextRef$1;
            private final BinLogSocketServerInExecutor executorBinlogServer$1;

            public void onTaskFailure(TaskContext taskContext, Throwable th) {
                this.taskContextRef$1.set(null);
                this.$outer.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$anonfun$$sendStopBinlogServerRequest$1(this.executorBinlogServer$1);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.taskContextRef$1 = atomicReference;
                this.executorBinlogServer$1 = binLogSocketServerInExecutor;
            }
        });
        TaskContext$.MODULE$.get().addTaskCompletionListener(new TaskCompletionListener(this, atomicReference, binLogSocketServerInExecutor) { // from class: org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1$$anon$3
            private final /* synthetic */ MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1 $outer;
            private final AtomicReference taskContextRef$1;
            private final BinLogSocketServerInExecutor executorBinlogServer$1;

            public void onTaskCompletion(TaskContext taskContext) {
                this.taskContextRef$1.set(null);
                this.$outer.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$anonfun$$sendStopBinlogServerRequest$1(this.executorBinlogServer$1);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.taskContextRef$1 = atomicReference;
                this.executorBinlogServer$1 = binLogSocketServerInExecutor;
            }
        });
        Socket socket = new Socket(this.tempSocketServerHost$1, this.tempSocketServerPort$1);
        BinLogSocketServerCommand$.MODULE$.sendRequest(new DataOutputStream(socket.getOutputStream()), new ReportBinlogSocketServerHostAndPort(binLogSocketServerInExecutor.host(), binLogSocketServerInExecutor.port()));
        socket.close();
        SocketServerInExecutor$.MODULE$.addNewBinlogServer(new MySQLBinlogServer(this.bingLogHost$1, this.bingLogPort$1), binLogSocketServerInExecutor);
        binLogSocketServerInExecutor.connectMySQL(new MySQLConnectionInfo(this.bingLogHost$1, this.bingLogPort$1, this.bingLogUserName$1, this.bingLogPassword$1, this.binlogFilename$1, this.binlogPos$1, this.databaseNamePattern$1, this.tableNamePattern$1), true);
        while (!TaskContext$.MODULE$.get().isInterrupted() && !binLogSocketServerInExecutor.isClosed()) {
            Thread.sleep(1000L);
        }
        return new ExecutorBinlogServer(binLogSocketServerInExecutor.host(), binLogSocketServerInExecutor.port());
    }

    public final void org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$anonfun$$sendStopBinlogServerRequest$1(BinLogSocketServerInExecutor binLogSocketServerInExecutor) {
        Socket socket = new Socket(binLogSocketServerInExecutor.host(), binLogSocketServerInExecutor.port());
        BinLogSocketServerCommand$.MODULE$.sendRequest(new DataOutputStream(socket.getOutputStream()), new ShutdownBinlogServer());
        socket.close();
    }

    public MLSQLBinLogDataSource$$anonfun$org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1$1(MLSQLBinLogDataSource mLSQLBinLogDataSource, String str, int i, String str2, String str3, Option option, Option option2, String str4, Option option3, Option option4, String str5, int i2, long j, SerializableConfiguration serializableConfiguration) {
        this.bingLogHost$1 = str;
        this.bingLogPort$1 = i;
        this.bingLogUserName$1 = str2;
        this.bingLogPassword$1 = str3;
        this.databaseNamePattern$1 = option;
        this.tableNamePattern$1 = option2;
        this.checkPointDir$1 = str4;
        this.binlogFilename$1 = option3;
        this.binlogPos$1 = option4;
        this.tempSocketServerHost$1 = str5;
        this.tempSocketServerPort$1 = i2;
        this.maxBinlogQueueSize$1 = j;
        this.broadcastedHadoopConf$1 = serializableConfiguration;
    }
}
