package io.axoniq.eventstore.client;

import io.axoniq.eventstore.Event;
import io.axoniq.eventstore.client.util.Broadcaster;
import io.axoniq.eventstore.client.util.EventCipher;
import io.axoniq.eventstore.client.util.EventStoreClientException;
import io.axoniq.eventstore.client.util.GrpcExceptionParser;
import io.axoniq.eventstore.grpc.ClusterGrpc;
import io.axoniq.eventstore.grpc.ClusterInfo;
import io.axoniq.eventstore.grpc.Confirmation;
import io.axoniq.eventstore.grpc.EventStoreGrpc;
import io.axoniq.eventstore.grpc.EventWithToken;
import io.axoniq.eventstore.grpc.GetAggregateEventsRequest;
import io.axoniq.eventstore.grpc.GetEventsRequest;
import io.axoniq.eventstore.grpc.NodeInfo;
import io.axoniq.eventstore.grpc.RetrieveClusterInfoRequest;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/eventstore/client/EventStoreClient.class */
public class EventStoreClient {
    private final EventStoreConfiguration eventStoreConfiguration;
    private final TokenAddingInterceptor tokenAddingInterceptor;
    private final EventCipher eventCipher;
    private final ChannelManager channelManager;
    private boolean shutdown;
    private final Logger logger = LoggerFactory.getLogger(EventStoreClient.class);
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    private final AtomicReference<ClusterInfo> eventStoreServer = new AtomicReference<>();

    public EventStoreClient(EventStoreConfiguration eventStoreConfiguration) {
        this.eventStoreConfiguration = eventStoreConfiguration;
        this.tokenAddingInterceptor = new TokenAddingInterceptor(eventStoreConfiguration.getToken());
        this.channelManager = new ChannelManager(eventStoreConfiguration.isSslEnabled(), eventStoreConfiguration.getCertFile());
        this.eventCipher = eventStoreConfiguration.getEventCipher();
    }

    public void shutdown() {
        this.shutdown = true;
        this.channelManager.cleanup();
    }

    private EventStoreGrpc.EventStoreStub eventStoreStub() {
        return (EventStoreGrpc.EventStoreStub) EventStoreGrpc.newStub(getChannelToEventStore()).withInterceptors(new ClientInterceptor[]{this.tokenAddingInterceptor});
    }

    private ClusterInfo discoverEventStore() {
        this.eventStoreServer.set(null);
        try {
            new Broadcaster(this.eventStoreConfiguration.getServerNodes(), this::retrieveClusterInfo, this::nodeReceived).broadcast(1, TimeUnit.SECONDS);
            return this.eventStoreServer.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ClientConnectionException("Thread was interrupted while attempting to connect to the server", e);
        }
    }

    private void nodeReceived(ClusterInfo clusterInfo) {
        this.logger.info("Received: {}:{}", clusterInfo.getMaster().getHostName(), Integer.valueOf(clusterInfo.getMaster().getGrpcPort()));
        this.eventStoreServer.set(clusterInfo);
    }

    private void retrieveClusterInfo(NodeInfo nodeInfo, StreamObserver<ClusterInfo> streamObserver) {
        ((ClusterGrpc.ClusterStub) ClusterGrpc.newStub(this.channelManager.getChannel(nodeInfo)).withInterceptors(new ClientInterceptor[]{new TokenAddingInterceptor(this.eventStoreConfiguration.getToken())})).retrieveClusterInfo(RetrieveClusterInfoRequest.newBuilder().m579build(), streamObserver);
    }

    private Channel getChannelToEventStore() {
        if (this.shutdown) {
            return null;
        }
        CompletableFuture<ClusterInfo> completableFuture = new CompletableFuture<>();
        getEventStoreAsync(this.eventStoreConfiguration.getConnectionRetryCount(), completableFuture);
        try {
            return this.channelManager.getChannel(completableFuture.get().getMaster());
        } catch (InterruptedException e) {
            throw new EventStoreClientException("AXONIQ-0001", e.getMessage(), e);
        } catch (ExecutionException e2) {
            throw ((RuntimeException) e2.getCause());
        }
    }

    private void getEventStoreAsync(int i, CompletableFuture<ClusterInfo> completableFuture) {
        ClusterInfo clusterInfo = this.eventStoreServer.get();
        if (clusterInfo != null) {
            completableFuture.complete(clusterInfo);
            return;
        }
        ClusterInfo discoverEventStore = discoverEventStore();
        if (discoverEventStore != null) {
            completableFuture.complete(discoverEventStore);
        } else if (i > 0) {
            this.executorService.schedule(() -> {
                getEventStoreAsync(i - 1, completableFuture);
            }, this.eventStoreConfiguration.getConnectionRetry(), TimeUnit.MILLISECONDS);
        } else {
            completableFuture.completeExceptionally(new EventStoreClientException("AXONIQ-0001", "No available event store server"));
        }
    }

    private void stopChannelToEventStore() {
        ClusterInfo andSet = this.eventStoreServer.getAndSet(null);
        if (andSet != null) {
            this.logger.info("Shutting down gRPC channel");
            this.channelManager.shutdown(andSet);
        }
    }

    public Stream<Event> listAggregateEvents(final GetAggregateEventsRequest getAggregateEventsRequest) throws ExecutionException, InterruptedException {
        final CompletableFuture completableFuture = new CompletableFuture();
        final long currentTimeMillis = System.currentTimeMillis();
        eventStoreStub().listAggregateEvents(getAggregateEventsRequest, new StreamObserver<Event>() { // from class: io.axoniq.eventstore.client.EventStoreClient.1
            Stream.Builder<Event> eventStream = Stream.builder();
            int count;

            public void onNext(Event event) {
                this.eventStream.accept(EventStoreClient.this.eventCipher.decrypt(event));
                this.count++;
            }

            public void onError(Throwable th) {
                EventStoreClient.this.checkConnectionException(th);
                completableFuture.completeExceptionally(GrpcExceptionParser.parse(th));
            }

            public void onCompleted() {
                completableFuture.complete(this.eventStream.build());
                if (EventStoreClient.this.logger.isDebugEnabled()) {
                    EventStoreClient.this.logger.debug("Done request for {}: {}ms, {} events", new Object[]{getAggregateEventsRequest.getAggregateId(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(this.count)});
                }
            }
        });
        return (Stream) completableFuture.get();
    }

    public StreamObserver<GetEventsRequest> listEvents(final StreamObserver<EventWithToken> streamObserver) {
        return eventStoreStub().listEvents(new StreamObserver<EventWithToken>() { // from class: io.axoniq.eventstore.client.EventStoreClient.2
            public void onNext(EventWithToken eventWithToken) {
                streamObserver.onNext(EventStoreClient.this.eventCipher.decrypt(eventWithToken));
            }

            public void onError(Throwable th) {
                EventStoreClient.this.checkConnectionException(th);
                streamObserver.onError(GrpcExceptionParser.parse(th));
            }

            public void onCompleted() {
                streamObserver.onCompleted();
            }
        });
    }

    public CompletableFuture<Confirmation> appendSnapshot(Event event) {
        final CompletableFuture<Confirmation> completableFuture = new CompletableFuture<>();
        eventStoreStub().appendSnapshot(this.eventCipher.encrypt(event), new StreamObserver<Confirmation>() { // from class: io.axoniq.eventstore.client.EventStoreClient.3
            public void onNext(Confirmation confirmation) {
                completableFuture.complete(confirmation);
            }

            public void onError(Throwable th) {
                EventStoreClient.this.checkConnectionException(th);
                completableFuture.completeExceptionally(GrpcExceptionParser.parse(th));
            }

            public void onCompleted() {
            }
        });
        return completableFuture;
    }

    public AppendEventTransaction createAppendEventConnection() {
        final CompletableFuture completableFuture = new CompletableFuture();
        return new AppendEventTransaction(eventStoreStub().appendEvent(new StreamObserver<Confirmation>() { // from class: io.axoniq.eventstore.client.EventStoreClient.4
            public void onNext(Confirmation confirmation) {
                completableFuture.complete(confirmation);
            }

            public void onError(Throwable th) {
                EventStoreClient.this.checkConnectionException(th);
                completableFuture.completeExceptionally(GrpcExceptionParser.parse(th));
            }

            public void onCompleted() {
            }
        }), completableFuture, this.eventCipher);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkConnectionException(Throwable th) {
        if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode().equals(Status.UNAVAILABLE.getCode())) {
            stopChannelToEventStore();
        }
    }
}
