05 April 2014

Java EE 7 brings some not so obvious additions. One of them is in particular interesting for resource adapter developers. This posting shows what this is about, how to run it on Wildfly and on TomEE, and how to test it.

Even though it seems that all new features of Java EE 7 are around HTML5 including WebSockets, ManagedExecutors and Batch there are also some other more subtle additions that are worth trying out. One very interesting point for developers of resource adapters that went into EJB 3.2 and JCA 1.7 is the addition of message driven beans with no-method listener interfaces. Thanks go to David Blevins of Tomitribe for pointing me on that. Instead of having predefined methods on an interface Java EE 7 allows to reflect the MDB class and therefore invoke any public method of that.

In this way an application is able to allow for developing MDBs that feel like JAX-RS endpoints. So you can check the methods of the MDB for annotations and dynamically dispatch an event of the connector. Here I present an example that you can also find on https://github.com/robertpanzer/filesystemwatch-connector. This resource adapter uses the Java SE 7 java.nio FileSystem WatchService to observe a directory for file changes. Whenever a file is created, deleted or modified in that directory the MDB should be called.

The code

Let’s start with the API. The API for an inbound resource adapter is the listener interface. The user of that API is the MDB. To profit from this new programming model the listener interface has to be empty. So it simply looks like this:

public interface FSWatcher {}

The MDB should implement methods that declare the type of event and the type of files for which they should be called, so that we get from MDBs having methods like onCreate(File f) to something like this:

@MessageDriven(activationConfig = {
	@ActivationConfigProperty(propertyName = "dir", propertyValue = ".") })
public class FSWatcherMDB implements FSWatcher {
	@Create(".*\\.txt")
	public void onNewTextFile(File f) {...}

	@Create(".*\\.pdf")
	public void onNewPdfFile(File f) {...}

	@Delete(".*\\.txt")
	public void onDeleteTextFile(File f) {...}
}

That’s pretty amazing, right? The MDB declares in its activationspec the directory it wants to watch. For every kind of event and file type it implements a method without having to dispatch itself. This code is very expressive and concise.

The last missing part now is the resource adapter. The interesting part is the method endpointActivation that is called to register the MDB to the resource adapter:

	@Override
	public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) throws ResourceException {
		FSWatcherActivationSpec fsWatcherAS = (FSWatcherActivationSpec) activationSpec;

		try {
			WatchKey watchKey =
			    fileSystem.getPath(
			        fsWatcherAS.getDir()).register(
			            watchService,
			            StandardWatchEventKinds.ENTRY_CREATE,
			            StandardWatchEventKinds.ENTRY_DELETE,
			            StandardWatchEventKinds.ENTRY_MODIFY);
			listeners.put(watchKey, endpointFactory);

			endpointFactoryToBeanClass.put(
					endpointFactory,
					endpointFactory.getEndpointClass()); (1)
		} catch (IOException e) {
			throw new ResourceException(e);
		}
	}
	public Class<?> getBeanClass(MessageEndpointFactory endpointFactory) {
		return endpointFactoryToBeanClass.get(endpointFactory);
	}
  1. endpointFactory.getEndpointClass() returns the pure Class of the MDB, not the version of the application server that decorates or subclasses the MDB.

Now when an event is fired by the watch service the resource adapter can check the MDBs methods and call them if the annotations match. All this is done on a separate thread spawned by the resource adapter that waits for events from the WatchService.

	private void dispatchEvents(List<WatchEvent<?>> events, MessageEndpointFactory messageEndpointFactory) {
		for (WatchEvent<?> event: events) {
			Path path = (Path) event.context();

			try {
				MessageEndpoint endpoint = messageEndpointFactory.createEndpoint(null);
				Class<?> beanClass = resourceAdapter.getBeanClass(messageEndpointFactory);
				for (Method m: beanClass.getMethods()) {
					if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())
							&& m.isAnnotationPresent(Create.class)
							&& path.toString().matches(m.getAnnotation(Create.class).value())) {
						invoke(endpoint, m, path);
					} else if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())
							&& m.isAnnotationPresent(Delete.class)
							&& path.toString().matches(m.getAnnotation(Delete.class).value())) {
						invoke(endpoint, m, path);
					} else if (StandardWatchEventKinds.ENTRY_MODIFY.equals(event.kind())
							&& m.isAnnotationPresent(Modify.class)
							&& path.toString().matches(m.getAnnotation(Modify.class).value())) {
						invoke(endpoint, m, path);
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

Great stuff, isn’t it? As a resource adapter developer you are now able to deliver a much more comfortable and elegant programming model to your MDB developers.

You said TomEE! Where is it?

TomEE+ supports JCA resource adapters, even though it is declared as only being certified for the Java EE Web Profile. But it is the Java EE 6 Web Profile so you might think that this will not work. But in fact it does! With a small difference.

If the activation spec defines methods getBeanClass() and setBeanClass(Class<?> cls) TomEE will set the bean class on the activation spec. So instead of calling EndpointFactory.getBeanClass() you simply call FSWatcherActivationSpec.getBeanClass(), so that you can change the method endpointActivation from above like this and it will work on TomEE as well as on Wildlfy:

			endpointFactoryToBeanClass.put(
					endpointFactory,
					fsWatcherAS.getBeanClass() != null ? fsWatcherAS.getBeanClass() : endpointFactory.getEndpointClass());

Show me that it works

Testing this is certainly done using the Arquillian framework. If you do not know that yet and you are a Java EE developer you should definitely try that out.

A first positive test should look like:

  1. Create a file in the directory referred to by the activation spec

  2. Check that the MDB was called with the right path

The fact that the MDB is called asynchronously adds an additional difficulty to the test. The first naïve approach would be to wait a certain amount of time and check if a method has been called on the MDB. But that will make your tests run very long even when everything is ok and the MDB is called immediately. Or your tests could become fragile if you wait for a too short amount of time.

Andrew Lee Rubinger and Aslak Knutsen propose an improved approach in their book "Continuous Enterprise Development in Java" using the class java.util.concurrent.CyclicBarrier added by Java SE 7. It implements a barrier that most of us probably know from the CS lectures in parallel programming. Together with CDI events this makes test pass immediately if everything works fine and make it only wait if there is a failure.

So the idea is that the MDB fires a CDI event if a method is called. This event is observed by test class that walks into the barrier. The test method is the second party going into the barrier.

The test MDB basically looks like this:

@MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "dir", propertyValue = ".") })
public class FSWatcherMDB implements FSWatcher {

	@Inject
	private Event<FileEvent> fileEvent;

	@Create(".*\\.txt")
	public void onNewTextFile(File f) {
		fileEvent.fire(new FileEvent(FileEvent.CREATE, f));
	}
	...
}

The test class looks like this:

@RunWith(Arquillian.class)
public class ResourceAdapterTest {

	@Deployment
	public static EnterpriseArchive deploy() throws Exception {...}

	private static CyclicBarrier barrier; (1)

	private static File newFile;          (1)

	private static int mode;              (1)

	@Before
	public void init() throws Exception {
		newFile = null;
		mode = 0;
		barrier = new CyclicBarrier(2);  (2)
	}

	@Test
	public void testTxtFile() throws Exception {

		File tempFile = new File(".", "testFile.txt");
		assertTrue("Could not create temp file", tempFile.createNewFile());

		barrier.await(10, TimeUnit.SECONDS);

		assertEquals(tempFile.getName(), newFile.getName());
		assertEquals(FileEvent.CREATE, mode);
	}

	public void notifyFileEvent(@Observes FileEvent fileEvent) {
		mode = fileEvent.getMode();
		newFile = fileEvent.getFile();
		try {
			barrier.await();
		} catch (InterruptedException | BrokenBarrierException e) {
			e.printStackTrace();
		}
	}

}
  1. These members are static because the CDI event will be delivered to another instance of the test class created by the CDI runtime.

  2. Declares a barrier for 2 parties, that means barrier.await() finished as soon as 2 threads call that method or the given timeout elapsed.

I want to try that


comments powered by Disqus