package org.apache.hadoop.hdfs.qjournal.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.TextFormat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.mortbay.util.URIUtil;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.class
  input_file:hadoop-hdfs-2.5.2/share/hadoop/hdfs/hadoop-hdfs-2.5.2.jar:org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.class
 */
@InterfaceAudience.Private
/* loaded from: input_file:hadoop-hdfs-2.5.2.jar:org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.class */
public class QuorumJournalManager implements JournalManager {
    static final Log LOG;
    private final int startSegmentTimeoutMs;
    private final int prepareRecoveryTimeoutMs;
    private final int acceptRecoveryTimeoutMs;
    private final int finalizeSegmentTimeoutMs;
    private final int selectInputStreamsTimeoutMs;
    private final int getJournalStateTimeoutMs;
    private final int newEpochTimeoutMs;
    private final int writeTxnsTimeoutMs;
    private static final int FORMAT_TIMEOUT_MS = 60000;
    private static final int HASDATA_TIMEOUT_MS = 60000;
    private static final int CAN_ROLL_BACK_TIMEOUT_MS = 60000;
    private static final int FINALIZE_TIMEOUT_MS = 60000;
    private static final int PRE_UPGRADE_TIMEOUT_MS = 60000;
    private static final int ROLL_BACK_TIMEOUT_MS = 60000;
    private static final int UPGRADE_TIMEOUT_MS = 60000;
    private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
    private static final int DISCARD_SEGMENTS_TIMEOUT_MS = 60000;
    private final Configuration conf;
    private final URI uri;
    private final NamespaceInfo nsInfo;
    private boolean isActiveWriter;
    private final AsyncLoggerSet loggers;
    private int outputBufferCapacity;
    private final URLConnectionFactory connectionFactory;
    static final /* synthetic */ boolean $assertionsDisabled;

    public QuorumJournalManager(Configuration configuration, URI uri, NamespaceInfo namespaceInfo) throws IOException {
        this(configuration, uri, namespaceInfo, IPCLoggerChannel.FACTORY);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QuorumJournalManager(Configuration configuration, URI uri, NamespaceInfo namespaceInfo, AsyncLogger.Factory factory) throws IOException {
        this.outputBufferCapacity = 524288;
        Preconditions.checkArgument(configuration != null, "must be configured");
        this.conf = configuration;
        this.uri = uri;
        this.nsInfo = namespaceInfo;
        this.loggers = new AsyncLoggerSet(createLoggers(factory));
        this.connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(configuration);
        this.startSegmentTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY, 20000);
        this.prepareRecoveryTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY, 120000);
        this.acceptRecoveryTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY, 120000);
        this.finalizeSegmentTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY, 120000);
        this.selectInputStreamsTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 20000);
        this.getJournalStateTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY, 120000);
        this.newEpochTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY, 120000);
        this.writeTxnsTimeoutMs = configuration.getInt(DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY, 20000);
    }

    protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) throws IOException {
        return createLoggers(this.conf, this.uri, this.nsInfo, factory);
    }

    static String parseJournalId(URI uri) {
        String path = uri.getPath();
        Preconditions.checkArgument((path == null || path.isEmpty()) ? false : true, "Bad URI '%s': must identify journal in path component", uri);
        String substring = path.substring(1);
        checkJournalId(substring);
        return substring;
    }

    public static void checkJournalId(String str) {
        Preconditions.checkArgument((str == null || str.isEmpty() || str.contains(URIUtil.SLASH) || str.startsWith(".")) ? false : true, "bad journal id: " + str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<AsyncLogger, QJournalProtocolProtos.NewEpochResponseProto> createNewUniqueEpoch() throws IOException {
        Preconditions.checkState(!this.loggers.isEpochEstablished(), "epoch already created");
        long j = Long.MIN_VALUE;
        Iterator it = this.loggers.waitForWriteQuorum(this.loggers.getJournalState(), this.getJournalStateTimeoutMs, "getJournalState()").values().iterator();
        while (it.hasNext()) {
            j = Math.max(j, ((QJournalProtocolProtos.GetJournalStateResponseProto) it.next()).getLastPromisedEpoch());
        }
        if (!$assertionsDisabled && j < 0) {
            throw new AssertionError();
        }
        long j2 = j + 1;
        Map<AsyncLogger, QJournalProtocolProtos.NewEpochResponseProto> waitForWriteQuorum = this.loggers.waitForWriteQuorum(this.loggers.newEpoch(this.nsInfo, j2), this.newEpochTimeoutMs, "newEpoch(" + j2 + ")");
        this.loggers.setEpoch(j2);
        return waitForWriteQuorum;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void format(NamespaceInfo namespaceInfo) throws IOException {
        QuorumCall<AsyncLogger, Void> format = this.loggers.format(namespaceInfo);
        try {
            format.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "format");
            if (format.countExceptions() > 0) {
                format.rethrowException("Could not format one or more JournalNodes");
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted waiting for format() response");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for format() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable
    public boolean hasSomeData() throws IOException {
        QuorumCall<AsyncLogger, Boolean> isFormatted = this.loggers.isFormatted();
        try {
            isFormatted.waitFor(this.loggers.size(), 0, 0, 60000, "hasSomeData");
            if (isFormatted.countExceptions() > 0) {
                isFormatted.rethrowException("Unable to check if JNs are ready for formatting");
            }
            Iterator<Boolean> it = isFormatted.getResults().values().iterator();
            while (it.hasNext()) {
                if (it.next().booleanValue()) {
                    return true;
                }
            }
            return false;
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while determining if JNs have data");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for response from loggers");
        }
    }

    private void recoverUnclosedSegment(long j) throws IOException {
        Preconditions.checkArgument(j > 0);
        LOG.info("Beginning recovery of unclosed segment starting at txid " + j);
        Map waitForWriteQuorum = this.loggers.waitForWriteQuorum(this.loggers.prepareRecovery(j), this.prepareRecoveryTimeoutMs, "prepareRecovery(" + j + ")");
        LOG.info("Recovery prepare phase complete. Responses:\n" + QuorumCall.mapToString(waitForWriteQuorum));
        Map.Entry entry = (Map.Entry) Collections.max(waitForWriteQuorum.entrySet(), SegmentRecoveryComparator.INSTANCE);
        AsyncLogger asyncLogger = (AsyncLogger) entry.getKey();
        QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecoveryResponseProto = (QJournalProtocolProtos.PrepareRecoveryResponseProto) entry.getValue();
        if (prepareRecoveryResponseProto.hasAcceptedInEpoch()) {
            LOG.info("Using already-accepted recovery for segment starting at txid " + j + ": " + entry);
        } else {
            if (!prepareRecoveryResponseProto.hasSegmentState()) {
                for (QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecoveryResponseProto2 : waitForWriteQuorum.values()) {
                    if (!$assertionsDisabled && prepareRecoveryResponseProto2.hasSegmentState()) {
                        throw new AssertionError("One of the loggers had a response, but no best logger was found.");
                    }
                }
                LOG.info("None of the responders had a log to recover: " + QuorumCall.mapToString(waitForWriteQuorum));
                return;
            }
            LOG.info("Using longest log: " + entry);
        }
        QJournalProtocolProtos.SegmentStateProto segmentState = prepareRecoveryResponseProto.getSegmentState();
        if (!$assertionsDisabled && j != segmentState.getStartTxId()) {
            throw new AssertionError();
        }
        for (Map.Entry entry2 : waitForWriteQuorum.entrySet()) {
            AsyncLogger asyncLogger2 = (AsyncLogger) entry2.getKey();
            QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecoveryResponseProto3 = (QJournalProtocolProtos.PrepareRecoveryResponseProto) entry2.getValue();
            if (prepareRecoveryResponseProto3.hasLastCommittedTxId() && prepareRecoveryResponseProto3.getLastCommittedTxId() > segmentState.getEndTxId()) {
                throw new AssertionError("Decided to synchronize log to " + segmentState + " but logger " + asyncLogger2 + " had seen txid " + prepareRecoveryResponseProto3.getLastCommittedTxId() + " committed");
            }
        }
        this.loggers.waitForWriteQuorum(this.loggers.acceptRecovery(segmentState, asyncLogger.buildURLToFetchLogs(j)), this.acceptRecoveryTimeoutMs, "acceptRecovery(" + TextFormat.shortDebugString(segmentState) + ")");
        this.loggers.waitForWriteQuorum(this.loggers.finalizeLogSegment(segmentState.getStartTxId(), segmentState.getEndTxId()), this.finalizeSegmentTimeoutMs, String.format("finalizeLogSegment(%s-%s)", Long.valueOf(segmentState.getStartTxId()), Long.valueOf(segmentState.getEndTxId())));
    }

    static List<AsyncLogger> createLoggers(Configuration configuration, URI uri, NamespaceInfo namespaceInfo, AsyncLogger.Factory factory) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        List<InetSocketAddress> loggerAddresses = getLoggerAddresses(uri);
        String parseJournalId = parseJournalId(uri);
        Iterator<InetSocketAddress> it = loggerAddresses.iterator();
        while (it.hasNext()) {
            newArrayList.add(factory.createLogger(configuration, namespaceInfo, parseJournalId, it.next()));
        }
        return newArrayList;
    }

    private static List<InetSocketAddress> getLoggerAddresses(URI uri) throws IOException {
        String authority = uri.getAuthority();
        Preconditions.checkArgument((authority == null || authority.isEmpty()) ? false : true, "URI has no authority: " + uri);
        String[] split = StringUtils.split(authority, ';');
        for (int i = 0; i < split.length; i++) {
            split[i] = split[i].trim();
        }
        if (split.length % 2 == 0) {
            LOG.warn("Quorum journal URI '" + uri + "' has an even number of Journal Nodes specified. This is not recommended!");
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (String str : split) {
            newArrayList.add(NetUtils.createSocketAddr(str, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT));
        }
        return newArrayList;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public EditLogOutputStream startLogSegment(long j, int i) throws IOException {
        Preconditions.checkState(this.isActiveWriter, "must recover segments before starting a new one");
        this.loggers.waitForWriteQuorum(this.loggers.startLogSegment(j, i), this.startSegmentTimeoutMs, "startLogSegment(" + j + ")");
        return new QuorumOutputStream(this.loggers, j, this.outputBufferCapacity, this.writeTxnsTimeoutMs);
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void finalizeLogSegment(long j, long j2) throws IOException {
        this.loggers.waitForWriteQuorum(this.loggers.finalizeLogSegment(j, j2), this.finalizeSegmentTimeoutMs, String.format("finalizeLogSegment(%s-%s)", Long.valueOf(j), Long.valueOf(j2)));
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void setOutputBufferCapacity(int i) {
        this.outputBufferCapacity = i;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.LogsPurgeable
    public void purgeLogsOlderThan(long j) throws IOException {
        LOG.info("Purging remote journals older than txid " + j);
        this.loggers.purgeLogsOlderThan(j);
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void recoverUnfinalizedSegments() throws IOException {
        Preconditions.checkState(!this.isActiveWriter, "already active writer");
        LOG.info("Starting recovery process for unclosed journal segments...");
        Map<AsyncLogger, QJournalProtocolProtos.NewEpochResponseProto> createNewUniqueEpoch = createNewUniqueEpoch();
        LOG.info("Successfully started new epoch " + this.loggers.getEpoch());
        if (LOG.isDebugEnabled()) {
            LOG.debug("newEpoch(" + this.loggers.getEpoch() + ") responses:\n" + QuorumCall.mapToString(createNewUniqueEpoch));
        }
        long j = Long.MIN_VALUE;
        for (QJournalProtocolProtos.NewEpochResponseProto newEpochResponseProto : createNewUniqueEpoch.values()) {
            if (newEpochResponseProto.hasLastSegmentTxId()) {
                j = Math.max(j, newEpochResponseProto.getLastSegmentTxId());
            }
        }
        if (j != Long.MIN_VALUE) {
            recoverUnclosedSegment(j);
        }
        this.isActiveWriter = true;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.loggers.close();
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.LogsPurgeable
    public void selectInputStreams(Collection<EditLogInputStream> collection, long j, boolean z) throws IOException {
        Map<?, ?> waitForWriteQuorum = this.loggers.waitForWriteQuorum(this.loggers.getEditLogManifest(j, z), this.selectInputStreamsTimeoutMs, "selectInputStreams");
        LOG.debug("selectInputStream manifests:\n" + Joiner.on(IOUtils.LINE_SEPARATOR_UNIX).withKeyValueSeparator(": ").join(waitForWriteQuorum));
        PriorityQueue priorityQueue = new PriorityQueue(64, JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
        for (Map.Entry<?, ?> entry : waitForWriteQuorum.entrySet()) {
            AsyncLogger asyncLogger = (AsyncLogger) entry.getKey();
            for (RemoteEditLog remoteEditLog : ((RemoteEditLogManifest) entry.getValue()).getLogs()) {
                priorityQueue.add(EditLogFileInputStream.fromUrl(this.connectionFactory, asyncLogger.buildURLToFetchLogs(remoteEditLog.getStartTxId()), remoteEditLog.getStartTxId(), remoteEditLog.getEndTxId(), remoteEditLog.isInProgress()));
            }
        }
        JournalSet.chainAndMakeRedundantStreams(collection, priorityQueue, j);
    }

    @Override // org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable
    public String toString() {
        return "QJM to " + this.loggers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public AsyncLoggerSet getLoggerSetForTests() {
        return this.loggers;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void doPreUpgrade() throws IOException {
        QuorumCall<AsyncLogger, Void> doPreUpgrade = this.loggers.doPreUpgrade();
        try {
            doPreUpgrade.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "doPreUpgrade");
            if (doPreUpgrade.countExceptions() > 0) {
                doPreUpgrade.rethrowException("Could not do pre-upgrade of one or more JournalNodes");
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted waiting for doPreUpgrade() response");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for doPreUpgrade() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void doUpgrade(Storage storage) throws IOException {
        QuorumCall<AsyncLogger, Void> doUpgrade = this.loggers.doUpgrade(storage);
        try {
            doUpgrade.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "doUpgrade");
            if (doUpgrade.countExceptions() > 0) {
                doUpgrade.rethrowException("Could not perform upgrade of one or more JournalNodes");
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted waiting for doUpgrade() response");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for doUpgrade() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void doFinalize() throws IOException {
        QuorumCall<AsyncLogger, Void> doFinalize = this.loggers.doFinalize();
        try {
            doFinalize.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "doFinalize");
            if (doFinalize.countExceptions() > 0) {
                doFinalize.rethrowException("Could not finalize one or more JournalNodes");
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted waiting for doFinalize() response");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for doFinalize() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public boolean canRollBack(StorageInfo storageInfo, StorageInfo storageInfo2, int i) throws IOException {
        QuorumCall<AsyncLogger, Boolean> canRollBack = this.loggers.canRollBack(storageInfo, storageInfo2, i);
        try {
            canRollBack.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "lockSharedStorage");
            if (canRollBack.countExceptions() > 0) {
                canRollBack.rethrowException("Could not check if roll back possible for one or more JournalNodes");
            }
            try {
                DFSUtil.assertAllResultsEqual(canRollBack.getResults().values());
                Iterator<Boolean> it = canRollBack.getResults().values().iterator();
                if (it.hasNext()) {
                    return it.next().booleanValue();
                }
                throw new AssertionError("Unreachable code.");
            } catch (AssertionError e) {
                throw new IOException("Results differed for canRollBack", e);
            }
        } catch (InterruptedException e2) {
            throw new IOException("Interrupted waiting for lockSharedStorage() response");
        } catch (TimeoutException e3) {
            throw new IOException("Timed out waiting for lockSharedStorage() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void doRollback() throws IOException {
        QuorumCall<AsyncLogger, Void> doRollback = this.loggers.doRollback();
        try {
            doRollback.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "doRollback");
            if (doRollback.countExceptions() > 0) {
                doRollback.rethrowException("Could not perform rollback of one or more JournalNodes");
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted waiting for doFinalize() response");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for doFinalize() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public long getJournalCTime() throws IOException {
        QuorumCall<AsyncLogger, Long> journalCTime = this.loggers.getJournalCTime();
        try {
            journalCTime.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "getJournalCTime");
            if (journalCTime.countExceptions() > 0) {
                journalCTime.rethrowException("Could not journal CTime for one more JournalNodes");
            }
            try {
                DFSUtil.assertAllResultsEqual(journalCTime.getResults().values());
                Iterator<Long> it = journalCTime.getResults().values().iterator();
                if (it.hasNext()) {
                    return it.next().longValue();
                }
                throw new AssertionError("Unreachable code.");
            } catch (AssertionError e) {
                throw new IOException("Results differed for getJournalCTime", e);
            }
        } catch (InterruptedException e2) {
            throw new IOException("Interrupted waiting for getJournalCTime() response");
        } catch (TimeoutException e3) {
            throw new IOException("Timed out waiting for getJournalCTime() response");
        }
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.JournalManager
    public void discardSegments(long j) throws IOException {
        QuorumCall<AsyncLogger, Void> discardSegments = this.loggers.discardSegments(j);
        try {
            discardSegments.waitFor(this.loggers.size(), this.loggers.size(), 0, 60000, "discardSegments");
            if (discardSegments.countExceptions() > 0) {
                discardSegments.rethrowException("Could not perform discardSegments of one or more JournalNodes");
            }
        } catch (InterruptedException e) {
            throw new IOException("Interrupted waiting for discardSegments() response");
        } catch (TimeoutException e2) {
            throw new IOException("Timed out waiting for discardSegments() response");
        }
    }

    static {
        $assertionsDisabled = !QuorumJournalManager.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(QuorumJournalManager.class);
    }
}
