package io.atomix.protocols.log.impl;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.Replication;
import io.atomix.primitive.partition.GroupMember;
import io.atomix.primitive.partition.ManagedMemberGroupService;
import io.atomix.primitive.partition.MemberGroup;
import io.atomix.primitive.partition.PrimaryElection;
import io.atomix.primitive.partition.PrimaryElectionEventListener;
import io.atomix.primitive.partition.PrimaryTerm;
import io.atomix.protocols.log.DistributedLogServer;
import io.atomix.protocols.log.protocol.AppendRequest;
import io.atomix.protocols.log.protocol.AppendResponse;
import io.atomix.protocols.log.protocol.BackupRequest;
import io.atomix.protocols.log.protocol.BackupResponse;
import io.atomix.protocols.log.protocol.ConsumeRequest;
import io.atomix.protocols.log.protocol.ConsumeResponse;
import io.atomix.protocols.log.protocol.LogEntry;
import io.atomix.protocols.log.protocol.LogResponse;
import io.atomix.protocols.log.protocol.LogServerProtocol;
import io.atomix.protocols.log.protocol.ResetRequest;
import io.atomix.protocols.log.roles.FollowerRole;
import io.atomix.protocols.log.roles.LeaderRole;
import io.atomix.protocols.log.roles.LogServerRole;
import io.atomix.protocols.log.roles.NoneRole;
import io.atomix.storage.journal.JournalReader;
import io.atomix.storage.journal.JournalSegment;
import io.atomix.storage.journal.JournalWriter;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.utils.Managed;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.Scheduled;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLogger;
import io.atomix.utils.logging.LoggerContext;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/atomix/protocols/log/impl/DistributedLogServerContext.class */
public class DistributedLogServerContext implements Managed<Void> {
    private final Logger log;
    private final String serverName;
    private final MemberId memberId;
    private final ClusterMembershipService clusterMembershipService;
    private final ManagedMemberGroupService memberGroupService;
    private final LogServerProtocol protocol;
    private final int replicationFactor;
    private final Replication replicationStrategy;
    private final ThreadContextFactory threadContextFactory;
    private final ThreadContext threadContext;
    private final boolean closeOnStop;
    private final PrimaryElection primaryElection;
    private MemberId leader;
    private List<MemberId> followers;
    private LogServerRole role;
    private long currentTerm;
    private long commitIndex;
    private final SegmentedJournal<LogEntry> journal;
    private final JournalWriter<LogEntry> writer;
    private final JournalReader<LogEntry> reader;
    private final long maxLogSize;
    private final Duration maxLogAge;
    private Scheduled compactTimer;
    private final PrimaryElectionEventListener primaryElectionListener = primaryElectionEvent -> {
        changeRole(primaryElectionEvent.term());
    };
    private final AtomicBoolean started = new AtomicBoolean();

    public DistributedLogServerContext(String str, ClusterMembershipService clusterMembershipService, ManagedMemberGroupService managedMemberGroupService, LogServerProtocol logServerProtocol, PrimaryElection primaryElection, int i, Replication replication, SegmentedJournal<LogEntry> segmentedJournal, long j, Duration duration, ThreadContextFactory threadContextFactory, boolean z) {
        this.serverName = str;
        this.memberId = clusterMembershipService.getLocalMember().id();
        this.clusterMembershipService = clusterMembershipService;
        this.memberGroupService = managedMemberGroupService;
        this.protocol = logServerProtocol;
        this.replicationFactor = i;
        this.replicationStrategy = replication;
        this.threadContextFactory = threadContextFactory;
        this.threadContext = threadContextFactory.createContext();
        this.closeOnStop = z;
        this.journal = segmentedJournal;
        this.writer = segmentedJournal.writer();
        this.reader = segmentedJournal.openReader(1L);
        this.maxLogSize = j;
        this.maxLogAge = duration;
        this.primaryElection = primaryElection;
        this.log = new ContextualLogger(LoggerFactory.getLogger(getClass()), LoggerContext.builder(getClass()).addValue(str).build());
    }

    public String serverName() {
        return this.serverName;
    }

    public MemberId memberId() {
        return this.memberId;
    }

    public LogServerProtocol protocol() {
        return this.protocol;
    }

    public SegmentedJournal<LogEntry> journal() {
        return this.journal;
    }

    public JournalWriter<LogEntry> writer() {
        return this.writer;
    }

    public JournalReader<LogEntry> reader() {
        return this.reader;
    }

    public int replicationFactor() {
        return this.replicationFactor;
    }

    public Replication replicationStrategy() {
        return this.replicationStrategy;
    }

    public ThreadContext threadContext() {
        return this.threadContext;
    }

    public DistributedLogServer.Role getRole() {
        return Objects.equals(((PrimaryTerm) Futures.get(this.primaryElection.getTerm())).primary().memberId(), this.clusterMembershipService.getLocalMember().id()) ? DistributedLogServer.Role.LEADER : DistributedLogServer.Role.FOLLOWER;
    }

    public MemberId leader() {
        return this.leader;
    }

    public List<MemberId> followers() {
        return this.followers;
    }

    public long currentTerm() {
        return this.currentTerm;
    }

    public void resetTerm(long j, MemberId memberId) {
        this.currentTerm = j;
        this.leader = memberId;
    }

    public long setCommitIndex(long j) {
        this.commitIndex = Math.max(this.commitIndex, j);
        this.writer.commit(this.commitIndex);
        return this.commitIndex;
    }

    public long getCommitIndex() {
        return this.commitIndex;
    }

    public void compact() {
        compactBySize();
        compactByAge();
    }

    private void compactBySize() {
        if (this.maxLogSize <= 0 || this.journal.size() <= this.maxLogSize) {
            return;
        }
        JournalSegment journalSegment = null;
        Long l = null;
        Iterator it = this.journal.segments().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            JournalSegment journalSegment2 = (JournalSegment) it.next();
            if (this.journal.segments(journalSegment2.lastIndex() + 1).stream().mapToLong((v0) -> {
                return v0.size();
            }).sum() > this.maxLogSize) {
                this.log.debug("Found outsize journal segment {}", journalSegment2.file().file());
                journalSegment = journalSegment2;
            } else if (journalSegment != null) {
                l = Long.valueOf(journalSegment2.index());
                break;
            }
        }
        if (l != null) {
            this.log.info("Compacting journal by size up to {}", l);
            this.journal.compact(l.longValue());
        }
    }

    private void compactByAge() {
        if (this.maxLogAge != null) {
            long currentTimeMillis = System.currentTimeMillis();
            JournalSegment journalSegment = null;
            Long l = null;
            Iterator it = this.journal.segments().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                JournalSegment journalSegment2 = (JournalSegment) it.next();
                if (currentTimeMillis - journalSegment2.descriptor().updated() > this.maxLogAge.toMillis()) {
                    this.log.debug("Found expired journal segment {}", journalSegment2.file().file());
                    journalSegment = journalSegment2;
                } else if (journalSegment != null) {
                    l = Long.valueOf(journalSegment2.index());
                    break;
                }
            }
            if (l != null) {
                this.log.info("Compacting journal by age up to {}", l);
                this.journal.compact(l.longValue());
            }
        }
    }

    public CompletableFuture<Void> start() {
        registerListeners();
        this.compactTimer = this.threadContext.schedule(Duration.ofSeconds(30L), this::compact);
        return this.memberGroupService.start().thenComposeAsync(memberGroupService -> {
            MemberGroup memberGroup = this.memberGroupService.getMemberGroup(this.clusterMembershipService.getLocalMember());
            this.primaryElection.addListener(this.primaryElectionListener);
            return memberGroup != null ? this.primaryElection.enter(new GroupMember(this.clusterMembershipService.getLocalMember().id(), memberGroup.id())).thenApply(primaryTerm -> {
                changeRole(primaryTerm);
                return null;
            }) : CompletableFuture.completedFuture(null);
        }, (Executor) this.threadContext).thenApply(obj -> {
            this.started.set(true);
            return null;
        });
    }

    private void changeRole(PrimaryTerm primaryTerm) {
        this.threadContext.execute(() -> {
            if (primaryTerm.term() >= this.currentTerm) {
                this.log.debug("{} - Term changed: {}", this.memberId, primaryTerm);
                this.currentTerm = primaryTerm.term();
                this.leader = primaryTerm.primary() != null ? primaryTerm.primary().memberId() : null;
                this.followers = (List) primaryTerm.backups(this.replicationFactor - 1).stream().map((v0) -> {
                    return v0.memberId();
                }).collect(Collectors.toList());
                if (Objects.equals(this.leader, this.clusterMembershipService.getLocalMember().id())) {
                    if (this.role == null) {
                        this.role = new LeaderRole(this);
                        this.log.debug("{} transitioning to {}", this.clusterMembershipService.getLocalMember().id(), DistributedLogServer.Role.LEADER);
                        return;
                    } else {
                        if (this.role.role() != DistributedLogServer.Role.LEADER) {
                            this.role.close();
                            this.role = new LeaderRole(this);
                            this.log.debug("{} transitioning to {}", this.clusterMembershipService.getLocalMember().id(), DistributedLogServer.Role.LEADER);
                            return;
                        }
                        return;
                    }
                }
                if (this.followers.contains(this.clusterMembershipService.getLocalMember().id())) {
                    if (this.role == null) {
                        this.role = new FollowerRole(this);
                        this.log.debug("{} transitioning to {}", this.clusterMembershipService.getLocalMember().id(), DistributedLogServer.Role.FOLLOWER);
                        return;
                    } else {
                        if (this.role.role() != DistributedLogServer.Role.FOLLOWER) {
                            this.role.close();
                            this.role = new FollowerRole(this);
                            this.log.debug("{} transitioning to {}", this.clusterMembershipService.getLocalMember().id(), DistributedLogServer.Role.FOLLOWER);
                            return;
                        }
                        return;
                    }
                }
                if (this.role == null) {
                    this.role = new NoneRole(this);
                    this.log.debug("{} transitioning to {}", this.clusterMembershipService.getLocalMember().id(), DistributedLogServer.Role.NONE);
                } else if (this.role.role() != DistributedLogServer.Role.NONE) {
                    this.role.close();
                    this.role = new NoneRole(this);
                    this.log.debug("{} transitioning to {}", this.clusterMembershipService.getLocalMember().id(), DistributedLogServer.Role.NONE);
                }
            }
        });
    }

    private CompletableFuture<AppendResponse> append(AppendRequest appendRequest) {
        return runOnContext(() -> {
            return this.role.append(appendRequest);
        });
    }

    private CompletableFuture<BackupResponse> backup(BackupRequest backupRequest) {
        return runOnContext(() -> {
            return this.role.backup(backupRequest);
        });
    }

    private CompletableFuture<ConsumeResponse> consume(ConsumeRequest consumeRequest) {
        return runOnContext(() -> {
            return this.role.consume(consumeRequest);
        });
    }

    private void reset(ResetRequest resetRequest) {
        this.role.reset(resetRequest);
    }

    private <R extends LogResponse> CompletableFuture<R> runOnContext(Supplier<CompletableFuture<R>> supplier) {
        CompletableFuture<R> completableFuture = new CompletableFuture<>();
        this.threadContext.execute(() -> {
            ((CompletableFuture) supplier.get()).whenComplete((logResponse, th) -> {
                if (th == null) {
                    completableFuture.complete(logResponse);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            });
        });
        return completableFuture;
    }

    private void registerListeners() {
        this.protocol.registerAppendHandler(this::append);
        this.protocol.registerBackupHandler(this::backup);
        this.protocol.registerConsumeHandler(this::consume);
        this.protocol.registerResetConsumer(this::reset, this.threadContext);
    }

    private void unregisterListeners() {
        this.protocol.unregisterAppendHandler();
        this.protocol.unregisterBackupHandler();
        this.protocol.unregisterConsumeHandler();
        this.protocol.unregisterResetConsumer();
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public CompletableFuture<Void> stop() {
        unregisterListeners();
        this.primaryElection.removeListener(this.primaryElectionListener);
        if (this.compactTimer != null) {
            this.compactTimer.cancel();
        }
        this.journal.close();
        this.started.set(false);
        return this.memberGroupService.stop().exceptionally(th -> {
            this.log.error("Failed stopping member group service", th);
            return null;
        }).thenRunAsync(() -> {
            if (this.closeOnStop) {
                this.threadContextFactory.close();
            }
        });
    }
}
