package de.otto.synapse.channel;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import de.otto.synapse.endpoint.MessageInterceptorRegistry;
import de.otto.synapse.endpoint.receiver.AbstractMessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageLogReceiverEndpoint;
import de.otto.synapse.endpoint.receiver.MessageQueueReceiverEndpoint;
import de.otto.synapse.info.MessageReceiverStatus;
import de.otto.synapse.message.Header;
import de.otto.synapse.message.Message;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

/* loaded from: input_file:de/otto/synapse/channel/InMemoryChannel.class */
public class InMemoryChannel extends AbstractMessageLogReceiverEndpoint implements MessageLogReceiverEndpoint, MessageQueueReceiverEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryChannel.class);
    private final List<Message<String>> eventQueue;
    private final AtomicBoolean stopSignal;

    public InMemoryChannel(String str, MessageInterceptorRegistry messageInterceptorRegistry) {
        super(str, messageInterceptorRegistry, (ApplicationEventPublisher) null);
        this.stopSignal = new AtomicBoolean(false);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public InMemoryChannel(String str, MessageInterceptorRegistry messageInterceptorRegistry, ApplicationEventPublisher applicationEventPublisher) {
        super(str, messageInterceptorRegistry, applicationEventPublisher);
        this.stopSignal = new AtomicBoolean(false);
        this.eventQueue = Collections.synchronizedList(new ArrayList());
    }

    public synchronized void send(Message<String> message) {
        int size = this.eventQueue.size();
        LOG.info("Sending {} to {} at position{}", new Object[]{message, getChannelName(), Integer.valueOf(size)});
        this.eventQueue.add(Message.message(message.getKey(), Header.of(ShardPosition.fromPosition(getChannelName(), String.valueOf(size)), ImmutableMap.builder().putAll(message.getHeader().getAll()).put("synapse_msg_arrival_ts", Instant.now().toString()).build()), message.getPayload()));
    }

    @Nonnull
    public CompletableFuture<ChannelPosition> consumeUntil(@Nonnull ChannelPosition channelPosition, @Nonnull Predicate<ShardResponse> predicate) {
        publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + getChannelName(), null);
        publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + getChannelName(), null);
        return CompletableFuture.supplyAsync(() -> {
            ImmutableList of;
            ShardPosition fromPosition;
            AtomicInteger atomicInteger = new AtomicInteger(positionOf(channelPosition.shard(getChannelName())));
            do {
                if (hasMessageAfter(atomicInteger.get())) {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    Message<String> message = this.eventQueue.get(incrementAndGet);
                    of = ImmutableList.of(message);
                    LOG.info("Received message from channel={} at position={}: message={}", new Object[]{getChannelName(), Integer.valueOf(incrementAndGet), message});
                    Message intercept = intercept(message);
                    if (intercept != null) {
                        getMessageDispatcher().accept(intercept);
                    }
                } else {
                    of = ImmutableList.of();
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
                fromPosition = ShardPosition.fromPosition(getChannelName(), String.valueOf(atomicInteger));
                if (predicate.test(ShardResponse.shardResponse(fromPosition, durationBehind(atomicInteger.get()), of))) {
                    break;
                }
            } while (!this.stopSignal.get());
            publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + getChannelName(), null);
            return ChannelPosition.channelPosition(new ShardPosition[]{fromPosition});
        }, Executors.newSingleThreadExecutor());
    }

    private int positionOf(ShardPosition shardPosition) {
        if (shardPosition.startFrom() == StartFrom.HORIZON) {
            return -1;
        }
        return Integer.valueOf(shardPosition.position()).intValue();
    }

    private Duration durationBehind(int i) {
        return (i != -1 || this.eventQueue.size() <= 0) ? (i < 0 || i > this.eventQueue.size()) ? Duration.ZERO : Duration.between(((Message) Iterables.getLast(this.eventQueue)).getHeader().getAsInstant("synapse_msg_arrival_ts"), this.eventQueue.get(i).getHeader().getAsInstant("synapse_msg_arrival_ts")).abs() : Duration.between(((Message) Iterables.getLast(this.eventQueue)).getHeader().getAsInstant("synapse_msg_arrival_ts"), this.eventQueue.get(this.eventQueue.size() - 1).getHeader().getAsInstant("synapse_msg_arrival_ts")).abs();
    }

    public CompletableFuture<Void> consume() {
        publishEvent(MessageReceiverStatus.STARTING, "Starting InMemoryChannel " + getChannelName(), null);
        Message message = this.eventQueue.isEmpty() ? null : (Message) Iterables.getLast(this.eventQueue);
        ChannelDurationBehind build = message != null ? ChannelDurationBehind.channelDurationBehind().with(getChannelName(), Duration.between(message.getHeader().getAsInstant("synapse_msg_arrival_ts"), Instant.now())).build() : null;
        publishEvent(MessageReceiverStatus.STARTED, "Started InMemoryChannel " + getChannelName(), build);
        return CompletableFuture.supplyAsync(() -> {
            do {
                if (this.eventQueue.isEmpty()) {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                } else {
                    Message<String> remove = this.eventQueue.remove(0);
                    Message intercept = intercept(Message.message(remove.getKey(), Header.of((ShardPosition) null, remove.getHeader().getAll()), remove.getPayload()));
                    if (intercept != null) {
                        getMessageDispatcher().accept(intercept);
                    }
                }
            } while (!this.stopSignal.get());
            publishEvent(MessageReceiverStatus.FINISHED, "Finished InMemoryChannel " + getChannelName(), build);
            return null;
        }, Executors.newSingleThreadExecutor());
    }

    public void stop() {
        this.stopSignal.set(true);
    }

    private synchronized boolean hasMessageAfter(int i) {
        return this.eventQueue.size() > i + 1;
    }
}
