301. CloudTrail Log Analysis - qyjohn/AWS_Tutorials GitHub Wiki

In this tutorial, we will show you how to import CloudTrail logs into MySQL for further analysis. Here we assume that you have downloaded the CloudTrail logs into a local folder on your EC2 instance (or your local computer).

(1) Understanding CloudTrail Log

In short, CloudTrail log files are Amazon S3 objects. These log files are written in JSON (JavaScript Object Notation) format. We recommend that you refer to the following AWS document for a quick introduction on what CloudTrail is, as well as what information is included in the CloudTrail logs.

There have been several tools available for CloudTrail log processing. Using Athena for CloudTrail log analysis is quite convenient.

(2) The MySQL Database

Here we assume that you have a MySQL server (either a local installation, or an RDS MySQL instance) that can be used to store the CloudTrail logs. The database configuration is defined in your configuration file db.properties, as below:

db_hostname=127.0.0.1
db_username=root
db_password=root
db_database=cloudtrail

In the database, you have a table called logs, with the following schema.

mysql> describe logs;
+--------------------+-------------+------+-----+---------+-------+
| Field              | Type        | Null | Key | Default | Extra |
+--------------------+-------------+------+-----+---------+-------+
| eventVersion       | varchar(10) | YES  |     | NULL    |       |
| userIdentity       | text        | YES  |     | NULL    |       |
| eventTime          | varchar(24) | YES  |     | NULL    |       |
| eventSource        | varchar(80) | YES  |     | NULL    |       |
| eventName          | varchar(80) | YES  |     | NULL    |       |
| awsRegion          | varchar(20) | YES  |     | NULL    |       |
| sourceIPAddress    | varchar(40) | YES  |     | NULL    |       |
| userAgent          | text        | YES  |     | NULL    |       |
| errorCode          | varchar(80) | YES  |     | NULL    |       |
| errorMessage       | text        | YES  |     | NULL    |       |
| requestParameters  | text        | YES  |     | NULL    |       |
| responseElements   | longtext    | YES  |     | NULL    |       |
| requestID          | varchar(80) | YES  |     | NULL    |       |
| eventID            | varchar(80) | YES  |     | NULL    |       |
| eventType          | varchar(24) | YES  |     | NULL    |       |
| recipientAccountId | varchar(12) | YES  |     | NULL    |       |
+--------------------+-------------+------+-----+---------+-------+
16 rows in set (1.59 sec)

As you can see, we do not have any index on the table. This is to improve the performance of bulk data loading. After you finish bulk data loading, you should add the index you need to the table to speed up your queries.

In this github repository, there is an empty dumped file cloudtrail.sql. This file contains the above-mentioned table schema. You can create this table with the following command:

$ mysql -h hostname -u username -p database < cloudtrail.sql

(3) The Java Code

The log file importing class is implemented as a derived class of Thread, allowing it to run in a multi-thread fashion. The number of threads equals to the number of CPU cores available on the system. A ConcurrentLinkedQueue is used so that we do not need to manually handle the synchronization of data between multiple threads.

The application accept an input parameter, which can be a filename, a folder name, an S3 object, or an S3 folder.

public class ImportCloudTrailLogs extends Thread
{
	public Connection conn = null;
	ConcurrentLinkedQueue<File> jobs;

	// ......

	public static void main(String[] args)
	{
		try
		{
			boolean is_s3 = false;
			String s3_region = null, s3_bucket = null, s3_key = null;
			ConcurrentLinkedQueue<String> jobs = new ConcurrentLinkedQueue<String>();

			if (args[0].startsWith("s3://"))
			{
				is_s3 = true;

				// Input location is in S3, probably with a prefix
				AmazonS3URI s3Uri = new AmazonS3URI(args[0]);
				s3_bucket = s3Uri.getBucket();
				s3_key = s3Uri.getKey();

				// Obtain the bucket location
				AmazonS3Client client = new AmazonS3Client();
				s3_region = client.getBucketLocation(s3_bucket);
				client.configureRegion(Regions.fromName(s3_region));

				// List all the S3 objects
				ObjectListing listing = client.listObjects(s3_bucket, s3_key);
				for (S3ObjectSummary object : listing.getObjectSummaries())
				{
					jobs.add(object.getKey());
				// Recursively listing
				while (listing.isTruncated())
				{
					ListNextBatchOfObjectsRequest request = new ListNextBatchOfObjectsRequest(listing);
					listing = client.listNextBatchOfObjects(request);
					for (S3ObjectSummary object : listing.getObjectSummaries())
					{
						jobs.add(object.getKey());
					}
				}
			}
			else
			{
				// Input location is local disk
				File file = new File(args[0]);
				if (file.isDirectory())
				{
					File[] files = file.listFiles();
					for (File currentFile : files)
					{
					
						jobs.add(currentFile.getPath());
					}
				}
				else
				{
					jobs.add(file.getPath());
				}
			}


			int threads = Runtime.getRuntime().availableProcessors();
			ImportCloudTrailLogs[] ictl = new ImportCloudTrailLogs[threads];
			for (int i=0; i<threads; i++)
			{
				ictl[i] = new ImportCloudTrailLogs();
				ictl[i].setJobs(is_s3, s3_region, s3_bucket, jobs);
				ictl[i].start();
			}
			for (int i=0; i<threads; i++)
			{
				ictl[i].join();
			}
		} catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
	}
}

When the application starts, each thread establishes its own connection to the MySQL database. The database configuration details is loaded from property file db.properties:

public class ImportCloudTrailLogs extends Thread
{
	public Connection conn = null;
	ConcurrentLinkedQueue<File> jobs;

	public ImportCloudTrailLogs()
	{
		try
		{
			// Getting database properties from db.properties
			Properties prop = new Properties();
			InputStream input = new FileInputStream("db.properties");
			prop.load(input);
			String db_hostname = prop.getProperty("db_hostname");
			String db_username = prop.getProperty("db_username");
			String db_password = prop.getProperty("db_password");
			String db_database = prop.getProperty("db_database");

			// Load the MySQL JDBC driver
			Class.forName("com.mysql.jdbc.Driver");
			String jdbc_url = "jdbc:mysql://" + db_hostname + "/" + db_database + "?user=" + db_username + "&password=" + db_password;
			// Create a connection using the JDBC driver
			conn = DriverManager.getConnection(jdbc_url);
		} catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
	}

When the thread runs, it checks into the ConcurrentLinkedQueue to see if the queue is empty. If the queue is not empty, then poll one input file from the queue to do the import.

	public void run()
	{
		while (!jobs.isEmpty())
		{
			String job = jobs.poll();
			System.out.println(job.getName());
			importLog(job);
		}
	}

For each log file, the import work involves parsing the input file into a JSON object, traversing all the records in the JSON object, then insert the record into MySQL database one by one:

	public void importLog(String job)
	{
		try
		{
			InputStream stream = null;
			if (is_s3)
			{
				S3Object s3 = s3_client.getObject(s3_bucket, job);
				stream = s3.getObjectContent();
			}
			else
			{
				stream = new FileInputStream(job);
			}
			InputStream gzipStream = new GZIPInputStream(stream);
			Reader reader = new InputStreamReader(gzipStream);

			JSONParser parser = new JSONParser();
			Object obj = parser.parse(reader);
			JSONObject jsonObj = (JSONObject) obj;
			JSONArray records = (JSONArray) jsonObj.get("Records");
			Iterator it = records.iterator();
			while (it.hasNext())
			{
				String[] attributes = new String[16];
				JSONObject record = (JSONObject) it.next();
				attributes[0] = getAttribute(record, "eventVersion");
				attributes[1] = getAttribute(record, "userIdentity");
				attributes[2] = getAttribute(record, "eventTime");
				attributes[3] = getAttribute(record, "eventSource");
				attributes[4] = getAttribute(record, "eventName");
				attributes[5] = getAttribute(record, "awsRegion");
				attributes[6] = getAttribute(record, "sourceIPAddress");
				attributes[7] = getAttribute(record, "userAgent");
				attributes[8] = getAttribute(record, "errorCode");
				attributes[9] = getAttribute(record, "errorMessage");
				attributes[10] = getAttribute(record, "requestParameters");
				attributes[11] = getAttribute(record, "responseElements");
				attributes[12] = getAttribute(record, "requestID");
				attributes[13] = getAttribute(record, "eventID");
				attributes[14] = getAttribute(record, "EventType");
				attributes[15] = getAttribute(record, "recipientAccountId");

				insertRecord(attributes);
			}
		} catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
	}

	public String getAttribute(JSONObject record, String attribute)
	{
		Object obj = record.get(attribute);

		if (obj != null)
		{
			if (obj.getClass() == JSONObject.class)
			{
				JSONObject jsonObj = (JSONObject) obj;
				return jsonObj.toJSONString();
			}
			else
			{
				return (String) obj;
			}
		}
		else
		{
			return null;
		}
	}
	public void insertRecord(String[] attributes)
	{
		try
		{
			String sql = "INSERT INTO logs (eventVersion, userIdentity, eventTime, eventSource, eventName, awsRegion, sourceIPAddress, userAgent, errorCode, errorMessage, requestParameters, responseElements, requestID, eventID, eventType, recipientAccountId) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
			PreparedStatement preparedStatement = conn.prepareStatement(sql);

			for (int i=0; i<attributes.length; i++)
			{
				preparedStatement.setString(i+1, attributes[i]);
			}
			preparedStatement.executeUpdate();
		} catch (Exception e)
		{
			System.out.println(e.getMessage());
			e.printStackTrace();
		}
	}

In order to decode the JSON message produced by S3 event notification, you will need a JSON parser. In this example, we use the json-simple library. This library use Maven for project management so we need to install Maven to build it.

$ sudo apt-get update
$ sudo apt-get install maven
$ cd ~
$ git clone https://github.com/fangyidong/json-simple
$ cd json-simple
$ mvn compile
$ mvn package

After the building process is completed, you should have target/json-simple-1.1.1.jar in the json-simple project folder. You will need to add json-simple-1.1.1.jar to your CLASSPATH. Oh yes you do need the MySQL JDBC Driver to make the application work. If you do not know how to make JDBC work, you should revisit the 105. AWS RDS 01 tutorial.

If you have a very large number of log files and you want the import to be completed faster, you might want to increase the number of concurrent threads. For example:

				int threads = 10 * Runtime.getRuntime().availableProcessors();

It should be noted that the json-simple library is quite memory hungry. As such, you need to make sure you have sufficient memory available for your Java heap to avoid Out-of-Memory errors. On a r3.4xlarge instance with 122 GB memory, I use the following command to give the import application 100 GB memory:

$ java -Xmx100G ImportCloudTrailLogs /data/log_folder/
$ java -Xmx100G ImportCloudTrailLogs /data/log_folder/log_file.gz
$ java -Xmx100G ImportCloudTrailLogs s3://bucket-name/log_folder/
$ java -Xmx100G ImportCloudTrailLogs s3://bucket-name/log_folder/log_file.gz

Also, the disk I/O capacity of the EBS volume (or instance-store volume) can be a performance bottleneck. On an EC2 instances, the default MySQL server installation stores data on the root EBS volume. For more information on disk I/O performance, you should revisit our 102. AWS EC2 02 tutorial.

(4) Working with MySQL

After we finish bulk data loading, we need to add a couple of index to the table to speed up queries:

mysql> ALTER TABLE logs ADD INDEX (eventSource);
Query OK, 0 rows affected (1 min 24.79 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> ALTER TABLE logs ADD INDEX (eventName);
Query OK, 0 rows affected (1 min 24.79 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> ALTER TABLE logs ADD INDEX (errorCode);
Query OK, 0 rows affected (1 min 24.79 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> ALTER TABLE logs ADD INDEX (requestID);
Query OK, 0 rows affected (1 min 24.79 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> ALTER TABLE logs ADD INDEX (eventType);
Query OK, 0 rows affected (1 min 24.79 sec)
Records: 0  Duplicates: 0  Warnings: 0

Below is an example of log analysis using standard SQL queries. In this example, we want to know the distribution of API calls to the EMR service during a 24 hour period. The logs being imported into the MySQL database contains 24 hours of CloudTrail log only.

mysql> DESCRIBE logs;
+--------------------+-------------+------+-----+---------+-------+
| Field              | Type        | Null | Key | Default | Extra |
+--------------------+-------------+------+-----+---------+-------+
| eventVersion       | varchar(10) | YES  |     | NULL    |       |
| userIdentity       | text        | YES  |     | NULL    |       |
| eventTime          | varchar(24) | YES  |     | NULL    |       |
| eventSource        | varchar(80) | YES  | MUL | NULL    |       |
| eventName          | varchar(80) | YES  | MUL | NULL    |       |
| awsRegion          | varchar(20) | YES  |     | NULL    |       |
| sourceIPAddress    | varchar(40) | YES  |     | NULL    |       |
| userAgent          | text        | YES  |     | NULL    |       |
| errorCode          | varchar(80) | YES  | MUL | NULL    |       |
| errorMessage       | text        | YES  |     | NULL    |       |
| requestParameters  | text        | YES  |     | NULL    |       |
| responseElements   | longtext    | YES  |     | NULL    |       |
| requestID          | varchar(80) | YES  | MUL | NULL    |       |
| eventID            | varchar(80) | YES  |     | NULL    |       |
| eventType          | varchar(24) | YES  | MUL | NULL    |       |
| recipientAccountId | varchar(12) | YES  |     | NULL    |       |
+--------------------+-------------+------+-----+---------+-------+
16 rows in set (0.00 sec)

mysql> SELECT COUNT(*) FROM logs;
+----------+
| COUNT(*) |
+----------+
| 14181841 |
+----------+
1 row in set (17.37 sec)

mysql> SELECT COUNT(*), eventSource FROM logs GROUP BY eventSource;
+----------+------------------------------------+
| COUNT(*) | eventSource                        |
+----------+------------------------------------+
|       20 | acm.amazonaws.com                  |
|    76981 | autoscaling.amazonaws.com          |
|      377 | cloudfront.amazonaws.com           |
|      147 | cloudtrail.amazonaws.com           |
|    62206 | datapipeline.amazonaws.com         |
|  4038742 | dynamodb.amazonaws.com             |
|   337033 | ec2.amazonaws.com                  |
|   505627 | ecr.amazonaws.com                  |
|     2096 | ecs.amazonaws.com                  |
|       96 | elasticache.amazonaws.com          |
|       11 | elasticbeanstalk.amazonaws.com     |
|     4343 | elasticloadbalancing.amazonaws.com |
|    33067 | elasticmapreduce.amazonaws.com     |
|      394 | es.amazonaws.com                   |
|      149 | firehose.amazonaws.com             |
|    29751 | iam.amazonaws.com                  |
|     5472 | kinesis.amazonaws.com              |
|  2031085 | kms.amazonaws.com                  |
|     1635 | lambda.amazonaws.com               |
|     2945 | logs.amazonaws.com                 |
|    62844 | monitoring.amazonaws.com           |
|     1098 | rds.amazonaws.com                  |
|      512 | redshift.amazonaws.com             |
|      819 | route53.amazonaws.com              |
|   224774 | s3.amazonaws.com                   |
|      351 | signin.amazonaws.com               |
|  1303311 | sns.amazonaws.com                  |
|     8576 | sqs.amazonaws.com                  |
|  5447188 | sts.amazonaws.com                  |
|       78 | support.amazonaws.com              |
|       87 | swf.amazonaws.com                  |
|        2 | waf.amazonaws.com                  |
|       24 | workspaces.amazonaws.com           |
+----------+------------------------------------+
33 rows in set (7.30 sec)

mysql> SELECT count(*), eventName FROM logs WHERE eventSource='elasticmapreduce.amazonaws.com' GROUP BY eventName;
+----------+-------------------+
| count(*) | eventName         |
+----------+-------------------+
|    23782 | DescribeCluster   |
|     1131 | ListClusters      |
|     7491 | ListSteps         |
|      330 | RunJobFlow        |
|      333 | TerminateJobFlows |
+----------+-------------------+
5 rows in set (0.95 sec)

mysql> SELECT count(*), substring(eventTime, 1, 13) as time FROM logs WHERE eventSource='elasticmapreduce.amazonaws.com' GROUP BY time;
+----------+---------------+
| count(*) | time          |
+----------+---------------+
|      125 | 2017-04-09T23 |
|      992 | 2017-04-10T00 |
|      882 | 2017-04-10T01 |
|      749 | 2017-04-10T02 |
|      716 | 2017-04-10T03 |
|      712 | 2017-04-10T04 |
|      716 | 2017-04-10T05 |
|      710 | 2017-04-10T06 |
|      672 | 2017-04-10T07 |
|      668 | 2017-04-10T08 |
|      437 | 2017-04-10T09 |
|       69 | 2017-04-10T10 |
|     5859 | 2017-04-10T11 |
|     4901 | 2017-04-10T12 |
|     2329 | 2017-04-10T13 |
|     1694 | 2017-04-10T14 |
|     1603 | 2017-04-10T15 |
|     1480 | 2017-04-10T16 |
|     1360 | 2017-04-10T17 |
|     1191 | 2017-04-10T18 |
|     1209 | 2017-04-10T19 |
|     1115 | 2017-04-10T20 |
|     1060 | 2017-04-10T21 |
|     1027 | 2017-04-10T22 |
|      791 | 2017-04-10T23 |
+----------+---------------+
25 rows in set (0.94 sec)

mysql> SELECT count(*), SUBSTRING(eventTime, 1, 16) as time FROM logs WHERE eventSource='elasticmapreduce.amazonaws.com' AND SUBSTRING(eventTime, 1, 13)='2017-04-10T11' GROUP BY time;
+----------+------------------+
| count(*) | time             |
+----------+------------------+
|       86 | 2017-04-10T11:00 |
|       50 | 2017-04-10T11:01 |
|       77 | 2017-04-10T11:02 |
|       77 | 2017-04-10T11:03 |
|       69 | 2017-04-10T11:04 |
|       75 | 2017-04-10T11:05 |
|       75 | 2017-04-10T11:06 |
|       83 | 2017-04-10T11:07 |
|       86 | 2017-04-10T11:08 |
|       86 | 2017-04-10T11:09 |
|       99 | 2017-04-10T11:10 |
|       97 | 2017-04-10T11:11 |
|       88 | 2017-04-10T11:12 |
|       99 | 2017-04-10T11:13 |
|       98 | 2017-04-10T11:14 |
|      106 | 2017-04-10T11:15 |
|       99 | 2017-04-10T11:16 |
|       96 | 2017-04-10T11:17 |
|       78 | 2017-04-10T11:18 |
|      122 | 2017-04-10T11:19 |
|       97 | 2017-04-10T11:20 |
|       93 | 2017-04-10T11:21 |
|      101 | 2017-04-10T11:22 |
|      105 | 2017-04-10T11:23 |
|       78 | 2017-04-10T11:24 |
|      118 | 2017-04-10T11:25 |
|       97 | 2017-04-10T11:26 |
|      103 | 2017-04-10T11:27 |
|      115 | 2017-04-10T11:28 |
|       97 | 2017-04-10T11:29 |
|       95 | 2017-04-10T11:30 |
|      102 | 2017-04-10T11:31 |
|      125 | 2017-04-10T11:32 |
|       99 | 2017-04-10T11:33 |
|      117 | 2017-04-10T11:34 |
|      124 | 2017-04-10T11:35 |
|      121 | 2017-04-10T11:36 |
|       90 | 2017-04-10T11:37 |
|       94 | 2017-04-10T11:38 |
|       96 | 2017-04-10T11:39 |
|       82 | 2017-04-10T11:40 |
|       93 | 2017-04-10T11:41 |
|       95 | 2017-04-10T11:42 |
|      102 | 2017-04-10T11:43 |
|       90 | 2017-04-10T11:44 |
|       91 | 2017-04-10T11:45 |
|      103 | 2017-04-10T11:46 |
|       92 | 2017-04-10T11:47 |
|       92 | 2017-04-10T11:48 |
|      104 | 2017-04-10T11:49 |
|      128 | 2017-04-10T11:50 |
|      119 | 2017-04-10T11:51 |
|      101 | 2017-04-10T11:52 |
|      108 | 2017-04-10T11:53 |
|      102 | 2017-04-10T11:54 |
|       96 | 2017-04-10T11:55 |
|      107 | 2017-04-10T11:56 |
|      112 | 2017-04-10T11:57 |
|       96 | 2017-04-10T11:58 |
|      133 | 2017-04-10T11:59 |
+----------+------------------+
60 rows in set (0.90 sec)

mysql> SELECT count(*), eventTime FROM logs WHERE eventSource='elasticmapreduce.amazonaws.com' AND SUBSTRING(eventTime, 1, 16)='2017-04-10T11:35' GROUP BY eventTime;
+----------+----------------------+
| count(*) | eventTime            |
+----------+----------------------+
|        1 | 2017-04-10T11:35:00Z |
|        6 | 2017-04-10T11:35:01Z |
|        4 | 2017-04-10T11:35:02Z |
|        3 | 2017-04-10T11:35:03Z |
|        2 | 2017-04-10T11:35:05Z |
|        1 | 2017-04-10T11:35:07Z |
|        4 | 2017-04-10T11:35:08Z |
|        3 | 2017-04-10T11:35:09Z |
|        1 | 2017-04-10T11:35:11Z |
|        2 | 2017-04-10T11:35:14Z |
|        5 | 2017-04-10T11:35:15Z |
|        1 | 2017-04-10T11:35:16Z |
|        1 | 2017-04-10T11:35:21Z |
|        3 | 2017-04-10T11:35:22Z |
|        3 | 2017-04-10T11:35:23Z |
|        5 | 2017-04-10T11:35:24Z |
|        4 | 2017-04-10T11:35:25Z |
|        1 | 2017-04-10T11:35:26Z |
|        7 | 2017-04-10T11:35:27Z |
|        4 | 2017-04-10T11:35:28Z |
|        3 | 2017-04-10T11:35:34Z |
|        1 | 2017-04-10T11:35:35Z |
|        4 | 2017-04-10T11:35:36Z |
|        2 | 2017-04-10T11:35:38Z |
|        7 | 2017-04-10T11:35:40Z |
|        5 | 2017-04-10T11:35:47Z |
|        1 | 2017-04-10T11:35:48Z |
|        3 | 2017-04-10T11:35:49Z |
|        6 | 2017-04-10T11:35:50Z |
|        7 | 2017-04-10T11:35:51Z |
|        7 | 2017-04-10T11:35:52Z |
|        2 | 2017-04-10T11:35:53Z |
|        1 | 2017-04-10T11:35:54Z |
|        7 | 2017-04-10T11:35:55Z |
|        5 | 2017-04-10T11:35:56Z |
|        2 | 2017-04-10T11:35:57Z |
+----------+----------------------+
36 rows in set (0.88 sec)

(5) Summary

If you just want a quick way to use this utility, simply clone the code from the github repository:

$ cd ~
$ git clone https://github.com/qyjohn/AWS_Tutorials
$ export CLASSPATH=~/AWS_Tutorials/lib/*:.

Create a MySQL database to store the logs (you will need to update db.properties in the Java folder with the proper database connection details):

$ mysql -u root -p
mysql> CREATE DATABASE cloudtrail;
mysql> quit
$ cd AWS_Tutorials/Java
$ mysql -u root -p < cloudtrail.sql

Then compile and run the code to import logs:

$ javac ImportCloudTrailLogs.java
$ java ImportCloudTrailLogs /data/log_folder/
$ java ImportCloudTrailLogs /data/log_folder/log_file.gz
$ java ImportCloudTrailLogs s3://bucket-name/log_folder/
$ java ImportCloudTrailLogs s3://bucket-name/log_folder/log_file.gz

After log import you need to build index to make your queries faster:

mysql> ALTER TABLE logs ADD INDEX (eventSource);
mysql> ALTER TABLE logs ADD INDEX (eventName);
mysql> ALTER TABLE logs ADD INDEX (errorCode);
mysql> ALTER TABLE logs ADD INDEX (requestID);
mysql> ALTER TABLE logs ADD INDEX (eventType);
⚠️ **GitHub.com Fallback** ⚠️