viernes, 10 de octubre de 2014

Ejemplo: productor y consumidor de mensajes con ActiveMQ


Vamos a ver un ejemplo de un escenario donde podría ser interesante el uso de ActiveMQ.

Supongamos que tenemos una aplicación (o varias) con un elevado número de usuarios. En dicha aplicación queremos poder "trazar" la actividad del usuario, o lo que es lo mismo, queremos saber cómo interactúa el usuario con la aplicación para que, posteriormente, el departamento de negocio pueda explotar esa información.

Como hemos dicho, la aplicación tiene un elevado número de usuarios por lo que se decide que el responsable de procesar y almacenar la información de la actividad de los usuarios sea otra aplicación. De esta forma liberamos a la aplicación principal de carga de trabajo.

Para implementar esta solución haremos uso de ActiveMQ. Cada vez que la aplicación principal detecte una acción de un usuario (ej: cuando el usuario vaya a "Opciones de configuración") enviará un mensaje a nuestro intermediario de mensajes con la información de dicha acción.

Estos mensajes quedarán almacenados en nuestro broker de mensajería a la espera de que la aplicación que procesa los datos los consuma para su posterior tratamiento. Si NO se necesitase un tratamiento instantáneo de la información, podría ser un proceso nocturno quien se encargase de esto (se presupone que la cantidad de mensajes generados es muy elevada).

Dicho esto, vamos a ver cómo creamos un productor y un consumidor de mensajes.

Productor de mensajes


En el código que viene a continuación podemos ver cómo un emisor de mensajes envía 20 mensajes a nuestro intermediario con la información de las acciones que realizan los usuarios.

Para este ejemplo es necesaria la librería activemq-all-X.X.X.jar. Viene incluida en la distribución de Apache ActiveMQ.

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.*;
import java.util.Random;
 
public class MessageSender {
 
    public enum UserAction {
 
        CONFIGURACION("IR A OPCIONES DE CONFIGURACION"),
        PORTADA("VER PORTADA"),
        LOGIN("ACCEDER A LA APLICACION"),
        SUGERENCIA("ENVIAR SUGERENCIA");
 
        private final String userAction;
 
        private UserAction(String userAction) {
            this.userAction = userAction;
        }
 
        public String getActionAsString() {
            return this.userAction;
        }
    }
 
    private static final Random RANDOM = new Random(System.currentTimeMillis());
 
    private static final String URL = "tcp://localhost:61616";
 
    private static final String USER = ActiveMQConnection.DEFAULT_USER;
 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
 
    private static final String DESTINATION_QUEUE = "APLICATION1.QUEUE";
 
    private static final boolean TRANSACTED_SESSION = true;
     
    private static final int MESSAGES_TO_SEND = 20;
 
    public void sendMessages() throws JMSException {
 
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
 
        final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
        final Destination destination = session.createQueue(DESTINATION_QUEUE);
 
        final MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
        sendMessages(session, producer);
        session.commit();
 
        session.close();
        connection.close();
 
        System.out.println("Mensajes enviados correctamente");
    }
 
    private void sendMessages(Session session, MessageProducer producer) throws JMSException {
        final MessageSender messageSender = new MessageSender();
        for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
            final UserAction userActionToSend = getRandomUserAction();
            messageSender.sendMessage(userActionToSend.getActionAsString(), session, producer);
        }
    }
 
    private void sendMessage(String message, Session session, MessageProducer producer) throws JMSException {
        final TextMessage textMessage = session.createTextMessage(message);
        producer.send(textMessage);
    }
 
    private static UserAction getRandomUserAction() {
        final int userActionNumber = (int) (RANDOM.nextFloat() * UserAction.values().length);
        return UserAction.values()[userActionNumber];
    }
 
    public static void main(String[] args) throws JMSException {
        final MessageSender messageSender = new MessageSender();
        messageSender.sendMessages();
    }
 
}


Si nos fijamos en el código observamos que lo que se está haciendo es enviar un número de mensajes con acciones del usuario al azar. No es exactamente lo que hemos descrito en el punto anterior pero creo que así se entiende mejor.

Con ActiveMQ arrancado lanzamos el ejemplo (método main) y vemos en la consola de administración cómo la cola de mensajes destino (APLICATION1.QUEUE) guarda la información de los mensajes que le acabamos de enviar.


Si además tenemos configurado que los mensajes se almacenen en una base de datos relacional (en nuestro caso MySQL) podemos ver cómo en la tabla "activemq_msgs" tenemos guardados nuestros mensajes.



En caso de que tuviésemos varias colas de mensajes (imaginemos varios productores donde cada uno envía mensajes a una cola), podemos ver la cantidad de mensajes de un vistazo con la consola de administración



consumidor de mensajes.


Nuestro consumidor procesará los todos mensajes que haya en la cola "a demanda" (cuando se ejecute). Si quisiésemos que procesase cada mensaje cuando llega a la cola (no es nuestro caso) deberíamos implementar la interface javax.jms.MessageListener y lanzar nuestro proceso como un hilo que se queda en espera (Thread).

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.*;
import java.util.HashMap;
import java.util.Map;
 
public class UserActionConsumer {
 
    private static final String URL = "tcp://localhost:61616";
 
    private static final String USER = ActiveMQConnection.DEFAULT_USER;
 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
 
    private static final String DESTINATION_QUEUE = "APLICATION1.QUEUE";
 
    private static final boolean TRANSACTED_SESSION = false;
 
    private static final int TIMEOUT = 1000;
 
    private final Map consumedMessageTypes;
 
    private int totalConsumedMessages = 0;
 
    public UserActionConsumer() {
        this.consumedMessageTypes = new HashMap();
    }
 
    public void processMessages() throws JMSException {
 
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
        final Connection connection = connectionFactory.createConnection();
 
        connection.start();
 
        final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
        final Destination destination = session.createQueue(DESTINATION_QUEUE);
        final MessageConsumer consumer = session.createConsumer(destination);
 
        processAllMessagesInQueue(consumer);
 
        consumer.close();
        session.close();
        connection.close();
 
        showProcessedResults();
    }
 
    private void processAllMessagesInQueue(MessageConsumer consumer) throws JMSException {
        Message message;
        while ((message = consumer.receive(TIMEOUT)) != null) {
            proccessMessage(message);
        }
    }
 
    private void proccessMessage(Message message) throws JMSException {
        if (message instanceof TextMessage) {
            final TextMessage textMessage = (TextMessage) message;
            final String text = textMessage.getText();
            incrementMessageType(text);
            totalConsumedMessages++;
        }
    }
 
    private void incrementMessageType(String message) {
        if (consumedMessageTypes.get(message) == null) {
            consumedMessageTypes.put(message, 1);
        } else {
            final int numberOfTypeMessages = consumedMessageTypes.get(message);
            consumedMessageTypes.put(message, numberOfTypeMessages + 1);
        }
    }
 
    private void showProcessedResults() {
        System.out.println("Procesados un total de " + totalConsumedMessages + " mensajes");
        for (String messageType : consumedMessageTypes.keySet()) {
            final int numberOfTypeMessages = consumedMessageTypes.get(messageType);
            System.out.println("Tipo " + messageType + " Procesados " + numberOfTypeMessages + " (" +
                    (numberOfTypeMessages * 100 / totalConsumedMessages) + "%)");
        }
    }
 
    public static void main(String[] args) throws JMSException {
        final UserActionConsumer userActionConsumer = new UserActionConsumer();
        userActionConsumer.processMessages();
    }
}


Como vemos, lo que hace nuestro consumidor es procesar los mensajes de la cola y mostrar el número de acciones de usuario de cada tipo. Con ActiveMQ arrancado lanzamos el ejemplo y el resultado es el siguiente.



Si consultásemos la consola de administración o la base de datos, veríamos que todos los mensajes de la cola han desaparecido puesto que ya han sido consumidos.

No hay comentarios:

Publicar un comentario