Skip to main content
Ben Nadel at cf.Objective() 2014 (Bloomington, MN) with: Simon Free
Ben Nadel at cf.Objective() 2014 (Bloomington, MN) with: Simon Free ( @simonfree )

Using Java's Concurrent Queues For Asynchronous Processing In Lucee CFML 5.3.7.47

By on
Tags:

I'm utterly fascinated with asynchronous processing. But, I also know that there's always a healthy tension between complexity and performance, stability, and availability. Which is why I'm constantly noodling on different ways to perform asynchronous processing (such as with Task threads in Lucee CFML). The other day, I came across a set of "Concurrent Queues" in Java. These are queues that use efficient locking and non-locking techniques to allow for safe, high-throughput queues in a multi-threaded environment. These piqued my interesting; and, I wanted to see how I might be able to leverage these concurrent queues for asynchronous processing in Lucee CFML 5.3.7.47.

To be clear, anything that can be done in one of Java's Concurrent Queues can also be done with a ColdFusion Array. The major difference is that with a ColdFusion Array, we have to implement all the locking and synchronization on our own. To see what I mean, take a look at my old AsyncTaskQueue.cfc project - it has four CFLock tags in it. Each one of these locks represents a possible choke-point in the application performance; not to mention decreased readability and an opportunity to introduce bugs. If we can offload these locks to the Java layers, it will likely mean better performance and fewer bugs.

To explore this idea, I'm going to use the AsyncTaskQueue.cfc pattern in which we have an abstract base class that provides the queuing mechanics. Then, we'll implement a concrete sub-class that consumes the internal queue provided by the super class.

One of the most obvious uses-cases for asynchronous processing is analytics tracking. Analytics typically entails non-mission-critical data that - ideally - should never block processing or disrupt the parent request. Analytics events should be queued-up and then processed in the background.

To this end, I've created an AnalyticsService.cfc ColdFusion component that extends an AbstractAsyncQueue.cfc base component. The AbstractAsyncQueue.cfc component provides an internal queue while the AnalyticsService.cfc provides the logic for processing the items in that queue.

To see how this all fits together, let's start from the outside-in, looking at how our application logic might track analytics events - assume that my AnalyticsService.cfc has been cached in the application scope:

<cfscript>

	application.analyticsService.trackAsync(
		userID = 4,
		event = {
			type: "LoggedIn",
			source: "homepage",
			experiment: "A3"
		}
	);
	application.analyticsService.trackAsync(
		userID = 4,
		event = {
			type: "HomePage.Viewed"
		}
	);
	application.analyticsService.trackAsync(
		userID = 4,
		event = {
			type: "HomePage.FilterSelected"
		}
	);
	application.analyticsService.trackAsync(
		userID = 4,
		event = {
			type: "HomePage.ProjectSelected",
			projectID: 1234
		}
	);
	application.analyticsService.trackAsync(
		userID = 4,
		event = {
			type: "ProjectDetail.Loaded",
			projectID: 1234
		}
	);

	systemOutput( "Top-level page has completed.", true, true );

</cfscript>

There's nothing too interesting happening here - we're just calling the .trackAsync() method a bunch of times. The "Async" token in the method name is meant to imply a few things:

  • The events are track asynchronously - that's the obvious one.

  • Calling this method is safe - this method should never raise an exception and should not need to be wrapped in any try/catch blocks.

Ok, now let's look at how the AnalyticsService.cfc is implemented. As mentioned above, this ColdFusion component is going to extend an abstract base class which means that it interacts with the base class using both explicit super calls and concrete methods. Roughly speaking the interface for the AbstractAsyncQueue.cfc is this:

  • abstract void handleError( required any error )
  • abstract void processItem( required any item )
  • boolean addItem( required any item )

The .addItem() method allows the concrete sub-class to push items onto the abstract queue while the abstract method, .processItem(), allows the super-class to push the processing details of the items down into the sub-class.

Here's the AnalyticsService.cfc:

component
	extends = "AbstractAsyncQueue"
	output = false
	hint = "I provide methods for tracking analytics events."
	{

	/**
	* I initialize the analytics service with the given queue capacity. In order to
	* prevent blocking of the parent request, events are queued-up internally and then
	* processed in a background thread. By default, the queue is unbounded and will eat
	* up as much memory as it can get (assuming that the asynchronous processing has
	* failed in some way). But, if a max queue length is provided, events will be "shed"
	* (ie, discarded) once the queue hits its max size.
	* 
	* @maxQueueLength I am the max length of the internal queue. 0 means no max length.
	*/
	public void function init( numeric maxQueueLength = 0 ) {

		variables.maxQueueLength = arguments.maxQueueLength;

		super.init( capacity = maxQueueLength );

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I track the given event, processing it synchronously.
	* 
	* CAUTION: When called a means to process events synchronously, you must explicitly
	* handle any errors raised during processing.
	* 
	* @userID I am the user being tracked.
	* @event I am the event data being tracked.
	*/
	public void function track(
		required numeric userID,
		required struct event
		) {

		sleep( randRange( 100, 1000 ) );
		systemOutput( "Processing event: #event.type#", true, true );

	}


	/**
	* I queue-up the given tracking event and process it asynchronously.
	* 
	* @userID I am the user being tracked.
	* @event I am the event data being tracked.
	*/
	public void function trackAsync(
		required numeric userID,
		required struct event
		) {

		// CAUTION: The item pushed queued-up will be consumed later-on as invocation
		// arguments for the track() method using the "argumentCollection" syntax.
		var itemAdded = super.addItem( arguments );

		// NOTE: Adding an item will only ever fail if the service was initialized with a
		// maxQueueLength (and the queue is fully populated).
		if ( ! itemAdded ) {

			systemOutput( "CAUTION: AnalyticsService is shedding load, capacity ( #maxQueueLength# ).", true, true );

		}
		
	}

	// ---
	// CONCRETE METHODS.
	// ---

	/**
	* CONCRETE METHOD FOR ABSTRACT SUPER CLASS. NOT INTEDED FOR PUBLIC USE.
	* --
	* I handle the given error. Should be limited to CFLock, CFThread, and item
	* processing errors. This method may be invoked during either the top-level page
	* request or inside the asynchronous worker thread.
	* 
	* @error I am the error being handled.
	*/
	public void function handleError( required any error ) {

		systemOutput( error, true, true );

	}


	/**
	* CONCRETE METHOD FOR ABSTRACT SUPER CLASS. NOT INTEDED FOR PUBLIC USE.
	* --
	* I process the given item. This method will be invoked inside the asynchronous
	* worker thread. Errors raised inside this method will be caught by the super 
	* class and passed to handleError().
	* 
	* @item I am the queued item being processed.
	*/
	public void function processItem( required any item ) {

		track( argumentCollection = item );

	}

}

In a production application, our .processItem() method would likely push analytics events to an external system like Segment, NewRelic, or Amplitude; but, in this case, I'm just logging events to the console. After all, our goal here is simply to explore the queuing feature in Java.

That said, we can see here that the .trackAsync() method is just turning around and calling super.addItem() on the base class. The base class then queues that item up and starts a background worker thread which ultimately calls the .processItem() method.

Now let's look at the AbstractAsyncQueue.cfc super class. If you noticed above, the super constructor can be invoked with a capacity parameter. The existing of this non-zero value will determine which Concurrent Queue is used internally: one with a max-size; or, an unbounded one. For this demo, assume that I've instantiated my AnalyticsService.cfc with a maxQueueLength of 3.

component
	modifier = "abstract"
	output = false
	hint = "I provide a abstract base class for an asynchronous, atomic queue."
	{

	/**
	* I initialize the asynchronous atomic queue. If a capacity is provided, a bounded
	* queue will be used (and will shed-load if the capacity is reached). If no capacity
	* is provided, an unbounded queue will be used.
	* 
	* @capacity I determine the max size of the bounded queue. 0 means no max size.
	*/
	public void function init( numeric capacity = 0 ) {

		variables.queue = ( capacity )
			? createObject( "java", "java.util.concurrent.LinkedBlockingQueue" ).init( capacity )
			: createObject( "java", "java.util.concurrent.ConcurrentLinkedQueue" ).init()
		;

		// When we spawn the asynchronous CFThread tag, we're going to keep a reference
		// to it so that we can determine if background processing is taking place.
		variables.asyncThread = nullValue();
		variables.asyncThreadLockName = "AbstractAsyncQueue.Lock.#createUniqueID()#";

		// Used for demo debugging.
		variables.concreteClassName = getComponentMetadata( this ).name;

	}

	// ---
	// ABSTRACT METHODS.
	// ---

	/**
	* ABSTRACT METHOD: I handle any errors raised during item queuing or item processing.
	* 
	* @error I am the error being raised.
	*/
	public void function handleError( required any error )
		modifier = "abstract"
		{

		throw( type = "MethodNotImplemented" );

	}


	/**
	* ABSTRACT METHOD: I process the given item which has been dequeued. Any errors
	* raised during the processing will be caught and handed-off to the handleError()
	* method for logging.
	* 
	* @item I am the queue item to process.
	*/
	public void function processItem( required any item )
		modifier = "abstract"
		{

		throw( type = "MethodNotImplemented" );

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I add the given item to the internal queue. Processing of the queue will be handled
	* asynchronously. Any error raised during the item queuing process will be handed-off
	* to the handleError() method.
	* 
	* When using a BOUNDED queue (with a capacity), this methods returns a FALSE if the
	* item exceeded capacity and had to be skipped. Otherwise, this method returns TRUE.
	* 
	* @item I am the item to add to the queue.
	*/
	public boolean function addItem( required any item ) {

		try {

			return( queue.offer( item ) );

		} finally {

			ensureAsyncQueueProcessing();

		}

	}

	// ---
	// PRIVATE METHODS.
	// ---

	/**
	* I ensure that the background CFThread tag is running and processing items.
	*/
	private void function ensureAsyncQueueProcessing() {

		if ( isAsyncThreadRunning() ) {

			return;

		}

		try {

			// Perform a DOUBLE-CHECK LOCK to make sure that we only spawn ONE THREAD
			// in a concurrent request model.
			lock
				name = asyncThreadLockName
				type = "exclusive"
				timeout = 5
				throwOnTimeout = true
				{

				// Make sure that a CFThread wasn't spawned while we were waiting to
				// obtain the lock (this is the second check in the "DOUBLE-CHECK" lock
				// algorithm).
				if ( isAsyncThreadRunning() ) {

					return;

				}

				// CAUTION: Since this method may end up getting called multiple times in
				// a single request, we must make sure that it is uniquely named.
				var asyncThreadName = "AbstractAsyncQueue.Thread.#createUniqueID()#";

				thread
					name = asyncThreadName
					priority = "low"
					{

					systemOutput( "Spawning worker thread for #concreteClassName#.", true, true );

					while ( true ) {

						// Get the next item or NULL if the queue is empty.
						var item = queue.poll();

						if ( isNull( item ) ) {

							// RACE CONDITION REMEDIATION: There's a race condition in
							// between when we just checked for the last queue item and
							// when a new item may get pushed onto the queue while the
							// CFThread "appears" to still be processing items. To help
							// cope with this case, we're going to explicitly remove the
							// Thread reference and then perform one more check to see if
							// there are any items in the queue.
							// --
							// NOTE: The "variables" scope MUST BE PROVIDED or this
							// assignment will affect the CFThread local-scope, not the
							// intended shared scope of the component.
							variables.asyncThread = nullValue();

							// If a new item has been pushed onto the queue AFTER our
							// internal check but BEFORE we nullified the Thread, then
							// let's just spawn the worker thread one more time.
							if ( ! isNull( queue.peek() ) ) {

								ensureAsyncQueueProcessing();

							}

							return;

						}

						try {

							processItem( item );

						} catch ( any error ) {

							handleError( error );

						}

					} // END: While-loop.

				} // END: Worker-thread.

				// Now that we've spawned the thread, keep track of it so that subsequent
				// requests will be able to check its execution status.
				asyncThread = cfthread[ asyncThreadName ];

			} // END: Lock.

		} catch ( any error ) {

			// The two possible errors at this point should be limited to Lock timeouts
			// and failures to spawn a new thread.
			handleError( error );

		}

	}


	/**
	* I determine if the asynchronous processing thread is running.
	*/
	private boolean function isAsyncThreadRunning() {

		// NOTE: Struct access is implicitly synchronized in ColdFusion. As such, we
		// don't have to worry about checking this property in a multi-threaded context.
		switch ( asyncThread.status ?: "" ) {
			case "NOT_STARTED":
			case "RUNNING":
				return( true );
			break;
			// I'm including the non-running cases (in addition to "default") primarily
			// for documentation of the CFThread behavior.
			case "TERMINATED":
			case "COMPLETED":
			case "WAITING":
			default:
				return( false );
			break;
		}

	}

}

Earlier, I mentioned that an old implementation of an asynchronous task queue had 4 CFLock tags. Now, with this implementation - when using Java's Concurrent Queue features - I only have a single lock. And, that lock is only used if there is no existing CFThread running in the background.

Now, one thing to notice is that my CFThread tag calls itself recursively. This may be a Lucee CFML-Only feature. I know that this did not work in Adobe ColdFusion (ACF) 2018; though, I'm not sure if this has changed at all in ACF 2021.

In this case, I am using a recursive CFThread tag to hedge against a race condition in which a new item may be pushed onto the internal queue just as the CFThread tag is about to exit.

Now, if we run our test code, that invokes our AnalyticsService.cfc, we get the following terminal output:

CAUTION: AnalyticsService is shedding load, capacity ( 3 ).
Spawning worker thread for scribble.atomic-queue.AnalyticsService.
CAUTION: AnalyticsService is shedding load, capacity ( 3 ).
Top-level page has completed.
Processing event: LoggedIn
Processing event: HomePage.Viewed
Processing event: HomePage.FilterSelected

Notice that the top-level page completed before any of the events were processed asynchronously. And, that we "shed" (ie, discarded) two of the events because we immediately exceeded our queue capacity of 3.

This is cool stuff! I haven't tried this specific code in a production environment; but, I've used variations on this code for years - only, using native ColdFusion Arrays and a lot of locking. So, I have no reason to think this would cause an issue. But, I'm excited that by using Java's Concurrent Queues, I can improve performance, reduce bugs, and increase readability in some of my asynchronous processing in Lucee CFML.

Epilogue: Why Not Just Use a Message Queue or a Redis List?

When processing data asynchronously, your default instinct might be to push the data into an external queuing system like Amazon SQS, RabbitMQ, or a Redis List. And, there's nothing wrong with that. But, using an external message queue adds a non-trivial amount of complexity. It also means that you have another piece of infrastructure that you have to maintain and interface with. Which, when it comes to mission critical data, may very well be worth the effort.

But, as I mentioned above, every application necessarily has a healthy tension between complexity and performance, stability, and availability. In this case, for something like analytics events, I think it is perfectly legitimate for that tension to bias towards simplicity at the potential cost of an "ility" like "durability". There is no one-size fits all approach to data processing.

Want to use code from this post? Check out the license.

Reader Comments

I believe in love. I believe in compassion. I believe in human rights. I believe that we can afford to give more of these gifts to the world around us because it costs us nothing to be decent and kind and understanding. And, I want you to know that when you land on this site, you are accepted for who you are, no matter how you identify, what truths you live, or whatever kind of goofy shit makes you feel alive! Rock on with your bad self!
Ben Nadel