This guide demonstrates how your Quarkus application can utilize MicroProfile Reactive Messaging to consume and produce HTTP messages.
Prerequisites
To complete this guide, you need:
-
less than 15 minutes
-
an IDE
-
JDK 1.8+ installed with
JAVA_HOME
configured appropriately -
Apache Maven 3.6.3
-
GraalVM, Docker or Podman installed if you want to run in native mode.
Architecture
In this guide we will implement a service, namely CostConverter
that consumes HTTP messages
with costs in multiple currencies and converts each cost to its value in Euro.
To let a user easily try out the service, we will implement an HTTP resource summing up the costs
(CostCollector
), and a simple web page to add new costs and watch the sum.
Solution
We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.
Clone the Git repository: git clone https://github.com/quarkusio/quarkus-quickstarts.git
, or download an archive.
The solution is located in the reactive-messaging-http-quickstart
directory.
Creating the Maven Project
First, we need a new project. Create a new project with the following command:
mvn io.quarkus:quarkus-maven-plugin:1.13.0.CR1:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=reactive-messaging-http-quickstart \
-Dextensions="reactive-messaging-http" \
-DnoExamples
cd reactive-http
This command generates a Maven project, importing the Reactive Messaging and HTTP connector extensions.
The Converter
Create the src/main/java/org/acme/reactivehttp/CostConverter.java
file, with the following content:
package org.acme.reactivehttp;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
/**
* A bean consuming costs in multiple currencies and producing prices in EUR from them
*/
@ApplicationScoped
public class CostConverter {
private static final Logger log = Logger.getLogger(CostConverter.class);
private static final Map<String, Double> conversionRatios = new HashMap<>();
static {
conversionRatios.put("CHF", 0.93);
conversionRatios.put("USD", 0.84);
conversionRatios.put("PLN", 0.22);
conversionRatios.put("EUR", 1.);
}
@Incoming("incoming-costs") (1)
@Outgoing("outgoing-costs") (2)
double convert(Cost cost) { (3)
Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
if (conversionRatio == null) {
return 0.;
}
return conversionRatio * cost.getValue();
}
}
1 | Consume messages from the incoming-costs stream. |
2 | Dispatch returned values to the outgoing-costs stream. |
3 | Consume an event with payload of type Cost and produce a double .
In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt
to deserialize the request body assuming it is JSON. |
Let’s define the Cost
class:
package org.acme.reactivehttp;
public class Cost {
private double value;
private String currency;
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
public String getCurrency() {
return currency;
}
public void setCurrency(String currency) {
this.currency = currency;
}
}
In the next step, we will create configurations for both streams in the application.properties
file.
Configuring the HTTP connector
We need to configure the HTTP connector. This is done in the application.properties
file.
The keys are structured as follows:
mp.messaging.[outgoing|incoming].{channel-name}.{property}=value
The channel-name
segment must match the value set in the @Incoming
and @Outgoing
annotation:
-
incoming-costs
→ a source that receives costs -
outgoing-costs
→ a sink that receives converted costs
mp.messaging.outgoing.outgoing-costs.connector=quarkus-http
# here we are using a URL pointing to a test endpoint
# you can use e.g. an environment variable to change it
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector
# POST is the default method. Another possibility is PUT
mp.messaging.outgoing.outgoing-costs.method=POST
mp.messaging.incoming.incoming-costs.connector=quarkus-http
# the incoming-costs channel will be fed via an endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs
# POST is the default method. Another possibility is PUT
mp.messaging.incoming.incoming-costs.method=POST
The CostCollector
To illustrate that converting messages and passing them through works, let’s add an endpoint that will receive the outgoing costs and sum them up. This is a usual JAX-RS endpoint.
package org.acme.reactivehttp;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {
private double sum = 0;
@POST
public void consumeCost(String valueAsString) {
sum += Double.parseDouble(valueAsString);
}
@GET
public double getSum() {
return sum;
}
}
The HTML page
To conveniently interact with the application, let’s create a simple web page.
The page will provide a form to add costs, and an info of the current sum of costs.
The page periodically updates the sum by requesting the current sum from /cost-collector
.
Create the src/main/resources/META-INF/resources/index.html
file, with the following content:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Costs</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Add a cost</h2>
<div>
<div>
<label for="value">Value</label>
<input type="text" id="value">
</div>
<div>
<label for="currency">Currency</label>
<select id="currency">
<option value="CHF">Swiss franc</option>
<option value="USD">United States dollar</option>
<option value="CHF">Polish złoty</option>
</select>
</div>
<input type="button" onclick="add()" value="Add">
</div>
<h2>Last cost</h2>
<div class="row">
<p class="col-md-12">The total cost is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script>
add = function() {
var value = document.getElementById('value').value;
var currency = document.getElementById('currency').value;
var cost = {
value: document.getElementById('value').value,
currency: document.getElementById('currency').value
};
fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
}
updateCost = function() {
fetch('cost-collector').then(response => response.text()).then(sum =>
document.getElementById('content').textContent=sum
);
}
window.setInterval(updateCost, 500);
</script>
</html>
Get it running
Run the application using:
./mvnw quarkus:dev
Open http://localhost:8080/index.html
in your browser.
Going further
HTTP connector options
All quarkus-http
connector options:
# OUTGOING
# The target URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213
# Message payload serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# The number of attempts to make for sending a request to a remote endpoint. Must not be less than zero
# Zero by default
mp.messaging.outgoing.<channelName>.maxRetries=3
# Configures the random factor when using back-off with maxRetries > 0. 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.3
# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=1s
#The HTTP method (either `POST` or `PUT`), `POST` by default
mp.messaging.outgoing.<channelName>.method=PUT
#INCOMING
# The HTTP method (either `POST` or `PUT`, `POST` by default
mp.messaging.incoming.<channelName>.method=POST
# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint
# HTTP endpoint buffers messages if a consumer is not able to keep up. This setting specifies the size of the buffer.
# 8 by default.
mp.messaging.incoming.<channelName>.buffer-size=13
WebSockets
Except of the quarkus-http
connector, the quarkus-reactive-messaging-http
extension also brings in
quarkus-websocket
- a connector for exposing and feeding WebSockets.
At the moment only binary data is supported.
While the sink of the HTTP connector checks if the message is consumed by the remote endpoint, the WebSocket sink does not. It may happen that a failure to receive a message is not reported, e.g. if the remote side closes the WebSocket connection in a crucial moment. |
The quarkus-websocket
connector is configured with the following properties:
# OUTGOING
# The target URL
mp.messaging.outgoing.<channelName>.url=ws://localhost:8234/
# Message serializer, optional, implementation of `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer`
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# The number of retries to make for sending a message to a remote websocket endpoint.
# A value greater than 0 is advised. Otherwise, a web socket timeout can result in a dropped message
# The default value is 1
mp.messaging.outgoing.<channelName>.maxRetries=1
# Configures the random factor when using back-off with maxAttempts > 1, 0.5 by default
mp.messaging.outgoing.<channelName>.jitter=0.7
# Configures a back-off delay between attempts to send a request.
# A random factor (jitter) is applied to increase the delay when several failures happen.
mp.messaging.outgoing.<channelName>.delay=2s
# INCOMING
# The path of the endpoint
mp.messaging.incoming.<channelName>.path=/my-ws-endpoint
# Web socket endpoint buffers messages if a consumer is not able to keep up.
# This setting specifies the size of the buffer. 8 by default
mp.messaging.incoming.<channelName>.buffer-size=3
Reactive Messaging
This extension utilizes MicroProfile Reactive Messaging to build data streaming applications.
If you did the Kafka or AMQP quickstart, you have realized that it’s the same code. The only difference is the connector configuration.
If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.