Mostrando entradas con la etiqueta JMS. Mostrar todas las entradas
Mostrando entradas con la etiqueta JMS. Mostrar todas las entradas

viernes, 10 de octubre de 2014

Ejemplo: productor y consumidor de mensajes con ActiveMQ


Vamos a ver un ejemplo de un escenario donde podría ser interesante el uso de ActiveMQ.

Supongamos que tenemos una aplicación (o varias) con un elevado número de usuarios. En dicha aplicación queremos poder "trazar" la actividad del usuario, o lo que es lo mismo, queremos saber cómo interactúa el usuario con la aplicación para que, posteriormente, el departamento de negocio pueda explotar esa información.

Como hemos dicho, la aplicación tiene un elevado número de usuarios por lo que se decide que el responsable de procesar y almacenar la información de la actividad de los usuarios sea otra aplicación. De esta forma liberamos a la aplicación principal de carga de trabajo.

Para implementar esta solución haremos uso de ActiveMQ. Cada vez que la aplicación principal detecte una acción de un usuario (ej: cuando el usuario vaya a "Opciones de configuración") enviará un mensaje a nuestro intermediario de mensajes con la información de dicha acción.

Estos mensajes quedarán almacenados en nuestro broker de mensajería a la espera de que la aplicación que procesa los datos los consuma para su posterior tratamiento. Si NO se necesitase un tratamiento instantáneo de la información, podría ser un proceso nocturno quien se encargase de esto (se presupone que la cantidad de mensajes generados es muy elevada).

Dicho esto, vamos a ver cómo creamos un productor y un consumidor de mensajes.

Productor de mensajes


En el código que viene a continuación podemos ver cómo un emisor de mensajes envía 20 mensajes a nuestro intermediario con la información de las acciones que realizan los usuarios.

Para este ejemplo es necesaria la librería activemq-all-X.X.X.jar. Viene incluida en la distribución de Apache ActiveMQ.

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.*;
import java.util.Random;
 
public class MessageSender {
 
    public enum UserAction {
 
        CONFIGURACION("IR A OPCIONES DE CONFIGURACION"),
        PORTADA("VER PORTADA"),
        LOGIN("ACCEDER A LA APLICACION"),
        SUGERENCIA("ENVIAR SUGERENCIA");
 
        private final String userAction;
 
        private UserAction(String userAction) {
            this.userAction = userAction;
        }
 
        public String getActionAsString() {
            return this.userAction;
        }
    }
 
    private static final Random RANDOM = new Random(System.currentTimeMillis());
 
    private static final String URL = "tcp://localhost:61616";
 
    private static final String USER = ActiveMQConnection.DEFAULT_USER;
 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
 
    private static final String DESTINATION_QUEUE = "APLICATION1.QUEUE";
 
    private static final boolean TRANSACTED_SESSION = true;
     
    private static final int MESSAGES_TO_SEND = 20;
 
    public void sendMessages() throws JMSException {
 
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
 
        final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
        final Destination destination = session.createQueue(DESTINATION_QUEUE);
 
        final MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
        sendMessages(session, producer);
        session.commit();
 
        session.close();
        connection.close();
 
        System.out.println("Mensajes enviados correctamente");
    }
 
    private void sendMessages(Session session, MessageProducer producer) throws JMSException {
        final MessageSender messageSender = new MessageSender();
        for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
            final UserAction userActionToSend = getRandomUserAction();
            messageSender.sendMessage(userActionToSend.getActionAsString(), session, producer);
        }
    }
 
    private void sendMessage(String message, Session session, MessageProducer producer) throws JMSException {
        final TextMessage textMessage = session.createTextMessage(message);
        producer.send(textMessage);
    }
 
    private static UserAction getRandomUserAction() {
        final int userActionNumber = (int) (RANDOM.nextFloat() * UserAction.values().length);
        return UserAction.values()[userActionNumber];
    }
 
    public static void main(String[] args) throws JMSException {
        final MessageSender messageSender = new MessageSender();
        messageSender.sendMessages();
    }
 
}


Si nos fijamos en el código observamos que lo que se está haciendo es enviar un número de mensajes con acciones del usuario al azar. No es exactamente lo que hemos descrito en el punto anterior pero creo que así se entiende mejor.

Con ActiveMQ arrancado lanzamos el ejemplo (método main) y vemos en la consola de administración cómo la cola de mensajes destino (APLICATION1.QUEUE) guarda la información de los mensajes que le acabamos de enviar.


Si además tenemos configurado que los mensajes se almacenen en una base de datos relacional (en nuestro caso MySQL) podemos ver cómo en la tabla "activemq_msgs" tenemos guardados nuestros mensajes.



En caso de que tuviésemos varias colas de mensajes (imaginemos varios productores donde cada uno envía mensajes a una cola), podemos ver la cantidad de mensajes de un vistazo con la consola de administración



consumidor de mensajes.


Nuestro consumidor procesará los todos mensajes que haya en la cola "a demanda" (cuando se ejecute). Si quisiésemos que procesase cada mensaje cuando llega a la cola (no es nuestro caso) deberíamos implementar la interface javax.jms.MessageListener y lanzar nuestro proceso como un hilo que se queda en espera (Thread).

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.*;
import java.util.HashMap;
import java.util.Map;
 
public class UserActionConsumer {
 
    private static final String URL = "tcp://localhost:61616";
 
    private static final String USER = ActiveMQConnection.DEFAULT_USER;
 
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
 
    private static final String DESTINATION_QUEUE = "APLICATION1.QUEUE";
 
    private static final boolean TRANSACTED_SESSION = false;
 
    private static final int TIMEOUT = 1000;
 
    private final Map consumedMessageTypes;
 
    private int totalConsumedMessages = 0;
 
    public UserActionConsumer() {
        this.consumedMessageTypes = new HashMap();
    }
 
    public void processMessages() throws JMSException {
 
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
        final Connection connection = connectionFactory.createConnection();
 
        connection.start();
 
        final Session session = connection.createSession(TRANSACTED_SESSION, Session.AUTO_ACKNOWLEDGE);
        final Destination destination = session.createQueue(DESTINATION_QUEUE);
        final MessageConsumer consumer = session.createConsumer(destination);
 
        processAllMessagesInQueue(consumer);
 
        consumer.close();
        session.close();
        connection.close();
 
        showProcessedResults();
    }
 
    private void processAllMessagesInQueue(MessageConsumer consumer) throws JMSException {
        Message message;
        while ((message = consumer.receive(TIMEOUT)) != null) {
            proccessMessage(message);
        }
    }
 
    private void proccessMessage(Message message) throws JMSException {
        if (message instanceof TextMessage) {
            final TextMessage textMessage = (TextMessage) message;
            final String text = textMessage.getText();
            incrementMessageType(text);
            totalConsumedMessages++;
        }
    }
 
    private void incrementMessageType(String message) {
        if (consumedMessageTypes.get(message) == null) {
            consumedMessageTypes.put(message, 1);
        } else {
            final int numberOfTypeMessages = consumedMessageTypes.get(message);
            consumedMessageTypes.put(message, numberOfTypeMessages + 1);
        }
    }
 
    private void showProcessedResults() {
        System.out.println("Procesados un total de " + totalConsumedMessages + " mensajes");
        for (String messageType : consumedMessageTypes.keySet()) {
            final int numberOfTypeMessages = consumedMessageTypes.get(messageType);
            System.out.println("Tipo " + messageType + " Procesados " + numberOfTypeMessages + " (" +
                    (numberOfTypeMessages * 100 / totalConsumedMessages) + "%)");
        }
    }
 
    public static void main(String[] args) throws JMSException {
        final UserActionConsumer userActionConsumer = new UserActionConsumer();
        userActionConsumer.processMessages();
    }
}


Como vemos, lo que hace nuestro consumidor es procesar los mensajes de la cola y mostrar el número de acciones de usuario de cada tipo. Con ActiveMQ arrancado lanzamos el ejemplo y el resultado es el siguiente.



Si consultásemos la consola de administración o la base de datos, veríamos que todos los mensajes de la cola han desaparecido puesto que ya han sido consumidos.

jueves, 9 de octubre de 2014

ActiveMQ para intercambio de mensajes

Apache ActiveMQ


Apache ActiveMQ es un componente MOM (Message Oriented Middleware), es decir, un intermediario (broker) de mensajes que usan dos o más sistemas o aplicaciones para intercambiar mensajes. Ofrece "Características empresariales" tales como clustering, múltiples almacenes para mensajes, así como la capacidad de emplear cualquier administrador de base de datos como proveedor de persistencia JMS, aparte de VM, caché y persistencia de jornales.


En este artículo veremos las características de ActiveMQ, un intermediario de mensajes entre diferentes aplicaciones o sistemas, se describen algunos de los escenarios donde puede ser interesante su uso. Claro está, veremos cómo se instala, cómo se configura y un ejemplo de funcionamiento.

Características principales


  • Es Código Open Source. 
  • Actúa como mediador de mensajes entre aplicaciones emisoras y receptoras.
  • Proporciona comunicación asíncrona entre aplicaciones.
  • Pese a ser una implementación de la especificación JMS (Java Message Service), proporciona una gran cantidad de APIs para diferentes lenguajes como PHP, C/C++, .Net, Ruby, etc.
  • Soporta diferentes protocolos de conexión como HTTP, TCP, SSL, etc.
  • Tiene una GUI de administración.

Instalación


La instalación de Apache ActiveMQ es sencilla.Primero debemos descargar el software de: http://activemq.apache.org/download.html.  Una vez descargado el fichero, debemos seguir los siguientes pasos:
  1. Descomprimir la distribución descargada en el directorio deseado. 
    • Para entornos Unix: tar zxvf apache-activemq-X.X.X-bin.tar.gz
    • Comprobar que el script de arranque de la aplicación tiene permisos de ejecución. Entramos en el directorio bin  que se encuentra dentro de la carpeta de instalación, y asignamos al script "activemq" permisos de ejecución. chmod 755 activemq
  2. Arrancamos Apache ActiveMQ de la siguiente forma: directorio_instalacion_activemq]/bin/activemq start 
    • También podemos "parar" la aplicación, "reiniciarla" o comprobar su "estado" con: stop, restart o status.
  3. Para comprobar que nuestro intermediario de mensajes ha arrancado correctamente: netstat -an|grep 61616, donde 61616 es el puerto por defecto.
También podemos acceder a la consola de administración en http://localhost:8161/admin/

Para que puedas acceder:

  • Usuario: admin
  • Password: admin

Configuración


La configuración del ActiveMQ se encuentra en un fichero activemq.xml localizado en el directorio [directorio_instalacion_activemq]/conf/. No obstante, se puede cambiar el fichero de configuración con el que arrancar ActiveMQ de la siguiente forma:

[directorio_instalacion_activemq]/bin/activemq start xbean:[nombre_del_fichero_configuracion].xml

A continuación destacamos cada una de las propiedades principales:

Transport Connector

Define la conexión con el "broker" de mensajería. Se configura en el tag transportConnectors (dentro
del tag broker) de activemq.xml de la siguiente forma:

    ...
    
        
    
    ...



Se pueden definir varios tipos de conexiones dependiendo del protocolo de transporte (ver lista completa en http://activemq.apache.org/configuring-transports.html).

A continuación se muestran dos ejemplos por VM y TCP:


  • VM Transport permite la conexión con un cliente Java que corra bajo la misma JVM. Se define de la siguiente forma: vm://nombre_del_broker?transportOptions, donde:
    • nombre_del_broker: es el atributo brokerName del tag broker del activemq.xml
    • transportOptions: distintas opciones de la comunicación. Se puede ver la lista de opciones en http://activemq.apache.org/vm-transport-reference.html


    ...
    
        
    
    ...



  • TCP Transport permite la conexión entre el cliente y el gestor de mensajería mediante protocolo TCP. Se define de la siguiente forma: tcp://nombre_o_ip_de_la_maquina:puerto?options, donde:
    • nombre_o_ip_de_la_maquina: máquina o IP donde reside ActiveMQ
    • puerto: puerto de la máquina donde escucha ActiveMQ (por defecto suele ser el 61616)
    • options: opciones de la conexión. Se puede ver la lista entera de opciones en http://activemq.apache.org/tcp-transport-reference.html

    ...
    
        
    
    ...



http://www.adictosaltrabajo.com/tutoriales/tutoriales.php?pagina=ActiveMQ


Persistencia de mensajes

Define la manera de persistir los mensajes en caso de que sea necesario (ver especificación JMS). En este artículo hablaremos de dos tipos: con KahaDB y con JDBC.

Persistencia con KahaDB:
KahaDB es un sistema de almacenamiento propio de ActiveMQ diseñado específicamente para las necesidades de un intermediario de mensajes. Proporciona un rendimiento muy alto. Más que cualquier base de datos relacional. Es un sistema de almacenamiento en ficheros combinado con una caché en memoria.

A continuación un ejemplo de configuración de broker con persistencia en KahaDB, donde:

  • directory: directorio donde se almacenará la información.
  • maxDataFileLength: máximo número de bytes que tendrán los ficheros que compondrán KahaDB.



    ...
    
        
    
    ...




Persistencia con JDBC:

ActiveMQ tiene soporte para poder almacenar mensajes en la mayoría de bases de datos.



    ...
    
        
    
    ...

 


    
    
    
    
    



Importante: es necesario añadir el driver JDBC de conexión de cualquier base de datos al classpath de ActiveMQ, para ello basta con copiarlo en el directorio [directorio_instalacion_activemq]/lib/

Más información sobre persistencia de mensajes en http://activemq.apache.org/persistence.html.

Control de flujo


Existe un problema muy típico en ActiveMQ cuando se usan mensajes no persistentes (almacenados en memoria) y es que el buffer de almacenamiento termine con toda la RAM. Para solucionar esto existe un sistema de control de flujo, mediante el cual, es posible limitar el número de mensajes en memoria de uno o varios productores de mensajes no persistentes.

A continuación un ejemplo de configuración:


    ...
    
        
            
        
    
    ...



El valor del atributo queue igual a >, significa que este control de flujo se aplicará a cualquier cola de mensajes. Podríamos aplicar distintas políticas de control de flujo según el nombre de la cola. Ejemplo:

  • queue="TEST.FOO" : para todos los mensajes de la cola TEST.FOO
  • queue="TEST.>" : para todos los mensajes de las colas que comiencen por TEST.

Más info sobre control de flujo en http://activemq.apache.org/producer-flow-control.html

Uso del sistema



Es posible realizar una configuración más genérica de determinadas partes del sistema como puede ser el uso de memoria o el límite de almacenamiento físico o temporal.

Aquí vemos ejemplo:

....

    ...
    
        
            
                
            
            
                
            
            
                
            
        
    
    ...



Nótese que el límite de almacenamiento no influye en bases de datos externas aunque sí en KahaDB.


Ejemplo: productor y consumidor de mensajes.

En el siguiente artículo encontrarán un ejemplo para ActiveMQ

miércoles, 1 de octubre de 2014

Java Message Service: Topic vs Queue

Una cola de mensajes es un lugar común donde las aplicaciones pueden publicar y consumir mensajes. Existen 4 componentes principales en un sistema de mensajería:
  • el publicador,  es quien publica un Mensaje en una Cola
  • el consumidor, es quien consume Mensajes de una Cola
  • el mensaje, que tiene algún formato que tanto publicador como consumidor conocen.
  • la cola, es el lugar donde publicadores y consumidores se conectan y comunican a través de mensajes.

Una de las grandes ventajas que ofrece el uso de colas respecto a otras tecnologías, es que el publicador y el consumidor no tienen porque estar disponibles a la vez.

Existen dos tipos de colas en JMS(Java Message Service): Topic y Queue
  • En las colas tipo Queue, existen uno o varios publicadores, y un consumidor. Los publicadores van dejando sus mensajes en la cola, y son tomados en orden por el consumidor. Si el consumidor no está disponible, la cola va guardando los mensajes, de manera que el consumidor pueda retomar su procesamiento cuando vuelva a estar disponible.


  • En las colas tipo Topic,  siguen el modelo "publicador/subscriptor". Uno o varios consumidores se "suscriben" al Topic, y van recibiendo los mensajes que se publican. Cuando se desconectan dejan de recibir esos mensajes, y los pierden.



Conclusión

  • Las colas tipo Topic se utilizan cuando la información es "time sensitive", por ejemplo en las quotas de stocks...
  • También utilizaremos las colas Topic cuando la información se envía a una audiencia amplia.
  • En cambio utilizaremos colas de tipo Queue cuando se utilicen transacciones.