package io.pravega.client.stream.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.RoundRobinLoadBalancerFactory;
import io.netty.handler.ssl.SslContextBuilder;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.PingFailedException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.controller.stream.api.grpc.v1.ControllerServiceGrpc;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/ControllerImpl.class */
public class ControllerImpl implements Controller {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ControllerImpl.class);
    private static final long DEFAULT_KEEPALIVE_TIME_MINUTES = 6;
    private final Retry.RetryAndThrowExceptionally<StatusRuntimeException, Exception> retryConfig;
    private final ScheduledExecutorService executor;
    private final AtomicBoolean closed;
    private final ManagedChannel channel;
    private final ControllerServiceGrpc.ControllerServiceStub client;

    /* loaded from: input_file:io/pravega/client/stream/impl/ControllerImpl$RPCAsyncCallback.class */
    private static final class RPCAsyncCallback<T> implements StreamObserver<T> {
        private final long traceId;
        private final String method;
        private T result = null;
        private final CompletableFuture<T> future = new CompletableFuture<>();

        RPCAsyncCallback(long j, String str) {
            this.traceId = j;
            this.method = str;
        }

        @Override // io.grpc.stub.StreamObserver
        public void onNext(T t) {
            this.result = t;
        }

        @Override // io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            ControllerImpl.log.warn("gRPC call for {} with trace id {} failed with server error.", this.method, Long.valueOf(this.traceId), th);
            this.future.completeExceptionally(th);
        }

        @Override // io.grpc.stub.StreamObserver
        public void onCompleted() {
            this.future.complete(this.result);
        }

        public CompletableFuture<T> getFuture() {
            return this.future;
        }
    }

    public ControllerImpl(ControllerImplConfig controllerImplConfig, ScheduledExecutorService scheduledExecutorService) {
        this(NettyChannelBuilder.forTarget(controllerImplConfig.getClientConfig().getControllerURI().toString()).nameResolverFactory((NameResolver.Factory) new ControllerResolverFactory()).loadBalancerFactory((LoadBalancer.Factory) RoundRobinLoadBalancerFactory.getInstance()).keepAliveTime(DEFAULT_KEEPALIVE_TIME_MINUTES, TimeUnit.MINUTES), controllerImplConfig, scheduledExecutorService);
        log.info("Controller client connecting to server at {}", controllerImplConfig.getClientConfig().getControllerURI().getAuthority());
    }

    @VisibleForTesting
    public ControllerImpl(ManagedChannelBuilder<?> managedChannelBuilder, ControllerImplConfig controllerImplConfig, ScheduledExecutorService scheduledExecutorService) {
        NettyChannelBuilder negotiationType;
        this.closed = new AtomicBoolean(false);
        Preconditions.checkNotNull(managedChannelBuilder, "channelBuilder");
        this.executor = scheduledExecutorService;
        this.retryConfig = Retry.withExpBackoff(controllerImplConfig.getInitialBackoffMillis(), controllerImplConfig.getBackoffMultiple(), controllerImplConfig.getRetryAttempts(), controllerImplConfig.getMaxBackoffMillis()).retryingOn(StatusRuntimeException.class).throwingOn(Exception.class);
        if (controllerImplConfig.getClientConfig().isEnableTls()) {
            String trustStore = controllerImplConfig.getClientConfig().getTrustStore();
            SslContextBuilder forClient = GrpcSslContexts.forClient();
            try {
                negotiationType = ((NettyChannelBuilder) managedChannelBuilder).sslContext((Strings.isNullOrEmpty(trustStore) ? forClient : forClient.trustManager(new File(trustStore))).build()).negotiationType(NegotiationType.TLS);
            } catch (SSLException e) {
                throw new CompletionException(e);
            }
        } else {
            negotiationType = ((NettyChannelBuilder) managedChannelBuilder).negotiationType(NegotiationType.PLAINTEXT);
        }
        this.channel = negotiationType.build();
        ControllerServiceGrpc.ControllerServiceStub newStub = ControllerServiceGrpc.newStub(this.channel);
        Credentials credentials = controllerImplConfig.getClientConfig().getCredentials();
        this.client = credentials != null ? newStub.withCallCredentials(MoreCallCredentials.from(new PravegaCredsWrapper(credentials))) : newStub;
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> createScope(String str) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "createScope", str);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "createScope");
            this.client.createScope(Controller.ScopeInfo.newBuilder().setScope(str).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) createScopeStatus -> {
            switch (createScopeStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to create scope: {}", str);
                    throw new ControllerFailureException("Failed to create scope: " + str);
                case INVALID_SCOPE_NAME:
                    log.warn("Illegal scope name: {}", str);
                    throw new IllegalArgumentException("Illegal scope name: " + str);
                case SCOPE_EXISTS:
                    log.warn("Scope already exists: {}", str);
                    return false;
                case SUCCESS:
                    log.info("Scope created successfully: {}", str);
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status creating scope " + str + StringUtils.SPACE + createScopeStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("createScope failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "createScope", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> deleteScope(String str) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "deleteScope", str);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "deleteScope");
            this.client.deleteScope(Controller.ScopeInfo.newBuilder().setScope(str).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) deleteScopeStatus -> {
            switch (deleteScopeStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to delete scope: {}", str);
                    throw new ControllerFailureException("Failed to delete scope: " + str);
                case SCOPE_NOT_EMPTY:
                    log.warn("Cannot delete non empty scope: {}", str);
                    throw new IllegalStateException("Scope " + str + " is not empty.");
                case SCOPE_NOT_FOUND:
                    log.warn("Scope not found: {}", str);
                    return false;
                case SUCCESS:
                    log.info("Scope deleted successfully: {}", str);
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status deleting scope " + str + StringUtils.SPACE + deleteScopeStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("deleteScope failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "deleteScope", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> createStream(StreamConfiguration streamConfiguration) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(streamConfiguration, "streamConfig");
        long traceEnter = LoggerHelpers.traceEnter(log, "createStream", streamConfiguration);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "createStream");
            this.client.createStream(ModelHelper.decode(streamConfiguration), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) createStreamStatus -> {
            switch (createStreamStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to create stream: {}", streamConfiguration.getStreamName());
                    throw new ControllerFailureException("Failed to create stream: " + streamConfiguration);
                case INVALID_STREAM_NAME:
                    log.warn("Illegal stream name: {}", streamConfiguration.getStreamName());
                    throw new IllegalArgumentException("Illegal stream name: " + streamConfiguration);
                case SCOPE_NOT_FOUND:
                    log.warn("Scope not found: {}", streamConfiguration.getScope());
                    throw new IllegalArgumentException("Scope does not exist: " + streamConfiguration);
                case STREAM_EXISTS:
                    log.warn("Stream already exists: {}", streamConfiguration.getStreamName());
                    return false;
                case SUCCESS:
                    log.info("Stream created successfully: {}", streamConfiguration.getStreamName());
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status creating stream " + streamConfiguration + StringUtils.SPACE + createStreamStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("createStream failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "createStream", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> updateStream(StreamConfiguration streamConfiguration) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(streamConfiguration, "streamConfig");
        long traceEnter = LoggerHelpers.traceEnter(log, "updateStream", streamConfiguration);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "updateStream");
            this.client.updateStream(ModelHelper.decode(streamConfiguration), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) updateStreamStatus -> {
            switch (updateStreamStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to update stream: {}", streamConfiguration.getStreamName());
                    throw new ControllerFailureException("Failed to update stream: " + streamConfiguration);
                case SCOPE_NOT_FOUND:
                    log.warn("Scope not found: {}", streamConfiguration.getScope());
                    throw new IllegalArgumentException("Scope does not exist: " + streamConfiguration);
                case STREAM_NOT_FOUND:
                    log.warn("Stream does not exist: {}", streamConfiguration.getStreamName());
                    throw new IllegalArgumentException("Stream does not exist: " + streamConfiguration);
                case SUCCESS:
                    log.info("Successfully updated stream: {}", streamConfiguration.getStreamName());
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status updating stream " + streamConfiguration + StringUtils.SPACE + updateStreamStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("updateStream failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "updateStream", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> truncateStream(String str, String str2, StreamCut streamCut) {
        return truncateStream(str, str2, getStreamCutMap(streamCut));
    }

    private CompletableFuture<Boolean> truncateStream(String str, String str2, Map<Long, Long> map) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(map, "streamCut");
        long traceEnter = LoggerHelpers.traceEnter(log, "truncateStream", map);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "truncateStream");
            this.client.truncateStream(ModelHelper.decode(str, str2, map), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) updateStreamStatus -> {
            switch (updateStreamStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to truncate stream: {}/{}", str, str2);
                    throw new ControllerFailureException("Failed to truncate stream: " + str + ZKPaths.PATH_SEPARATOR + str2);
                case SCOPE_NOT_FOUND:
                    log.warn("Scope not found: {}", str);
                    throw new IllegalArgumentException("Scope does not exist: " + str);
                case STREAM_NOT_FOUND:
                    log.warn("Stream does not exist: {}/{}", str, str2);
                    throw new IllegalArgumentException("Stream does not exist: " + str2);
                case SUCCESS:
                    log.info("Successfully updated stream: {}/{}", str, str2);
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status updating stream " + str + ZKPaths.PATH_SEPARATOR + str2 + StringUtils.SPACE + updateStreamStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("updateStream failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "updateStream", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CancellableRequest<Boolean> scaleStream(Stream stream, List<Long> list, Map<Double, Double> map, ScheduledExecutorService scheduledExecutorService) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        CancellableRequest<Boolean> cancellableRequest = new CancellableRequest<>();
        startScaleInternal(stream, list, map, LoggerHelpers.traceEnter(log, "scaleStream", stream), "scaleStream").whenComplete((scaleResponse, th) -> {
            if (th != null) {
                log.error("failed to start scale {}", th);
                cancellableRequest.start(() -> {
                    return Futures.failedFuture(th);
                }, bool -> {
                    return true;
                }, scheduledExecutorService);
                return;
            }
            try {
                boolean booleanValue = handleScaleResponse(stream, scaleResponse).booleanValue();
                cancellableRequest.start(() -> {
                    return booleanValue ? checkScaleStatus(stream, scaleResponse.getEpoch()) : CompletableFuture.completedFuture(false);
                }, bool2 -> {
                    return !booleanValue || bool2.booleanValue();
                }, scheduledExecutorService);
            } catch (Exception e) {
                cancellableRequest.start(() -> {
                    return Futures.failedFuture(e);
                }, bool3 -> {
                    return true;
                }, scheduledExecutorService);
            }
        });
        return cancellableRequest;
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> startScale(Stream stream, List<Long> list, Map<Double, Double> map) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "scaleStream", stream);
        return startScaleInternal(stream, list, map, traceEnter, "scaleStream").thenApply(scaleResponse -> {
            return handleScaleResponse(stream, scaleResponse);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (bool, th) -> {
            if (th != null) {
                log.warn("scaleStream failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "scaleStream", traceEnter, new Object[0]);
        });
    }

    private Boolean handleScaleResponse(Stream stream, Controller.ScaleResponse scaleResponse) {
        switch (scaleResponse.getStatus()) {
            case FAILURE:
                log.warn("Failed to scale stream: {}", stream.getStreamName());
                throw new ControllerFailureException("Failed to scale stream: " + stream);
            case PRECONDITION_FAILED:
                log.warn("Precondition failed for scale stream: {}", stream.getStreamName());
                return false;
            case STARTED:
                log.info("Successfully started scale stream: {}", stream.getStreamName());
                return true;
            case UNRECOGNIZED:
            default:
                throw new ControllerFailureException("Unknown return status scaling stream " + stream + StringUtils.SPACE + scaleResponse.getStatus());
        }
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> checkScaleStatus(Stream stream, int i) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkArgument(i >= 0);
        long traceEnter = LoggerHelpers.traceEnter(log, "checkScale", stream);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "checkScale");
            this.client.checkScale(Controller.ScaleStatusRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setEpoch(i).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) scaleStatusResponse -> {
            switch (scaleStatusResponse.getStatus()) {
                case IN_PROGRESS:
                    return false;
                case SUCCESS:
                    return true;
                case INVALID_INPUT:
                    throw new ControllerFailureException("invalid input");
                case INTERNAL_ERROR:
                default:
                    throw new ControllerFailureException("unknown error");
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("checking status failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "checkScale", traceEnter, new Object[0]);
        });
    }

    private CompletableFuture<Controller.ScaleResponse> startScaleInternal(Stream stream, List<Long> list, Map<Double, Double> map, long j, String str) {
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(list, "sealedSegments");
        Preconditions.checkNotNull(map, "newKeyRanges");
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(j, str);
            this.client.scale(Controller.ScaleRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).addAllSealedSegments(list).addAllNewKeyRanges((Iterable) map.entrySet().stream().map(entry -> {
                return Controller.ScaleRequest.KeyRangeEntry.newBuilder().setStart(((Double) entry.getKey()).doubleValue()).setEnd(((Double) entry.getValue()).doubleValue()).build();
            }).collect(Collectors.toList())).setScaleTimestamp(System.currentTimeMillis()).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor);
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> sealStream(String str, String str2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(str, "scope");
        Exceptions.checkNotNullOrEmpty(str2, "streamName");
        long traceEnter = LoggerHelpers.traceEnter(log, "sealStream", str, str2);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "sealStream");
            this.client.sealStream(ModelHelper.createStreamInfo(str, str2), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) updateStreamStatus -> {
            switch (updateStreamStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to seal stream: {}", str2);
                    throw new ControllerFailureException("Failed to seal stream: " + str2);
                case SCOPE_NOT_FOUND:
                    log.warn("Scope not found: {}", str);
                    throw new InvalidStreamException("Scope does not exist: " + str);
                case STREAM_NOT_FOUND:
                    log.warn("Stream does not exist: {}", str2);
                    throw new InvalidStreamException("Stream does not exist: " + str2);
                case SUCCESS:
                    log.info("Successfully sealed stream: {}", str2);
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status scealing stream " + str2 + StringUtils.SPACE + updateStreamStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("sealStream failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "sealStream", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> deleteStream(String str, String str2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(str, "scope");
        Exceptions.checkNotNullOrEmpty(str2, "streamName");
        long traceEnter = LoggerHelpers.traceEnter(log, "deleteStream", str, str2);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "deleteStream");
            this.client.deleteStream(ModelHelper.createStreamInfo(str, str2), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) deleteStreamStatus -> {
            switch (deleteStreamStatus.getStatus()) {
                case FAILURE:
                    log.warn("Failed to delete stream: {}", str2);
                    throw new ControllerFailureException("Failed to delete stream: " + str2);
                case STREAM_NOT_FOUND:
                    log.warn("Stream does not exist: {}", str2);
                    return false;
                case STREAM_NOT_SEALED:
                    log.warn("Stream is not sealed: {}", str2);
                    throw new IllegalArgumentException("Stream is not sealed: " + str2);
                case SUCCESS:
                    log.info("Successfully deleted stream: {}", str2);
                    return true;
                case UNRECOGNIZED:
                default:
                    throw new ControllerFailureException("Unknown return status deleting stream " + str2 + StringUtils.SPACE + deleteStreamStatus.getStatus());
            }
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("deleteStream failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "deleteStream", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Map<Segment, Long>> getSegmentsAtTime(Stream stream, long j) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        long traceEnter = LoggerHelpers.traceEnter(log, "getSegmentsAtTime", stream, Long.valueOf(j));
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "getSegmentsAtTime");
            this.client.getSegments(Controller.GetSegmentsRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTimestamp(j).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) segmentsAtTime -> {
            log.debug("Received the following data from the controller {}", segmentsAtTime.getSegmentsList());
            return (Map) segmentsAtTime.getSegmentsList().stream().collect(Collectors.toMap(segmentLocation -> {
                return ModelHelper.encode(segmentLocation.getSegmentId());
            }, segmentLocation2 -> {
                return Long.valueOf(segmentLocation2.getOffset());
            }));
        }).whenComplete((map, th) -> {
            if (th != null) {
                log.warn("getSegmentsAtTime failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getSegmentsAtTime", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<StreamSegmentsWithPredecessors> getSuccessors(Segment segment) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "getSuccessors", segment);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "getSuccessors");
            this.client.getSegmentsImmediatlyFollowing(ModelHelper.decode(segment), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) successorResponse -> {
            log.debug("Received the following data from the controller {}", successorResponse.getSegmentsList());
            HashMap hashMap = new HashMap();
            for (Controller.SuccessorResponse.SegmentEntry segmentEntry : successorResponse.getSegmentsList()) {
                hashMap.put(ModelHelper.encode(segmentEntry.getSegment()), segmentEntry.getValueList());
            }
            return new StreamSegmentsWithPredecessors(hashMap, successorResponse.getDelegationToken());
        }).whenComplete((streamSegmentsWithPredecessors, th) -> {
            if (th != null) {
                log.warn("getSuccessors failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getSuccessors", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<StreamSegmentSuccessors> getSuccessors(StreamCut streamCut) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "getSuccessorsFromCut", streamCut.asImpl().getStream());
        return getSegmentsBetweenStreamCuts(streamCut, StreamCut.UNBOUNDED).whenComplete((streamSegmentSuccessors, th) -> {
            if (th != null) {
                log.warn("getSuccessors failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getSuccessors", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<StreamSegmentSuccessors> getSegments(StreamCut streamCut, StreamCut streamCut2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(streamCut, "fromStreamCut");
        Preconditions.checkNotNull(streamCut2, "toStreamCut");
        Preconditions.checkArgument(streamCut.asImpl().getStream().equals(streamCut2.asImpl().getStream()), "Ensure streamCuts for the same stream is passed");
        long traceEnter = LoggerHelpers.traceEnter(log, "getSegments", streamCut.asImpl().getStream());
        return getSegmentsBetweenStreamCuts(streamCut, streamCut2).whenComplete((streamSegmentSuccessors, th) -> {
            if (th != null) {
                log.warn("getSuccessors failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getSuccessors", traceEnter, new Object[0]);
        });
    }

    private CompletableFuture<StreamSegmentSuccessors> getSegmentsBetweenStreamCuts(StreamCut streamCut, StreamCut streamCut2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Stream stream = streamCut.asImpl().getStream();
        long traceEnter = LoggerHelpers.traceEnter(log, "getSegments", stream);
        getOrRefreshDelegationTokenFor(stream.getScope(), stream.getStreamName());
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "getSuccessorsFromCut");
            this.client.getSegmentsBetween(ModelHelper.decode(stream.getScope(), stream.getStreamName(), getStreamCutMap(streamCut), getStreamCutMap(streamCut2)), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) streamCutRangeResponse -> {
            log.debug("Received the following data from the controller {}", streamCutRangeResponse.getSegmentsList());
            return new StreamSegmentSuccessors((Set) streamCutRangeResponse.getSegmentsList().stream().map(ModelHelper::encode).collect(Collectors.toSet()), streamCutRangeResponse.getDelegationToken());
        });
    }

    private Map<Long, Long> getStreamCutMap(StreamCut streamCut) {
        return streamCut.equals(StreamCut.UNBOUNDED) ? Collections.emptyMap() : (Map) streamCut.asImpl().getPositions().entrySet().stream().collect(Collectors.toMap(entry -> {
            return Long.valueOf(((Segment) entry.getKey()).getSegmentId());
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<StreamSegments> getCurrentSegments(String str, String str2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(str, "scope");
        Exceptions.checkNotNullOrEmpty(str2, "stream");
        long traceEnter = LoggerHelpers.traceEnter(log, "getCurrentSegments", str, str2);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "getCurrentSegments");
            this.client.getCurrentSegments(ModelHelper.createStreamInfo(str, str2), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) segmentRanges -> {
            log.debug("Received the following data from the controller {}", segmentRanges.getSegmentRangesList());
            TreeMap treeMap = new TreeMap();
            for (Controller.SegmentRange segmentRange : segmentRanges.getSegmentRangesList()) {
                Preconditions.checkState(segmentRange.getMinKey() <= segmentRange.getMaxKey(), "Min keyrange %s was not less than maximum keyRange %s for segment %s", Double.valueOf(segmentRange.getMinKey()), Double.valueOf(segmentRange.getMaxKey()), segmentRange.getSegmentId());
                treeMap.put(Double.valueOf(segmentRange.getMaxKey()), ModelHelper.encode(segmentRange.getSegmentId()));
            }
            return new StreamSegments(treeMap, segmentRanges.getDelegationToken());
        }).whenComplete((streamSegments, th) -> {
            if (th != null) {
                log.warn("getCurrentSegments failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getCurrentSegments", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<PravegaNodeUri> getEndpointForSegment(String str) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(str, "qualifiedSegmentName");
        long traceEnter = LoggerHelpers.traceEnter(log, "getEndpointForSegment", str);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "getEndpointForSegment");
            Segment fromScopedName = Segment.fromScopedName(str);
            this.client.getURI(ModelHelper.createSegmentId(fromScopedName.getScope(), fromScopedName.getStreamName(), fromScopedName.getSegmentId()), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) ModelHelper::encode).whenComplete((pravegaNodeUri, th) -> {
            if (th != null) {
                log.warn("getEndpointForSegment failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getEndpointForSegment", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Boolean> isSegmentOpen(Segment segment) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "isSegmentOpen", segment);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "isSegmentOpen");
            this.client.isSegmentValid(ModelHelper.createSegmentId(segment.getScope(), segment.getStreamName(), segment.getSegmentId()), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) (v0) -> {
            return v0.getResponse();
        }).whenComplete((bool, th) -> {
            if (th != null) {
                log.warn("isSegmentOpen failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "isSegmentOpen", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<TxnSegments> createTransaction(Stream stream, long j) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        long traceEnter = LoggerHelpers.traceEnter(log, "createTransaction", stream, Long.valueOf(j));
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "createTransaction");
            this.client.createTransaction(Controller.CreateTxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setLease(j).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) this::convert).whenComplete((txnSegments, th) -> {
            if (th != null) {
                log.warn("createTransaction failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "createTransaction", traceEnter, new Object[0]);
        });
    }

    private TxnSegments convert(Controller.CreateTxnResponse createTxnResponse) {
        TreeMap treeMap = new TreeMap();
        for (Controller.SegmentRange segmentRange : createTxnResponse.getActiveSegmentsList()) {
            Preconditions.checkState(segmentRange.getMinKey() <= segmentRange.getMaxKey());
            treeMap.put(Double.valueOf(segmentRange.getMaxKey()), ModelHelper.encode(segmentRange.getSegmentId()));
        }
        return new TxnSegments(new StreamSegments(treeMap, createTxnResponse.getDelegationToken()), ModelHelper.encode(createTxnResponse.getTxnId()));
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Void> pingTransaction(Stream stream, UUID uuid, long j) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long traceEnter = LoggerHelpers.traceEnter(log, "pingTransaction", stream, uuid, Long.valueOf(j));
        return Futures.toVoidExpecting(this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "pingTransaction");
            this.client.pingTransaction(Controller.PingTxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(uuid)).setLease(j).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor), Controller.PingTxnStatus.newBuilder().setStatus(Controller.PingTxnStatus.Status.OK).build(), PingFailedException::new).whenComplete((r8, th) -> {
            if (th != null) {
                log.warn("pingTransaction failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "pingTransaction", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Void> commitTransaction(Stream stream, UUID uuid) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(uuid, "txId");
        long traceEnter = LoggerHelpers.traceEnter(log, "commitTransaction", stream, uuid);
        return Futures.toVoidExpecting(this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "commitTransaction");
            this.client.commitTransaction(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(uuid)).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor), Controller.TxnStatus.newBuilder().setStatus(Controller.TxnStatus.Status.SUCCESS).build(), TxnFailedException::new).whenComplete((r8, th) -> {
            if (th != null) {
                log.warn("commitTransaction failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "commitTransaction", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Void> abortTransaction(Stream stream, UUID uuid) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(uuid, "txId");
        long traceEnter = LoggerHelpers.traceEnter(log, "abortTransaction", stream, uuid);
        return Futures.toVoidExpecting(this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "abortTransaction");
            this.client.abortTransaction(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(uuid)).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor), Controller.TxnStatus.newBuilder().setStatus(Controller.TxnStatus.Status.SUCCESS).build(), TxnFailedException::new).whenComplete((r8, th) -> {
            if (th != null) {
                log.warn("abortTransaction failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "abortTransaction", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<Transaction.Status> checkTransactionStatus(Stream stream, UUID uuid) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(stream, "stream");
        Preconditions.checkNotNull(uuid, "txId");
        long traceEnter = LoggerHelpers.traceEnter(log, "checkTransactionStatus", stream, uuid);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "checkTransactionStatus");
            this.client.checkTransactionState(Controller.TxnRequest.newBuilder().setStreamInfo(ModelHelper.createStreamInfo(stream.getScope(), stream.getStreamName())).setTxnId(ModelHelper.decode(uuid)).build(), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) txnState -> {
            return ModelHelper.encode(txnState.getState(), stream + StringUtils.SPACE + uuid);
        }).whenComplete((status, th) -> {
            if (th != null) {
                log.warn("checkTransactionStatus failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "checkTransactionStatus", traceEnter, new Object[0]);
        });
    }

    @Override // io.pravega.client.stream.impl.Controller, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.channel.shutdownNow();
    }

    @Override // io.pravega.client.stream.impl.Controller
    public CompletableFuture<String> getOrRefreshDelegationTokenFor(String str, String str2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Exceptions.checkNotNullOrEmpty(str, "scope");
        Exceptions.checkNotNullOrEmpty(str2, "stream");
        long traceEnter = LoggerHelpers.traceEnter(log, "getOrRefreshDelegationTokenFor", str, str2);
        return this.retryConfig.runAsync(() -> {
            RPCAsyncCallback rPCAsyncCallback = new RPCAsyncCallback(traceEnter, "getOrRefreshDelegationTokenFor");
            this.client.getDelegationToken(ModelHelper.createStreamInfo(str, str2), rPCAsyncCallback);
            return rPCAsyncCallback.getFuture();
        }, this.executor).thenApply((Function<? super ReturnT, ? extends U>) delegationToken -> {
            return delegationToken.getDelegationToken();
        }).whenComplete((str3, th) -> {
            if (th != null) {
                log.warn("getCurrentSegments failed: ", th);
            }
            LoggerHelpers.traceLeave(log, "getCurrentSegments", traceEnter, new Object[0]);
        });
    }
}
