Premier Training & Business Partner Red Hat

Messaging subsystem su JBoss EAP 7

Alessandro Cassano
RHCJA Certified JBoss Specialist
Ti piacerebbe diventare anche tu uno di noi e
pubblicare i tuoi articoli nel blog degli RHCE italiani?

Il subsystem messaging in Jboss EAP 7 implementa le API JMS 2.0 dettate dalle specifiche JEE 7. JMS (Java Messaging System, definito nella JSR343) è il set di API che si occupa della gestione dei messaggi asincroni internamente alle applicazioni.

Jboss EAP 7 implementa le specifiche JMS tramite il message broker  Artemis-ActiveMQ.

Per messaggi in questo caso non intendiamo qualcosa come e-mail, chat o altri tipi di messaggistica user-based ma di vere e proprie comunicazioni interne alle applicazioni così da comunicare tra loro le varie operazioni da eseguire sulle applicazioni.

Esempio: Al termine di un acquisto su un sito di e-commerce il messaging subsystem tramite un messaggio interno notifica alla mia applicazione di pagamento la transazione in modo da poter procedere istantaneamente  per poi successivamente inviare un altro messaggio per notificare all’ applicazione che l’ oggetto non sarà più disponibile nell’ inventario in quanto appena acquistato.

In ambito JEE si tratterà essenzialmente di messaggi prodotti e consumati da specifiche classi Java. Una nota particolare meritano i Message Driven Bean (MDB) che sono speciali EJB (Enterprise Java Bean) che consumano messaggi in modo asincrono da una determinata coda o topic e che svolgono delle azioni nel momento in cui ricevono il messaggio.

Funzionamento del JMS:

L’ applicazione che crea e invia il messaggio si chiama Producer, mentre quella che riceve (consuma) il messaggio si chiama Consumer.

Per collegare Producer e Consumer le applicazioni si affideranno ad una ConnectionFactory (https://docs.oracle.com/javaee/7/api/javax/jms/ConnectionFactory.html) che è una interfaccia JMS che si occupa di creare le connessioni.

L’ applicazione si servirà della ConnectionFactory attraverso il suo JNDI name.

Per comprendere il concetto di connection factory ci avvaliamo di un estratto del codice sorgente di una applicazione presente tra i quickstarts di wildfly che ha lo scopo di mostrare il funzionamento del messaging service. (https://github.com/wildfly/quickstart/tree/master/helloworld-jms)

// Set up all the default values
    private static final String DEFAULT_MESSAGE = "Hello, World!";
    private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory";
    private static final String DEFAULT_DESTINATION = "jms/queue/test";
    private static final String DEFAULT_MESSAGE_COUNT = "1";
    private static final String DEFAULT_USERNAME = "quickstartUser";
    private static final String DEFAULT_PASSWORD = "quickstartPwd1!";
    private static final String INITIAL_CONTEXT_FACTORY = "org.wildfly.naming.client.WildFlyInitialContextFactory";
    private static final String PROVIDER_URL = "http-remoting://127.0.0.1:8080";

    public static void main(String[] args) {

        Context namingContext = null;

        try {
            String userName = System.getProperty("username", DEFAULT_USERNAME);
            String password = System.getProperty("password", DEFAULT_PASSWORD);

            // Set up the namingContext for the JNDI lookup
            final Properties env = new Properties();
            env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
            env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));
            env.put(Context.SECURITY_PRINCIPAL, userName);
            env.put(Context.SECURITY_CREDENTIALS, password);
            namingContext = new InitialContext(env);

            // Perform the JNDI lookups
            String connectionFactoryString = System.getProperty("connection.factory", DEFAULT_CONNECTION_FACTORY);
            log.info("Attempting to acquire connection factory \"" + connectionFactoryString + "\"");
            ConnectionFactory connectionFactory = (ConnectionFactory) namingContext.lookup(connectionFactoryString);
            log.info("Found connection factory \"" + connectionFactoryString + "\" in JNDI");

            String destinationString = System.getProperty("destination", DEFAULT_DESTINATION);
            log.info("Attempting to acquire destination \"" + destinationString + "\"");
            Destination destination = (Destination) namingContext.lookup(destinationString);
            log.info("Found destination \"" + destinationString + "\" in JNDI");

In questo esempio vediamo come l’ applicazione si interfaccia alla connection factory dal JNDI NAME jms/RemoteConnectionFactory. Questa CF è preimpostata in Jboss EAP e usa come connector il protocollo http che verrà gestito dal subsystem Undertow. Vediamo infatti che nell’ applicazione è specificato anche la URL del server con  cui l’ applicazione dovrà interfacciarsi per effettuare la sua richiesta alla connection factory:

 private static final String PROVIDER_URL = 
"http-remoting://127.0.0.1:8080";

Nel caso in cui non vi sia bisogno di interfacciarsi ad un server remoto in quanto la CF Nel caso in cui non vi sia bisogno di interfacciarsi ad un server remoto in quanto la CF si trova nello stesso server dell’ applicazione producer/consumer è possibile utilizzare il connector in-vm che specifica tale evenienza.

Nel subsystem JMS sono già specificati due connectionfactory di default:si trova nello stesso server dell’ applicazione producer/consumer è possibile utilizzare il connector in-vm che specifica tale evenienza.

Nel JMS sono già specificati due connectionfactory di default:

<connection-factory name="InVmConnectionFactory" connectors="in-vm" entries="java:/ConnectionFactory"/>

<connection-factory name="RemoteConnectionFactory" connectors="http-connector"

entries="java:jboss/exported/jms/RemoteConnectionFactory"/>

Volendo è possibile creare delle nuove connection factory custom da CLI.

N.B.: Nella creazione di una remote connection factory bisognerà ricordare di utilizzare il JNDI namespace java:jboss/exported altrimenti le connessioni non funzioneranno, ad esempio:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/connection-factory=remotecustomfactory:add(connectors=[http-connector],entries=[java:jboss/exported/jms/remotecustomfactory ]

{"outcome" => "success"}

Inoltre possiamo utilizzare delle PooledConnectionFactory , che altro non sono che delle ConnectionFactory con un pool di connessioni già attive e pronte all’ Inoltre possiamo utilizzare delle PooledConnectionFactory , che altro non sono che delle ConnectionFactory con un pool di connessioni già attive e pronte all’ uso e da allocare in caso di necessità.

Questo tipo di ConnectionFactory sono preferibili in quanto maggiormente performanti.

I messaggi sono scambiati su delle Destionations che possono essere Queue (code) o Topics.

Per l’invio dei messaggi il producer si serve di una Queue, che viene riconosciuta dal JMS attraverso un JNDI specificato nel codice sorgente dell’ applicazione, esempio:uso e da allocare in caso di necessità.

Questo tipo di ConnectionFactory sono preferibili in quanto maggiormente performanti.

I messaggi sono scambiati su delle Destionations che possono essere Queue (code) o Topics.

Per l’invio dei messaggi il producer si serve di una Queue, che viene riconosciuta dal JMS attraverso un JNDI specificato nel codice sorgente dell’ applicazione, esempio:

private static final String DEFAULT_DESTINATION = "jms/queue/test";

conoscendo il JNDI delle code dei messages possiamo impostarla nel nostro JMS tramite CLI in modo tale da renderla operativa sul nostro application server, prendendo ad esempio la stringa mostrata in precedenza:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=testqueue:add(entries=[java:/jms/queue/test])

In questo esempio è stata creata una coda Testqueue nel nostro JMS che ha come destinazione il JNDI jms/queue/test specificato nell’ applicazione. Il namespace JNDI utilizzato in questo caso è java:/.
Il producer pertanto invierà il messaggio alla coda che verrà ricevuto dal primo consumer impostato per consumarlo.

Nel caso in cui un Producer dovrà inviare lo stesso messaggio a più consumer si avvarrà di un Topic .

Più consumer possono interfacciarsi allo stesso Topic diventandone pertanto Subscriber, mentre il producer verrà denominato Publisher. Quando il messaggio verrà prodotto per quello specifico topic sarà poi ricevuto da tutti i suoi iscritti.

Se un subscriber di quel topic risulterà irraggiungibile il messaggio resterà in standby per poi essere consumato non appena tornerà online.

Anche i Topic sono specificati nell’ applicazione tramite un JNDI name che poi verrà  utilizzato dal JMS per essere riconosciuto e gestito.

Esempio:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-topic=testopic:add(entries=[java:/jms/testtopic/topicforsubs])

 In questo caso tutti I subscribers registrati presso il topic che fa riferimento al JNDI name jms/testtopic/topicforsubs consumeranno il messaggio prodotto verso quel topic.
Cosa accade se un producer invia un messaggio alla coda ma il consumer non è disponibile per la ricezione?

JMS ha dei settaggi preimpostati che specificano dove verranno indirizzati questi messaggi in caso di mancato consumo, per controllarli possiamo usare la CLI:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/address-setting=#:read-resource

{

"outcome" => "success",

"result" => {

     "address-full-policy" => "PAGE",

     "auto-create-jms-queues" => false,

     "auto-delete-jms-queues" => false,

     "dead-letter-address" => "jms.queue.DLQ",

     "expiry-address" => "jms.queue.ExpiryQueue",

     "expiry-delay" => -1L,

     "last-value-queue" => false,

     "max-delivery-attempts" => 10,

     "max-redelivery-delay" => 0L,

     "max-size-bytes" => 10485760L,

        "message-counter-history-day-limit" => 10,

        "page-max-cache-size" => 5,

     "page-size-bytes" => 2097152L,

     "redelivery-delay" => 0L,

     "redelivery-multiplier" => 1.0,

     "redistribution-delay" => -1L,

     "send-to-dla-on-no-route" => false,

     "slow-consumer-check-period" => 5L,

     "slow-consumer-policy" => "NOTIFY",

     "slow-consumer-threshold" => -1L

}

}

Da queste impostazioni evinciamo che:

I messaggi che non arrivano a destinazione vengono inviati nella coda “jms.queue.DLQ”  

"dead-letter-address" => "jms.queue.DLQ",

I messaggi che vanno nella ExpiryQueue invece sono quelli in cui è stata settata un’Expiration in fase di creazione e che non sono stati consegnati in quel lasso di tempo. Hanno insomma un Timetolive scaduto ed in questo caso finiranno nella coda con address jms.queue.ExpiryQueue

"expiry-address" => "jms.queue.ExpiryQueue"",

ovviamente possiamo modificare i parametri sempre tramite CLI oppure creare degli address setting appositi per ogni coda presente inserita da noi sul nostro JMS (opzione consigliata).

Per fare ciò porto ad esempio la coda creata precedentemente, testqueue.

Questa coda alla creazione avrà un suo address setting, che possiamo verificare così:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=testqueue:read-attribute(name=queue-address)

{

"outcome" => "success",

"result" => "jms.queue.testqueue"

 }

Ora dobbiamo pertanto creare due code custom dove far finire i messaggi DLQ e i messaggi expired:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=customDLQ:add(entries=[java:/jms/queue/customDLQ])

{"outcome" => "success"}

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=customEXQ:add(entries=[java:/jms/queue/customEXQ])

{"outcome" => "success"}

Successivamente non ci resterà che creare un address customizzato per far puntare i messaggi non consumati alle code appena create.

Per fare ciò dobbiamo prima vedere l’indirizzo delle nostre code appena create ( solitamente sarà come il campo entry ca con i . a dividere i vari campi anzichè / ):

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=customEXQ:read-attribute(name=queue-address)

{

"outcome" => "success",

"result" => "jms.queue.customEXQ"

}

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/jms-queue=customDLQ:read-attribute(name=queue-address)

{

"outcome" => "success",

"result" => "jms.queue.customDLQ"

Ora non ci resta che creare l’ address:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/address-setting=jms.queue.testqueue:add(expiry-address=jms.queue.customEXQ,dead-letter-address=jms.queue.customDLQ,max-delivery-attempts=5)

{"outcome" => "success"}
Nell’esempio appena mostrato ho creato un address setting relativo alla testqueue (jms.queue.testqueue) con expiry address e dead letter address che puntano verso le code custom precedentemente create, inoltre ho impostato 5 tentativi massimi di consegna dei messaggi inviati prima di inserire i messaggi nella ExpiryQueue.
[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default/address-setting=jms.queue.testqueue:read-resource(recursive=true)
{

"outcome" => "success",

"result" => {

     "address-full-policy" => "PAGE",

     "auto-create-jms-queues" => false,

     "auto-delete-jms-queues" => false,

     "dead-letter-address" => "jms.queue.customDLQ",

     "expiry-address" => "jms.queue.customEXQ",

     "expiry-delay" => -1L,

     "last-value-queue" => false,

     "max-delivery-attempts" => 5,

     "max-redelivery-delay" => 0L,

     "max-size-bytes" => -1L,

        "message-counter-history-day-limit" => 0,

     "page-max-cache-size" => 5,

     "page-size-bytes" => 10485760L,

     "redelivery-delay" => 0L,

     "redelivery-multiplier" => 1.0,

     "redistribution-delay" => -1L,

     "send-to-dla-on-no-route" => false,

     "slow-consumer-check-period" => 5L,

     "slow-consumer-policy" => "NOTIFY",

     "slow-consumer-threshold" => -1L

}

}

Ma dove finiscono i messaggi che non vengono consegnati e restano di conseguenza appesi nelle code Expiry o dead letter a livello di sistema?

Il JMS inserisce questi messaggi direttamente su file system, attraverso dei file preallocati su di esso che si chiamano Journals

I Journals possono utilizzare due tipi di meccanismi di I/O:

NIO:  Che utilizza l’ API Java “NIO” per interfacciarsi con il FS.

AIO ( Async IO):  Può essere utilizzato solo su Linux in quanto utilizza le librerie native Asynchronous I/O  e fornisce delle migliori performance rispetto a NIO.

I journals vengono creati direttamente sul FS e possono avere diverse impostazioni a seconda delle proprie esigenze:

[standalone@localhost:9990 /] /subsystem=messaging-activemq/server=default:read-resource(recursive=true)

{

    "outcome" => "success",

    "result" => {

        "async-connection-execution-enabled" => true,

        "cluster-password" => "CHANGE ME!!",

        "cluster-user" => "ACTIVEMQ.CLUSTER.ADMIN.USER",

        "connection-ttl-override" => -1L,

        "create-bindings-dir" => true,

        "create-journal-dir" => true,

        "id-cache-size" => 20000,

        "incoming-interceptors" => undefined,

        "jmx-domain" => "org.apache.activemq.artemis",

        "jmx-management-enabled" => false,

        "journal-buffer-size" => undefined,

        "journal-buffer-timeout" => undefined,

        "journal-compact-min-files" => 10,

        "journal-compact-percentage" => 30,

        "journal-file-size" => 10485760,

        "journal-max-io" => undefined,

        "journal-min-files" => 2,

        "journal-pool-files" => -1,

        "journal-sync-non-transactional" => true,    
        "journal-sync-transactional" => true,

        "journal-type" => "ASYNCIO",

[...]

Come possiamo vedere i journals possono essere gestiti come meglio riteniamo per la nostra applicazione, ad esempio al momento verranno creati sul FS 2 journal files di tipo AIO che avranno la dimensione di 10MB.

Infine vediamo come funziona l’ invio dei messaggi sia lato applicazione che lato JMS.

Sempre dai quickstarts di wildfly ricaviamo il codice sorgente di un app che invia messaggi, “helloworld”

package org.jboss.as.quickstarts.jms;

 

import java.util.logging.Logger;

import java.util.Properties;

 

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSConsumer;

import javax.jms.JMSContext;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

 

public class HelloWorldJMSClient {

    private static final Logger log = Logger.getLogger(HelloWorldJMSClient.class.getName());

 

    // Set up all the default values

    private static final String DEFAULT_MESSAGE = "Hello, World!";

    private static final String DEFAULT_CONNECTION_FACTORY = "jms/RemoteConnectionFactory";

    private static final String DEFAULT_DESTINATION = "jms/queue/test";

    private static final String DEFAULT_MESSAGE_COUNT = "1";

    private static final String DEFAULT_USERNAME = "quickstartUser";

    private static final String DEFAULT_PASSWORD = "quickstartPwd1!";

    private static final String INITIAL_CONTEXT_FACTORY = "org.wildfly.naming.client.WildFlyInitialContextFactory";

    private static final String PROVIDER_URL = "http-remoting://127.0.0.1:8080";

 

    public static void main(String[] args) {

 

        Context namingContext = null;

 

        try {

            String userName = System.getProperty("username", DEFAULT_USERNAME);

            String password = System.getProperty("password", DEFAULT_PASSWORD);

 

            // Set up the namingContext for the JNDI lookup

            final Properties env = new Properties();

            env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);

            env.put(Context.PROVIDER_URL, System.getProperty(Context.PROVIDER_URL, PROVIDER_URL));

            env.put(Context.SECURITY_PRINCIPAL, userName);

            env.put(Context.SECURITY_CREDENTIALS, password);

            namingContext = new InitialContext(env);

 

            // Perform the JNDI lookups

            String connectionFactoryString = System.getProperty("connection.factory", DEFAULT_CONNECTION_FACTORY);

            log.info("Attempting to acquire connection factory \"" + connectionFactoryString + "\"");

            ConnectionFactory connectionFactory = (ConnectionFactory) namingContext.lookup(connectionFactoryString);

            log.info("Found connection factory \"" + connectionFactoryString + "\" in JNDI");

 

            String destinationString = System.getProperty("destination", DEFAULT_DESTINATION);

            log.info("Attempting to acquire destination \"" + destinationString + "\"");

            Destination destination = (Destination) namingContext.lookup(destinationString);

            log.info("Found destination \"" + destinationString + "\" in JNDI");

 

            int count = Integer.parseInt(System.getProperty("message.count", DEFAULT_MESSAGE_COUNT));

            String content = System.getProperty("message.content", DEFAULT_MESSAGE);

 

            try (JMSContext context = connectionFactory.createContext(userName, password)) {

                log.info("Sending " + count + " messages with content: " + content);

                // Send the specified number of messages

                for (int i = 0; i < count; i++) {

                    context.createProducer().send(destination, content);

                }

 

                // Create the JMS consumer

                JMSConsumer consumer = context.createConsumer(destination);

                // Then receive the same number of messages that were sent

                for (int i = 0; i < count; i++) {

                    String text = consumer.receiveBody(String.class, 5000);

                    log.info("Received message with content " + text);

                }

            }

        } catch (NamingException e) {

            log.severe(e.getMessage());

        } finally {

            if (namingContext != null) {

                try {

                    namingContext.close();

                } catch (NamingException e) {

                    log.severe(e.getMessage());

                }

            }

        }

    }

}

Il funzionamento di questo codice è riassumibile con il seguente grafico:

Schema messaging subsystem JBoss EAP7

Conclusioni:

In questo testo abbiamo visto in modo abbastanza semplificato come gestire il messaging subsystem.

Per approfondire le tematiche relative all’amministrazione di JBoss EAP 7 nella sua completezza vi consigliamo di iscrivervi al corso JB248 – Red Hat JBoss Application Administration I. Se invece siete dei developers Java interessati ad approfondire gli aspetti cruciali dello sviluppo JEE vi consigliamo il nuovo corso Red Hat JB183 – Application Development I.

Info about author

Alessandro Cassano