Asynchronous tests with GPars (Osoco test gallery, part 4)

If you wrote multithreaded code in Groovy/Grails, I’m sure you stumbled upon the GPars library. It simplifies parallel processing – sometimes it’s as easy as wrapping a code fragment with a GPars closure, changing the iteration method (in our example from each to eachParallel) and passing the closure to a GPars pool. The library implementation handles the executor pool creation and adds new methods to collections.

Whenever I deliberately add multithreading, I always separate responsibilities (processing logic and concurrency handling). I wrap the single-threaded class with a new one, handing the parallel execution (creating a pool, managing threads, handling cancellation, etc.):

class MultithreadedProcessorService {
    ProcessorService processorService
    def threadPoolSize = ConfigurationHolder.config.threadPool.size

    void processMultithreaded(batches) {
        GParsPool.withPool(threadPoolSize) {
            batches.eachParallel { batch ->
                processorService.process(batch)
            }
        }
    }
}

class ProcessorService {
    void process(batch) {
        // some processing
    }
}

To test the multithreaded class you can you use Spock interactions. As opposed to Groovy and Grails mocks, they are thread-safe and you won’t get any weird and unstable results:

class MultithreadedProcessorServiceSpec extends UnitSpec {
    private MultithreadedProcessorService multithreadedProcessorService
    // Using Spock mocks because they are thread-safe
    private ProcessorService processorService = Mock(ProcessorService)

    def setup() {
        mockConfig('threadPool.size=2')
        multithreadedProcessorService = new MultithreadedProcessorService(
            processorService: processorService
        )
    }

    def 'processes batches using multiple threads'() {
        given:
        def batches = [[0, 1], [2, 3, 4], [5]]
        def processedBatches = markedAsNotProcessed(batches)

        when:
        multithreadedProcessorService.processMultithreaded(batches)

        then:
        batches.size() * processorService.process({ batch ->
            processedBatches[batch] = true
            batch in batches
        })
        assertAllProcessed(processedBatches)

    }

    private markedAsNotProcessed(batches) {
        batches.inject([:]) { processed, batch ->
            processed[batch] = false
            processed
        }
    }

    private void assertAllProcessed(batches) {
        assert batches*.value == [true] * batches.size()
    }
}

This post series present the best of Osoco tests – tests that were tricky or we are just proud of. You can find a runnable source code for this test and more in the Grails Test Gallery project shared on GitHub.

Advertisements