Get familiar with the RabbitMQ and Spring Boot AMQP and its features.

Time: 20 minutes.

Part 1: Code Snippet Manager AMQP

You will continue with the Code Snippet Manager code. For this lab you will need the snippet-code-manager-actuator project code.

There are new requirements for the Code Snippet Manager:

  • Receive code snippets using RabbitMQ:

    • The communication should be as RPC (emulate synchronous communication) from/to a client.

    • Accept messages for save, update and delete code snippets.

    • Create Listeners for two queues: snippet.upsert and snippet.remove.

    • For every message that process, send a Snippet Notification to the snippet.notifications queue.

  • Should you be able to create the next topology:

    RabbitMQ Topology

    Topology

  1. Open a browser and hit the url: http://start.spring.io

  2. Click the Switch to the full version link.

  3. Fill out the Code Snippet Manager AMQP Project metadata with (See Figure 1.0):

    Table 1. Code Snippet Manager AMQP - metadata
    Property Value

    Group:

    io.pivotal.workshop

    Artifact:

    code-snippet-manager-amqp

    Name:

    code-snippet-manager-amqp

    Package Name:

    io.pivotal.workshop.snippet

    Dependencies:

    Web, DevTools, Groovy Templates, JPA, Rest Repositories, H2, MySQL, Actuator, HATEOAS, AMQP

    Spring Boot:

    2.0.0.M7

    Figure 1.0: Spring Initializr - http://start.spring.io/

    SpringInitializr

    Tip
    You can choose either Maven or Gradle project types.
  4. Type Web, DevTools, Groovy Templates, JPA, Rest Repositories, H2, MySQL, Actuator, HATEOAS and AMQP in the Dependencies field and press Enter.

  5. Click the Generate Project button.

  6. Unzip the file in any directory you want.

  7. Import your project in any IDE you want.

  8. Copy all the packages (with code) into the new project.

  9. The packages: domain, actuator, controller, repository and config will remain with no change.

  10. Next, start by adding the Listeners that will be using the queues, create the SnippetAmqpListener class in the io.pivotal.workshop.snippet.amqp package:

    io.pivotal.workshop.snippet.amqp.SnippetAmqpListener.java
    package io.pivotal.workshop.snippet.amqp;
    
    import java.util.Date;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Component;
    
    import io.pivotal.workshop.snippet.domain.Snippet;
    import io.pivotal.workshop.snippet.domain.SnippetNotification;
    import io.pivotal.workshop.snippet.repository.SnippetRepository;
    
    @Component
    public class SnippetAmqpListener {
    
            private SnippetRepository repo;
            public SnippetAmqpListener(SnippetRepository repo){
                    this.repo = repo;
            }
    
    
            @RabbitListener(queues={"snippet.upsert"})
            @SendTo
            public SnippetNotification save(Snippet snippet){
                    Snippet result = this.repo.save(snippet);
                    return new SnippetNotification(result.getId(),"SAVE",new Date(),null);
            }
    
            @RabbitListener(queues={"snippet.remove"})
            @SendTo
            public SnippetNotification delete(Snippet snippet){
                    this.repo.deleteById(snippet.getId());
                    return new SnippetNotification(snippet.getId(),"DELETE",new Date(),null);
            }
    
    }

    In the code above we are using the @RabbitListener and @SendTo annotations. There are two methods that handle the messages (receiving a Snippet) from the two queues. Also notice that both methods return a new SnippetNotification. Actually the @SendTo annotation will be the mechanism to reply that notification to the client.

  11. Create in the domain package the SnippetNotification and SnippetError classes:

    io.pivotal.workshop.snippet.domain.SnippetNotification.java
    package io.pivotal.workshop.snippet.domain;
    
    import java.util.Date;
    
    public class SnippetNotification {
    
            private String snippetId;
            private String action;
            private Date processed;
            private SnippetError error;
    
            public SnippetNotification() {
            }
    
            public SnippetNotification(String snippetId, String action, Date processed, SnippetError error) {
                    super();
                    this.snippetId = snippetId;
                    this.action = action;
                    this.processed = processed;
                    this.error = error;
            }
    
            public String getSnippetId() {
                    return snippetId;
            }
    
            public void setSnippetId(String snippetId) {
                    this.snippetId = snippetId;
            }
    
            public String getAction() {
                    return action;
            }
    
            public void setAction(String action) {
                    this.action = action;
            }
    
            public Date getProcessed() {
                    return processed;
            }
    
            public void setProcessed(Date processed) {
                    this.processed = processed;
            }
    
            public SnippetError getError() {
                    return error;
            }
    
            public void setError(SnippetError error) {
                    this.error = error;
            }
    
    }
    io.pivotal.workshop.snippet.domain.SnippetError.java
    package io.pivotal.workshop.snippet.domain;
    
    public class SnippetError {
    
            private String message;
            private String errorCode;
    
            public SnippetError() {
            }
    
            public SnippetError(String message, String errorCode) {
                    this.message = message;
                    this.errorCode = errorCode;
            }
    
            public String getMessage() {
                    return message;
            }
    
            public void setMessage(String message) {
                    this.message = message;
            }
    
            public String getErrorCode() {
                    return errorCode;
            }
    
            public void setErrorCode(String errorCode) {
                    this.errorCode = errorCode;
            }
    }
  12. Next let’s create the configuration that will be use to convert the incoming messages into a Snippet instances and also let’s wire up the topology we are going to need. Create the SnippetAmqpConfig class in the io.pivotal.workshop.snippet.config package:

    io.pivotal.workshop.snippet.config.SnippetAmqpConfig.java
    package io.pivotal.workshop.snippet.config;
    
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.Binding.DestinationType;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class SnippetAmqpConfig {
    
            private final String SNIPPET_EXCHANGE = "snippet.manager";
    
            @Bean
            public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
                    SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
                    container.setConnectionFactory(connectionFactory);
                    container.setMessageConverter(new Jackson2JsonMessageConverter());
                    return container;
            }
    
            @Bean
            public DirectExchange directExchange(){
                    return new DirectExchange(SNIPPET_EXCHANGE, true, false);
            }
    
            @Bean
            public Queue upsert(){
                    return new Queue("snippet.upsert");
            }
    
            @Bean
            public Queue remove(){
                    return new Queue("snippet.remove");
            }
    
            @Bean
            public CommandLineRunner queuesAndBindings(AmqpAdmin admin){
                    return args -> {
                        admin.declareBinding(new Binding("snippet.upsert", DestinationType.QUEUE, SNIPPET_EXCHANGE, "snippet.save", null));
                            admin.declareBinding(new Binding("snippet.remove", DestinationType.QUEUE, SNIPPET_EXCHANGE, "snippet.delete", null));
                            admin.declareBinding(new Binding("snippet.upsert", DestinationType.QUEUE, SNIPPET_EXCHANGE, "snippet.update", null));
                    };
            }
    }

    In the above code we need to create a SimpleRabbitListenerContainerFactory so it can any JSON format into our Snippet class. In other words, the queues will receive a snippet as JSON object. Also we are creating the Exchange and the Queues, and after the application is about to start we Bind the exchange with all the queues by specifying the Routing Keys.

  13. Now you can run the code and take a look at the RabbitMQ Web Console by going to http://localhost:15672 and see the exchange and queues created and the consumers listening to the queues.

Tip
The username/password for the RabbitMQ console is: guest/guest.

Part 2: Snippet AMQP Client

The idea of this application is be able to send snippets to be saved or delete, and receive a notification.

  1. Open a browser and hit the url: http://start.spring.io

  2. Click the Switch to the full version link.

  3. Fill out the Snippet AMQP Client Project metadata with (See Figure 2.0):

    Table 2. Snippet AMQP Client - metadata
    Property Value

    Group:

    io.pivotal.workshop

    Artifact:

    snippet-amqp-client

    Name:

    snippet-amqp-client

    Package Name:

    io.pivotal.workshop.snippet.client

    Dependencies:

    DevTools, AMQP

    Figure 2.0: Spring Initializr - http://start.spring.io/

    SpringInitializr

    Tip
    You can choose either Maven or Gradle project types.
  4. Type Web, DevTools, Groovy Templates, JPA, Rest Repositories, H2, MySQL, Actuator, HATEOAS and AMQP in the Dependencies field and press Enter.

  5. Click the Generate Project button.

  6. Unzip the file in any directory you want.

  7. Import your project in any IDE you want.

  8. Lets start by creating the domains will use, create the following classes in the io.pivotal.workshop.snippet.client.domain package:

    io.pivotal.workshop.snippet.client.domain.Code.java
    package io.pivotal.workshop.snippet.client.domain;
    
    public class Code {
    
            private String source;
    
            public Code() {
            }
    
            public Code(String source) {
                    this.source = source;
            }
    
            public String getSource() {
                    return source;
            }
    
            public void setSource(String source) {
                    this.source = source;
            }
    
    }
    io.pivotal.workshop.snippet.client.domain.Language.java
    package io.pivotal.workshop.snippet.client.domain;
    
    public class Language {
    
            private String name;
            private String syntax = "text";
    
            public Language() {
            }
    
            public Language(String name) {
                    this();
                    this.name = name;
            }
    
            public Language(String name, String syntax) {
                    this(name);
                    this.syntax = syntax;
            }
    
            public String getName() {
                    return name;
            }
    
            public void setName(String name) {
                    this.name = name;
            }
    
            public String getSyntax() {
                    return syntax;
            }
    
            public void setSyntax(String syntax) {
                    this.syntax = syntax;
            }
    }
    io.pivotal.workshop.snippet.client.domain.Snippet.java
    package io.pivotal.workshop.snippet.client.domain;
    
    public class Snippet {
    
            private String title;
            private String keywords = "";
            private Language lang;
            private Code code;
    
            public Snippet() {
            }
    
            public Snippet(String title, String keywords, String description, Language lang, Code code) {
                    this.title = title;
                    this.keywords = keywords;
                    this.lang = lang;
                    this.code = code;
            }
    
            public Snippet(String title, Language lang, Code code) {
                    this(title, "", "", lang, code);
            }
    
            public String getTitle() {
                    return title;
            }
    
            public void setTitle(String title) {
                    this.title = title;
            }
    
            public Language getLang() {
                    return lang;
            }
    
            public void setLang(Language lang) {
                    this.lang = lang;
            }
    
            public String getKeywords() {
                    return keywords;
            }
    
            public void setKeywords(String keywords) {
                    this.keywords = keywords;
            }
    
            public Code getCode() {
                    return code;
            }
    
            public void setCode(Code code) {
                    this.code = code;
            }
    }
    io.pivotal.workshop.snippet.client.domain.SnippetError.java
    package io.pivotal.workshop.snippet.client.domain;
    
    public class SnippetError {
    
            private String message;
            private String errorCode;
    
            public SnippetError() {
            }
    
            public SnippetError(String message, String errorCode) {
                    this.message = message;
                    this.errorCode = errorCode;
            }
    
            public String getMessage() {
                    return message;
            }
    
            public void setMessage(String message) {
                    this.message = message;
            }
    
            public String getErrorCode() {
                    return errorCode;
            }
    
            public void setErrorCode(String errorCode) {
                    this.errorCode = errorCode;
            }
    }
    io.pivotal.workshop.snippet.client.domain.SnippetNotification.java
    package io.pivotal.workshop.snippet.client.domain;
    
    import java.util.Date;
    
    public class SnippetNotification {
    
            private String snipetId;
            private String action;
            private Date processed;
            private SnippetError error;
    
            public SnippetNotification() {
            }
    
            public SnippetNotification(String snipetId, String action, Date processed, SnippetError error) {
                    super();
                    this.snipetId = snipetId;
                    this.action = action;
                    this.processed = processed;
                    this.error = error;
            }
    
            public String getSnipetId() {
                    return snipetId;
            }
    
            public void setSnipetId(String snipetId) {
                    this.snipetId = snipetId;
            }
    
            public String getAction() {
                    return action;
            }
    
            public void setAction(String action) {
                    this.action = action;
            }
    
            public Date getProcessed() {
                    return processed;
            }
    
            public void setProcessed(Date processed) {
                    this.processed = processed;
            }
    
            public SnippetError getError() {
                    return error;
            }
    
            public void setError(SnippetError error) {
                    this.error = error;
            }
    
            @Override
            public String toString() {
                    return "SnippetNotification [snipetId=" + snipetId + ", action=" + action + ", processed=" + processed
                                    + ", error=" + error + "]";
            }
    
    }
  9. Next, lets create a producer that will send the messages to the exchange, create the SnippetProducer class in the io.pivotal.workshop.snippet.client.amqp package:

    io.pivotal.workshop.snippet.client.amqp.SnippetProducer.java
    package io.pivotal.workshop.snippet.client.amqp;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    import io.pivotal.workshop.snippet.client.domain.Snippet;
    
    @Component
    public class SnippetProducer {
    
            public enum SnippetAction {
                    SAVE("snippet.save"), DELETE("snippet.delete"), UPDATE("snippet.update");
    
                    private final String routingKey;
                    private SnippetAction(String routingKey){
                            this.routingKey = routingKey;
                    }
    
                    public String getRoutingKey(){
                            return this.routingKey;
                    }
            }
    
            private RabbitTemplate template;
    
            public SnippetProducer(RabbitTemplate template){
                    this.template = template;
            }
    
            public Object send(SnippetAction action,Snippet snippet){
                    return this.template.convertSendAndReceive(action.getRoutingKey(),snippet);
            }
    
    }

    Here we are using the RabbitTemplate and it’s method convertSendAndReceive (here we are doing the RPC - emulating the synchronous communication).

  10. Because we are doing an RPC, we are getting back from the Code Snippet Manager AMQP app the SnippetNotification (as JSON), so we need to configure the RabbitTemplate to support this conversion. Create the SnippetClientAmqpConfig in the io.pivotal.workshop.snippet.client.config package:

    io.pivotal.workshop.snippet.client.config.SnippetClientAmqpConfig.java
    package io.pivotal.workshop.snippet.client.config;
    
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper;
    import org.springframework.amqp.support.converter.Jackson2JavaTypeMapper;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import com.fasterxml.jackson.databind.JavaType;
    
    @Configuration
    public class SnippetClientAmqpConfig {
    
            private final String SNIPPET_EXCHANGE = "snippet.manager";
    
            @Bean
            public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
                    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
                    Jackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper() {
    
                            @Override
                            public JavaType toJavaType(MessageProperties properties) {
                                    properties.setHeader("__TypeId__", "io.pivotal.workshop.snippet.client.domain.SnippetNotification");
                                    return super.toJavaType(properties);
                            }
    
                    };
                    converter.setJavaTypeMapper(mapper);
    
                    RabbitTemplate template = new RabbitTemplate(connectionFactory);
                    template.setMessageConverter(converter);
                    template.setExchange(SNIPPET_EXCHANGE);
                    return template;
            }
    
    }

    Its important to notice that we are using the same converter as before, the Jackson2JsonMessageConverter, that practically will map the JSON object to the SnippetNotification. Here we are adding a little trick, a special header: TypeId that will be use as reference for the mapping, our own SnippetNotification.

    Tip
    If you want to know more about this solution, ask the instructor to explain a little further. Also you can take a look at the documentation: http://docs.spring.io/spring-amqp/reference/html/_reference.html#message-converters
  11. Now its time to send some Snippet. Create the SnippetClientConfig class in the io.pivotal.workshop.snippet.client.config package:

    io.pivotal.workshop.snippet.client.config.SnippetClientConfig.java
    package io.pivotal.workshop.snippet.client.config;
    
    import java.nio.file.Files;
    import java.nio.file.Paths;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import io.pivotal.workshop.snippet.client.amqp.SnippetProducer;
    import io.pivotal.workshop.snippet.client.amqp.SnippetProducer.SnippetAction;
    import io.pivotal.workshop.snippet.client.domain.Code;
    import io.pivotal.workshop.snippet.client.domain.Language;
    import io.pivotal.workshop.snippet.client.domain.Snippet;
    
    @Configuration
    public class SnippetClientConfig {
    
            private final Logger log = LoggerFactory.getLogger("SNIPPET-SENDER");
    
            @Bean
            public CommandLineRunner sendSnippets(SnippetProducer producer){
                    return args -> {
                            Object obj = producer.send(SnippetAction.SAVE, new Snippet("Hello World", new Language("Kotlin", "java"),new Code(new String(Files.readAllBytes(Paths.get("code/Hello.kt"))))));
                            log.info(obj.toString());
                    };
            }
    }

    Here we are using the Snippet producer to send the Snippet message.

  12. Now you can run the application, and you should get your response back, a SnippetNotification object.

Challenges

As you already know we havent implemented some of the requirements:

  • Create the snippet.notifications queue and bind it to the snippet.manager exchange.

  • The Code Snippet Manager AMQP app needs to send a notification every time it receives a Snippet:

    • Without modifying the SnippetAmqpListener add the behavior to send a SnippetNotification to the snippet.notifications queue.

      Tip
      Use AOP for this particular concern and use the RabbitTemplate to send a asynchronous notification.
  • Make the Snippet AMQP Client app listen for any notification.