package org.opensearch.rest.action.document;

import io.netty.handler.codec.rtsp.RtspHeaders;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.client.Requests;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.support.XContentHttpChunk;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;
import org.opensearch.ingest.AbstractBatchingProcessor;
import org.opensearch.ingest.PipelineProcessor;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.StreamingRestChannel;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.threadpool.ThreadPool;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ExperimentalApi
/* loaded from: input_file:WEB-INF/lib/opensearch-2.19.1.jar:org/opensearch/rest/action/document/RestBulkStreamingAction.class */
public class RestBulkStreamingAction extends BaseRestHandler {
    private static final BulkResponse EMPTY = new BulkResponse(new BulkItemResponse[0], 0);
    private final boolean allowExplicitIndex;

    public RestBulkStreamingAction(Settings settings) {
        this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings).booleanValue();
    }

    @Override // org.opensearch.rest.RestHandler
    public List<RestHandler.Route> routes() {
        return Collections.unmodifiableList(Arrays.asList(new RestHandler.Route(RestRequest.Method.POST, "/_bulk/stream"), new RestHandler.Route(RestRequest.Method.PUT, "/_bulk/stream"), new RestHandler.Route(RestRequest.Method.POST, "/{index}/_bulk/stream"), new RestHandler.Route(RestRequest.Method.PUT, "/{index}/_bulk/stream")));
    }

    @Override // org.opensearch.rest.BaseRestHandler
    public String getName() {
        return "streaming_bulk_action";
    }

    @Override // org.opensearch.rest.BaseRestHandler
    public BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
        String param = restRequest.param("index");
        String param2 = restRequest.param("routing");
        String param3 = restRequest.param(PipelineProcessor.TYPE);
        String param4 = restRequest.param("wait_for_active_shards");
        Boolean paramAsBoolean = restRequest.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, (Boolean) null);
        TimeValue paramAsTime = restRequest.paramAsTime(RtspHeaders.Values.TIMEOUT, BulkShardRequest.DEFAULT_TIMEOUT);
        String param5 = restRequest.param(ThreadPool.Names.REFRESH);
        TimeValue paramAsTime2 = restRequest.paramAsTime("batch_interval", null);
        int paramAsInt = restRequest.paramAsInt(AbstractBatchingProcessor.BATCH_SIZE_FIELD, 1);
        boolean hasParam = restRequest.hasParam(AbstractBatchingProcessor.BATCH_SIZE_FIELD);
        if (paramAsTime2 != null && paramAsTime2.duration() <= 0) {
            throw new IllegalArgumentException("The batch_interval value should be non-negative [" + paramAsTime2.millis() + "ms].");
        }
        if (paramAsInt <= 0) {
            throw new IllegalArgumentException("The batch_size value should be non-negative [" + paramAsInt + "].");
        }
        BaseRestHandler.StreamingRestChannelConsumer streamingRestChannelConsumer = streamingRestChannel -> {
            MediaType mediaType = restRequest.getMediaType();
            FetchSourceContext parseFromRestRequest = FetchSourceContext.parseFromRestRequest(restRequest);
            BulkRequest bulkRequest = Requests.bulkRequest();
            if (param4 != null) {
                bulkRequest.waitForActiveShards(ActiveShardCount.parseString(param4));
            }
            bulkRequest.timeout(paramAsTime);
            bulkRequest.setRefreshPolicy(param5);
            streamingRestChannel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters())));
            createBufferedFlux(paramAsTime2, paramAsInt, hasParam, streamingRestChannel).zipWith(Flux.fromStream(Stream.generate(() -> {
                BulkRequest bulkRequest2 = Requests.bulkRequest();
                bulkRequest2.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkRequest2.timeout(bulkRequest.timeout());
                bulkRequest2.setRefreshPolicy(bulkRequest.getRefreshPolicy());
                return bulkRequest2;
            }))).map(tuple2 -> {
                boolean z = false;
                List<HttpChunk> list = (List) tuple2.getT1();
                BulkRequest bulkRequest2 = (BulkRequest) tuple2.getT2();
                for (HttpChunk httpChunk : list) {
                    z |= httpChunk.isLast();
                    try {
                        try {
                            bulkRequest2.add(httpChunk.content(), param, param2, parseFromRestRequest, param3, paramAsBoolean, this.allowExplicitIndex, restRequest.getMediaType());
                            if (httpChunk != null) {
                                httpChunk.close();
                            }
                        } finally {
                        }
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
                return Tuple.tuple(Boolean.valueOf(z), bulkRequest2);
            }).flatMap(tuple -> {
                final CompletableFuture completableFuture = new CompletableFuture();
                if (((BulkRequest) tuple.v2()).requests().isEmpty()) {
                    completableFuture.complete(EMPTY);
                } else {
                    nodeClient.bulk((BulkRequest) tuple.v2(), new ActionListener<BulkResponse>() { // from class: org.opensearch.rest.action.document.RestBulkStreamingAction.1
                        @Override // org.opensearch.core.action.ActionListener
                        public void onResponse(BulkResponse bulkResponse) {
                            completableFuture.complete(bulkResponse);
                        }

                        @Override // org.opensearch.core.action.ActionListener
                        public void onFailure(Exception exc) {
                            completableFuture.completeExceptionally(exc);
                        }
                    });
                    if (((Boolean) tuple.v1()).booleanValue()) {
                        return Flux.just((Object[]) new CompletableFuture[]{completableFuture, CompletableFuture.completedFuture(EMPTY)});
                    }
                }
                return Mono.just(completableFuture);
            }).concatMap(completableFuture -> {
                return Mono.fromFuture(completableFuture).doOnNext(bulkResponse -> {
                    try {
                        if (bulkResponse == EMPTY) {
                            streamingRestChannel.sendChunk(XContentHttpChunk.last());
                        } else {
                            XContentBuilder newBuilder = streamingRestChannel.newBuilder(mediaType, true);
                            try {
                                streamingRestChannel.sendChunk(XContentHttpChunk.from(bulkResponse.toXContent(newBuilder, ToXContent.EMPTY_PARAMS)));
                                if (newBuilder != null) {
                                    newBuilder.close();
                                }
                            } finally {
                            }
                        }
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
            }).onErrorComplete(th -> {
                if (th instanceof Error) {
                    return false;
                }
                try {
                    streamingRestChannel.sendResponse(new BytesRestResponse(streamingRestChannel, (Exception) th));
                    return true;
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }).subscribe();
        };
        return restChannel -> {
            if (restChannel instanceof StreamingRestChannel) {
                streamingRestChannelConsumer.accept((StreamingRestChannel) restChannel);
                return;
            }
            ActionRequestValidationException actionRequestValidationException = new ActionRequestValidationException();
            actionRequestValidationException.addValidationError("Unable to initiate request / response streaming over non-streaming channel");
            restChannel.sendResponse(new BytesRestResponse(restChannel, actionRequestValidationException));
        };
    }

    @Override // org.opensearch.rest.RestHandler
    public boolean supportsContentStream() {
        return true;
    }

    @Override // org.opensearch.rest.RestHandler
    public boolean supportsStreaming() {
        return true;
    }

    @Override // org.opensearch.rest.RestHandler
    public boolean allowsUnsafeBuffers() {
        return true;
    }

    private Flux<List<HttpChunk>> createBufferedFlux(TimeValue timeValue, int i, boolean z, StreamingRestChannel streamingRestChannel) {
        return timeValue != null ? z ? Flux.from(streamingRestChannel).bufferTimeout(i, Duration.ofMillis(timeValue.millis())) : Flux.from(streamingRestChannel).buffer(Duration.ofMillis(timeValue.millis())) : Flux.from(streamingRestChannel).buffer(i);
    }
}
