package org.apache.activemq.artemis.core.replication;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.internal.PlatformDependent;
import java.io.FileInputStream;
import java.lang.invoke.MethodHandles;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrimaryIsStoppingMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/core/replication/ReplicationManager.class */
public final class ReplicationManager implements ActiveMQComponent {
    private static final Logger logger;
    private final ActiveMQServer server;
    private final Channel replicatingChannel;
    private boolean started;
    private volatile boolean enabled;
    private final ExecutorFactory ioExecutorFactory;
    private SessionFailureListener failureListener;
    private CoreRemotingConnection remotingConnection;
    private final long maxAllowedSlownessNanos;
    private final long initialReplicationSyncTimeout;
    private final Queue<ReplicatePacketRequest> replicatePacketRequests;
    private final Executor replicationStream;
    private final ScheduledExecutorService scheduledExecutorService;
    private ScheduledFuture<?> slowReplicationChecker;
    private long notWritableFrom;
    private boolean checkSlowReplication;
    private final ReadyListener onResume;
    private boolean isFlushing;
    private boolean awaitingResume;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ResponseHandler responseHandler = new ResponseHandler();
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue();
    private volatile boolean inSync = true;
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);

    /* loaded from: input_file:org/apache/activemq/artemis/core/replication/ReplicationManager$ADD_OPERATION_TYPE.class */
    public enum ADD_OPERATION_TYPE {
        UPDATE { // from class: org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE.1
            @Override // org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE
            public byte toRecord() {
                return (byte) 0;
            }
        },
        ADD { // from class: org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE.2
            @Override // org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE
            public byte toRecord() {
                return (byte) 1;
            }
        },
        EVENT { // from class: org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE.3
            @Override // org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE
            public byte toRecord() {
                return (byte) 2;
            }
        };

        public abstract byte toRecord();

        public static ADD_OPERATION_TYPE toOperation(byte b) {
            switch (b) {
                case 0:
                    return UPDATE;
                case 1:
                    return ADD;
                case 2:
                    return EVENT;
                default:
                    return ADD;
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/replication/ReplicationManager$NullEncoding.class */
    private static final class NullEncoding implements EncodingSupport {
        static final NullEncoding instance = new NullEncoding();

        private NullEncoding() {
        }

        @Override // org.apache.activemq.artemis.core.journal.EncodingSupport
        public void decode(ActiveMQBuffer activeMQBuffer) {
        }

        @Override // org.apache.activemq.artemis.core.journal.EncodingSupport
        public void encode(ActiveMQBuffer activeMQBuffer) {
        }

        @Override // org.apache.activemq.artemis.core.journal.EncodingSupport
        public int getEncodeSize() {
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/core/replication/ReplicationManager$ReplicatePacketRequest.class */
    public static final class ReplicatePacketRequest {
        final Packet packet;
        final OperationContext context;
        final ReusableLatch done;

        ReplicatePacketRequest(Packet packet, OperationContext operationContext, ReusableLatch reusableLatch) {
            this.packet = packet;
            this.context = operationContext;
            this.done = reusableLatch;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/replication/ReplicationManager$ReplicatedSessionFailureListener.class */
    private final class ReplicatedSessionFailureListener implements SessionFailureListener {
        private ReplicatedSessionFailureListener() {
        }

        @Override // org.apache.activemq.artemis.core.remoting.FailureListener
        public void connectionFailed(ActiveMQException activeMQException, boolean z) {
            if (activeMQException.getType() == ActiveMQExceptionType.DISCONNECTED) {
                ActiveMQServerLogger.LOGGER.replicationStopOnBackupShutdown();
            } else {
                ActiveMQServerLogger.LOGGER.replicationStopOnBackupFail(activeMQException);
            }
            try {
                ReplicationManager.this.stop();
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingReplication(e);
            }
        }

        @Override // org.apache.activemq.artemis.core.remoting.FailureListener
        public void connectionFailed(ActiveMQException activeMQException, boolean z, String str) {
            connectionFailed(activeMQException, z);
        }

        @Override // org.apache.activemq.artemis.api.core.client.SessionFailureListener
        public void beforeReconnect(ActiveMQException activeMQException) {
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/core/replication/ReplicationManager$ResponseHandler.class */
    private final class ResponseHandler implements ChannelHandler {
        private ResponseHandler() {
        }

        @Override // org.apache.activemq.artemis.core.protocol.core.ChannelHandler
        public void handlePacket(Packet packet) {
            if (packet.getType() == 90 || packet.getType() == -9) {
                ReplicationManager.this.replicated();
                if (packet.getType() == -9 && ((ReplicationResponseMessageV2) packet).isSynchronizationIsFinishedAcknowledgement()) {
                    ReplicationManager.this.synchronizationIsFinishedAcknowledgement.countDown();
                }
            }
        }
    }

    public ReplicationManager(ActiveMQServer activeMQServer, CoreRemotingConnection coreRemotingConnection, long j, long j2, ExecutorFactory executorFactory) {
        this.server = activeMQServer;
        this.ioExecutorFactory = executorFactory;
        this.initialReplicationSyncTimeout = j2;
        this.replicatingChannel = coreRemotingConnection.getChannel(ChannelImpl.CHANNEL_ID.REPLICATION.id, -1);
        this.remotingConnection = coreRemotingConnection;
        Connection transportConnection = this.remotingConnection.getTransportConnection();
        if (transportConnection instanceof NettyConnection) {
            EventLoop eventLoop = ((NettyConnection) transportConnection).getNettyChannel().eventLoop();
            this.replicationStream = eventLoop;
            this.scheduledExecutorService = eventLoop;
        } else {
            this.replicationStream = executorFactory.getExecutor();
            this.scheduledExecutorService = null;
        }
        this.maxAllowedSlownessNanos = j > 0 ? TimeUnit.MILLISECONDS.toNanos(j) : -1L;
        this.replicatePacketRequests = PlatformDependent.newMpscQueue();
        this.slowReplicationChecker = null;
        this.notWritableFrom = Long.MAX_VALUE;
        this.awaitingResume = false;
        this.onResume = this::resume;
        this.isFlushing = false;
        this.checkSlowReplication = false;
    }

    public void appendUpdateRecord(byte b, ADD_OPERATION_TYPE add_operation_type, long j, byte b2, Persister persister, Object obj) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddMessage(this.remotingConnection.isBeforeTwoEighteen(), b, add_operation_type, j, b2, persister, obj));
        }
    }

    public void appendDeleteRecord(byte b, long j) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteMessage(b, j));
        }
    }

    public void appendAddRecordTransactional(byte b, ADD_OPERATION_TYPE add_operation_type, long j, long j2, byte b2, Persister persister, Object obj) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationAddTXMessage(this.remotingConnection.isBeforeTwoEighteen(), b, add_operation_type, j, j2, b2, persister, obj));
        }
    }

    public void appendCommitRecord(byte b, long j, boolean z, boolean z2) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationCommitMessage(b, false, j), z2);
        }
    }

    public void appendDeleteRecordTransactional(byte b, long j, long j2, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteTXMessage(b, j, j2, encodingSupport));
        }
    }

    public void appendDeleteRecordTransactional(byte b, long j, long j2) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationDeleteTXMessage(b, j, j2, NullEncoding.instance));
        }
    }

    public void appendPrepareRecord(byte b, long j, EncodingSupport encodingSupport) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPrepareMessage(b, j, encodingSupport));
        }
    }

    public void appendRollbackRecord(byte b, long j) throws Exception {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationCommitMessage(b, true, j));
        }
    }

    public void pageClosed(SimpleString simpleString, long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageEventMessage(simpleString, j, false, this.remotingConnection.isVersionUsingLongOnPageReplication()));
        }
    }

    public void pageDeleted(SimpleString simpleString, long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageEventMessage(simpleString, j, true, this.remotingConnection.isVersionUsingLongOnPageReplication()));
        }
    }

    public void pageWrite(PagedMessage pagedMessage, long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationPageWriteMessage(pagedMessage, j, this.remotingConnection.isVersionUsingLongOnPageReplication()));
        }
    }

    public void largeMessageBegin(long j) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageBeginMessage(j));
        }
    }

    public void largeMessageDelete(Long l, JournalStorageManager journalStorageManager) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageEndMessage(l.longValue(), journalStorageManager.generateID(), true));
        }
    }

    public void largeMessageClosed(Long l, JournalStorageManager journalStorageManager) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageEndMessage(l.longValue(), -1L, false));
        }
    }

    public void largeMessageWrite(long j, byte[] bArr) {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationLargeMessageWriteMessage(j, bArr));
        }
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public synchronized void start() throws ActiveMQException {
        if (this.started) {
            throw new IllegalStateException("ReplicationManager is already started");
        }
        this.replicatingChannel.setHandler(this.responseHandler);
        this.failureListener = new ReplicatedSessionFailureListener();
        this.remotingConnection.addFailureListener(this.failureListener);
        if (this.scheduledExecutorService != null && this.maxAllowedSlownessNanos >= 0) {
            long j = this.maxAllowedSlownessNanos / 10;
            if (j > TimeUnit.SECONDS.toNanos(1L)) {
                j = TimeUnit.SECONDS.toNanos(1L);
            } else if (j < TimeUnit.MILLISECONDS.toNanos(100L)) {
                logger.warn("The cluster call timeout is too low ie {} ms: consider raising it to save CPU", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(this.maxAllowedSlownessNanos)));
                j = TimeUnit.MILLISECONDS.toNanos(100L);
            }
            logger.debug("Slow replication checker is running with a period of {} ms", Long.valueOf(TimeUnit.NANOSECONDS.toMillis(j)));
            this.slowReplicationChecker = this.scheduledExecutorService.scheduleAtFixedRate(this::checkSlowReplication, j, j, TimeUnit.NANOSECONDS);
        }
        this.started = true;
        this.enabled = true;
    }

    @Override // org.apache.activemq.artemis.core.server.ActiveMQComponent
    public void stop() throws Exception {
        stop(true);
    }

    public void stop(boolean z) throws Exception {
        synchronized (this) {
            if (!this.started) {
                logger.trace("Stopping being ignored as it hasn't been started");
                return;
            }
            this.started = false;
            if (logger.isTraceEnabled()) {
                logger.trace("stop(clearTokens={})", Boolean.valueOf(z), new Exception("Trace"));
            }
            if (this.replicatingChannel != null) {
                this.replicatingChannel.close();
                this.replicatingChannel.getConnection().getTransportConnection().fireReady(true);
            }
            if (this.slowReplicationChecker != null) {
                this.slowReplicationChecker.cancel(false);
                this.slowReplicationChecker = null;
            }
            this.enabled = false;
            if (z) {
                clearReplicationTokens();
            }
            CoreRemotingConnection coreRemotingConnection = this.remotingConnection;
            if (coreRemotingConnection != null) {
                coreRemotingConnection.removeFailureListener(this.failureListener);
                coreRemotingConnection.destroy();
            }
            this.remotingConnection = null;
        }
    }

    public void clearReplicationTokens() {
        logger.trace("clearReplicationTokens initiating");
        while (!this.pendingTokens.isEmpty()) {
            OperationContext poll = this.pendingTokens.poll();
            logger.trace("Calling ctx.replicationDone()");
            try {
                poll.replicationDone();
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(th);
            }
        }
        logger.trace("clearReplicationTokens finished");
    }

    public Set<OperationContext> getActiveTokens() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        Iterator<OperationContext> it = this.pendingTokens.iterator();
        while (it.hasNext()) {
            linkedHashSet.add(it.next());
        }
        return linkedHashSet;
    }

    private OperationContext sendReplicatePacket(Packet packet) {
        return sendReplicatePacket(packet, true);
    }

    private OperationContext sendReplicatePacket(Packet packet, boolean z) {
        return sendReplicatePacket(packet, z, null);
    }

    private OperationContext sendReplicatePacket(Packet packet, boolean z, ReusableLatch reusableLatch) {
        if (!this.enabled) {
            packet.release();
            return null;
        }
        OperationContext context = OperationContextImpl.getContext(this.ioExecutorFactory);
        if (z) {
            context.replicationLineUp();
        }
        this.replicatePacketRequests.add(new ReplicatePacketRequest(packet, context, reusableLatch));
        this.replicationStream.execute(() -> {
            if (this.enabled) {
                sendReplicatedPackets(false);
            } else {
                releaseReplicatedPackets(this.replicatePacketRequests);
            }
        });
        return context;
    }

    private void releaseReplicatedPackets(Queue<ReplicatePacketRequest> queue) {
        if (!$assertionsDisabled && !checkEventLoop()) {
            throw new AssertionError();
        }
        while (true) {
            ReplicatePacketRequest poll = queue.poll();
            if (poll == null) {
                return;
            }
            poll.packet.release();
            poll.context.replicationDone();
            if (poll.done != null) {
                poll.done.countDown();
            }
        }
    }

    private void checkSlowReplication() {
        if (this.enabled) {
            if (!$assertionsDisabled && !checkEventLoop()) {
                throw new AssertionError();
            }
            if (this.checkSlowReplication) {
                if (this.replicatingChannel.getConnection().blockUntilWritable(0L)) {
                    this.checkSlowReplication = false;
                    return;
                }
                if (System.nanoTime() - this.notWritableFrom >= this.maxAllowedSlownessNanos) {
                    this.checkSlowReplication = false;
                    releaseReplicatedPackets(this.replicatePacketRequests);
                    try {
                        ActiveMQServerLogger.LOGGER.slowReplicationResponse();
                        stop();
                    } catch (Exception e) {
                        logger.warn(e.getMessage(), (Throwable) e);
                    }
                }
            }
        }
    }

    private void resume() {
        sendReplicatedPackets(true);
    }

    private void sendReplicatedPackets(boolean z) {
        if (!$assertionsDisabled && !checkEventLoop()) {
            throw new AssertionError();
        }
        if (z) {
            this.awaitingResume = false;
        }
        if (this.awaitingResume || this.isFlushing || !this.enabled || this.replicatePacketRequests.isEmpty()) {
            return;
        }
        this.isFlushing = true;
        CoreRemotingConnection connection = this.replicatingChannel.getConnection();
        while (connection.blockUntilWritable(0L)) {
            try {
                try {
                    this.checkSlowReplication = false;
                    ReplicatePacketRequest poll = this.replicatePacketRequests.poll();
                    if (poll == null) {
                        this.replicatingChannel.flushConnection();
                        this.isFlushing = false;
                        return;
                    }
                    this.pendingTokens.add(poll.context);
                    Packet packet = poll.packet;
                    ReusableLatch reusableLatch = poll.done;
                    if (reusableLatch != null) {
                        reusableLatch.countDown();
                    }
                    this.replicatingChannel.send(packet, false);
                } catch (Throwable th) {
                    if (!$assertionsDisabled && (th instanceof AssertionError)) {
                        throw new AssertionError(th.getMessage());
                    }
                    if (connection.getTransportConnection().isOpen()) {
                        logger.warn("Unexpected error while flushing replicate packets", th);
                    } else {
                        logger.trace("Transport connection closed: cleaning up replicate tokens", th);
                        releaseReplicatedPackets(this.replicatePacketRequests);
                        connection.getTransportConnection().fireReady(true);
                    }
                    this.isFlushing = false;
                    return;
                }
            } catch (Throwable th2) {
                this.isFlushing = false;
                throw th2;
            }
        }
        this.replicatingChannel.flushConnection();
        if (!$assertionsDisabled && this.awaitingResume) {
            throw new AssertionError();
        }
        if (!this.replicatePacketRequests.isEmpty()) {
            if (connection.isWritable(this.onResume)) {
                this.replicationStream.execute(() -> {
                    sendReplicatedPackets(false);
                });
            } else {
                this.checkSlowReplication = true;
                this.notWritableFrom = System.nanoTime();
                this.awaitingResume = true;
            }
        }
        this.isFlushing = false;
    }

    private boolean checkEventLoop() {
        if (this.replicationStream instanceof SingleThreadEventLoop) {
            return ((SingleThreadEventLoop) this.replicationStream).inEventLoop();
        }
        return true;
    }

    private void replicated() {
        if (!$assertionsDisabled && !checkEventLoop()) {
            throw new AssertionError();
        }
        OperationContext poll = this.pendingTokens.poll();
        if (poll == null) {
            ActiveMQServerLogger.LOGGER.missingReplicationTokenOnQueue();
        } else {
            poll.replicationDone();
        }
    }

    public void syncJournalFile(JournalFile journalFile, AbstractJournalStorageManager.JournalContent journalContent) throws Exception {
        if (this.enabled) {
            SequentialFile cloneFile = journalFile.getFile().cloneFile();
            try {
                ActiveMQServerLogger.LOGGER.replicaSyncFile(cloneFile, Long.valueOf(cloneFile.size()));
                sendLargeFile(journalContent, null, journalFile.getFileID(), cloneFile, Long.MAX_VALUE);
                if (cloneFile.isOpen()) {
                    cloneFile.close();
                }
            } catch (Throwable th) {
                if (cloneFile.isOpen()) {
                    cloneFile.close();
                }
                throw th;
            }
        }
    }

    public void syncLargeMessageFile(SequentialFile sequentialFile, long j, long j2) throws Exception {
        if (this.enabled) {
            sendLargeFile(null, null, j2, sequentialFile, j);
        }
    }

    public void syncPages(SequentialFile sequentialFile, long j, SimpleString simpleString) throws Exception {
        if (this.enabled) {
            sendLargeFile(null, simpleString, j, sequentialFile, Long.MAX_VALUE);
        }
    }

    private void sendLargeFile(AbstractJournalStorageManager.JournalContent journalContent, SimpleString simpleString, long j, SequentialFile sequentialFile, long j2) throws Exception {
        boolean z;
        if (this.enabled) {
            if (!sequentialFile.isOpen()) {
                sequentialFile.open();
            }
            int i = 0;
            ReusableLatch reusableLatch = new ReusableLatch(1);
            try {
                FileInputStream fileInputStream = new FileInputStream(sequentialFile.getJavaFile());
                try {
                    FileChannel channel = fileInputStream.getChannel();
                    do {
                        try {
                            ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(32768, 32768);
                            directBuffer.clear();
                            int read = channel.read(directBuffer.writerIndex(32768).readerIndex(0).nioBuffer());
                            int i2 = read;
                            if (read > 0) {
                                if (read >= j2) {
                                    i2 = (int) j2;
                                    j2 = 0;
                                } else {
                                    j2 -= read;
                                }
                            }
                            if (logger.isDebugEnabled()) {
                                logger.debug("sending {} bytes on file {}", Integer.valueOf(directBuffer.writerIndex()), sequentialFile.getFileName());
                            }
                            z = read == -1 || read == 0 || j2 == 0;
                            if (i % 10 == 0 || z) {
                                reusableLatch.setCount(1);
                                sendReplicatePacket(new ReplicationSyncFileMessage(journalContent, simpleString, j, i2, directBuffer), true, reusableLatch);
                                awaitFlushOfReplicationStream(reusableLatch);
                            } else {
                                sendReplicatePacket(new ReplicationSyncFileMessage(journalContent, simpleString, j, i2, directBuffer), true);
                            }
                            i++;
                        } catch (Throwable th) {
                            if (channel != null) {
                                try {
                                    channel.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } while (!z);
                    if (channel != null) {
                        channel.close();
                    }
                    fileInputStream.close();
                } finally {
                }
            } finally {
                if (sequentialFile.isOpen()) {
                    sequentialFile.close();
                }
            }
        }
    }

    private void awaitFlushOfReplicationStream(ReusableLatch reusableLatch) throws Exception {
        if (!reusableLatch.await(this.initialReplicationSyncTimeout, TimeUnit.MILLISECONDS)) {
            throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(this.initialReplicationSyncTimeout);
        }
    }

    public void sendStartSyncMessage(JournalFile[] journalFileArr, AbstractJournalStorageManager.JournalContent journalContent, String str, boolean z) throws ActiveMQException {
        if (this.enabled) {
            sendReplicatePacket(new ReplicationStartSyncMessage(this.remotingConnection.isBeforeTwoEighteen(), journalFileArr, journalContent, str, z));
        }
    }

    public void sendSynchronizationDone(String str, long j, IOCriticalErrorListener iOCriticalErrorListener) throws ActiveMQReplicationTimeooutException {
        if (this.enabled) {
            if (logger.isTraceEnabled()) {
                logger.trace("sendSynchronizationDone ::{}, {}", str, Long.valueOf(j));
            }
            this.synchronizationIsFinishedAcknowledgement.countUp();
            sendReplicatePacket(new ReplicationStartSyncMessage(this.remotingConnection.isBeforeTwoEighteen(), str, this.server.getNodeManager().getNodeActivationSequence()));
            try {
            } catch (InterruptedException e) {
                logger.debug(e.getMessage(), (Throwable) e);
            }
            if (this.synchronizationIsFinishedAcknowledgement.await(j)) {
                this.inSync = false;
                logger.trace("sendSynchronizationDone finished");
                return;
            }
            ActiveMQReplicationTimeooutException replicationSynchronizationTimeout = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(j);
            if (this.server != null) {
                try {
                    ClusterManager clusterManager = this.server.getClusterManager();
                    if (clusterManager != null) {
                        QuorumManager quorumManager = clusterManager.getQuorumManager();
                        if (iOCriticalErrorListener != null && quorumManager != null && quorumManager.getMaxClusterSize() <= 2) {
                            iOCriticalErrorListener.onIOException(replicationSynchronizationTimeout, replicationSynchronizationTimeout.getMessage(), null);
                        }
                    }
                } catch (Throwable th) {
                    logger.warn(th.getMessage(), th);
                }
            }
            logger.trace("sendSynchronizationDone wasn't finished in time");
            throw replicationSynchronizationTimeout;
        }
    }

    public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> map) {
        ArrayList arrayList = new ArrayList(map.keySet());
        if (this.enabled) {
            sendReplicatePacket(new ReplicationStartSyncMessage(this.remotingConnection.isBeforeTwoEighteen(), arrayList));
        }
    }

    public OperationContext sendPrimaryIsStopping(ReplicationPrimaryIsStoppingMessage.PrimaryStopping primaryStopping) {
        logger.debug("PRIMARY IS STOPPING?!? message={} enabled={}", primaryStopping, Boolean.valueOf(this.enabled));
        if (!this.enabled) {
            return null;
        }
        logger.debug("PRIMARY IS STOPPING?!? message={} {}", primaryStopping, Boolean.valueOf(this.enabled));
        return sendReplicatePacket(new ReplicationPrimaryIsStoppingMessage(primaryStopping));
    }

    public CoreRemotingConnection getBackupTransportConnection() {
        return this.remotingConnection;
    }

    public boolean isSynchronizing() {
        return this.inSync;
    }

    static {
        $assertionsDisabled = !ReplicationManager.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    }
}
