REST with Asynchronous Jersey and RXJava – Part 4

By | December 29, 2016

In this series of articles I will implement a RESTful web service using Spring Boot and Jersey. I will then modify the service as to use asynchronous server-side request response processing and RXJava. Load tests will be made using Gatling before and after the modifications, in order to determine any improvements.

The first article in this series is available here, the second part here and the third part here.

In this fourth part in this series, I will first of all stand corrected. The Jersey features I have used in my example program does not enable asynchronous request processing, but asynchronous server-side response processing.

I will modify the example program as to use Jersey’s asynchronous server-side response processing and, as usual by now, load-test the modified version of the program. As in the previous article in this series, I will continue to use an external database with the exact same configuration.

Jersey Asynchronous Response Processing

Introducing Jersey’s asynchronous response processing requires smaller modifications compared to when introducing RXJava. The response is no longer created by the REST resource base class, but provided by Jersey as a parameter. This will not only reduce the amount of code but also fit in better with RXJava.

The Jersey asynchronous response processing means that there no longer is an I/O thread locked waiting for a response from a request processing method. Instead, the I/O thread may immediately return to the container, Tomcat in the case of my example program, while the processing of the request continues in another thread.

New Version of the REST Resource Base Class

First I will show the entire new version of the REST resource base class and then a detailed analysis will follow.

package se.ivankrizsan.restexample.restadapter;


import se.ivankrizsan.restexample.domain.LongIdEntity;
import se.ivankrizsan.restexample.services.AbstractServiceBaseRxJava;

import javax.validation.constraints.NotNull;
import javax.ws.rs.*;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;

/**
 * Abstract base class for REST resources exposing operations on an entity type.
 * All operations will return HTTP status 500 with a plain text body containing an
 * error message if an error occurred during request processing.
 *
 * @param <E> Entity type.
 * @author Ivan Krizsan
 */
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN})
@Consumes({MediaType.APPLICATION_JSON})
public abstract class RestResourceBaseRxJava<E extends LongIdEntity> {
    /* Constant(s): */

    /* Instance variable(s): */
    protected AbstractServiceBaseRxJava<E> mService;

    /**
     * Retrieves all entities.
     *
     * @param inAsyncResponse Asynchronous response object.
     */
    @GET
    public void getAll(@Suspended final AsyncResponse inAsyncResponse) {

        mService.findAll().subscribe(
            inResult -> inAsyncResponse.resume(
                Response.ok(entityListToArray(inResult)).build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity("An error occurred retrieving all entities: "
                        + inError.getMessage()).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()));
    }

    /**
     * Deletes the entity with supplied id.
     *
     * @param inAsyncResponse Asynchronous response object.
     * @param inEntityId Id of entity to delete.
     */
    @DELETE
    @Path("{id}")
    public void deleteEntityById(
        @Suspended final AsyncResponse inAsyncResponse,
        @PathParam("id") @NotNull final Long inEntityId) {
        mService.delete(inEntityId).subscribe(
            inResult -> inAsyncResponse.resume(Response.ok().build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity("An error occurred deleting entity with id " + inEntityId).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()),
            () -> inAsyncResponse.resume(Response.ok().build()));
    }

    /**
     * Deletes all entities.
     * Will return HTTP status 500 if error occurred during request processing.
     *
     * @param inAsyncResponse Asynchronous response object.
     */
    @DELETE
    public void deleteAllEntities(@Suspended final AsyncResponse inAsyncResponse) {
        mService.deleteAll().subscribe(
            inResult -> inAsyncResponse.resume(Response.ok().build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity("An error occurred deleting all entities.").
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()),
            () -> inAsyncResponse.resume(Response.ok().build()));
    }

    /**
     * Retrieves entity with supplied id.
     *
     * @param inEntityId Id of entity to retrieve.
     * @param inAsyncResponse Asynchronous response object.
     */
    @GET
    @Path("{id}")
    public void getEntityById(@PathParam("id") Long inEntityId,
        @Suspended final AsyncResponse inAsyncResponse) {
        mService.find(inEntityId).subscribe(
            inResult -> inAsyncResponse.resume(Response.ok(inResult).build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity(inError.getMessage()).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()));
    }

    /**
     * Updates the entity with supplied id by overwriting it with the supplied entity.
     *
     * @param inAsyncResponse Asynchronous response object.
     * @param inEntity Entity data to write.
     * @param inEntityId Id of entity to update.
     */
    @PUT
    @Path("{id}")
    public void updateEntity(@Suspended final AsyncResponse inAsyncResponse,
        final E inEntity, @PathParam("id") @NotNull final Long inEntityId) {

        inEntity.setId(inEntityId);
        mService.update(inEntity).subscribe(
            inResult -> inAsyncResponse.resume(Response.ok(inResult).build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity("An error occurred updating entity with id "
                        + inEntityId + ": " + inError.getMessage()).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()));
    }

    /**
     * Creates a new entity using the supplied entity data.
     *
     * @param inAsyncResponse Asynchronous response object.
     * @param inEntity Entity data to use when creating new entity.
     */
    @POST
    public void createEntity(
        @Suspended final AsyncResponse inAsyncResponse, final E inEntity) {
        if (inEntity.getId() != null) {
            final Response response = Response.status(400).entity(
                "Id must not be set on new entity").build();
            inAsyncResponse.resume(response);
        }

        mService.save(inEntity).subscribe(
            inResult -> inAsyncResponse.resume(Response.ok(inResult).build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity("An error occurred creating a new entity: "
                        + inError.getMessage()).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()));
    }

    /**
     * Creates an array containing the entities in the supplied list.
     *
     * @param inEntityList List of entities.
     * @return Array containing the entities from the list.
     */
    protected abstract E[] entityListToArray(final List<E> inEntityList);

    public AbstractServiceBaseRxJava<E> getService() {
        return mService;
    }

    public void setService(final AbstractServiceBaseRxJava<E> inService) {
        mService = inService;
    }
}

The method signatures and part of the implementation of all the methods annotated with HTTP verbs (@GET, @POST, @PUT and @DELETE) have changed. The in all of these methods are the same, so we’ll just have a closer look at one of these methods.

A Method Before and After Asynchronous Response Processing

Let’s compare the previous and the new versions of the getEntityById method. First, recall the previous version:

    @GET
    @Path("{id}")
    public Response getEntityById(@PathParam("id") Long inEntityId) {
        final AtomicReference<Response> theResponseContainer =
            new AtomicReference<>();

        mService.find(inEntityId).subscribe(
            inResult -> theResponseContainer.set(Response.ok(inResult).build()),
            inError -> theResponseContainer.set(
                Response.
                    status(500).
                    entity(inError.getMessage()).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()));
        return theResponseContainer.get();
    }

The new version looks like this:

    @GET
    @Path("{id}")
    public void getEntityById(@PathParam("id") Long inEntityId,
        @Suspended final AsyncResponse inAsyncResponse) {
        mService.find(inEntityId).subscribe(
            inResult -> inAsyncResponse.resume(Response.ok(inResult).build()),
            inError -> inAsyncResponse.resume(
                Response.
                    status(500).
                    entity(inError.getMessage()).
                    type(MediaType.TEXT_PLAIN_TYPE).
                    build()));
    }

Note that:

  • In the new version, the method return type is void.
    Since response processing is now asynchronous the result of a method is no longer to be returned but to sent to a response object, as we will shortly see.
  • In the new version, the method has an additional parameter – an AsyncResponse object.
    This is an object to which the response will be sent when the processing of the request has finished.
  • The additional parameter in the new version of the method is annotated with the @Suspended annotation.
    The @Suspended annotation tells Jersey that a new AsyncResponse object is to be injected into the annotated parameter whenever a request arrives bound for this method. The annotation also tells Jersey that the method is to be executed in an asynchronous fashion.
  • In the new version of the method, there is no longer any need to use the response container of the type AtomicReference.
    The AsyncResponse object is a final method parameter and thus readily available in the RXJava callback code blocks. Instead of storing the response object created in the RXJava callbacks, the response object can be sent back directly to the asynchronous response using the resume method.

Run Tests

To verify that these latest modifications hasn’t broken the program, we now run all the tests. Recall that the tests are TestNG tests – in my IDE there is a special alternative for running all the TestNG tests in a project.

Alternatively the tests can be run using Maven. In a terminal window, use the command:

mvn test

All tests should pass.

Running the Load Test

It is now time for the load test once again to test on the new version of the REST service.

  • Start the example program.
    As usual, right-click the RestExampleApplication class and run it.
  • Run the Gatling load test in a terminal window.
    mvn gatling:test
  • Examine the Gatling load test report.

The text-version of the report obtained after my load-test looks like this:

================================================================================
---- Global Information --------------------------------------------------------
> request count                                       5287 (OK=5287   KO=0     )
> min response time                                     14 (OK=14     KO=-     )
> max response time                                  13258 (OK=13258  KO=-     )
> mean response time                                  2174 (OK=2174   KO=-     )
> std deviation                                       2404 (OK=2404   KO=-     )
> response time 50th percentile                       1511 (OK=1511   KO=-     )
> response time 75th percentile                       2165 (OK=2165   KO=-     )
> response time 95th percentile                       8555 (OK=8555   KO=-     )
> response time 99th percentile                      12384 (OK=12384  KO=-     )
> mean requests/sec                                 85.274 (OK=85.274 KO=-     )
---- Response Time Distribution ------------------------------------------------
> t < 800 ms                                           998 ( 19%)
> 800 ms < t < 1200 ms                                 973 ( 18%)
> t > 1200 ms                                         3316 ( 63%)
> failed                                                 0 (  0%)
================================================================================

These numbers are almost identical with the result from the load-test after having introduced RXJava in the previous article in this series. The graphs for this load test looks like this:

Load test response time distribution graph for the final version of the example program.

Load test response time distribution graph for the final version of the example program.

Load test responses per second graph for the final version of the example program.

Load test responses per second graph for the final version of the example program.

Why did we not achieve a better result from the load-test?
If one studies the Jersey documentation on the asynchronous server API, the following can be read:

“Note that the use of server-side asynchronous processing model will not improve the request processing time perceived by the client.”

So, why fiddle around with the Jersey asynchronous server API when it does not give us any improvements that can be perceived by the client?
While using an asynchronous processing model may not increase the performance, it does increase the robustness of the service. A service using a synchronous processing model that receives a high number of request may run out of threads to process the requests. This will result in requests failing. On the other hand, a service using an asynchronous processing model will not run out of resources as easily and be able to queue up more requests for processing. Requests may of course eventually time out if the service is not able to keep up the processing but that is another problem.

In my example program, the database is still as slow as ever and it is still the bottleneck so we can’t really expect any performance improvements.

Conclusion

So I wasted all this time fiddling around with RXJava and Jersey’s asynchronous response processing?

Personally, I do not think so. I have expanded my knowledge regarding load testing. I have learned to create more robust JAX-RS web services with Jersey. Finally, I have had a peek into RXJava, which I am sure I will get back to in the future.

This concludes this series of articles on REST with Jersey and RXJava. I hope you have learned something along the way.
As before, all the source-code is on GitHub. There is a “before” branch which shows the program before any changes were made. The “master” branch contains the final version of the example program with all modifications applied.

Happy coding!

One thought on “REST with Asynchronous Jersey and RXJava – Part 4

Leave a Reply

Your email address will not be published. Required fields are marked *