package org.springframework.batch.integration.partition;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.JobExplorerFactoryBean;
import org.springframework.batch.core.partition.support.AbstractPartitionHandler;
import org.springframework.batch.poller.DirectPoller;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Payloads;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

@MessageEndpoint
/* loaded from: input_file:org/springframework/batch/integration/partition/MessageChannelPartitionHandler.class */
public class MessageChannelPartitionHandler extends AbstractPartitionHandler implements InitializingBean {
    private static Log logger = LogFactory.getLog(MessageChannelPartitionHandler.class);
    private MessagingTemplate messagingGateway;
    private String stepName;
    private JobExplorer jobExplorer;
    private DataSource dataSource;
    private PollableChannel replyChannel;
    private long pollInterval = 10000;
    private boolean pollRepositoryForResults = false;
    private long timeout = -1;

    public void afterPropertiesSet() throws Exception {
        Assert.state(this.stepName != null, "A step name must be provided for the remote workers.");
        Assert.state(this.messagingGateway != null, "The MessagingOperations must be set");
        this.pollRepositoryForResults = (this.dataSource == null && this.jobExplorer == null) ? false : true;
        if (this.pollRepositoryForResults) {
            logger.debug("MessageChannelPartitionHandler is configured to poll the job repository for worker results");
        }
        if (this.dataSource != null && this.jobExplorer == null) {
            JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
            jobExplorerFactoryBean.setDataSource(this.dataSource);
            jobExplorerFactoryBean.afterPropertiesSet();
            this.jobExplorer = jobExplorerFactoryBean.getObject();
        }
        if (this.pollRepositoryForResults || this.replyChannel != null) {
            return;
        }
        this.replyChannel = new QueueChannel();
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void setJobExplorer(JobExplorer jobExplorer) {
        this.jobExplorer = jobExplorer;
    }

    public void setPollInterval(long j) {
        this.pollInterval = j;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void setMessagingOperations(MessagingTemplate messagingTemplate) {
        this.messagingGateway = messagingTemplate;
    }

    public void setStepName(String str) {
        this.stepName = str;
    }

    @Aggregator(sendPartialResultsOnExpiry = "true")
    public List<?> aggregate(@Payloads List<?> list) {
        return list;
    }

    public void setReplyChannel(PollableChannel pollableChannel) {
        this.replyChannel = pollableChannel;
    }

    protected Set<StepExecution> doHandle(StepExecution stepExecution, Set<StepExecution> set) throws Exception {
        if (CollectionUtils.isEmpty(set)) {
            return set;
        }
        int i = 0;
        for (StepExecution stepExecution2 : set) {
            int i2 = i;
            i++;
            Message<StepExecutionRequest> createMessage = createMessage(i2, set.size(), new StepExecutionRequest(this.stepName, stepExecution2.getJobExecutionId(), stepExecution2.getId()), this.replyChannel);
            if (logger.isDebugEnabled()) {
                logger.debug("Sending request: " + createMessage);
            }
            this.messagingGateway.send(createMessage);
        }
        return !this.pollRepositoryForResults ? receiveReplies(this.replyChannel) : pollReplies(stepExecution, set);
    }

    private Set<StepExecution> pollReplies(final StepExecution stepExecution, final Set<StepExecution> set) throws Exception {
        final HashSet hashSet = new HashSet(set.size());
        Future poll = new DirectPoller(this.pollInterval).poll(new Callable<Set<StepExecution>>() { // from class: org.springframework.batch.integration.partition.MessageChannelPartitionHandler.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Set<StepExecution> call() throws Exception {
                Set set2 = (Set) set.stream().map((v0) -> {
                    return v0.getId();
                }).collect(Collectors.toSet());
                Stream filter = MessageChannelPartitionHandler.this.jobExplorer.getJobExecution(stepExecution.getJobExecutionId()).getStepExecutions().stream().filter(stepExecution2 -> {
                    return set2.contains(stepExecution2.getId());
                });
                Set set3 = hashSet;
                Stream filter2 = filter.filter(stepExecution3 -> {
                    return !set3.contains(stepExecution3);
                }).filter(stepExecution4 -> {
                    return !stepExecution4.getStatus().isRunning();
                });
                Set set4 = hashSet;
                Objects.requireNonNull(set4);
                filter2.forEach((v1) -> {
                    r1.add(v1);
                });
                if (MessageChannelPartitionHandler.logger.isDebugEnabled()) {
                    MessageChannelPartitionHandler.logger.debug(String.format("Currently waiting on %s partitions to finish", Integer.valueOf(set.size())));
                }
                if (hashSet.size() == set.size()) {
                    return hashSet;
                }
                return null;
            }
        });
        return this.timeout >= 0 ? (Set) poll.get(this.timeout, TimeUnit.MILLISECONDS) : (Set) poll.get();
    }

    private Set<StepExecution> receiveReplies(PollableChannel pollableChannel) {
        Message receive = this.messagingGateway.receive(pollableChannel);
        if (receive == null) {
            throw new MessageTimeoutException("Timeout occurred before all partitions returned");
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Received replies: " + receive);
        }
        return new HashSet((Collection) receive.getPayload());
    }

    private Message<StepExecutionRequest> createMessage(int i, int i2, StepExecutionRequest stepExecutionRequest, PollableChannel pollableChannel) {
        return MessageBuilder.withPayload(stepExecutionRequest).setSequenceNumber(Integer.valueOf(i)).setSequenceSize(Integer.valueOf(i2)).setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName()).setReplyChannel(pollableChannel).build();
    }
}
