Web technologies -- Laboratory 15 -- 2009-2010 -- info.uvt.ro
Message Queue Systems
[edit]- Message queues are components used for inter-process/thread communication.
- They rely on queues to deliver/process messages in a FIFO manner.
- The communication is usually asynchronous.
- They rely on point-to-point (the sender needs to know the destination) or publish/subscribe (the sender does not need to know the destination) models to handle messages.
Java Message Service (JMS)
[edit]JMS is a middleware API for sending messages between two or more clients. The current version specification is JMS 1.1 (since 2002).
ActiveMQ
[edit]Apache's ActiveMQ middleware is one JMS solution. It has support for Java, .NET, C++, Python, PHP, Ruby, etc. The current version is 5.4.2 (since December 2010). It can be downloaded from here.
Installing ActiveMQ is straightforward:
- unzip the archive into the desired directory;
- open a console;
- cd into that directory;
- you should see several subdirectories including one called bin;
- cd into bin;
- type activemq into the console;
- the server should start and some information should be displayed on the console;
- test your server by typing: http://localhost:8161/admin in a browser.
A simple example showing how one can create a producer and consumer using ActiveMQ is shown bellow:
NOTE: Remember to add the libraries found in the lib directory of the ActiveMQ distribution to the Java project.
// Classes required by ActiveMQ JMS
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
public class SimpleActiveMQExample {
private static ActiveMQConnectionFactory connectionFactory;
private static Connection connection;
private static Session session;
private static Destination destination;
private static boolean transacted = false;
public static void main(String[] args) throws Exception {
// Create the service broker
BrokerService broker = new BrokerService();
broker.setUseJmx(true);
// Add a connector to the port where the messages will be sent
broker.addConnector("tcp://localhost:61616");
// Start the broker
broker.start();
// Initialise the system
setUp();
// Initialise the producer and send a message
createProducerAndSendAMessage();
// Sleep a little...
Thread.sleep(4000);
// Initialise the consumer and grab the message
createConsumerAndReceiveAMessage();
broker.stop();
}
private static void setUp() throws JMSException {
// We need a connection factory. It produces connections
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL);
// Create a connection (this one is used for the producer)
connection = connectionFactory.createConnection();
// Activate it
connection.start();
// Attach a session to the connection
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
// Create a queue attached to the session
destination = session.createQueue("mmy first active mq queue");
}
private static void createProducerAndSendAMessage() throws JMSException {
// Create a producer attached to the destination (queue)
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a simple text message. Other options are available. Even binary data can be sent
TextMessage message = session.createTextMessage("Hello World!");
// Send the message
System.out.println("Sending message: " + message.getText());
producer.send(message);
}
private static void createConsumerAndReceiveAMessage() throws JMSException, InterruptedException {
// Create a connection for the consumer
connection = connectionFactory.createConnection();
// Activate it
connection.start();
// Create a consumer attached to the destination (queue)
// NOTICE HOW THE DESTINATION IS THE SAME FOR BOTH CONSUMER AND PRODUCER
MessageConsumer consumer = session.createConsumer(destination);
// Create the actual consumer
MyConsumer myConsumer = new MyConsumer();
// Exceptions and messages are triggered by events asynchronously
connection.setExceptionListener(myConsumer);
consumer.setMessageListener(myConsumer);
}
/**
* This is the consumer class. It can be in its own file but would need to duplicate the
* setUp() method for setting the system
*/
private static class MyConsumer implements MessageListener, ExceptionListener {
synchronized public void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
System.exit(1);
}
public void onMessage(Message message) {
// Remember that the message we've sent is of type Text. If other types
// are used simply add more ifs
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Received message: " + textMessage.getText());
} catch (JMSException ex) {
System.out.println("Error reading message: " + ex);
}
} else {
System.out.println("Received: " + message);
}
}
}
}
Links
[edit]- http://en.wikipedia.org/wiki/Java_Message_Service
- http://en.wikipedia.org/wiki/ActiveMQ
- http://activemq.apache.org/
Advanced Message Queuing Protocol (AMQP)
[edit]Is a middleware that offers:
- message orientation;
- queuing;
- routing (including point-to-point and publish-and-subscribe);
- reliability;
- security.
Unlike JMS, AMQP is a wire-level protocol which defines the format the data sent over the network. Thus it opens the way for inter-operability as any software adhering to this format would be able to attach itself to the global system.
It relies on:
- exchanges : entities to which the messages are sent. They can be of type: direct, fanout, topic, header;
- and queues : entities which read the messages. They are bound to exchanges.
- messages : the communication atoms. They are sent to exchanges and consumers having queues bound to that exchange can read from them.
The latest version is 1.0 (May 2010).
RabbitMQ
[edit]RabbitMQ is one of the several AMQP based solutions. It can be downloaded from here.
Installation steps are explained here. If you plan on testing it on your machine install both client and server.
NOTE Erlang needs to be also installed on your machine in order for RabbitMQ to work.
The following example shows how a producer and consumer can be created using RabbitMQ:
Producer:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.*;
public class RabbitMQProducer {
public static void main(String []args) throws Exception {
// Create the connection factory. It produces connections
ConnectionFactory factory = new ConnectionFactory();
// Set up the credentials
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("127.0.0.1");
factory.setPort(5672);
// Create a connection
Connection conn = factory.newConnection();
// Create the communication channel attached to this connection
Channel channel = conn.createChannel();
String exchangeName = "myExchange";
String routingKey = "testRoute";
// Create the message
byte[] messageBodyBytes = "Hello, world!".getBytes();
// Publish the message to an exchange using a routing key. The routing key allows messages to be sent only to certain exchanges
channel.basicPublish(
exchangeName,
routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes) ;
// Close the channel and the connection
channel.close();
conn.close();
}
}
Consumer:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.*;
public class RabbitMQConsumer {
public static void main(String []args) throws Exception {
// Create the connection factory. It produces connections
ConnectionFactory factory = new ConnectionFactory();
// Set up the credentials
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("127.0.0.1");
factory.setPort(5672);
// Create a connection
Connection conn = factory.newConnection();
// Create a communication channel attached to the connection
Channel channel = conn.createChannel();
String exchangeName = "myExchange";
String queueName = "myQueue";
String routingKey = "testRoute";
boolean durable = true;
// Create an exchange to which messages are to be sent
channel.exchangeDeclare(exchangeName, "direct", durable);
// Create a queue to consume messages
channel.queueDeclare(queueName, durable,false,false,null);
// Bind the queue to the exchange
channel.queueBind(queueName, exchangeName, routingKey);
boolean noAck = false;
// Create the queue consumer
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, noAck, consumer);
boolean runInfinite = true;
while (runInfinite) {
// In an infinite loop grab next message
QueueingConsumer.Delivery delivery;
try {
// If nextDelivery receives no argument it will wait indefinitely for a message. This blocks the rest of the thread.
// One solution is to start this in a separate thread
delivery = consumer.nextDelivery();
} catch (InterruptedException ie) {
continue;
}
System.out.println("Message received" + new String(delivery.getBody()));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
// Close the channel and the connection
channel.close();
conn.close();
}
}
Links
[edit]- http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
- http://www.amqp.org/confluence/display/AMQP/Advanced+Message+Queuing+Protocol
- http://www.rabbitmq.com/
Comparison between various Message Queuing Systems
[edit]- http://bhavin.directi.com/rabbitmq-vs-apache-activemq-vs-apache-qpid/
- http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes
Exercises
[edit]- Install JMS or AMQP
- Create a simple chat using either JMS or AMQP. Compare its efficiency with the application from http://beta.wikiversity.org/wiki/Web_technologies_--_Laboratory_7_--_2009-2010_--_info.uvt.ro.