package io.pravega.client.netty.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ScheduledFuture;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.ReusableFutureLatch;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/netty/impl/FlowHandler.class */
public class FlowHandler extends ChannelInboundHandlerAdapter implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FlowHandler.class);
    private static final int FLOW_DISABLED = -1;
    private final String connectionName;
    private final AppendBatchSizeTracker batchSizeTracker;
    private final AtomicReference<Channel> channel = new AtomicReference<>();
    private final AtomicReference<ScheduledFuture<?>> keepAliveFuture = new AtomicReference<>();
    private final AtomicBoolean recentMessage = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ReusableFutureLatch<Void> registeredFutureLatch = new ReusableFutureLatch<>();

    @VisibleForTesting
    private final ConcurrentHashMap<Integer, ReplyProcessor> flowIdReplyProcessorMap = new ConcurrentHashMap<>();
    private final AtomicBoolean disableFlow = new AtomicBoolean(false);

    /* loaded from: input_file:io/pravega/client/netty/impl/FlowHandler$KeepAliveTask.class */
    private final class KeepAliveTask implements Runnable {
        private KeepAliveTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!FlowHandler.this.recentMessage.getAndSet(false)) {
                    Futures.getAndHandleExceptions(FlowHandler.this.getChannel().writeAndFlush(new WireCommands.KeepAlive()), ConnectionFailedException::new);
                }
            } catch (Exception e) {
                FlowHandler.log.warn("Keep alive failed, killing connection {} due to {}", FlowHandler.this.connectionName, e.getMessage());
                FlowHandler.this.close();
            }
        }
    }

    public FlowHandler(String str, AppendBatchSizeTracker appendBatchSizeTracker) {
        this.connectionName = str;
        this.batchSizeTracker = appendBatchSizeTracker;
    }

    public ClientConnection createFlow(Flow flow, ReplyProcessor replyProcessor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.get(), "Ensure flows are enabled.");
        log.info("Creating Flow {} for endpoint {}. The current Channel is {}.", Integer.valueOf(flow.getFlowId()), this.connectionName, this.channel.get());
        if (this.flowIdReplyProcessorMap.put(Integer.valueOf(flow.getFlowId()), replyProcessor) != null) {
            throw new IllegalArgumentException("Multiple flows cannot be created with the same Flow id " + flow.getFlowId());
        }
        return new ClientConnectionImpl(this.connectionName, flow.getFlowId(), this.batchSizeTracker, this);
    }

    public ClientConnection createConnectionWithFlowDisabled(ReplyProcessor replyProcessor) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(!this.disableFlow.getAndSet(true), "Flows are disabled, incorrect usage pattern.");
        log.info("Creating a new connection with flow disabled for endpoint {}. The current Channel is {}.", this.connectionName, this.channel.get());
        this.flowIdReplyProcessorMap.put(-1, replyProcessor);
        return new ClientConnectionImpl(this.connectionName, -1, this.batchSizeTracker, this);
    }

    public void closeFlow(ClientConnection clientConnection) {
        ClientConnectionImpl clientConnectionImpl = (ClientConnectionImpl) clientConnection;
        int flowId = clientConnectionImpl.getFlowId();
        log.info("Closing Flow {} for endpoint {}", Integer.valueOf(flowId), clientConnectionImpl.getConnectionName());
        this.flowIdReplyProcessorMap.remove(Integer.valueOf(flowId));
    }

    public int getOpenFlowCount() {
        return this.flowIdReplyProcessorMap.size();
    }

    public boolean isConnectionEstablished() {
        return this.channel.get() != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Channel getChannel() throws ConnectionFailedException {
        Channel channel = this.channel.get();
        if (channel == null) {
            throw new ConnectionFailedException("Connection to " + this.connectionName + " is not established.");
        }
        return channel;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setRecentMessage() {
        this.recentMessage.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeWhenRegistered(CompletableFuture<Void> completableFuture) {
        Preconditions.checkNotNull(completableFuture, "future");
        this.registeredFutureLatch.register(completableFuture);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelRegistered(channelHandlerContext);
        Channel channel = channelHandlerContext.channel();
        this.channel.set(channel);
        log.info("Connection established with endpoint {} on ChannelId {}", this.connectionName, channel);
        channel.write(new WireCommands.Hello(8, 5), channel.voidPromise());
        this.registeredFutureLatch.release(null);
        ScheduledFuture<?> andSet = this.keepAliveFuture.getAndSet(channel.eventLoop().scheduleWithFixedDelay((Runnable) new KeepAliveTask(), 20L, 10L, TimeUnit.SECONDS));
        if (andSet != null) {
            andSet.cancel(false);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.registeredFutureLatch.reset();
        ScheduledFuture<?> scheduledFuture = this.keepAliveFuture.get();
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.channel.set(null);
        this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
            try {
                log.debug("Connection dropped for flow id {}", num);
                replyProcessor.connectionDropped();
            } catch (Exception e) {
                log.warn("Encountered exception invoking ReplyProcessor for flow id {}", num, e);
            }
        });
        super.channelUnregistered(channelHandlerContext);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        Reply reply = (Reply) obj;
        log.debug(this.connectionName + " processing reply {} with flow {}", reply, Flow.from(reply.getRequestId()));
        if (reply instanceof WireCommands.Hello) {
            this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
                try {
                    replyProcessor.hello((WireCommands.Hello) reply);
                } catch (Exception e) {
                    log.warn("Encountered exception invoking ReplyProcessor.hello for flow id {}", num, e);
                }
            });
            return;
        }
        if (reply instanceof WireCommands.DataAppended) {
            this.batchSizeTracker.recordAck(((WireCommands.DataAppended) reply).getEventNumber());
        }
        getReplyProcessor(reply).ifPresent(replyProcessor2 -> {
            try {
                replyProcessor2.process(reply);
            } catch (Exception e) {
                log.warn("ReplyProcessor.process failed for reply {} due to {}", reply, e.getMessage());
                replyProcessor2.processingFailure(e);
            }
        });
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.flowIdReplyProcessorMap.forEach((num, replyProcessor) -> {
            try {
                log.debug("Exception observed for flow id {} due to {}", num, th.getMessage());
                replyProcessor.processingFailure(new ConnectionFailedException(th));
            } catch (Exception e) {
                log.warn("Encountered exception invoking ReplyProcessor.processingFailure for flow id {}", num, e);
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Channel channel;
        if (!this.closed.compareAndSet(false, true) || (channel = this.channel.get()) == null) {
            return;
        }
        log.debug("Closing channel {} ", channel);
        int size = this.flowIdReplyProcessorMap.size();
        if (size != 0) {
            log.warn("{} flows are not closed", Integer.valueOf(size));
        }
        channel.close();
    }

    private Optional<ReplyProcessor> getReplyProcessor(Reply reply) {
        int flowId = this.disableFlow.get() ? -1 : Flow.from(reply.getRequestId()).getFlowId();
        ReplyProcessor replyProcessor = this.flowIdReplyProcessorMap.get(Integer.valueOf(flowId));
        if (replyProcessor == null) {
            log.warn("No ReplyProcessor found for the provided flowId {}. Ignoring response", Integer.valueOf(flowId));
        }
        return Optional.ofNullable(replyProcessor);
    }

    @SuppressFBWarnings(justification = "generated code")
    public AppendBatchSizeTracker getBatchSizeTracker() {
        return this.batchSizeTracker;
    }

    @SuppressFBWarnings(justification = "generated code")
    ConcurrentHashMap<Integer, ReplyProcessor> getFlowIdReplyProcessorMap() {
        return this.flowIdReplyProcessorMap;
    }
}
