package dev.langchain4j.http.client;

import dev.langchain4j.exception.HttpException;
import dev.langchain4j.http.client.sse.DefaultServerSentEventParser;
import dev.langchain4j.http.client.sse.ServerSentEvent;
import dev.langchain4j.http.client.sse.ServerSentEventListener;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

@EnabledIfEnvironmentVariable(named = "OPENAI_API_KEY", matches = ".+")
/* loaded from: input_file:dev/langchain4j/http/client/HttpClientIT.class */
public abstract class HttpClientIT {
    private static final String OPENAI_API_KEY = System.getenv("OPENAI_API_KEY");

    /* renamed from: dev.langchain4j.http.client.HttpClientIT$1StreamingResult, reason: invalid class name */
    /* loaded from: input_file:dev/langchain4j/http/client/HttpClientIT$1StreamingResult.class */
    static final class C1StreamingResult extends Record {
        private final SuccessfulHttpResponse response;
        private final List<ServerSentEvent> events;
        private final Set<Thread> threads;

        C1StreamingResult(SuccessfulHttpResponse successfulHttpResponse, List<ServerSentEvent> list, Set<Thread> set) {
            this.response = successfulHttpResponse;
            this.events = list;
            this.threads = set;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, C1StreamingResult.class), C1StreamingResult.class, "response;events;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->response:Ldev/langchain4j/http/client/SuccessfulHttpResponse;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->events:Ljava/util/List;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, C1StreamingResult.class), C1StreamingResult.class, "response;events;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->response:Ldev/langchain4j/http/client/SuccessfulHttpResponse;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->events:Ljava/util/List;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, C1StreamingResult.class, Object.class), C1StreamingResult.class, "response;events;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->response:Ldev/langchain4j/http/client/SuccessfulHttpResponse;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->events:Ljava/util/List;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$1StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public SuccessfulHttpResponse response() {
            return this.response;
        }

        public List<ServerSentEvent> events() {
            return this.events;
        }

        public Set<Thread> threads() {
            return this.threads;
        }
    }

    /* renamed from: dev.langchain4j.http.client.HttpClientIT$2StreamingResult, reason: invalid class name */
    /* loaded from: input_file:dev/langchain4j/http/client/HttpClientIT$2StreamingResult.class */
    static final class C2StreamingResult extends Record {
        private final SuccessfulHttpResponse response;
        private final List<ServerSentEvent> events;
        private final Set<Thread> threads;

        C2StreamingResult(SuccessfulHttpResponse successfulHttpResponse, List<ServerSentEvent> list, Set<Thread> set) {
            this.response = successfulHttpResponse;
            this.events = list;
            this.threads = set;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, C2StreamingResult.class), C2StreamingResult.class, "response;events;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->response:Ldev/langchain4j/http/client/SuccessfulHttpResponse;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->events:Ljava/util/List;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, C2StreamingResult.class), C2StreamingResult.class, "response;events;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->response:Ldev/langchain4j/http/client/SuccessfulHttpResponse;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->events:Ljava/util/List;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, C2StreamingResult.class, Object.class), C2StreamingResult.class, "response;events;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->response:Ldev/langchain4j/http/client/SuccessfulHttpResponse;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->events:Ljava/util/List;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$2StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public SuccessfulHttpResponse response() {
            return this.response;
        }

        public List<ServerSentEvent> events() {
            return this.events;
        }

        public Set<Thread> threads() {
            return this.threads;
        }
    }

    /* renamed from: dev.langchain4j.http.client.HttpClientIT$3StreamingResult, reason: invalid class name */
    /* loaded from: input_file:dev/langchain4j/http/client/HttpClientIT$3StreamingResult.class */
    static final class C3StreamingResult extends Record {
        private final Throwable throwable;
        private final Set<Thread> threads;

        C3StreamingResult(Throwable th, Set<Thread> set) {
            this.throwable = th;
            this.threads = set;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, C3StreamingResult.class), C3StreamingResult.class, "throwable;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$3StreamingResult;->throwable:Ljava/lang/Throwable;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$3StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, C3StreamingResult.class), C3StreamingResult.class, "throwable;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$3StreamingResult;->throwable:Ljava/lang/Throwable;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$3StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, C3StreamingResult.class, Object.class), C3StreamingResult.class, "throwable;threads", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$3StreamingResult;->throwable:Ljava/lang/Throwable;", "FIELD:Ldev/langchain4j/http/client/HttpClientIT$3StreamingResult;->threads:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Throwable throwable() {
            return this.throwable;
        }

        public Set<Thread> threads() {
            return this.threads;
        }
    }

    protected abstract List<HttpClient> clients();

    @Test
    void should_return_successful_http_response_sync() {
        Iterator<HttpClient> it = clients().iterator();
        while (it.hasNext()) {
            SuccessfulHttpResponse execute = it.next().execute(HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany?\"\n        }\n    ]\n}\n").build());
            Assertions.assertThat(execute.statusCode()).isEqualTo(200);
            Assertions.assertThat(execute.headers()).isNotEmpty();
            Assertions.assertThat(execute.body()).contains(new CharSequence[]{"Berlin"});
        }
    }

    @Test
    void should_throw_400_sync() {
        Iterator<HttpClient> it = clients().iterator();
        while (it.hasNext()) {
            try {
                it.next().execute(HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\"\n}\n").build());
                Assertions.fail("Should have thrown an exception");
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(HttpException.class);
                HttpException httpException = e;
                Assertions.assertThat(httpException.statusCode()).isEqualTo(400);
                Assertions.assertThat(httpException.getMessage()).contains(new CharSequence[]{"Missing required parameter: 'messages'"});
            }
        }
    }

    @Test
    void should_throw_401_sync() {
        Iterator<HttpClient> it = clients().iterator();
        while (it.hasNext()) {
            try {
                it.next().execute(HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + "banana"}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany?\"\n        }\n    ]\n}\n").build());
                Assertions.fail("Should have thrown an exception");
            } catch (Exception e) {
                Assertions.assertThat(e).isExactlyInstanceOf(HttpException.class);
                HttpException httpException = e;
                Assertions.assertThat(httpException.statusCode()).isEqualTo(401);
                Assertions.assertThat(httpException.getMessage()).contains(new CharSequence[]{"Incorrect API key provided"});
            }
        }
    }

    @Test
    void should_return_successful_http_response_async() throws Exception {
        for (HttpClient httpClient : clients()) {
            HttpRequest build = HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany?\"\n        }\n    ],\n    \"stream\": true\n}\n").build();
            final CompletableFuture completableFuture = new CompletableFuture();
            ServerSentEventListener serverSentEventListener = (ServerSentEventListener) Mockito.spy(new ServerSentEventListener() { // from class: dev.langchain4j.http.client.HttpClientIT.1
                private final AtomicReference<SuccessfulHttpResponse> response = new AtomicReference<>();
                private final List<ServerSentEvent> events = new ArrayList();
                private final Set<Thread> threads = new HashSet();

                public void onOpen(SuccessfulHttpResponse successfulHttpResponse) {
                    this.threads.add(Thread.currentThread());
                    this.response.set(successfulHttpResponse);
                }

                public void onEvent(ServerSentEvent serverSentEvent) {
                    this.threads.add(Thread.currentThread());
                    this.events.add(serverSentEvent);
                }

                public void onError(Throwable th) {
                    this.threads.add(Thread.currentThread());
                    completableFuture.completeExceptionally(th);
                }

                public void onClose() {
                    this.threads.add(Thread.currentThread());
                    completableFuture.complete(new C1StreamingResult(this.response.get(), this.events, this.threads));
                }
            });
            httpClient.execute(build, new DefaultServerSentEventParser(), serverSentEventListener);
            C1StreamingResult c1StreamingResult = (C1StreamingResult) completableFuture.get(30L, TimeUnit.SECONDS);
            Assertions.assertThat(c1StreamingResult.response()).isNotNull();
            Assertions.assertThat(c1StreamingResult.response().statusCode()).isEqualTo(200);
            Assertions.assertThat(c1StreamingResult.response().headers()).isNotEmpty();
            Assertions.assertThat(c1StreamingResult.response().body()).isNull();
            Assertions.assertThat(c1StreamingResult.events()).isNotEmpty();
            Assertions.assertThat((String) c1StreamingResult.events().stream().map((v0) -> {
                return v0.data();
            }).collect(Collectors.joining(""))).contains(new CharSequence[]{"Berlin"});
            Assertions.assertThat(c1StreamingResult.threads()).hasSize(1);
            Assertions.assertThat(c1StreamingResult.threads().iterator().next()).isNotEqualTo(Thread.currentThread());
            InOrder inOrder = Mockito.inOrder(new Object[]{serverSentEventListener});
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onOpen((SuccessfulHttpResponse) ArgumentMatchers.any());
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.atLeastOnce())).onEvent((ServerSentEvent) ArgumentMatchers.any());
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onClose();
            inOrder.verifyNoMoreInteractions();
        }
    }

    @Test
    void should_return_successful_http_response_with_double_newline_async() throws Exception {
        for (HttpClient httpClient : clients()) {
            HttpRequest build = HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany? What is a capital of France? Your answers must be separated by a double newline!\"\n        }\n    ],\n    \"temperature\": 0.0,\n    \"stream\": true\n}\n").build();
            final CompletableFuture completableFuture = new CompletableFuture();
            ServerSentEventListener serverSentEventListener = (ServerSentEventListener) Mockito.spy(new ServerSentEventListener() { // from class: dev.langchain4j.http.client.HttpClientIT.2
                private final AtomicReference<SuccessfulHttpResponse> response = new AtomicReference<>();
                private final List<ServerSentEvent> events = new ArrayList();
                private final Set<Thread> threads = new HashSet();

                public void onOpen(SuccessfulHttpResponse successfulHttpResponse) {
                    this.threads.add(Thread.currentThread());
                    this.response.set(successfulHttpResponse);
                }

                public void onEvent(ServerSentEvent serverSentEvent) {
                    this.threads.add(Thread.currentThread());
                    this.events.add(serverSentEvent);
                }

                public void onError(Throwable th) {
                    this.threads.add(Thread.currentThread());
                    completableFuture.completeExceptionally(th);
                }

                public void onClose() {
                    this.threads.add(Thread.currentThread());
                    completableFuture.complete(new C2StreamingResult(this.response.get(), this.events, this.threads));
                }
            });
            httpClient.execute(build, new DefaultServerSentEventParser(), serverSentEventListener);
            C2StreamingResult c2StreamingResult = (C2StreamingResult) completableFuture.get(30L, TimeUnit.SECONDS);
            Assertions.assertThat(c2StreamingResult.response()).isNotNull();
            Assertions.assertThat(c2StreamingResult.response().statusCode()).isEqualTo(200);
            Assertions.assertThat(c2StreamingResult.response().headers()).isNotEmpty();
            Assertions.assertThat(c2StreamingResult.response().body()).isNull();
            Assertions.assertThat(c2StreamingResult.events()).isNotEmpty();
            Assertions.assertThat((String) c2StreamingResult.events().stream().map((v0) -> {
                return v0.data();
            }).collect(Collectors.joining(""))).contains(new CharSequence[]{"Berlin", "Paris", "\\n\\n"});
            Assertions.assertThat(c2StreamingResult.threads()).hasSize(1);
            Assertions.assertThat(c2StreamingResult.threads().iterator().next()).isNotEqualTo(Thread.currentThread());
            InOrder inOrder = Mockito.inOrder(new Object[]{serverSentEventListener});
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onOpen((SuccessfulHttpResponse) ArgumentMatchers.any());
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.atLeastOnce())).onEvent((ServerSentEvent) ArgumentMatchers.any());
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onClose();
            inOrder.verifyNoMoreInteractions();
        }
    }

    @Test
    void should_throw_400_async() throws Exception {
        for (HttpClient httpClient : clients()) {
            HttpRequest build = HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"stream\": true\n}\n").build();
            final CompletableFuture completableFuture = new CompletableFuture();
            ServerSentEventListener serverSentEventListener = (ServerSentEventListener) Mockito.spy(new ServerSentEventListener() { // from class: dev.langchain4j.http.client.HttpClientIT.3
                private final Set<Thread> threads = new HashSet();

                public void onOpen(SuccessfulHttpResponse successfulHttpResponse) {
                    completableFuture.completeExceptionally(new IllegalStateException("onOpen() should not be called"));
                }

                public void onEvent(ServerSentEvent serverSentEvent) {
                    completableFuture.completeExceptionally(new IllegalStateException("onEvent() should not be called"));
                }

                public void onError(Throwable th) {
                    this.threads.add(Thread.currentThread());
                    completableFuture.complete(new C3StreamingResult(th, this.threads));
                }

                public void onClose() {
                    completableFuture.completeExceptionally(new IllegalStateException("onClose() should not be called"));
                }
            });
            httpClient.execute(build, new DefaultServerSentEventParser(), serverSentEventListener);
            C3StreamingResult c3StreamingResult = (C3StreamingResult) completableFuture.get(30L, TimeUnit.SECONDS);
            Assertions.assertThat(c3StreamingResult.throwable()).isExactlyInstanceOf(HttpException.class).extracting("statusCode").isEqualTo(400);
            Assertions.assertThat(c3StreamingResult.throwable()).hasMessageContaining("Missing required parameter: 'messages'");
            Assertions.assertThat(c3StreamingResult.threads()).hasSize(1);
            Assertions.assertThat(c3StreamingResult.threads().iterator().next()).isNotEqualTo(Thread.currentThread());
            InOrder inOrder = Mockito.inOrder(new Object[]{serverSentEventListener});
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onError((Throwable) ArgumentMatchers.any());
            inOrder.verifyNoMoreInteractions();
        }
    }

    @Test
    void should_fail_when_listener_onOpen_throws_exception() throws Exception {
        for (HttpClient httpClient : clients()) {
            HttpRequest build = HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany?\"\n        }\n    ],\n    \"stream\": true\n}\n").build();
            final AtomicReference atomicReference = new AtomicReference();
            final List synchronizedList = Collections.synchronizedList(new ArrayList());
            final List synchronizedList2 = Collections.synchronizedList(new ArrayList());
            final Set synchronizedSet = Collections.synchronizedSet(new HashSet());
            ServerSentEventListener serverSentEventListener = (ServerSentEventListener) Mockito.spy(new ServerSentEventListener() { // from class: dev.langchain4j.http.client.HttpClientIT.4
                public void onOpen(SuccessfulHttpResponse successfulHttpResponse) {
                    atomicReference.set(successfulHttpResponse);
                    synchronizedSet.add(Thread.currentThread());
                    throw new RuntimeException("Unexpected exception in onOpen()");
                }

                public void onEvent(ServerSentEvent serverSentEvent) {
                    synchronizedList.add(serverSentEvent);
                    synchronizedSet.add(Thread.currentThread());
                }

                public void onError(Throwable th) {
                    synchronizedList2.add(th);
                    synchronizedSet.add(Thread.currentThread());
                }

                public void onClose() {
                    synchronizedSet.add(Thread.currentThread());
                }
            });
            httpClient.execute(build, new DefaultServerSentEventParser(), serverSentEventListener);
            Thread.sleep(5000L);
            Assertions.assertThat((SuccessfulHttpResponse) atomicReference.get()).isNotNull();
            Assertions.assertThat(synchronizedList).isEmpty();
            Assertions.assertThat(synchronizedList2).isEmpty();
            Assertions.assertThat(synchronizedSet).hasSize(1);
            Assertions.assertThat((Thread) synchronizedSet.iterator().next()).isNotEqualTo(Thread.currentThread());
            InOrder inOrder = Mockito.inOrder(new Object[]{serverSentEventListener});
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onOpen((SuccessfulHttpResponse) ArgumentMatchers.any());
            inOrder.verifyNoMoreInteractions();
        }
    }

    @Test
    void should_fail_when_listener_onEvent_throws_exception() throws Exception {
        for (HttpClient httpClient : clients()) {
            HttpRequest build = HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + OPENAI_API_KEY}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany?\"\n        }\n    ],\n    \"stream\": true\n}\n").build();
            final AtomicReference atomicReference = new AtomicReference();
            final List synchronizedList = Collections.synchronizedList(new ArrayList());
            final List synchronizedList2 = Collections.synchronizedList(new ArrayList());
            final Set synchronizedSet = Collections.synchronizedSet(new HashSet());
            ServerSentEventListener serverSentEventListener = (ServerSentEventListener) Mockito.spy(new ServerSentEventListener() { // from class: dev.langchain4j.http.client.HttpClientIT.5
                public void onOpen(SuccessfulHttpResponse successfulHttpResponse) {
                    atomicReference.set(successfulHttpResponse);
                    synchronizedSet.add(Thread.currentThread());
                }

                public void onEvent(ServerSentEvent serverSentEvent) {
                    synchronizedList.add(serverSentEvent);
                    synchronizedSet.add(Thread.currentThread());
                    throw new RuntimeException("Unexpected exception in onEvent()");
                }

                public void onError(Throwable th) {
                    synchronizedList2.add(th);
                    synchronizedSet.add(Thread.currentThread());
                }

                public void onClose() {
                    synchronizedSet.add(Thread.currentThread());
                }
            });
            httpClient.execute(build, new DefaultServerSentEventParser(), serverSentEventListener);
            Thread.sleep(5000L);
            Assertions.assertThat((SuccessfulHttpResponse) atomicReference.get()).isNotNull();
            Assertions.assertThat(synchronizedList).hasSize(1);
            Assertions.assertThat(synchronizedList2).isEmpty();
            Assertions.assertThat(synchronizedSet).hasSize(1);
            Assertions.assertThat((Thread) synchronizedSet.iterator().next()).isNotEqualTo(Thread.currentThread());
            InOrder inOrder = Mockito.inOrder(new Object[]{serverSentEventListener});
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onOpen((SuccessfulHttpResponse) ArgumentMatchers.any());
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onEvent((ServerSentEvent) ArgumentMatchers.any());
            inOrder.verifyNoMoreInteractions();
        }
    }

    @Test
    void should_fail_when_listener_onError_throws_exception() throws Exception {
        for (HttpClient httpClient : clients()) {
            HttpRequest build = HttpRequest.builder().method(HttpMethod.POST).url("https://api.openai.com/v1/chat/completions").addHeader("Authorization", new String[]{"Bearer " + "banana"}).addHeader("Content-Type", new String[]{"application/json"}).body("{\n    \"model\": \"gpt-4o-mini\",\n    \"messages\": [\n        {\n            \"role\" : \"user\",\n            \"content\" : \"What is the capital of Germany?\"\n        }\n    ],\n    \"stream\": true\n}\n").build();
            final AtomicReference atomicReference = new AtomicReference();
            final List synchronizedList = Collections.synchronizedList(new ArrayList());
            final List synchronizedList2 = Collections.synchronizedList(new ArrayList());
            final Set synchronizedSet = Collections.synchronizedSet(new HashSet());
            ServerSentEventListener serverSentEventListener = (ServerSentEventListener) Mockito.spy(new ServerSentEventListener() { // from class: dev.langchain4j.http.client.HttpClientIT.6
                public void onOpen(SuccessfulHttpResponse successfulHttpResponse) {
                    atomicReference.set(successfulHttpResponse);
                    synchronizedSet.add(Thread.currentThread());
                }

                public void onEvent(ServerSentEvent serverSentEvent) {
                    synchronizedList.add(serverSentEvent);
                    synchronizedSet.add(Thread.currentThread());
                }

                public void onError(Throwable th) {
                    synchronizedList2.add(th);
                    synchronizedSet.add(Thread.currentThread());
                    throw new RuntimeException("Unexpected exception in onError()");
                }

                public void onClose() {
                    synchronizedSet.add(Thread.currentThread());
                }
            });
            httpClient.execute(build, new DefaultServerSentEventParser(), serverSentEventListener);
            Thread.sleep(5000L);
            Assertions.assertThat((SuccessfulHttpResponse) atomicReference.get()).isNull();
            Assertions.assertThat(synchronizedList).isEmpty();
            Assertions.assertThat(synchronizedList2).hasSize(1);
            Assertions.assertThat((Throwable) synchronizedList2.get(0)).isExactlyInstanceOf(HttpException.class).hasMessageContaining("Incorrect API key provided");
            Assertions.assertThat(synchronizedSet).hasSize(1);
            Assertions.assertThat((Thread) synchronizedSet.iterator().next()).isNotEqualTo(Thread.currentThread());
            InOrder inOrder = Mockito.inOrder(new Object[]{serverSentEventListener});
            ((ServerSentEventListener) inOrder.verify(serverSentEventListener, Mockito.times(1))).onError((Throwable) ArgumentMatchers.any());
            inOrder.verifyNoMoreInteractions();
        }
    }
}
