This guide demonstrates how your Quarkus application can utilize MicroProfile Reactive Messaging to consume and produce HTTP messages.

Prerequisites

To complete this guide, you need:

  • less than 15 minutes

  • an IDE

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.6.3

  • GraalVM, Docker or Podman installed if you want to run in native mode.

Architecture

In this guide we will implement a service, namely CostConverter that consumes HTTP messages with costs in multiple currencies and converts each cost to its value in Euro.

To let a user easily try out the service, we will implement an HTTP resource summing up the costs (CostCollector), and a simple web page to add new costs and watch the sum.

Solution

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone https://github.com/quarkusio/quarkus-quickstarts.git, or download an archive.

The solution is located in the reactive-messaging-http-quickstart directory.

Creating the Maven Project

First, we need a new project. Create a new project with the following command:

mvn io.quarkus:quarkus-maven-plugin:1.11.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=reactive-messaging-http-quickstart \
    -Dextensions="reactive-messaging-http"
cd reactive-http

This command generates a Maven project, importing the Reactive Messaging and HTTP connector extensions.

The Converter

Create the src/main/java/org/acme/reactivehttp/CostConverter.java file, with the following content:

package org.acme.reactivehttp;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

/**
 * A bean consuming costs in multiple currencies and producing prices in EUR from them
 */
@ApplicationScoped
public class CostConverter {

    private static final Logger log = Logger.getLogger(CostConverter.class);

    private static final Map<String, Double> conversionRatios = new HashMap<>();

    static {
        conversionRatios.put("CHF", 0.93);
        conversionRatios.put("USD", 0.84);
        conversionRatios.put("PLN", 0.22);
        conversionRatios.put("EUR", 1.);
    }

    @Incoming("incoming-costs") (1)
    @Outgoing("outgoing-costs") (2)
    double convert(Cost cost) { (3)
        Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
        if (conversionRatio == null) {
            return 0.;
        }
        return conversionRatio * cost.getValue();
    }
}
1 Consume messages from the incoming-costs stream.
2 Dispatch returned values to the outgoing-costs stream.
3 Consume an event with payload of type Cost and produce a double. In the case of consuming an arbitrary object, the reactive-messaging-http extension will attempt to deserialize the request body assuming it is JSON.

Let’s define the Cost class:

package org.acme.reactivehttp;

public class Cost {
    private double value;
    private String currency;

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getCurrency() {
        return currency;
    }

    public void setCurrency(String currency) {
        this.currency = currency;
    }
}

In the next step, we will create configurations for both streams in the application.properties file.

Configuring the HTTP connector

We need to configure the HTTP connector. This is done in the application.properties file. The keys are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.{property}=value

The channel-name segment must match the value set in the @Incoming and @Outgoing annotation:

  • incoming-costs → a source that receives costs

  • outgoing-costs → a sink that receives converted costs

mp.messaging.outgoing.outgoing-costs.connector=quarkus-http

# here we are using a URL pointing to a test endpoint
# you can use e.g. an environment variable to change it
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector

# POST is the default method. Another possibility is PUT
mp.messaging.outgoing.outgoing-costs.method=POST


mp.messaging.incoming.incoming-costs.connector=quarkus-http

# the incoming-costs channel will be fed via an endpoint on the `/costs` path
mp.messaging.incoming.incoming-costs.path=/costs

# POST is the default method. Another possibility is PUT
mp.messaging.incoming.incoming-costs.method=POST

The CostCollector

To illustrate that converting messages and passing them through works, let’s add an endpoint that will receive the outgoing costs and sum them up. This is a usual JAX-RS endpoint.

package org.acme.reactivehttp;

import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {

    private double sum = 0;

    @POST
    public void consumeCost(String valueAsString) {
        sum += Double.parseDouble(valueAsString);
    }

    @GET
    public double getSum() {
        return sum;
    }

}

The HTML page

To conveniently interact with the application, let’s create a simple web page.

The page will provide a form to add costs, and an info of the current sum of costs. The page periodically updates the sum by requesting the current sum from /cost-collector.

Create the src/main/resources/META-INF/resources/index.html file, with the following content:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Costs</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Add a cost</h2>
    <div>
        <div>
            <label for="value">Value</label>
            <input type="text" id="value">
        </div>
        <div>
            <label for="currency">Currency</label>
            <select id="currency">
                <option value="CHF">Swiss franc</option>
                <option value="USD">United States dollar</option>
                <option value="CHF">Polish złoty</option>
            </select>
        </div>
        <input type="button" onclick="add()" value="Add">
    </div>


    <h2>Last cost</h2>
    <div class="row">
        <p class="col-md-12">The total cost is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script>
    add = function() {
        var value = document.getElementById('value').value;
        var currency = document.getElementById('currency').value;

        var cost = {
            value: document.getElementById('value').value,
            currency: document.getElementById('currency').value
        };

        fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
    }

    updateCost = function() {
        fetch('cost-collector').then(response => response.text()).then(sum =>
            document.getElementById('content').textContent=sum
        );
    }

    window.setInterval(updateCost, 500);
</script>
</html>

Get it running

If you followed the instructions, you should have the AMQP broker running. Then, you just need to run the application using:

./mvnw quarkus:dev

Open http://localhost:8080/index.html in your browser.

If you started the AMQP broker with docker compose, stop it using CTRL+C followed by docker-compose down.

Running Native

You can build the native executable with:

./mvnw package -Pnative

Going further

Web Sockets

Except of the quarkus-http connector, the quarkus-reactive-messaging-http extension also brings in quarkus-websocket - a connector for exposing and feeding Web Sockets. At the moment only binary data is supported.

While the sink of the HTTP connector checks if the message is consumed by the remote endpoint, the Web Socket sink does not. It may happen that a failure to receive a message is not reported, e.g. if the remote side closes the Web Socket connection in a crucial moment.

Reactive Messaging

This extension utilizes MicroProfile Reactive Messaging to build data streaming applications.

If you did the Kafka or AMQP quickstart, you have realized that it’s the same code. The only difference is the connector configuration.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.