package com.sevenparadigms.tcpclient;

import com.fasterxml.jackson.databind.JsonNode;
import com.sevenparadigms.common.CommonExtensionsKt;
import com.sevenparadigms.dsl.R2dbcDslRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOption;
import io.r2dbc.postgresql.api.Notification;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

/* compiled from: AsyncTcpClient.kt */
@Metadata(mv = {1, 1, 16}, bv = {1, 0, 3}, k = 1, d1 = {"��j\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0006\n\u0002\u0010(\n��\n\u0002\u0010\u0012\n��\n\u0002\u0010\u000b\n\u0002\b\u0005\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010 \n\u0002\b\u0007\b&\u0018��2\u00020\u0001B\u001d\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u000e\u0010\u0004\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0005¢\u0006\u0002\u0010\u0006J\b\u0010 \u001a\u00020!H\u0002J\b\u0010\"\u001a\u00020!H\u0002J\u0010\u0010#\u001a\n\u0012\u0006\b\u0001\u0012\u00020%0$H\u0002J\b\u0010&\u001a\u00020!H\u0004J\u0014\u0010'\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00010(0$H&J\u0010\u0010)\u001a\n\u0012\u0006\b\u0001\u0012\u00020%0$H\u0002J\u0016\u0010*\u001a\b\u0012\u0004\u0012\u00020\u00010$2\u0006\u0010+\u001a\u00020\u0015H&J\u000e\u0010,\u001a\u00020!2\u0006\u0010+\u001a\u00020\u0015J\u0010\u0010-\u001a\u00020!2\u0006\u0010.\u001a\u00020\u0001H&R\u000e\u0010\u0007\u001a\u00020\bX\u0082D¢\u0006\u0002\n��R\u001c\u0010\t\u001a\u0010\u0012\f\u0012\n\u0012\u0006\u0012\u0004\u0018\u00010\f0\u000b0\nX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u00030\nX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0002\u001a\u00020\u0003X\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u000e\u0010\u000f\"\u0004\b\u0010\u0010\u0011R\u001a\u0010\u0012\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u00130\nX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00150\nX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0016\u001a\b\u0012\u0004\u0012\u00020\u00170\nX\u0082\u0004¢\u0006\u0002\n��R\"\u0010\u0004\u001a\n\u0012\u0002\b\u0003\u0012\u0002\b\u00030\u0005X\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u0018\u0010\u0019\"\u0004\b\u001a\u0010\u001bR\u0014\u0010\u001c\u001a\b\u0012\u0004\u0012\u00020\u001d0\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001e\u001a\u00020\u001fX\u0082\u0004¢\u0006\u0002\n��¨\u0006/"}, d2 = {"Lcom/sevenparadigms/tcpclient/AsyncTcpClient;", "", "hosts", "", "repository", "Lcom/sevenparadigms/dsl/R2dbcDslRepository;", "(Ljava/lang/String;Lcom/sevenparadigms/dsl/R2dbcDslRepository;)V", "delaySeconds", "", "flux", "Ljava/util/concurrent/atomic/AtomicReference;", "Lreactor/core/publisher/FluxSink;", "Lio/netty/buffer/ByteBuf;", "host", "getHosts", "()Ljava/lang/String;", "setHosts", "(Ljava/lang/String;)V", "it", "", "lastPayload", "", "locking", "", "getRepository", "()Lcom/sevenparadigms/dsl/R2dbcDslRepository;", "setRepository", "(Lcom/sevenparadigms/dsl/R2dbcDslRepository;)V", "timer", "Ljava/time/LocalDateTime;", "trying", "Ljava/util/concurrent/atomic/AtomicInteger;", "checkAndSend", "", "checkReceiveTimer", "createClient", "Lreactor/core/publisher/Mono;", "Lreactor/netty/Connection;", "forceNext", "getNext", "", "init", "receive", "bytes", "sendMessage", "sendNext", "entity", "async-tcp-client"})
/* loaded from: input_file:com/sevenparadigms/tcpclient/AsyncTcpClient.class */
public abstract class AsyncTcpClient {
    private final long delaySeconds = 5;
    private final AtomicReference<Boolean> locking;
    private final AtomicReference<FluxSink<ByteBuf>> flux;
    private final AtomicReference<String> host;
    private final AtomicReference<Iterator<String>> it;
    private final AtomicReference<LocalDateTime> timer;
    private final AtomicInteger trying;
    private final AtomicReference<byte[]> lastPayload;

    @NotNull
    private String hosts;

    @NotNull
    private R2dbcDslRepository<?, ?> repository;

    private final Mono<? extends Connection> init() {
        this.locking.set(false);
        this.flux.set(null);
        this.host.set(null);
        this.it.set(CollectionsKt.toSet(StringsKt.split$default(this.hosts, new String[]{","}, false, 0, 6, (Object) null)).iterator());
        this.timer.set(LocalDateTime.now().plusSeconds(this.delaySeconds));
        return createClient();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void checkReceiveTimer() {
        if (this.locking.get().booleanValue() || !LocalDateTime.now().isAfter(this.timer.get())) {
            Boolean bool = this.locking.get();
            Intrinsics.checkExpressionValueIsNotNull(bool, "locking.get()");
            if (!bool.booleanValue() || !LocalDateTime.now().isAfter(this.timer.get().plusSeconds(this.delaySeconds))) {
                return;
            }
        }
        this.timer.set(LocalDateTime.now().plusSeconds(this.delaySeconds));
        this.locking.set(true);
        checkAndSend();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void checkAndSend() {
        Boolean bool = this.locking.get();
        Intrinsics.checkExpressionValueIsNotNull(bool, "locking.get()");
        if (bool.booleanValue()) {
            this.locking.set(false);
            getNext().subscribe(new Consumer<List<? extends Object>>() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient$checkAndSend$1
                @Override // java.util.function.Consumer
                public final void accept(List<? extends Object> list) {
                    AtomicReference atomicReference;
                    Intrinsics.checkExpressionValueIsNotNull(list, "it");
                    if (!list.isEmpty()) {
                        AsyncTcpClient.this.sendNext(CollectionsKt.first(list));
                    } else {
                        atomicReference = AsyncTcpClient.this.locking;
                        atomicReference.set(true);
                    }
                }
            });
        }
    }

    @NotNull
    public abstract Mono<List<Object>> getNext();

    public abstract void sendNext(@NotNull Object obj);

    @NotNull
    public abstract Mono<Object> receive(@NotNull byte[] bArr);

    /* JADX INFO: Access modifiers changed from: private */
    public final Mono<? extends Connection> createClient() {
        this.host.set(null);
        if (!this.it.get().hasNext()) {
            return init();
        }
        final String next = this.it.get().next();
        Iterator it = StringsKt.split$default(next, new String[]{":"}, false, 0, 6, (Object) null).iterator();
        Mono<? extends Connection> onErrorResume = TcpClient.create().host((String) it.next()).port(Integer.parseInt((String) it.next())).handle(new AsyncTcpClient$createClient$1(this)).option(ChannelOption.AUTO_CLOSE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000).option(ChannelOption.SO_KEEPALIVE, false).connect().doOnSuccess(new Consumer<Connection>() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient$createClient$2
            @Override // java.util.function.Consumer
            public final void accept(Connection connection) {
                AtomicReference atomicReference;
                AtomicReference atomicReference2;
                AtomicInteger atomicInteger;
                CommonExtensionsKt.info(AsyncTcpClient.this, "Connected to " + next, new Object[0]);
                atomicReference = AsyncTcpClient.this.host;
                atomicReference.set(next);
                atomicReference2 = AsyncTcpClient.this.locking;
                atomicReference2.set(true);
                atomicInteger = AsyncTcpClient.this.trying;
                atomicInteger.set(0);
            }
        }).onErrorResume(new Function<Throwable, Mono>() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient$createClient$3
            @Override // java.util.function.Function
            @Nullable
            public final Mono apply(Throwable th) {
                AtomicInteger atomicInteger;
                Mono createClient;
                atomicInteger = AsyncTcpClient.this.trying;
                if (atomicInteger.incrementAndGet() > 2) {
                    throw new IllegalArgumentException("Host " + next + " is unavailable");
                }
                TimeUnit.MILLISECONDS.sleep(1000L);
                createClient = AsyncTcpClient.this.createClient();
                return createClient;
            }
        });
        Intrinsics.checkExpressionValueIsNotNull(onErrorResume, "TcpClient.create().host(…g>?\n                    }");
        return onErrorResume;
    }

    public final synchronized void sendMessage(@NotNull byte[] bArr) {
        String str;
        FluxSink<ByteBuf> fluxSink;
        Intrinsics.checkParameterIsNotNull(bArr, "bytes");
        AtomicInteger atomicInteger = new AtomicInteger(40);
        while (true) {
            synchronized (this.host) {
                str = this.host.get();
            }
            if (str != null) {
                synchronized (this.flux) {
                    fluxSink = this.flux.get();
                }
                if (fluxSink != null) {
                    break;
                }
            }
            if (atomicInteger.getAndDecrement() == 1) {
                break;
            } else {
                TimeUnit.MILLISECONDS.sleep(100L);
            }
        }
        if (atomicInteger.get() != 0) {
            byte[] bArr2 = this.lastPayload.get();
            Intrinsics.checkExpressionValueIsNotNull(bArr2, "lastPayload.get()");
            if (!Intrinsics.areEqual(CommonExtensionsKt.hex(bArr2), CommonExtensionsKt.hex(bArr))) {
                CommonExtensionsKt.debug(this, "sending(" + this.host.get() + ") => [" + CommonExtensionsKt.hex(bArr) + ']', new Object[0]);
                this.lastPayload.set(bArr);
                this.timer.set(LocalDateTime.now().plusSeconds(this.delaySeconds));
                Intrinsics.checkExpressionValueIsNotNull(this.flux.get().next(Unpooled.wrappedBuffer(bArr)), "flux.get().next(Unpooled.wrappedBuffer(bytes))");
                return;
            }
        }
        this.lastPayload.set(new byte[0]);
        CommonExtensionsKt.info(this, "Found lost connection, attempting connect to next host", new Object[0]);
        Intrinsics.checkExpressionValueIsNotNull(createClient().subscribe(new Consumer<Connection>() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient$sendMessage$3
            @Override // java.util.function.Consumer
            public final void accept(Connection connection) {
                AsyncTcpClient.this.forceNext();
            }
        }), "createClient().subscribe…forceNext()\n            }");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void forceNext() {
        this.locking.set(true);
        checkAndSend();
    }

    @NotNull
    public final String getHosts() {
        return this.hosts;
    }

    public final void setHosts(@NotNull String str) {
        Intrinsics.checkParameterIsNotNull(str, "<set-?>");
        this.hosts = str;
    }

    @NotNull
    public final R2dbcDslRepository<?, ?> getRepository() {
        return this.repository;
    }

    public final void setRepository(@NotNull R2dbcDslRepository<?, ?> r2dbcDslRepository) {
        Intrinsics.checkParameterIsNotNull(r2dbcDslRepository, "<set-?>");
        this.repository = r2dbcDslRepository;
    }

    public AsyncTcpClient(@NotNull String str, @NotNull R2dbcDslRepository<?, ?> r2dbcDslRepository) {
        Intrinsics.checkParameterIsNotNull(str, "hosts");
        Intrinsics.checkParameterIsNotNull(r2dbcDslRepository, "repository");
        this.hosts = str;
        this.repository = r2dbcDslRepository;
        this.delaySeconds = 5L;
        this.locking = new AtomicReference<>(false);
        this.flux = new AtomicReference<>();
        this.host = new AtomicReference<>();
        this.it = new AtomicReference<>();
        this.timer = new AtomicReference<>();
        this.trying = new AtomicInteger();
        this.lastPayload = new AtomicReference<>(new byte[0]);
        init().subscribe(new Consumer<Connection>() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient.1
            @Override // java.util.function.Consumer
            public final void accept(Connection connection) {
                Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient.1.1
                    @Override // java.lang.Runnable
                    public final void run() {
                        AsyncTcpClient.this.checkReceiveTimer();
                    }
                }, AsyncTcpClient.this.delaySeconds, AsyncTcpClient.this.delaySeconds, TimeUnit.SECONDS);
                AsyncTcpClient.this.getRepository().listen().doOnNext(new Consumer<Notification>() { // from class: com.sevenparadigms.tcpclient.AsyncTcpClient.1.2
                    @Override // java.util.function.Consumer
                    public final void accept(Notification notification) {
                        Intrinsics.checkExpressionValueIsNotNull(notification, "notification");
                        String parameter = notification.getParameter();
                        if (parameter == null) {
                            Intrinsics.throwNpe();
                        }
                        JsonNode jsonNode = CommonExtensionsKt.toJsonNode(parameter.toString());
                        if (Intrinsics.areEqual(jsonNode.get("operation").asText(), "INSERT")) {
                            CommonExtensionsKt.info(AsyncTcpClient.this, "database <= " + jsonNode, new Object[0]);
                            AsyncTcpClient.this.checkAndSend();
                        }
                    }
                }).subscribe();
            }
        });
    }
}
