Mark Eschbach

Software Developer && System Analyst

A Simple STOMP Ruby Client

Why Ruby?

Ruby is a great dynamic language, especially for creating a testable solution for system administration purposes. I tend to use and recommend Ruby where the overhead of a Java Virtual Machine would be too great and simple shell scripts will not suffice. A minimal Java Virtual Machine instance would easily consume 16 megabytes of memory, while ruby run as a script on my production server, at the time of writing (3/17/2011), consumes 8 megabytes of shared memory, with an additional 5 megabytes used per instances.

What is STOMP?

The Stream Text Oriented Messaging Protocol, or STOMP for short, is a standardized data interchange format using an HTTP-like messaging stanzas for communicating with a remote messaging broker. Despite the ‘word’ text within the name, the protocol is well designed and supports binary messages. Many messaging brokers support STOMP, and a detailed list may be found on Wikipedia’s STOMP page.. For this tutorial I will be working with an ActiveMQ message broker.

Goal Overview

The goal of this blog entry is to walk a reader through the basics of transporting message to and from a remote message broker using the STOMP protocol within the Ruby language. The concrete example consists of two separate scripts: a producer and a consumer. The producer will send each line of its standard input as a message a queue on the message broker. When the end of the standard input is reached a termination message will be sent to the message consumer. The consumer will subscribe to our queue and will print the line of text contained within each message until a termination message is received. As we are using Ruby, we will use a GEM written by Guy M. Allard to handle the details of STOMP protocol.

The GEM

For this article I will be using the Ruby GEM recommend for communication using STOMP in a command line environment. If you are using Ruby On Rails, the STOMP group recommends a different connectivity layer. The GEM, simply named ‘stomp’, providing many features such as fail-over, transactions, and an asynchronous messaging handling.

If you have yet to install a GEM then in theory it is easy:

# gem install stomp

This will go out, retrieve the GEM package, install the package and documentation. Once it complete the process you are good to start coding against it.

The Producer

First goal: connect and send each line to a remote STOMP message.

So, the first step is to setup a basic Ruby script file:

#!/usr/local/bin/ruby18

require ‘gem’
require ‘stomp’

Okay, so this sets up our basic script template. As with most script-able languages, the hash character (#) indicates a comment. The ‘require’ statements pull in the GEM system and the STOMP client we had installed above. So now onto our goal:

port = 61613
client = Stomp::Client.new( “password”, “user”,  “broker.host.name”, port )
stdin.each { |line|
  client.send( “/queue/stdin”, line, { ‘amq-msg-type’=>’text’})
}
client.close

So, the second line from above creates our connection authenticating using the password and user given against the broker at the given “broker.host.name” on the port. This will not initiate a secure connection and will ask your authentication information, however for testing purposes this works. The next line of code reads each line from the standard input and for each line read the consumer sends that line to the queue “stdin” with a body of set to the contents of that queue.

In messaging there are at least two types of destinations: topics and queues. A topic is a broad cast from to many consumers, similar to an implementation of the event listener pattern in object oriented systems. A message sent or published to a topic will only be delivered to those who are subscribed to that topic when processed by the broker. A queue on the other hand represents a one to one relationship, similar to a delegate in object oriented programming; for each message sent to a queue it will only be delivered to one consume of the that queue. This is an over-simplified view of message-oriented middle ware, however a good start if you do not have prior experience.

The map given as the last argument in line 4 above allows you to set header values for the message you are sending. The ‘amq-msg-type’ key instructs ActiveMQ of the type of message and instructs ActiveMQ this is a TextMessage. If you do not specify this header, then ActiveMQ assumes your message type is a ByteMessage. Ruby STOMP clients do not care about the message type, however the message type may be significant within other APIs, such as JMS.

Once we have reached the end of the standard input stream we will arrive at line 5, cleaning up the resources of the connection. Now the basic STOMP producer has been completed, as we are able to publish to our queue. After the next section this producer will be revisited to add a termination message.

Download the entire send example here.

The Consumer

The next goal is to build a consumer which is able to subscribe to a queue and print the body of the received message to the standard output. There are two choices in the consumption of messages: a pull style which is able to timeout while waiting for messages; and a push notification that will consume a message until unsubscribe. As this application is not concerned with timeouts we will use the push method. Here is the basic template copied from above:

#!/usr/local/bin/ruby18

require ‘gem’
require ‘stomp’

port = 61613
client = Stomp::Client.new( “password”, “user”,  “broker.host.name”, port )

A connection has now been established to the remote broker, so the next step is to subscribe to the queue to receive messages. A closure is passed to the subscribe method. This closure will be execute anytime the consume is notified a message has been sent to the queue. The following code will accomplish our goal.

client.subscribe(‘/queue/stdin’) {
  | message |
  puts message.body
}

If the application were to execute in the current state you would not consume any messages. This is because unlike many Ruby methods taking blocks as parameters this method does not block. To do so we must join with the client, or block until the connection to the broker is closed.

  client.join

Terminating the listener

If you have been playing along and attempt to execute the consumer in the current state you will notice the consumer will never exit. This is because connection to the remote broker is never close so our primary thread does not return. With the current design of the consumer the message processing block must explicitly close the connection to the broker.

The modification to be introduced to terminate will involve checking the header of an incoming message. The standard STOMP attribute ‘type’ if set and containing ‘terminate’ will cause the consumer to exit. Under all other conditions the consumer will continue to print the body of the received message.

client.subscribe(‘/queue/stdin’) {
  | message |
  if( message.headers[‘type’] == ‘terminate’ )
    client.close
  else
    puts message.body
  end
}

There is now a mismatching the protocols the consumer and producer are speaking. To fix this design issue the producer will be modified to send the correct termination header when the standard input has finished.

stdin.each { |line|
  client.send( “/queue/stdin”, line, { ‘amq-msg-type’=>’text’,’type’=>’content’})
}
client.send(“/queue/stdin”, “”, {‘amq-msg-type’=>’text’, ‘type’=>’terminate’})

At this point both the consumer and producer fulfil the problem statement. Please take a look at the completed sources!

Download the publishing agent.

Download the consuming agent.