109. Scalable Batch Processing - qyjohn/AWS_Tutorials GitHub Wiki
In this training, we will use the SQS service to implement a scalable batch processing system.
(1) SQS Basics
Before you get started, we recommend that you read the following AWS documentation:
Then, follow these steps to create a queue:
Also, you need to use the AWS CLI to create, describe, and delete SQS queues:
It is important to understand that SQS is a distributed queue system. Multiple copies of your messages are stored on multiple servers for redundancy and high availability.
-
A standard queue makes a best effort to preserve the order of messages, but more than one copy of a message might be delivered out of order. If your system requires that order be preserved, we recommend using a FIFO (First-In-First-Out) queue or adding sequencing information in each message so you can reorder the messages when they're received.
-
Amazon SQS stores copies of your messages on multiple servers for redundancy and high availability. On rare occasions, one of the servers that stores a copy of a message might be unavailable when you receive or delete a message. If this occurs, the copy of the message isn't deleted on that unavailable server, and you might get that message copy again when you receive messages. You should design your applications to be idempotent (they should not be affected adversely when processing the same message more than once).
The following graph shows the lifecycle of a message in SQS queue:
The behavior of consuming messages from the queue depends on whether you use short (standard) polling, the default behavior, or long polling. When you consume messages from the queue using short polling, Amazon SQS samples a subset of the servers (based on a weighted random distribution) and returns messages from just these servers. Thus, a particular receive request might not return all of your messages. If you keep consuming from your queues, Amazon SQS eventually samples all of the servers, and you'll receive all your messages. The following figure shows the short-polling behavior of messages returned after one of your system components makes a receive request. Amazon SQS samples several of the servers (in gray) and returns the messages from those servers (Message A, C, D, and B). Message E isn't returned to this particular request, but is returned to a subsequent request.
(2) Simple Java Demo
In this section we use the AmazonSQSClient to build a simple batch processing system:
In a batch processing system, jobs are being sent to a queue, and workers poll the queue for jobs to execute them. The applications sending jobs to the queue are called "producers", while the workers polling jobs for execution are called "consumers". In this example, we use one Java program (SqsProducer) to send message to your SQS queue, and another Java program (SqsConsumer) to consume message from your SQS queue. Both Java programs share the same configuration file sqs.properties.
Below is the content of configuration file sqs.properties:
queueUrl=sqs-queue-url
Below is the program sending message to SQS queue at a certain interval. Each message represents a job to execute on the worker node. The program accepts a sleep interval. If you want to send a message every 1 second, you put 1000 (ms) as the sleep interval. If you want to send a message every 100 ms, you put 100 (ms) as the sleep interval.
import java.io.*;
import java.net.*;
import java.util.*;
import com.amazonaws.*;
import com.amazonaws.auth.*;
import com.amazonaws.auth.profile.*;
import com.amazonaws.regions.*;
import com.amazonaws.services.sqs.*;
import com.amazonaws.services.sqs.model.*;
public class SqsProducer
{
public AmazonSQSClient client;
public String queueUrl;
public String ip;
public SqsProducer()
{
client = new AmazonSQSClient();
client.configureRegion(Regions.AP_SOUTHEAST_2);
try
{
Properties prop = new Properties();
InputStream input = new FileInputStream("sqs.properties");
prop.load(input);
queueUrl = prop.getProperty("queueUrl");
ip = "" + InetAddress.getLocalHost().getHostAddress();
System.out.println(ip);
}catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
public void send(int sleep)
{
int start = 100000;
while (true)
{
try
{
String msg = ip + "-" + start;
client.sendMessage(queueUrl, msg);
start++;
Thread.sleep(sleep);
} catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
public static void main(String[] args)
{
try
{
SqsProducer sp = new SqsProducer();
int sleep = Integer.parseInt(args[0]);
sp.send(sleep);
} catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
Below is the program consuming messages from the queue, each message represents a job to execute. As such, the program sleeps for a random amount of time (between 0 and X seconds, where X is your runtime parameter), representing the time needed to execute the job. After the job is successfully executed (after sleep) by the program, we delete the message from the queue.
import java.io.*;
import java.util.*;
import java.math.*;
import com.amazonaws.*;
import com.amazonaws.auth.*;
import com.amazonaws.auth.profile.*;
import com.amazonaws.regions.*;
import com.amazonaws.services.sqs.*;
import com.amazonaws.services.sqs.model.*;
public class SqsConsumer
{
public AmazonSQSClient client;
public String queueUrl;
public SqsConsumer()
{
client = new AmazonSQSClient();
client.configureRegion(Regions.AP_SOUTHEAST_2);
try
{
Properties prop = new Properties();
InputStream input = new FileInputStream("sqs.properties");
prop.load(input);
queueUrl = prop.getProperty("queueUrl");
}catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
public void receive(int time)
{
Set<String> attrs = new HashSet<String>();
attrs.add("ApproximateNumberOfMessages");
while (true)
{
try
{
GetQueueAttributesRequest request = new GetQueueAttributesRequest(queueUrl).withAttributeNames(attrs);
Map<String,String> response = client.getQueueAttributes(request).getAttributes();
int count = Integer.parseInt(response.get("ApproximateNumberOfMessages"));
System.out.println("\n");
System.out.println("Approximate Number of Messages in SQS: " + count);
System.out.println("\n");
ReceiveMessageResult result = client.receiveMessage(queueUrl);
for (Message message : result.getMessages())
{
System.out.println(message.getMessageId() + "\t" + message.getBody());
int sleep = (int) (time * 1000 * Math.random());
Thread.sleep(sleep);
System.out.println("Processing time: " + sleep + " ms.");
client.deleteMessage(queueUrl, message.getReceiptHandle());
}
} catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
public static void main(String[] args)
{
try
{
SqsConsumer sc = new SqsConsumer();
int time = Integer.parseInt(args[0]);
sc.receive(time);
} catch (Exception e)
{
System.out.println(e.getMessage());
e.printStackTrace();
}
}
}
Run the SqsProducer to produce a message every 1 second. You can run multiple instances of the SqsProducer on the same EC2 instance or on different EC2 instances.
$ java SqsProducer 1000
Run the SqsConsumer to process the messages in the queue. The input parameter indicates the maximum amount of time needed to process a single message. In this example, we set it to be 30 seconds. Similarly, you can run multiple instances of the SqsConsumer on the same EC2 instance or on different EC2 instances. Change this value to something bigger than your visibility timeout value, observe that some of the messages (as identified by the IP address and message ID) may be processed twice by your SqsConsumer.
$ java SqsConsumer 30
(3) S3 Event Notification to SQS
Amazon S3 has the capability of sending event notification to SQS. In this section, you will need to create an S3 bucket, and configure S3 to send a notification to your SQS queue when a new object is added to your S3 bucket.
Use a program (Java, PHP, or any other tool) to upload a certain amount of objects to your S3 bucket. Use a modified version of the SqsConsumer program to observe the messages in your SQS queue.
The message received from S3 event notification looks like the following. In your program, you will need a JSON parser to decode the message.
{
"Records": [{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "ap-southeast-2",
"eventTime": "2017-03-30T02:16:13.620Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AROAICO7I6JYOVIW5R2XU:i-00d7924a1bced1313"
},
"requestParameters": {
"sourceIPAddress": "172.31.5.49"
},
"responseElements": {
"x-amz-request-id": "0B3137C637C977BF",
"x-amz-id-2": "CNZJ8w1fHW9IGTFvltJADD7IsG19foXXBGu8YW3XZPspDXaufYGD+qFrwrmHbRH0xp3zWiLb5kM="
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "new object",
"bucket": {
"name": "331982-training",
"ownerIdentity": {
"principalId": "A2L4CVMW42DK8D"
},
"arn": "arn:aws:s3:::331982-training"
},
"object": {
"key": "clouds10.jpg",
"size": 206683,
"eTag": "51fb9d061dc125f57aa07343d63a0573",
"sequencer": "0058DC6A6D86DDA1AB"
}
}
}]
}
(4) Batch Image Processing
By now you have a website that accepts user uploads to your S3 bucket. You want to convert the images to different formats including JPG, PNG, TIF, GIF. For example, if a user uploads abcd.jpg, you want to automatically generate abcd.png, abcd.tif, and abcd.gif. In Linux, image format conversion can be achieve by imagemagick:
$ sudo apt-get update
$ sudo apt-get install imagemagick
If you want to convert abcd.jpg to abcd.png, you can use the following command:
$ convert abcd.jpg abcd.png
Modify your SqsConsumer to read from your SQS queue. If the message is regarding a new image file (check the filename extension) being uploaded to your S3 bucket, do the following:
- download the file from S3 to local disk
- convert the file to jpg, png, gif, and tif formats
- upload all four files to another S3 bucket (with no SQS notification)
- delete the temp files on local disk (to save disk space)
Ideally, you should write a bash script to do these job, and you execute the bash script from your Java application. In Java, you can execute a bash script with the following code. Obviously, you can modify the String command to add arguments to your bash script.
String command = "/bin/bash myscript.sh";
Process p = Runtime.getRuntime().exec(command);
p.waitFor();
In order to decode the JSON message produced by S3 event notification, you will need a JSON parser. In this example, we use the json-simple library. This library use Maven for project management so we need to install Maven to build it.
$ sudo apt-get update
$ sudo apt-get install maven
$ cd ~
$ git clone https://github.com/fangyidong/json-simple
$ cd json-simple
$ mvn compile
$ mvn package
After the building process is completed, you should have target/json-simple-1.1.1.jar in the json-simple project folder. You will need to add json-simple-1.1.1.jar to your CLASSPATH.
At the beginning of your SqsConsumer, import the following libraries:
import org.json.simple.*;
import org.json.simple.parser.*;
Then you use the following code to receive message from your SQS queue, and parse the S3 object name from the message received.
ReceiveMessageResult result = client.receiveMessage(queueUrl);
for (Message message : result.getMessages())
{
System.out.println(message.getMessageId() + "\t" + message.getBody());
JSONParser parser = new JSONParser();
Object body = parser.parse(message.getBody());
JSONObject jsonObj = (JSONObject) body;
JSONArray records = (JSONArray) jsonObj.get("Records");
Iterator i = records.iterator();
while (i.hasNext())
{
JSONObject record = (JSONObject) i.next();
JSONObject s3 = (JSONObject) record.get("s3");
JSONObject object = (JSONObject) s3.get("object");
String key = (String) object.get("key");
System.out.println(key);
// Do you image conversion here
}
client.deleteMessage(queueUrl, message.getReceiptHandle());
}
After you finish working on your Java code, upload a large number of different images to your S3 bucket to see if your batch image processing system is working.
Create an AMI for your EC2 instance. In the AMI, set up something to launch your image processing application as soon as the operating system is booted.
Create an AutoScaling group with an SQS based scaling policy to dynamically change the number of EC2 instances in your batch image processing system.
(5) Batch Video Processing
Now we improve your batch image processing system with the capability to process video using ffmpeg. First of all, we need to install ffmpeg and some other libraries needed for video processing:
$ sudo add-apt-repository ppa:jonathonf/ffmpeg-3
$ sudo apt-get update
$ sudo apt-get install ffmpeg libav-tools x264 x265
With ffmpeg, you can convert video files from one format to another, for example:
$ ffmpeg -i youtube.flv -c:v libx264 filename.mp4
$ ffmpeg -i video.wmv -c:v libx264 -preset ultrafast video.mp4
For more information on using ffmpeg, you can refer to the following quick tutorial:
Modify your SqsConsumer, if the incoming object is an image then do image processing, if the incoming object is a video then do video processing - converting the video from its original format to flv, mp4, wmv, avi and then upload the output to another S3 bucket and clean up the temp files produced.
Congratulations! By now you have built your own Elastic Transcoder service.