package com.github.jitsni.rx.servlet.servlet.examples.basic;

import com.github.jitsni.rx.servlet.servlet.ObservableServlet;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import rx.Observable;
import rx.Observer;

@WebServlet(value = {"/test"}, asyncSupported = true)
/* loaded from: input_file:WEB-INF/classes/com/github/jitsni/rx/servlet/servlet/examples/basic/ObservableTestServlet.class */
public class ObservableTestServlet extends HttpServlet {

    /* loaded from: input_file:WEB-INF/classes/com/github/jitsni/rx/servlet/servlet/examples/basic/ObservableTestServlet$ReadObserver.class */
    static class ReadObserver implements Observer<ByteBuffer> {
        private final HttpServletResponse resp;
        private final AsyncContext ac;

        ReadObserver(HttpServletResponse httpServletResponse, AsyncContext asyncContext) {
            this.resp = httpServletResponse;
            this.ac = asyncContext;
        }

        @Override // rx.Observer
        public void onCompleted() {
            System.out.println("Read onCompleted=" + Thread.currentThread());
            this.resp.setStatus(HttpServletResponse.SC_OK);
            Observable<ByteBuffer> data = data();
            ServletOutputStream servletOutputStream = null;
            try {
                servletOutputStream = this.resp.getOutputStream();
            } catch (IOException e) {
                e.printStackTrace();
            }
            ObservableServlet.write(data, servletOutputStream).subscribe(new WriteObserver(this.ac));
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            System.out.println("read onError=" + Thread.currentThread());
            th.printStackTrace();
        }

        @Override // rx.Observer
        public void onNext(ByteBuffer byteBuffer) {
        }

        Observable<ByteBuffer> data() {
            ByteBuffer[] byteBufferArr = new ByteBuffer[1000000];
            for (int i = 0; i < byteBufferArr.length; i++) {
                byteBufferArr[i] = ByteBuffer.wrap((i + "0000000000000\n").getBytes());
            }
            return Observable.from(byteBufferArr);
        }
    }

    /* loaded from: input_file:WEB-INF/classes/com/github/jitsni/rx/servlet/servlet/examples/basic/ObservableTestServlet$WriteObserver.class */
    static class WriteObserver implements Observer<Void> {
        private final AsyncContext ac;

        public WriteObserver(AsyncContext asyncContext) {
            this.ac = asyncContext;
        }

        @Override // rx.Observer
        public void onCompleted() {
            System.out.println("Composite Write onCompleted");
            this.ac.complete();
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            System.out.println("write onError=" + Thread.currentThread());
            th.printStackTrace();
        }

        @Override // rx.Observer
        public void onNext(Void r2) {
        }
    }

    @Override // javax.servlet.http.HttpServlet
    protected void doGet(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException {
        httpServletResponse.setStatus(HttpServletResponse.SC_OK);
        PrintWriter writer = httpServletResponse.getWriter();
        writer.write("Working ...");
        writer.flush();
    }

    @Override // javax.servlet.http.HttpServlet
    protected void doPost(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException {
        ObservableServlet.create(httpServletRequest.getInputStream()).subscribe(new ReadObserver(httpServletResponse, httpServletRequest.startAsync()));
    }
}
