JMS queue listener integration tests with Grails (Osoco test gallery, part 1)

In our application we have to consume JMS messages. We install Grails JMS plugin and write a listener for this task. To not violate SRP, our consumer receives a message and routes it to a MessageProcessorService for further processing.

class QueueListenerService {
    static final QUEUE_NAME = 'aQueue'
    static exposes = ['jms']

    MessageProcessorService messageProcessorService

    @Queue(name = QueueListenerService.QUEUE_NAME)
    void onMessage(msg) {
        messageProcessorService.process(msg)
    }
}

MessageProcessorService is not relevant for this test; we have a stub for it:

class MessageProcessorService {
    void process(msg) {
        log.info("Processing message ${msg}")
    }
}

We use Spock for testing. We want to verify that our listener:

  • receives a message and passes it to further processing
  • message is returned to the queue if an exception is thrown (assuming that we configured a redelivery policy)

In this kind of spec we have to deal with asychrony. We are going to send a message from the spec, but it will be received by another thread. We synchronize the test and listener threads by a signal (CountDownLatch) that will indicate us that the JMS message has been received and passed to the processor. We initialize the latch at the beginning of the test. Then we send the message and block until the counter reaches zero (that means, our mock has received the message).

Last but not least we want to check if the queue either has been emptied (first test method) or not (second test method). Once again we have to handle the asynchronous behaviour of JMS. After receiving the message we start to poll the queue checking if the given condition (queue empty or not ) is fulfilled. We establish a 5 second timeout – it should be enough for JMS broker to handle a processing exception (see the Timeout helper class).

So, ladies and gentlemen, the final code:

class QueueListenerServiceSpec extends IntegrationSpec {
    private static final QUEUE_RECEPTION_TIMEOUT_SEC = 5
    private static final QUEUE_POLL_TIMEOUT_MILLIS = 5000
    private static final QUEUE_POLL_INTERVAL_MILLIS = 500

    QueueListenerService queueListenerService
    JmsService jmsService

    private messageReceived = new CountDownLatch(1)
    private messageProcessor = Mock(MessageProcessorService)

    def setup() {
        queueListenerService.messageProcessorService = messageProcessor
    }

    def 'receives a message and passes it to further processing'() {
        when:
        jmsService.send(QueueListenerService.QUEUE_NAME, message())
        messageReceived.await(QUEUE_RECEPTION_TIMEOUT_SEC, SECONDS)

        then:
        1 * messageProcessor.process({ Map msg ->
            messageReceived.countDown()
            msg == message()
        })

        assertQueue(empty())
    }

    def 'message is returned to the queue if an exception is thrown'() {
        given:
        messageProcessor.process(_) >> {
            messageReceived.countDown()
            throw new IllegalStateException('Processing error')
        }

        when:
        jmsService.send(QueueListenerService.QUEUE_NAME, message())
        messageReceived.await(QUEUE_RECEPTION_TIMEOUT_SEC, SECONDS)

        then:
        assertQueue(notEmpty())
    }

    private message() {
        [key: 'a value']
    }

    private void assertQueue(condition) {
        def timeout = new Timeout(QUEUE_POLL_TIMEOUT_MILLIS)
        while (!condition.fulfilled()) {
            if (timeout.hasTimedOut()) {
                throw new AssertionError(condition.describeFailure())
            }

            sleep(QUEUE_POLL_INTERVAL_MILLIS)
        }
    }

    private empty() {
        [
            fulfilled: { jmsService.browse(QueueListenerService.QUEUE_NAME) == []},
            describeFailure: 'Expected queue to be empty'
        ]
    }

    private notEmpty() {
        [
            fulfilled: { jmsService.browse(QueueListenerService.QUEUE_NAME) != []},
            describeFailure: 'Expected queue to be NOT empty'
        ]
    }
}
class Timeout {
    private endTime

    Timeout(duration) {
        endTime = System.currentTimeMillis() + duration
    }

    def hasTimedOut() {
        timeRemaining() <= 0
    }

    def timeRemaining() {
        endTime - System.currentTimeMillis()
    }
}

This post series present the best of Osoco tests – tests that were tricky or we are just proud of. You can find a runnable source code for this test and more in the Grails Test Gallery project shared on GitHub.

Advertisements