Skip to main content
Ben Nadel
On User Experience (UX) Design, JavaScript, ColdFusion, Node.js, Life, and Love.

Generate And Incrementally Stream A ZIP Archive To Amazon S3 Using Multipart Uploads In Lucee CFML 5.3.7.47

By Ben Nadel on
Tags: ColdFusion

Last week, I looked at using the ZipOutputStream Java class to generate and incrementally stream a Zip archive to the browser using Lucee CFML. In response to that, James Moberg and I were having a discussion about generating Zip archives asynchronously. This got me thinking about pushing the Zip file up to Amazon S3. And, more specifically, if there was a way for me to incrementally stream the Zip archive to S3 as I was generating it. From what I can see, there's nothing about "streams" in the Java SDK for AWS. But, I have used S3's multipart upload workflow to break-apart a file transfer. As a fun experiment, I wanted to see if I could generate and incrementally stream a Zip archive to S3 using this multipart upload workflow in Lucee CFML 5.3.7.47.

The Amazon S3 multipart workflow is fairly straightforward (conceptually):

  1. Initialize a multipart upload and get a unique ID.
  2. Upload a number of binary chunks associated to the unique ID.
  3. Finalize the multipart upload.

On the S3 side, Amazon is literally just taking your binary chunks and then concatenating them together once the multipart upload is finalized. There's some finer-details about aborting requests and payments, etc; but, the basic idea is that you're taking one binary value and splitting it up across several PUT request bodies.

In my previous post, I was taking the ZipOutputStream and piping it directly into the output stream of the ColdFusion response which is how I was incrementally streaming the Zip to the browser. But, in this case, I want to translate the ZipOutputStream into a series of upload requests to S3. To do this, I'm going to write the ZipOutpuStream to a ByteArrayOutputStream. The nice thing about the ByteArrayOutputStream class is that I can inspect the content using the .toByteArray() method; and, I can reset the content using the .reset() method. This means I can slice chunks of data off the output stream as they become available.

I can also use the .size() method to check the length of the ByteArrayOutputStream. This is important because the multipart upload workflow has file-size minimums: when chunking a file across multiple PUT requests, each "part" has to be at least 5mb in size. Except for the last part, which has no restrictions.

Given the requirements, the algorithm for streaming a ZIP archive to S3 starts to look something like this:

  1. Download a file.
  2. Write it to the ZipOutputStream.
  3. Check the size of the ByteArrayOutputStream.
  4. If the size is less than 5mb, GOTO step 1.
  5. If the size is more than 5mb, slice off the buffered content.
  6. Push the slice up to S3.
  7. If there are more files to archive, GOTO step 1.
  8. If there are no more files to archive, finalize the multipart upload.

To explore this workflow, I'm going to revamp my previous post which archived remote image URLs using the STORED method. Only, instead of streaming the archive to the browser, I'm going to "stream it to S3". And, to make this even more exciting, I'm going to use Futures to perform the downloads and the uploads in parallel.

One of the nice things about the S3 multipart upload is that the parts are explicitly numbered. Which means, the parts don't have to arrive in order on the S3 side - they just have the identified properly. As long as all parts are accounted for at the time of finalization, the order in which the parts of arrive is irrelevant.

This means we can use the runAsync() function to push each part to S3 without blocking subsequent image downloads. Of course, we still have to block-and-wait for all part-uploads to finish before we finalize the multipart workflow. But, the .get() method on the ColdFusion futures will do this for us effortlessly.

ASIDE: ColdFusion Futures introduce more complexity, especially around error handling. For example, what happens if we abort a multipart upload while part-uploads are still running in the background? I am not accounting for such complexity in this exploration; but, it is something you would have to consider in a production context.

To run this experiment, I'm using Lucee CFML's ability to dynamically load classes from JAR files. And, I'm using the following JAR files which I downloaded from Maven:

  • avalon-framework-4.1.5.jar
  • aws-java-sdk-core-1.11.870.jar
  • aws-java-sdk-kms-1.11.870.jar
  • aws-java-sdk-s3-1.11.870.jar
  • commons-codec-1.11.jar
  • commons-logging-1.1.3.jar
  • commons-logging-1.2.jar
  • httpclient-4.5.9.jar
  • httpcore-4.4.11.jar
  • ion-java-1.0.2.jar
  • jackson-annotations-2.6.0.jar
  • jackson-core-2.6.7.jar
  • jackson-databind-2.6.7.3.jar
  • jackson-dataformat-cbor-2.6.7.jar
  • jmespath-java-1.11.870.jar
  • joda-time-2.8.1.jar

With that said, here's the ColdFusion code that I came up with:

<cfscript>

	// NOTE: We'll be using Lucee CFML's ability to create Java classes from a set of JAR
	// files. In this case, we have the JAR files for AWS SDK 1.11.870 locally.
	s3Client = new S3Client(
		awsAccessID = server.aws.accessID,
		awsSecretKey = server.aws.secretKey,
		awsRegion = server.aws.region,
		awsBucket = server.aws.bucket,
		jarPaths = directoryList(
			path = expandPath( "../aws-s3-sdk/" ),
			recurse = true,
			listInfo = "path",
			type = "file",
			filter = "*.jar"
		)
	);

	// NOTE: Defines an array, "imageUrls", that contains remote files to download and
	// then zip / archive together.
	include "./image_urls.cfm";

	// ------------------------------------------------------------------------------- //
	// ------------------------------------------------------------------------------- //

	// While we can't "stream" the ZIP archive to Amazon S3 in the purest sense, we can
	// "chunk" the ZIP archive using a Multipart Upload wherein we slice-off parts of the
	// Output Stream and upload them when they are large enough (5mb minimum). Every
	// upload request requires us to provide the S3 resource path (key).
	path = "multipart-stream/images-#createUniqueId()#.zip";
	contentType = "application/zip";

	// When we finalize the multipart upload, we have to provide all of the ETags
	// returned by the individual parts so that Amazon can ensure nothing was corrupted.
	// And, since we have to keep track of these, we'll use the LENGTH of this array to
	// determine the Part Number of each upload.
	partFutures = [];

	uploadID = s3Client
		.createMultipartUpload(
			resourcePath = path,
			resourceContentType = contentType
		)
		.id
	;

	try {

		ZipEntryClass = createObject( "java", "java.util.zip.ZipEntry" );

		// In order to chunk the ZIP archive, we need to write the output to an in-memory
		// stream so that we can read from the stream as it is being generated. This way,
		// as we reach our minimum-part-size, we can stop and upload it.
		binaryOutputStream = javaNew( "java.io.ByteArrayOutputStream" )
			.init()
		;
		zipOutputStream = javaNew( "java.util.zip.ZipOutputStream" )
			.init( binaryOutputStream )
		;

		// In a multipart upload, Amazon requires that all parts be AT LEAST 5mb in size.
		// Except for the last part, which has no limit. As such, we can only initial an
		// upload when our ByteArrayOutputStream exceeds this limit.
		minimumPartSize = ( 1024 * 1024 * 5 );

		imageUrls.each(
			( imageUrl ) => {

				info( "Downloading: #imageUrl#" );

				// Download the remote URL.
				// --
				// NOTE: How awesome is it that we can download the remote URL with a
				// simple file-read operation. Yo, come on. Lucee, you complete me!
				var imageFilename = getFileFromPath( imageUrl );
				var imageBinary = fileReadBinary( imageUrl );

				// Write the image binary to the Zip output stream. Notice that we are
				// using the STORED method which means that we're not actually
				// compressing the entries - we're just archiving them. Since image files
				// already have compression, this should result in faster processing /
				// less CPU overhead.
				var zipEntry = javaNew( "java.util.zip.ZipEntry" )
					.init( "streaming-zip/images/#imageFilename#" )
				;
				zipEntry.setMethod( ZipEntryClass.STORED );
				zipEntry.setSize( arrayLen( imageBinary ) );
				zipEntry.setCrc( crc32( imageBinary ) );

				zipOutputStream.putNextEntry( zipEntry );
				zipOutputStream.write( imageBinary );
				zipOutputStream.closeEntry();
				zipOutputStream.flush();

				info( "Buffer size: #numberFormat( binaryOutputStream.size() )#" );

				// Each upload in the multipart upload (less the last part) has to be at
				// least 5mb in size. As such, let's check our output stream after each
				// Zip entry to see if we've reached the upload size threshold.
				if ( binaryOutputStream.size() < minimumPartSize ) {

					return;

				}

				// We are OVER THE MINIMUM PART SIZE! At this point, we can strip-off
				// whatever ZIP archive data we have flushed to our output stream and
				// push it up to Amazon S3.
				info( "**** ByteArrayOutputStream size has passed minimum size requirement ****" );
				info( "**** Uploading part to S3 ****" );

				var partNumber = ( partFutures.len() + 1 );
				var partContent = binaryOutputStream.toByteArray();

				// To try and keep things moving along quickly, we're going to try
				// uploading the part IN PARALLEL with the rest of the operations using a
				// Future.
				partFutures.append(
					runAsync(
						() => {

							var apiResponse = s3Client.uploadPart(
								resourcePath = path,
								multipartUploadID = uploadID,
								partNumber = partNumber,
								content = partContent,
								isLastPart = false
							);

							info( "**** Completed part #partNumber# upload ****" );
							return( apiResponse );

						}
					)
				);

				// Reset the output stream so all new ZIP archive data will be pushed to
				// the next "part".
				binaryOutputStream.reset();

			}
		); // END: each().

		// Finalize the Zip content.
		zipOutputStream.flush();
		zipOutputStream.close();
		binaryOutputStream.close();

		info( "Uploading LAST part to S3" );

		// TODO: What if output stream is empty at this point? Is that something we have
		// to account for with the ZipOutputStream class?

		partNumber = ( partFutures.len() + 1 );
		partContent = binaryOutputStream.toByteArray();
		partFutures.append(
			runAsync(
				() => {

					var apiResponse = s3Client.uploadPart(
						resourcePath = path,
						multipartUploadID = uploadID,
						partNumber = partNumber,
						content = partContent,
						isLastPart = true
					);

					info( "**** Completed LAST PART upload ****" );
					return( apiResponse );

				}
			)
		);

		info( "Waiting for parts to finish uploading" );

		// At this point, we've INITIATED all the part uploads; but, they may not have
		// finished. By mapping the Futures onto the Responses we will block-and-wait
		// for each part to complete uploading.
		etags = partFutures.map(
			( future ) => {

				return( future.get().awsResponse.getPartETag() );

			}
		);

		apiResponse = s3Client.completeMultipartUpload(
			resourcePath = path,
			multipartUploadID = uploadID,
			multipartETags = etags
		);

		info( "*******************************" )
		info( "-- Multipart upload complete --" );
		info( "*******************************" )
		echo( "Zip archive complete: #path#" );

	// If any errors occur during the processing, let's abort the multi-part request so
	// that we don't keep getting charged for the parts that have been uploaded.
	} catch ( any error ) {

		s3Client.abortMultipartUpload(
			resourcePath = path,
			multipartUploadID = uploadID
		);

		dump( error );

	}

	// ------------------------------------------------------------------------------- //
	// ------------------------------------------------------------------------------- //

	/**
	* I compute the CRC-32 checksum for the byte array.
	*
	* @input I am the input being checked.
	*/
	public numeric function crc32( required binary input ) {

		var checksum = createObject( "java", "java.util.zip.CRC32" ).init();
		checksum.update( input );

		return( checksum.getValue() );
	}


	/**
	* I log the given message to the standard output on its own line.
	* 
	* @message I am the message being logged.
	*/
	public void function info( required string message ) {

		systemOutput( message, true );

	}


	/**
	* I create a Java class instance with the given class name. This is just a short-hand
	* method for the createObject() call.
	* 
	* @className I am the Java class being created.
	*/
	public any function javaNew( required string className ) {

		return( createObject( "java", className ) );

	}

</cfscript>

As you can see, whenever my ByteArrayOutputStream exceeds 5mb in size, I'm slicing off the binary content (byte array) and pushing it to Amazon S3 inside a runAsync() call. This call runs in the background and returns a Future. Then, once all uploads have completed, I finalize the multipart upload.

Now, if we run this ColdFusion code and watch the server logs, we get the following output:

Server logs showing a ZIP archive being generated and incrementally streamed to S3 using multipart uploads in Lucee CFML.

As you can see, as the files are being pulled-down and written to the Zip archive, the processed chunks of the archive are being slice-off (once they hit 5mb in size) and "streamed" to S3 in the background. Then, once all the files have been downloaded and the parts have been uploaded, the multipart upload is completed.

This is pretty cool! As I mentioned above, there's more to be considered in terms of error handling and how you make sure to clean-up after yourself (should an error occur). But, the basic concept here feels pretty good.

The 5mb limit in size is an interesting aspect of the multipart upload workflow. It does make me wonder how many ZIP archive I generate are actually over 5mb? But, regardless, it's nice to have this in my back pocket.

For completeness, here's the S3Client.cfc ColdFusion component that I created for the demo. It just wraps the Amazon AWS SDK for Java and exposes methods specifically for the multipart upload:

component
	output = false
	hint = "I provide a simple wrapper for the S3 Client JDK classes."
	{

	/**
	* I initialize the S3 Client wrapper with the given configuration.
	* 
	* JavaDocs: https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/
	*/
	public void function init(
		required string awsAccessID,
		required string awsSecretKey,
		required string awsRegion,
		required string awsBucket,
		required array jarPaths
		) {

		variables.awsAccessID = arguments.awsAccessID;
		variables.awsSecretKey = arguments.awsSecretKey;
		variables.awsRegion = arguments.awsRegion;
		variables.awsBucket = arguments.awsBucket;
		variables.jarPaths = arguments.jarPaths;

		variables.s3Client = createS3Client();

	}

	// ---
	// PUBLIC METHODS.
	// ---

	/**
	* I abort the multipart upload with the given ID.
	* 
	* @resourcePath I am the key at which the final upload was going to be stored.
	* @multipartUploadID I am the ID for the multipart upload workflow.
	*/
	public void function abortMultipartUpload(
		required string resourcePath,
		required string multipartUploadID
		) {

		var abortRequest = loadClass( "com.amazonaws.services.s3.model.AbortMultipartUploadRequest" )
			.init( awsBucket, resourcePath, multipartUploadID )
		;

		s3Client.abortMultipartUpload( abortRequest );

	}


	/**
	* I complete the multipart upload with the given ID.
	* 
	* @resourcePath I am the key at which the final upload is going to be stored.
	* @multipartUploadID I am the ID for the multipart upload workflow.
	* @multipartETags I am the collection of ETags returned from the part-uploads.
	*/
	public struct function completeMultipartUpload(
		required string resourcePath,
		required string multipartUploadID,
		required array multipartETags
		) {

		var completeRequest = loadClass( "com.amazonaws.services.s3.model.CompleteMultipartUploadRequest" )
			.init( awsBucket, resourcePath, multipartUploadID, multipartETags )
		;

		var awsResponse = s3Client.completeMultipartUpload( completeRequest );

		return({
			awsResponse: awsResponse
		});

	}


	/**
	* I initialize a multipart upload for the given path.
	* 
	* @resourcePath I am the key at which the final upload is going to be stored.
	* @resourceContentType I am the mime-type of the final upload.
	*/
	public struct function createMultipartUpload(
		required string resourcePath,
		required string resourceContentType
		) {

		var uploadMetadata = loadClass( "com.amazonaws.services.s3.model.ObjectMetadata" ).init();
		uploadMetadata.setContentType( resourceContentType );

		var uploadRequest = loadClass( "com.amazonaws.services.s3.model.InitiateMultipartUploadRequest" )
			.init( awsBucket, resourcePath, uploadMetadata )
		;

		var awsResponse = s3Client.initiateMultipartUpload( uploadRequest );

		return({
			id: awsResponse.getUploadId(),
			awsResponse: awsResponse
		});

	}


	/**
	* I upload the given binary content as a unique part in a multipart upload.
	* 
	* @resourcePath I am the key at which the final upload is going to be stored.
	* @multipartUploadID I am the ID for the multipart upload workflow.
	* @partNumber I am the unique part number (1 - 10,000).
	* @content I am the binary / byte array data for the part.
	* @isLastPart I determine if this is the last part in the workflow.
	*/
	public struct function uploadPart(
		required string resourcePath,
		required string multipartUploadID,
		required numeric partNumber,
		required binary content,
		required boolean isLastPart
		) {

		var uploadRequest = loadClass( "com.amazonaws.services.s3.model.UploadPartRequest" )
			.init()
			.withBucketName( awsBucket )
			.withKey( resourcePath )
			.withUploadId( multipartUploadID )
			.withPartNumber( partNumber )
			.withLastPart( isLastPart )
			.withInputStream( asByteArrayInputStream( content ) )
			.withPartSize( arrayLen( content ) )
		;

		var awsResponse = s3Client.uploadPart( uploadRequest );

		return({
			awsResponse: awsResponse
		});

	}

	// ---
	// PRIVATE METHODS.
	// ---

	/**
	* I create a byte-array input stream from the given binary value.
	* 
	* @value I am the binary payload being wrapped.
	*/
	private any function asByteArrayInputStream( required binary value ) {

		return( javaNew( "java.io.ByteArrayInputStream" ).init( value ) );

	}


	/**
	* I create the S3 Client instance that this component is wrapping.
	*/
	private any function createS3Client() {

		// Load some class that we'll use for static references.
		var ProtocolClass = loadClass( "com.amazonaws.Protocol" );
		var RegionClass = loadClass( "com.amazonaws.regions.Region" );
		var RegionsClass = loadClass( "com.amazonaws.regions.Regions" );

		var clientConfig = loadClass( "com.amazonaws.ClientConfiguration" ).init();
		clientConfig.setProtocol( ProtocolClass.HTTPS );
		clientConfig.setMaxErrorRetry( 4 );

		var clientCredentials = loadClass( "com.amazonaws.auth.BasicAWSCredentials" )
			.init( awsAccessID, awsSecretKey )
		;

		var clientOptions = loadClass( "com.amazonaws.services.s3.S3ClientOptions" )
			.init()
			.withPathStyleAccess( true )
		;

		var clientRegion = RegionClass.getRegion( RegionsClass.fromName( awsRegion ) );

		var s3Client = loadClass( "com.amazonaws.services.s3.AmazonS3Client" )
			.init( clientCredentials, clientConfig )
		;
		s3Client.setS3ClientOptions( clientOptions );
		s3Client.setRegion( clientRegion );

		return( s3Client );

	}


	/**
	* I load the Java class with the given name using the base class loader.
	* 
	* @className I am the class to load.
	*/
	private any function javaNew( required string className ) {

		return( createObject( "java", className ) );

	}


	/**
	* I load the Java class with the given name (using the AWS SDK JAR files).
	* 
	* @className I am the class to load.
	*/
	private any function loadClass( required string className ) {

		return( createObject( "java", className, jarPaths ) );

	}

}

Lucee CFML is just bad-ass, amiright?! Not only does it offer amazing functionality on its own; but, it's built on top of Java, which means that we get to leverage decades-worth of battle-tested structures like the ZipOutputStream and the ByteArrayOutputStream. This is what allows us to generate Zip archives and stream them, incrementally, to Amazon S3 without breaking a sweat!



Reader Comments

What has two thumbs and hopes you leave a comment? This Guy! (Ben Nadel).

Post A Comment

You — Get Out Of My Dreams, Get Into My Blog
Live in the Now
Oops!
NEW: Some basic markdown formatting is now supported: bold, italic, blockquotes, lists, fenced code-blocks. Read more about markdown syntax »
Comment Etiquette: Please do not post spam. Please keep the comments on-topic. Please do not post unrelated questions or large chunks of code. And, above all, please be nice to each other - we're trying to have a good conversation here.