Spring Integration 5: WebFluxInboundEndpoint and Remote Client Address

By | October 14, 2017

Spring Integration 5 comes with the new WebFluxInboundEndpoint which is an endpoint that allows you to use reactive programming when handling HTTP requests in your Spring Integration application. Not only this, but the WebFluxInboundEndpoint is also implemented in a fashion which makes it much more suitable for reuse. In this article I am going to show you how to add a HTTP header that contains the remote address and port of the client, provided that this information is available. I am also going to look at testing when your code depends on the remote client address having been set by the endpoint.

Original image by Blake Burkhart. Modified by the article author.

Original image by Blake Burkhart. Modified by the article author.

HTTP Messaging Gateway?

What about the regular Spring Integration HTTP endpoint, the HttpRequestHandlingMessagingGateway, that has been around since Spring Integration 2, you may wonder. Regretfully that class uses both private and final methods, which makes implementing code that makes modifications to HTTP requests much more difficult. My guess is that it would be easier to implement a new class that is a sibling to HttpRequestHandlingMessagingGateway rather than trying to inherit from it. I will not attempt such an exercise in this article.

Approach

Normally I would want to write one or more tests that exposed the conditions of what I want to change, in order to be able to see a clear difference between before and after. However, in the case of starting a WebFluxInboundEndpoint as part of a test in a Spring Boot project, a regular server (Netty in this case) will not be started. The consequence of this in this particular case is that the remote address of the client having sent a request will never be set on the HTTP request object.

The approach I have used in this article is to run the example as a Spring Boot application and use Postman to send requests to the application. Any other HTTP client capable of sending POST requests and viewing the response will of course do, such as the REST Client in IntelliJ IDEA.

Create the Example Project

I am using Spring Initializr to generate the skeleton for the example project. Use this link to generate your own copy.
With Spring Integration WebFlux having been separated into its own module, we also need to add tit to the dependencies of the example project. In the <dependency> element in the pom.xml file, add the following dependency:

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-webflux</artifactId>
    </dependency>

Configuration

Spring Integration 5 has one quality which I like very much in which there apparently has gone some effort and thought; things of the same type, like message channels, endpoints etc are quite interchangeable. It thus requires a minimum of effort to switch from a HttpRequestHandlingMessagingGateway to a WebFluxInboundEndpoint – just change the type of object created and, depending on the variable type if you have one, the variable type.

In this example I am going to start out with a configuration that uses the regular WebFluxInboundEndpoint and later modify it. This is what the first version of the class containing the Spring Java configuration looks like:

package se.ivankrizsan.springintegration.remoteaddresswebfluxinboundendpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.http.support.DefaultHttpHeaderMapper;
import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.reactive.config.EnableWebFlux;

/**
 * Configuration for the application trying out the webflux inbound endpoint that
 * stores the remote address of the client sending request to the endpoint in a HTTP
 * header.
 * Note that if this would be the configuration for a test and the inbound endpoint
 * started as part of that test, then no real server (e.g. Netty) would not be started
 * and there would be no remote address set on requests.
 *
 * @author Ivan Krizsan
 */
@Configuration
@EnableWebFlux
@EnableIntegration
public class RemoteAddressWebFluxInboundEndpointConfiguration {
    /* Constant(s): */
    private static final Logger LOGGER = LoggerFactory.getLogger(
        RemoteAddressWebFluxInboundEndpointConfiguration.class);
    public static final String INBOUND_ENDPOINT_REQUEST_CHANNEL
        = "webfluxInboundEndpointRequestChannel";
    public static final String INBOUND_ENDPOINT_PATH_PATTERN = "/webflux";

    /**
     * Creates and configures the webflux inbound endpoint.
     *
     * @param inInboundHeaderMapper Mapper to be set on the inbound endpoint.
     * The mapper will map HTTP headers to message headers in requests and message headers
     * to HTTP headers in responses.
     * @return Inbound endpoint bean.
     */
    @Bean
    WebFluxInboundEndpoint webfluxInboundEndpoint(
        final HeaderMapper<HttpHeaders> inInboundHeaderMapper) {
        final WebFluxInboundEndpoint theInboundEndpoint =
            new WebFluxInboundEndpoint();

        final RequestMapping theRequestMapping = new RequestMapping();
        /* Accept only POST requests. */
        theRequestMapping.setMethods(HttpMethod.POST);
        /* Accept all media types in requests. */
        theRequestMapping.setConsumes(MediaType.ALL_VALUE);
        /* Produces plaintext responses. */
        theRequestMapping.setProduces(MediaType.TEXT_PLAIN_VALUE);
        /* HTTP path pattern at which the endpoint will listen. */
        theRequestMapping.setPathPatterns(INBOUND_ENDPOINT_PATH_PATTERN);

        theInboundEndpoint.setRequestMapping(theRequestMapping);
        theInboundEndpoint.setRequestPayloadTypeClass(String.class);
        theInboundEndpoint.setHeaderMapper(inInboundHeaderMapper);
        theInboundEndpoint.setRequestChannelName(INBOUND_ENDPOINT_REQUEST_CHANNEL);

        return theInboundEndpoint;
    }

    /**
     * Creates the header mapper that will map HTTP headers to message headers
     * for inbound requests.
     * Note that the HTTP header containing the remote client address must be added to
     * the names of inbound HTTP headers to be mapped to message headers, otherwise it will not
     * appear in request messages.
     *
     * @return HTTP header mapper for the webflux inbound endpoint.
     */
    @Bean
    HeaderMapper<HttpHeaders> requestHeaderMapper() {
        final DefaultHttpHeaderMapper theHttpHeaderMapper = DefaultHttpHeaderMapper.inboundMapper();
        final String[] theInboundHeaderNames = new String[] {
            DefaultHttpHeaderMapper.HTTP_REQUEST_HEADER_NAME_PATTERN,
            RemoteExtractingWebFluxInboundEndpoint.REMOTE_ADDRESS_HEADER
        };
        theHttpHeaderMapper.setInboundHeaderNames(theInboundHeaderNames);
        return theHttpHeaderMapper;
    }

    /**
     * Handles requests received by the inbound endpoint during tests.
     * If there is a remote address header in the request, then the value of this header
     * is to become the payload of the response message.
     * Otherwise a message saying that no remote address header present in request will
     * be the response payload.
     *
     * @param inRequestMessage Request message.
     * @return Response message.
     */
    @ServiceActivator(inputChannel = INBOUND_ENDPOINT_REQUEST_CHANNEL)
    public Message<String> handleRequest(final Message<String> inRequestMessage) {
        String theResponsePayload = "Uh-oh, no remote address found!";

        LOGGER.info("WebFlux inbound endpoint received a request message: {}",
            inRequestMessage);

        final Object theRemoteHeaderValue = inRequestMessage.getHeaders().get(
            RemoteExtractingWebFluxInboundEndpoint.REMOTE_ADDRESS_HEADER);
        if (theRemoteHeaderValue != null) {
            theResponsePayload = theRemoteHeaderValue.toString();

            LOGGER.info("Message header in request contained the remote client address: {}",
                theRemoteHeaderValue);
        } else {
            LOGGER.warn("No message header containing the client remote address found in request!");
        }

        return MessageBuilder
            .withPayload(theResponsePayload)
            /*
             * All headers from the request are copied to the response. No need to remove
             * any headers as the HTTP header mapper will take care of not mapping any message
             * headers that it does not have in its list of outbound header names to HTTP headers.
             */
            .copyHeaders(inRequestMessage.getHeaders())
            .build();
    }
}

I am going to assume little or no previous experience with Spring Integration, so if you are experienced in the area please bear with me.

  • As any class containing Spring Java configuration, this class is annotated with @Configuration.
  • In addition, the configuration class is also annotated with @EnableWebFlux and @EnableIntegration.
    @EnableWebFlux imports the default configuration for Spring Reactive Web, which is similar to Spring MVC but with support for reactive programming.
    @EnableIntegration enables the use of Spring Integration in the example program by creating a number of Spring beans, adding several post-processors and annotation processors etc.
  • The method webfluxInboundEndpoint creates the webflux inbound endpoint Spring bean.
    The endpoint is created and configured to accept only POST request, accept all media types in requests, produce plaintext responses and receive requests at the “/webflux” path.
    In addition, the endpoint is configured to post a message on the Spring Integration message channel with the name webfluxInboundEndpointRequestChannel for every request received.
  • The webfluxInboundEndpoint method takes a parameter of the type HeaderMapper<HttpHeaders>.
    When the endpoint has received a request and is to create a Spring Integration message from the request, this header mapper will be used to determine which HTTP headers from the request that will cause message headers to be created. In a similar fashion, the header mapper will also come into effect when a HTTP response is to be created from a Spring Integration message in order to determine which HTTP headers that are to be created in the HTTP response given the Spring Integration message headers.
  • The method requestHeaderMapper creates the header mapper Spring bean.
    This bean will be injected into the webflux inbound endpoint bean and has the functionality as described above.
    Note how a string array of inbound HTTP header names to map to message headers is created that contains the standard inbound HTTP headers, using the single constant DefaultHttpHeaderMapper.HTTP_REQUEST_HEADER_NAME_PATTERN, and adding our own remote address header name. This allows you to rely on Spring Integration to decide which HTTP headers to map as default and add your own HTTP headers that are to be mapped.
  • There is a method named handleRequest which is not annotated with @Bean.
    This method does not create a Spring bean, but it is annotated with the @ServiceActivator annotation, in which the inputChannel is set to the webfluxInboundEndpointRequestChannel. Being a service activator method, this method will be invoked as soon as there is a message available on the webfluxInboundEndpointRequestChannel message channel. The message from the message channel is sent as a parameter to the handleRequest method.
    If this would have been a larger program, I would have separated this method into a class of its own and created a Spring bean of that class.
  • The handleRequest method will, for each request received, examine the message headers to determine whether there is a message header with the name Remote-Addr. If there is such a header, then the value of the header will become the payload of the response message. Otherwise the response payload will be “Uh-oh, no remote address found!”.

We do not need to do anything except just create the above configuration class in the appropriate package in src/main/java in the example Spring Boot project and it will be picked up when the application starts.

First Run

Let’s take the example program out for a first test-run. I will use Postman to send requests to the webflux inbound endpoint.

  • Start the Spring Boot application.
    If all goes according to plan, you should see log output in the console which ends with lines similar to the ones below. Note especially that Netty has been started and on which port (8080). Also note the path (/webflux), the method (POST), the consumes (*/*) and the produces (text/plain) values for the endpoint.
2017-10-11 22:03:29.642 INFO 820 --- [ main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/webflux],methods=[POST],consumes=[*/*],produces=[text/plain]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2017-10-11 22:03:29.709 INFO 820 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2017-10-11 22:03:29.709 INFO 820 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2017-10-11 22:03:29.714 INFO 820 --- [ main] addressWebfluxinboundendpointApplication : Started RemoteaddressWebfluxinboundendpointApplication in 1.625 seconds (JVM running for 2.015)
  • Send a POST request to http://localhost:8080/webflux.
Using Postman to send a request to the example program - first attempt.

Using Postman to send a request to the example program – first attempt.

In the lower right corner of the above picture from Postman, you can see the result of my POST request. Apparently no HTTP header containing the client remote address was found in the request. If we look at the console log, the entire request Spring Integration message is logged:

WebFlux inbound endpoint received a request message: [Payload String content=Hello WebFlux Endpoint!][Headers={content-length=23, http_requestMethod=POST, errorChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@2854fdb7, Accept=*/*, User-Agent=PostmanRuntime/6.4.0, Connection=keep-alive, Host=localhost:8080, replyChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@2854fdb7, http_requestUrl=http://localhost:8080/webflux, id=cb7b0d7f-e4a0-32b1-1a17-7ed5aafbc66f, cache-control=no-cache, contentType=text/plain, accept-encoding=gzip, deflate, timestamp=1507666421012}]

We can see that the message contain the following headers:

  • content-length
  • http_requestMethod
  • errorChannel
  • Accept
  • User-Agent
  • Connection
  • Host
  • replyChannel
  • http_requestUrl
  • id
  • cache-control
  • contentType
  • accept-encoding
  • timestamp

The Host header contains the server endpoint address and port. The http_requestUrl header contains the URI of the request, that is, the URI to which the originator of the request sent the request.
The headers that have the prefix “http_” are Spring Integration HTTP headers defined in the org.springframework.integration.http.HttpHeaders class.
None of the headers contain the address of the client that sent the request.

A New WebFlux Inbound Endpoint

We have thus seen that the webflux inbound endpoint in Spring Integration 5 will not set the client remote address on the message. For the curious, the code that would be a candidate to set such a header is located in the buildMessage method in the WebFluxInboundEndpoint class. Regretfully this method is private so it cannot be overridden. Going up the method call-chain from buildMessage, the first method that can be overridden is the handle method. Implementing a subclass of WebFluxInboundEndpoint that overrides this method to store the remote address of the client sending the request in a header on the request may look like this:

package se.ivankrizsan.springintegration.remoteaddresswebfluxinboundendpoint;

import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.util.Assert;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import java.net.InetSocketAddress;

/**
 * WebFlux inbound endpoint that inserts the remote address of a client into
 * a HTTP header in the request, when one is available.
 *
 * @author Ivan Krizsan
 */
public class RemoteExtractingWebFluxInboundEndpoint extends WebFluxInboundEndpoint {
    /* Constant(s): */
    /** Default remote address header. */
    public final static String REMOTE_ADDRESS_HEADER = "Remote-Addr";

    /* Instance variable(s): */
    protected String mRemoteAddressHeader = REMOTE_ADDRESS_HEADER;

    /**
     * Handles a request.
     * This method stores the remote address of the client having sent the request in
     * a HTTP header in the request if the remote address is available.
     * Further processing of requests is delegated to the superclass.
     *
     * @param inServerWebExchange Current server exchange.
     * @return To indicate when request handling is complete.
     */
    @Override
    public Mono<Void> handle(final ServerWebExchange inServerWebExchange) {
        ServerWebExchange theServerWebExchange = inServerWebExchange;

        final InetSocketAddress theRemoteAddress = inServerWebExchange
            .getRequest()
            .getRemoteAddress();

        /* Only insert the remote address header if a remote address is indeed available. */
        if (theRemoteAddress != null) {
            /*
             * Need to create a mutable wrapper around the ServerHttpRequest
             * since the original ServerHttpRequest object is immutable.
             */
            final ServerHttpRequest theNewServerHttpRequest = inServerWebExchange
                .getRequest()
                .mutate()
                .header(mRemoteAddressHeader, theRemoteAddress.toString())
                .build();

            /*
             * In the same way as with the ServerHttpRequest a mutable wrapper also need
             * to be created around the ServerWebExchange object in order to be able to
             * set the new ServerHttpRequest on the ServerWebExchange.
             */
            theServerWebExchange = inServerWebExchange
                .mutate()
                .request(theNewServerHttpRequest)
                .build();
        }

        return super.handle(theServerWebExchange);
    }

    public String getRemoteAddressHeader() {
        return mRemoteAddressHeader;
    }

    public void setRemoteAddressHeader(final String inRemoteAddressHeader) {
        Assert.hasText(inRemoteAddressHeader,
            "Remote address header name must not be null or empty.");
        mRemoteAddressHeader = inRemoteAddressHeader;
    }
}

Note that:

  • The name of the header that will contain the client remote address can be modified.
    The default name is “Remote-Addr”, as defined in the constant REMOTE_ADDRESS_HEADER.
  • In the handle method, the remote client address is retrieved from the request (type ServerHttpRequest) object in the server web exchange object.
    As before, if you run a test in your Spring Boot application the application will not start a full-blown web server which will result in the remote address not being set on the request object. Thus the code does not assume that the remote address always is available.
  • The ServerHttpRequest object from the server web exchange object, as well as the server web exchange object itself (type ServerWebExchange) are immutable.
    Fortunately the good people developing these classes have taken into account that someone may want to modify objects of these classes and created a nifty mechanism with which the original, immutable, object is wrapped by a decorator object that allows for, for instance, setting of request headers.
  • Using the wrapping mechanism described above, we can first create a request decorator that wraps the original request object and then set the remote address header to the string representation of the client remote address.
  • Again using the wrapping mechanism, a wrapper for the original server web exchange is created, in order to be able to set the new server HTTP request object on the server web exchange.
  • Finally, the remaining processing of a request is left to the superclass.
  • I have also added getter and setter methods for the remote address header name.
    These methods are not used in this example.

Configuration Modification

In order for the new inbound endpoint type to be used, we need to make one single modification to the configuration file that was implemented earlier.

  • In the RemoteAddressWebFluxInboundEndpointConfiguration class, modify the row highlighted in the code-snippet below.
    @Bean
    WebFluxInboundEndpoint webfluxInboundEndpoint(
        final HeaderMapper<HttpHeaders> inInboundHeaderMapper) {
        final WebFluxInboundEndpoint theInboundEndpoint =
            new RemoteExtractingWebFluxInboundEndpoint();

Second Run

With the new inbound endpoint class and the configuration modification in place, we are now ready to run the application again and see if its behaviour has changed.

  • Start the Spring Boot application.
    The log output in the console should be almost identical to the output seen with the first run.
  • Send a POST request to http://localhost:8080/webflux.
    In Postman I can observe this:
Using Postman to send a request to the example program - second attempt.

Using Postman to send a request to the example program – second attempt.

This time it looks like there was indeed some kind of address found, but it looks not quite what I was expecting. The address we see is a IPv6 address with a port number appended. Nevertheless it is a success!
If we examine the console log related to the request, we see:

2017-10-13 06:28:08.720  INFO 711 --- [ctor-http-nio-3] dressWebFluxInboundEndpointConfiguration : WebFlux inbound endpoint received a request message: [Payload String content=Hello WebFlux Endpoint!][Headers={content-length=23, http_requestMethod=POST, errorChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@40cca244, Accept=*/*, User-Agent=PostmanRuntime/6.4.0, Connection=keep-alive, Host=localhost:8080, replyChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@40cca244, http_requestUrl=http://localhost:8080/webflux, id=242b11b7-71b6-c2ef-f79a-2ed55013c95f, cache-control=no-cache, contentType=text/plain, accept-encoding=gzip, deflate, Remote-Addr=/0:0:0:0:0:0:0:1:49329, timestamp=1507868888691}]
2017-10-13 06:28:08.720  INFO 711 --- [ctor-http-nio-3] dressWebFluxInboundEndpointConfiguration : Message header in request contained the remote client address: /0:0:0:0:0:0:0:1:49329

In the request message, there now indeed is a header named Remote-Addr and it has the value /0:0:0:0:0:0:0:1:49329.

  • Send a POST request to http://127.0.0.1:8080/webflux.
    In Postman I now see the response below.
Using Postman to send another request to the example program - second attempt.

Using Postman to send another request to the example program – second attempt.

Looking at the console log:

2017-10-13 06:28:47.166  INFO 711 --- [ctor-http-nio-4] dressWebFluxInboundEndpointConfiguration : WebFlux inbound endpoint received a request message: [Payload String content=Hello WebFlux Endpoint!][Headers={content-length=23, http_requestMethod=POST, errorChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@2371f3b3, Accept=*/*, User-Agent=PostmanRuntime/6.4.0, Connection=keep-alive, Host=127.0.0.1:8080, replyChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@2371f3b3, http_requestUrl=http://127.0.0.1:8080/webflux, id=359f0a97-2c20-6b3a-e181-2d071c649c91, cache-control=no-cache, contentType=text/plain, accept-encoding=gzip, deflate, Remote-Addr=/127.0.0.1:49332, timestamp=1507868927166}]
2017-10-13 06:28:47.166  INFO 711 --- [ctor-http-nio-4] dressWebFluxInboundEndpointConfiguration : Message header in request contained the remote client address: /127.0.0.1:49332

Testing

What about testing when you expect the header containing a remote client address to be set then?
Implement the following test in the example project:

package se.ivankrizsan.springintegration.remoteaddresswebfluxinboundendpoint;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.BodyInserters;

/**
 * Test of the {@code RemoteExtractingWebFluxInboundEndpoint}.
 *
 * @author Ivan Krizsan
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RemoteAddressWebFluxInboundEndpointTests {
    @Autowired
    protected ApplicationContext mApplicationContext;
    protected WebTestClient mWebTestClient;

    /**
     * Prepares for each test.
     */
    @Before
    public void setup() {
        /*
         * Note that the web test client must bind to the application context, since
         * no real server is started when the tests are run.
         */
        mWebTestClient = WebTestClient
            .bindToApplicationContext(mApplicationContext)
            .build();
    }

    /**
     * Tests sending a POST request to the WebFlux inbound endpoint.
     * Expected result: There should be a response from the endpoint with a payload
     * that contains the client remote address.
     */
    @Test
    public void sendPostRequestTest() {
        final FluxExchangeResult<String> theResponseMessage = mWebTestClient
            .post()
            .uri("/webflux")
            .accept(MediaType.TEXT_PLAIN)
            .contentType(MediaType.TEXT_PLAIN)
            .body(BodyInserters.fromObject("Hello WebFlux Endpoint, testing!"))
            .exchange()
            .expectStatus()
            .isOk()
            .returnResult(String.class);

        final String theResponsePayload = theResponseMessage.getResponseBody().blockFirst();

        Assert.assertTrue("The response should contain an address, but it contains: "
            + theResponsePayload,
            theResponsePayload != null
                && !theResponsePayload.contains("no remote address found"));
    }
}

Note that:

  • In the setup method, a web test client is created that binds to the Spring application context.
    When the example program is started from a test, a server will not be started and the endpoint will not be exposed outside of the application context.
    The curious can verify this by introducing a long delay in the test and then attempt to connect to the http://localhost:8080/webflux URL using, for instance, Postman.
  • The sendPostRequestTest method sends a POST request containing a greeting to the webflux inbound endpoint.
    In addition, it expects a response having the HTTP status 200 and with a payload that does not contain the string “no remote address found”.

If we now run the test, it will fail and the following message should be displayed in the console:

2017-10-13 05:46:55.798  INFO 4954 --- [           main] dressWebFluxInboundEndpointConfiguration : WebFlux inbound endpoint received a request message: [Payload String content=Hello WebFlux Endpoint, testing!][Headers={http_requestMethod=POST, replyChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@52cb4f50, errorChannel=org.springframework.integration.gateway.MessagingGatewaySupport$FutureReplyChannel@52cb4f50, Accept=text/plain, http_requestUrl=/webflux, id=585038eb-b764-2859-760a-8311f6638ada, contentType=text/plain, timestamp=1507866415781}]
2017-10-13 05:46:55.798  WARN 4954 --- [           main] dressWebFluxInboundEndpointConfiguration : No message header containing the client remote address found in request!

java.lang.AssertionError: The response should contain an address, but it contains: Uh-oh, no remote address found!

We can see that, first and foremost, the request did not contain a Remote-Addr header. Second, there is a warning about no client remote address found in the request and finally we see that the response payload was “Uh-oh, no remote address found!”.

To make the test pass, modify the sendPostRequestTest method and add a header to the request:

    @Test
    public void sendPostRequestTest() {
        final FluxExchangeResult<String> theResponseMessage = mWebTestClient
            .post()
            .uri("/webflux")
            .accept(MediaType.TEXT_PLAIN)
            .contentType(MediaType.TEXT_PLAIN)
            .body(BodyInserters.fromObject("Hello WebFlux Endpoint, testing!"))
            .header(RemoteExtractingWebFluxInboundEndpoint.REMOTE_ADDRESS_HEADER, "/127.1.2.3:1234")
            .exchange()
            .expectStatus()
            .isOk()
            .returnResult(String.class);

        final String theResponsePayload = theResponseMessage.getResponseBody().blockFirst();

        Assert.assertTrue("The response should contain an address, but it contains: "
            + theResponsePayload,
            theResponsePayload != null
                && !theResponsePayload.contains("no remote address found"));
    }

If we now run the test, it should pass and you should see line of log written to the console:

Message header in request contained the remote client address: /127.1.2.3:1234

Thus you can test code that depends on the remote address header being present but you will have to set it yourself. This does have the advantage that you can test different scenarios that depend on the client remote address.

Happy coding!

2 thoughts on “Spring Integration 5: WebFluxInboundEndpoint and Remote Client Address

  1. Phuong D. Pham

    Hello, how can I pass path variable to channel?
    E.g. /countries/province/{provinceCode}

    and pass provinceCode to the channel input with inboundChannel.
    Thanks,

    Reply
    1. Ivan Krizsan Post author

      Hello!
      If you change the inbound endpoint path pattern to, for example this:
      “/webflux/{pathParam}”
      then you can retrieve the value of the path parameter in the endpoint’s handle method using the following code:
      final Map thePathParams =
      (Map) inServerWebExchange.getAttributes().get(
      HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
      System.out.println(“Path param value: ” + thePathParams.get(“pathParam”));

      The you can do whatever you like with the path parameter value – pass it as metadata or payload of a message that you send to a message channel etc. Hope this helps.
      Happy coding!

      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *