Setup WSO2 JMSMessageStore and Message Processor

Following quickly on from my last post about the WSO2 MessageStore and Processor overview - Here's a quick guide to actually using the stuff and setting it up using WSO2's own message broker

(btw: I used version 4.8.1 of the ESB and 2.2.0 of the Message Broker.)

Configure WSO2 Message Broker

I'm not going to tell you how to install the message broker it's pretty much a case of downloading it from the WSO2 site and unzipping it into a directory of your choice.

If you intend to run the broker and the ESB on the same node then make sure that you run them on different ports.

The underlying message store and transport in Message Broker 2.2.0 is cassandra. Unfortunately if we change the offset for the message broker this doesn't automatically change the cassandra ports as well so we have to go manually change the value of any virtual hosts that we have in cassandra. Cassandra has some concepts which are best not thought about in most cases so use the default virtual hosts file whenever you can (<MB_INSTALL>/repository/cong/advanced/andes-virtualshots.xml). So, if you use the default file then offset the connection string in the <carbon><store><connectionString> the default being 9160 so if we have an offset of 1 then change it to 9161. You don't actually have to do this but I find it to be best-practice so that I can keep track of what all the ports are !

Once you've done that you can start the broker (bin/wso2server.sh)

You should then be able to go to the broker admin console and play around - create queues and put messages on them. When you're happy move on to the ESB configuration.
Setting up the WSO2 ESB

(Again, do a download and unzip of the ESB from the main WSO2 pages to get going.)

Decide if you're going to do a port offset or not and change the carbon.xml file accordingly. At this point a lot of docs will tell you to configure the axis2 transport that the ESB uses so that it has a JMS transport available to it. HOWEVER, if you are *only* using JMS for Message stores then you don't need to do this. In our case, as we're not setting up general JMS endpoints for ESB, we'll leave this step out and jump straight into defining the queues we need for the message store. What we do have to do prior to this is give the ESB some of the Message Brokers jar files though so...

copy over the following jar files from

<MB_INSTALL>/client-lib to <ESB_INSTALL>/repository/components/lib

andes-client-0.13.wso2v4
geronimo-jms_1.1_spec-1.1.0.wso2v1


Now you can go ahead and start the ESB.

Set up the MessageStore

First thing we need to do is set up some JNDI objects so that the MessageStore can use the objects to talk to the underlying broker. At this point you could create various queues and connection factories but you don't need to in order to 'just work' (something I like to do ;-) So, here's the minimum and why...

By default we can use the basic file JNDI provider given to us by the message broker jars we just copied across and we can use the default file location in the ESB <ESB_INSTALL>/repository/conf/jndi.properties as the JNDI repository file. If you want to use a different JNDI provider or location then you can do so (but remember to put the relevant jar files into the location mentioned above). Each JNDI configuration file will look different per provider.

Assuming you're working with the WSO2 broker provider....(org.wso2.andes.jndi.PropertiesFileInitialContextFactory) ...then your config file will look like the one in the above location. As a minimum the only thing you need to do in this file to set up a JMSMessageStore to run against WSO2 Message Broker is set up a QueueConnectionFactory. There is already one defined in the JNDI file but you may need to change a few things on it in order to connect correctly to your broker. The line we're looking for (if we want to use the 'QueueConnectionFactory' object in our MessageStore) is the below line...

connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5672'
There are three quirks which may take your attention in the connection URL...
  1. clientID/carbon: There are two parts to this - firstly the clientID is literally that - a client's ID. If you want to set it to something else to make your JMS client unique than you can do so. ClientID is a pretty standard JMS concept and gets added on to messages so you can see which client your messages came from and is used in many different messaging patterns. So, don't be scared of it at first glance - and just leave it alone if you don't care (I would always set it to something just to make sure everything works smoothly though) ! 
  2. The second part to the 'clientID/carbon' attribute is the 'carbon' element. Do you remember those Cassandra virtual hosts I mentioned in the andes-virtualhosts.xml file earlier on? Well, this is saying that we are connecting to the carbon virtual host defined in that file. (We could also use 'default' which is usually defined as an alias for the 'carbon' virtual host if we wanted.) 
  3. The next thing to note about this URL string is the port number. If you changed the port (using the port offset) that you are running the message broker on then you need to change this port value too. The default port for the message broker runtime is 5672 so add on the port offset that the broker is set to e.g. if the broker is set to offset 1 then this value should be 5673 
That's it for the jndi entries - you can ignore the queues and topics and topicconnectionfactory - these are not used in the simple case where we just want a JMSMessageStore up and running. We can now go to the MessageStore and set that up...

Creating the JMSMessageStore in the ESB eclipse project

Let's assume you read my previous blog entry and have decided to create your JMSMessageStore in eclipse...

Creating the Store is easy enough. Do the usual eclipse method of new->messagestore and you'll be taken through a wizard. This wizard asks you a number of questions so let's go through what each property means to you. I've listed each property as represented in the wizard and also how it's represented in the underlying XML file as well.
MessageStore wizard panel


REQUIRED


MessageStore Name

Anything you want but its what you put in your MessageStore mediator in your flows so best to make it meaningful !

MessageStore type

A drop down list - we're making a JMS Message Store.
(I've discussed the various types available to you in my previous blogs.)

Initial context factory (java.naming.factory.initial)

This is the class that connects to the JNDI provider of choice - in this case the file based JNDI that I mentioned above. (NOTE: Your specific provider jars need to be moved to the locations I mentioned above if you're using a different JNDI provider.)

Provider URL (java.naming.provider.url)

This is the URL of the JNDI repository location (in this case a local file system). NOTE: if you're using a file based system then you need to ensure that file is available on all the nodes that you are running this mediation on.

OPTIONAL


JNDI Queue Name (store.jms.destination)

This is the name JNDI definition of the queue that the JMSMessageStore will put messages on when storing them. If you don't specify anything here then WSO2 ESB assumes a default of <MessageStore Name>_Queue. If you're using WSO2 MB as the messaging provider then the queue gets created the first time it's used so you don't need to pre-create it. However, not all messaging providers do that so best to define a queue explicitly in your provider and put the name here !

Connection factory (store.jms.connection.factory)

This is the name of the connection factory we created earlier in the JNDI repository. This factory is used to connect to the Messaging provider.

username/password (store.jms.username/store.jms.password)

These are details of how the connection will be made to the message broker - ensure that they are authorised for the queue that the messages are going to be stored on.

JMS API Specification Version (store.ms.JMSSpecVersion)

I doubt you'll need to worry about this as most people use 1.1 of the JMS spec. (which is the default). This has to do with whether you are connecting using general factories or specific factories - something most people don't need to know about. Leave this at 1.1 in nearly all cases.

Connection caching (store.jms.cache.connection)

This is an advanced property which you may want to use as you get more messages flowing through your sequence. Ignore it for the 'just get it working' cases ;-)

Receive timeout

This is asking whether the receiver should stop waiting for a response or not and in what time. This is a JMS flag so if you understand messaging and JMS then tinker at your hearts content.


Once you've saved all that you should have a message store. You need to add that to your .car file so that this definition gets deployed along with the rest of your project to the ESB. However, that won't enable your mediation to run just yet. At this point your mediation will put messages on the queue and that's it - we haven't created anything that's going to take them off the queue and send them to an endpoint ! This is where the Message Processors come in.

Creating the Message Processor in the ESB eclipse project

MessageProcessor Wizard Panel
The properties here are slightly more complex...

REQUIRED

Message processor Type

I discussed the various types here in my previous entry and why you may want to use one over the other. Here I'll use a scheduled message processor - which sends messages directly to an endpoint.

Message Processor Name

Anything you want - you don't cross-reference this anywhere else so just make it fit to whatever your best-practice naming convention is.

MessageStore

This is the name of the message store that the MessageProcessor will connect to. NOTE: If you didn't already create the MessageStore in the eclipse project then you can't create the Message Processor. This is because this name has to be provided and it can only be provided using a drop-down list of available message stores from your eclipse workspace.

Endpoint name

The endpoint that you want the message processor to send the messages to.

ProcessorState (is.active)

Whether you want to make it run or not when it's first deployed.

OPTIONAL

Quartz Configuration File path (quartz.config)

Quartz is the name of the underlying system that manages the timing elements of the MessageProcessor. If you have need to alter this underlying system (many customers don't) then you can specify the attributes in this file. You can do things like fine tuning database retries and what size time-window a timer has to fire within and more.

cron expression (cronExpression)

The whole idea behind the MessageProcessor is that it fires up regularly to take messages off the queues. This specific cron job is establishing when to send messages. This is a more complex way of setting the forwarding interval - if you just want it every n milliseconds then use the 'interval' (below). The syntax of the cron is specified here  and can allow you patterns like "Every ten minutes between Monday through Friday"

Pinned Servers (pinnedServers)

In my previous entry I mentioned that it's possible, and indeed best-practice sometimes, to run multiple MessageProcessors against a single MessageStore. This is achieved by using the pinned servers list. This is a list of comma separated hostnames on which this specific MessageProcessor should run. This means that although the ESB sequence may be running on lots of servers you can decide which of these servers (zero, one or many) Processes Messages for you.

Forwarding Interval (interval)

This is a key attribute. This property tells the message processor how often to pick messages out of the MessageStore and attempt to send them onwards. (Default: 1000 milli). If you have set a cronExpression then the cronExpression will take precedence.

retry interval (client.retry.interval)

If the endpoint was unavailable then the Message Processor will attempt to resend the message at a later time. This is the time between those retries. (Default: 1000 milli).

Non-Retry HTTP Status codes (non.retry.status.codes)

In certain circumstances an endpoint may be in a specific state that you may not want to retry at all. This is a comma separated list of the HTTP response codes that you want to look out for which will stop the MessageProcessor retrying.

Maximum Delivery attempts (max.delivery.attempts)

How many times do you want the MessageProcessor to keep trying the endpoint (-1 being forever !.Default: 4 if not set).

In the case where the MessageProcessor fails to eventually send the message after Max delivery attempts the message will be put back on to the message store queue and the MessageProcessor will be de-activated. The only way to get the MessageProcessor back on-line again is to manually re activate it.

There is an alternative to this behaviour. You can make the MessageProcessor drop the message and continue processing other messages. In this case you will lose messages ! You can configure this behaviour by setting the max.delivery.drop parameter to Enabled.
NOTE: max.delivery.drop is not a documented feature at this time !

Axis2 client repository (axis2.repo)

At the end of the day there are two transports involved in a MessageProcessor - the first being the JMS transport that pulls the messages out of the store. The second being the link to the endpoint where the messages are sent. The latter, the link to the endpoint, uses the underlying axis2 transports to send the message. axis2 has the concept of handlers and this is where you can put those 'modules' and 'services', if you need to do any special handling of messages that are being sent to the endpoint.

Axis2 Configuration (axis2.config)

This is the configuration file where you set up the axis client that uses the modules that are defined above.

Reply Sequence Name (message.processor.reply.sequence)

If a response is received from the client then the response will be routed through this reply sequence. For example - In more complex asynchronous messaging scenarios you may need to do things like put the message on to a queue for the originating client to pick-up - this is where you would specify the sequence that would do that for you.

Fault Sequence Name (message.processor.fault.sequence)

This sequence called if the MessageProcessor has a fault during its execution of the sequences


And, that's it!
Hopefully you now have a very clear idea of what Message Stores and Message Processors are and what the core capabilities are. You've also (hopefully) learned more about their basic setup.

Comments

Popular posts from this blog