package io.atomix.protocols.log.partition.impl;

import com.google.common.base.Preconditions;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.protocols.log.protocol.AppendRequest;
import io.atomix.protocols.log.protocol.AppendResponse;
import io.atomix.protocols.log.protocol.BackupRequest;
import io.atomix.protocols.log.protocol.BackupResponse;
import io.atomix.protocols.log.protocol.ConsumeRequest;
import io.atomix.protocols.log.protocol.ConsumeResponse;
import io.atomix.protocols.log.protocol.LogServerProtocol;
import io.atomix.protocols.log.protocol.RecordsRequest;
import io.atomix.protocols.log.protocol.ResetRequest;
import io.atomix.utils.serializer.Serializer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/atomix/protocols/log/partition/impl/LogServerCommunicator.class */
public class LogServerCommunicator implements LogServerProtocol {
    private final LogMessageContext context;
    private final Serializer serializer;
    private final ClusterCommunicationService clusterCommunicator;

    public LogServerCommunicator(String str, Serializer serializer, ClusterCommunicationService clusterCommunicationService) {
        this.context = new LogMessageContext(str);
        this.serializer = (Serializer) Preconditions.checkNotNull(serializer, "serializer cannot be null");
        this.clusterCommunicator = (ClusterCommunicationService) Preconditions.checkNotNull(clusterCommunicationService, "clusterCommunicator cannot be null");
    }

    private <T> void unicast(String str, T t, MemberId memberId) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.unicast(str, t, serializer::encode, memberId, false);
    }

    private <T, U> CompletableFuture<U> send(String str, T t, MemberId memberId) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function = serializer::encode;
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        return clusterCommunicationService.send(str, t, function, serializer2::decode, memberId);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void produce(MemberId memberId, String str, RecordsRequest recordsRequest) {
        unicast(str, recordsRequest, memberId);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public CompletableFuture<BackupResponse> backup(MemberId memberId, BackupRequest backupRequest) {
        return send(this.context.backupSubject, backupRequest, memberId);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void registerBackupHandler(Function<BackupRequest, CompletableFuture<BackupResponse>> function) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.context.backupSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function2 = serializer::decode;
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.subscribe(str, function2, function, (v1) -> {
            return r4.encode(v1);
        });
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void unregisterBackupHandler() {
        this.clusterCommunicator.unsubscribe(this.context.backupSubject);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> function) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.context.appendSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function2 = serializer::decode;
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.subscribe(str, function2, function, (v1) -> {
            return r4.encode(v1);
        });
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void unregisterAppendHandler() {
        this.clusterCommunicator.unsubscribe(this.context.appendSubject);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void registerConsumeHandler(Function<ConsumeRequest, CompletableFuture<ConsumeResponse>> function) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.context.consumeSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function2 = serializer::decode;
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.subscribe(str, function2, function, (v1) -> {
            return r4.encode(v1);
        });
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void unregisterConsumeHandler() {
        this.clusterCommunicator.unsubscribe(this.context.consumeSubject);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void registerResetConsumer(Consumer<ResetRequest> consumer, Executor executor) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        String str = this.context.resetSubject;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.subscribe(str, serializer::decode, consumer, executor);
    }

    @Override // io.atomix.protocols.log.protocol.LogServerProtocol
    public void unregisterResetConsumer() {
        this.clusterCommunicator.unsubscribe(this.context.resetSubject);
    }
}
