package foundation.stack.datamill.http.impl;

import com.google.common.collect.Multimap;
import foundation.stack.datamill.http.Entity;
import foundation.stack.datamill.http.Response;
import foundation.stack.datamill.http.Route;
import foundation.stack.datamill.http.ServerRequest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.subjects.ReplaySubject;

/* loaded from: input_file:foundation/stack/datamill/http/impl/ClientToServerChannelHandler.class */
public class ClientToServerChannelHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(ClientToServerChannelHandler.class);
    private final BiFunction<ServerRequest, Throwable, Observable<Response>> errorResponseConstructor;
    private final Route route;
    private final ExecutorService threadPool;
    private volatile boolean channelClosed;
    private volatile Subscription entitySubscription;
    private ReplaySubject<byte[]> entityStream;
    private ServerRequestImpl serverRequest;

    public ClientToServerChannelHandler(ExecutorService executorService, Route route, BiFunction<ServerRequest, Throwable, Observable<Response>> biFunction) {
        this.threadPool = executorService;
        this.route = route;
        this.errorResponseConstructor = biFunction;
    }

    private void sendGeneralServerError(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.channelClosed = true;
        if (this.entitySubscription != null) {
            if (!this.entitySubscription.isUnsubscribed()) {
                this.entitySubscription.unsubscribe();
            }
            this.entitySubscription = null;
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (obj instanceof HttpRequest) {
            HttpRequest httpRequest = (HttpRequest) obj;
            if (HttpUtil.is100ContinueExpected(httpRequest)) {
                sendContinueResponse(channelHandlerContext);
            }
            this.entityStream = ReplaySubject.create();
            this.serverRequest = ServerRequestBuilder.buildServerRequest(httpRequest, this.entityStream, this.threadPool);
            processRequest(channelHandlerContext, httpRequest);
            if (httpRequest.decoderResult().isFailure()) {
                this.entityStream.onError(httpRequest.decoderResult().cause());
            }
        }
        if (obj instanceof HttpContent) {
            HttpContent httpContent = (HttpContent) obj;
            ByteBuf content = httpContent.content();
            if (content.isReadable()) {
                byte[] bArr = new byte[content.readableBytes()];
                content.readBytes(bArr);
                this.entityStream.onNext(bArr);
                if (httpContent.decoderResult().isFailure()) {
                    this.entityStream.onError(httpContent.decoderResult().cause());
                }
            }
            if (obj instanceof LastHttpContent) {
                LastHttpContent lastHttpContent = (LastHttpContent) obj;
                if (!lastHttpContent.trailingHeaders().isEmpty()) {
                    this.serverRequest.setTrailingHeaders(ServerRequestBuilder.buildHeadersMap(lastHttpContent.trailingHeaders()));
                }
                this.entityStream.onCompleted();
            }
        }
    }

    private void processRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
        this.threadPool.execute(() -> {
            try {
                Observable<Response> apply = this.route.apply(this.serverRequest);
                if (apply != null) {
                    this.threadPool.execute(() -> {
                        sendResponse(channelHandlerContext, httpRequest, (Response) apply.onErrorResumeNext(th -> {
                            Observable<Response> apply2 = this.errorResponseConstructor.apply(this.serverRequest, th);
                            if (apply2 == null) {
                                return Observable.just((Object) null);
                            }
                            logger.debug("Error occurred handling request, invoking application error handler");
                            return apply2.onErrorResumeNext(Observable.just((Object) null));
                        }).toBlocking().lastOrDefault((Object) null));
                    });
                } else {
                    logger.debug("Error occurred handling request, sending a generic server error (500)");
                    sendGeneralServerError(channelHandlerContext);
                }
            } catch (Exception e) {
                logger.debug("Error occurred handling request, sending a generic server error (500)", e);
                sendGeneralServerError(channelHandlerContext);
            }
        });
    }

    private void fillResponse(HttpRequest httpRequest, HttpResponse httpResponse, Multimap<String, String> multimap, int i) {
        if (HttpUtil.isKeepAlive(httpRequest)) {
            httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        if (i > -1) {
            httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, i);
        }
        if (multimap == null || multimap.size() <= 0) {
            return;
        }
        for (Map.Entry entry : multimap.entries()) {
            httpResponse.headers().add((String) entry.getKey(), entry.getValue());
        }
    }

    private void sendResponseStart(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, int i, Multimap<String, String> multimap, int i2) {
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.valueOf(i));
        fillResponse(httpRequest, defaultHttpResponse, multimap, i2);
        channelHandlerContext.write(defaultHttpResponse);
    }

    private void sendContent(ChannelHandlerContext channelHandlerContext, byte[] bArr) {
        channelHandlerContext.writeAndFlush(new DefaultHttpContent(bArr == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(bArr)));
    }

    private void sendResponseEnd(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
        writeAndFlush(channelHandlerContext, httpRequest, LastHttpContent.EMPTY_LAST_CONTENT);
    }

    private void sendFullResponse(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, int i, Multimap<String, String> multimap) {
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.valueOf(i), Unpooled.EMPTY_BUFFER);
        fillResponse(httpRequest, defaultFullHttpResponse, multimap, 0);
        writeAndFlush(channelHandlerContext, httpRequest, defaultFullHttpResponse);
    }

    private void sendResponse(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, Response response) {
        Entity entity = response.entity();
        if (entity != null) {
            this.threadPool.execute(() -> {
                if (this.channelClosed) {
                    return;
                }
                boolean[] zArr = {true};
                this.entitySubscription = entity.asChunks().doOnNext(bArr -> {
                    if (!zArr[0]) {
                        sendContent(channelHandlerContext, bArr);
                        return;
                    }
                    sendResponseStart(channelHandlerContext, httpRequest, response.status().getCode(), response.headers(), bArr == null ? -1 : bArr.length);
                    sendContent(channelHandlerContext, bArr);
                    zArr[0] = false;
                }).finallyDo(() -> {
                    sendResponseEnd(channelHandlerContext, httpRequest);
                }).subscribe();
            });
        } else {
            sendFullResponse(channelHandlerContext, httpRequest, response.status().getCode(), response.headers());
        }
    }

    private static void sendContinueResponse(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
    }

    private void writeAndFlush(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpObject httpObject) {
        ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(httpObject);
        if (HttpUtil.isKeepAlive(httpRequest)) {
            return;
        }
        writeAndFlush.addListener(ChannelFutureListener.CLOSE);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        th.printStackTrace();
        channelHandlerContext.close();
    }
}
