Thursday, August 18, 2022
No menu items!
HomeDatabase ManagementBackfilling an Amazon DynamoDB Time to Live attribute using Amazon EMR: Part...

Backfilling an Amazon DynamoDB Time to Live attribute using Amazon EMR: Part 2

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. DynamoDB can store and retrieve any amount of data. As the data grows and becomes cold, many use cases such as session management or order management require archival of the older unneeded items. DynamoDB has a feature called Time to Live (TTL), which expires and deletes items without consuming write capacity units (WCU). Managing item expiry with TTL can lower your storage and WCU (deletes) costs.

We suggest choosing a TTL attribute before writing data to your DynamoDB table. Sometimes, customers start using TTL with tables that already contain data. To efficiently manage backfilling DynamoDB TTL attributes, we recommended using Amazon EMR because it’s a highly scalable solution with built-in functionality for connecting with DynamoDB. You can run an Amazon EMR job after you modify your application to add a TTL attribute for all new items.

Part 1 of this series shows you how to create an Amazon EMR cluster and run a Hive query in the cluster to backfill a TTL attribute in the items that are missing it. The method works when there are no complex or collection data types such as maps or lists in the data. In this post, I show you how to backfill TTL attributes in items when data has map, list, Boolean, or null data types, which aren’t natively supported by DynamoDBStorageHandler in Hive. The steps in the post work whether the set of data attributes for each item is uniform or discrete.

This technique works best for insert-only workloads or during periods of maintenance when updates and deletes aren’t allowed. If data is being updated or deleted during the process, it may be unintentionally deleted.

DynamoDB schema

Although DynamoDB only defines and enforces primary key attributes, data for most applications has some form of schema. In this scenario, the current schema contains the following attributes:

order_id – The partition key, which is a string in universally unique identifier (UUID) form
creation_timestamp – A string that represents the item’s creation timestamp in ISO 8601 format
delivery_address – A map that represents the order’s delivery address
is_cod – A Boolean value that represents if the payment mode is cash on delivery
item_list – A list of ordered product codes

This post uses a table called Orders with 2 million items that are missing the expiration_timestamp attribute. Figure 1 that follows is a screenshot showing a sample of the items in the Orders table.

Figure 1: Sample of the Orders table

Fetching one of the items from this table using the AWS Command Line Interface (AWS CLI) get-item call, you can see that there is no expiration_timestamp attribute present:

aws dynamodb get-item –table-name Orders –key ‘{“order_id”:{“S”:”e9bba98e-d579-43bb-a571-93ccdb32c960″}}’
{
“Item”: {
“order_id”: {
“S”: “e9bba98e-d579-43bb-a571-93ccdb32c960”
},
“delivery_address”: {
“M”: {
“city”: {
“S”: “Jonathan”
},
“state”: {
“S”: “VA”
},
“door”: {
“S”: “5482”
},
“pin”: {
“S”: “28458”
}
}
},
“creation_timestamp”: {
“S”: “2022-02-01T20:23:25Z”
},
“is_cod”: {
“BOOL”: true
},
“item_list”: {
“L”: [
{
“S”: “pglyg”
}
]
}
}
}

You add a TTL attribute called expiration_timestamp to each of these items using Hive commands. In this case, you want to delete the order 180 days after it was created, so the expiration_timestamp attribute contains a number that represents the item’s expiration time in seconds since the epoch, which is 180 days after the creation_timestamp.

After the backfill process, you can use the same call to verify if the TTL attribute has been added to this item.

Running Hive CLI commands

You can calculate the new TTL attribute on a per-item basis using another timestamp attribute that already exists in each item. For more information about creating an Amazon EMR cluster, connecting to it through SSH, and sizing considerations, refer to Part 1 of this series.

Amazon EMR cluster version 5.34 has been used for this post. You create this cluster with five nodes (one master and four core nodes) of type c5.4xlarge.

Log in to the master node of the cluster and log in to the Hive CLI by entering hive on the master node terminal. Create a database and log in to the database using the following commands:

hive> show databases;
OK
default
Time taken: 0.632 seconds, Fetched: 1 row(s)

hive> create database dynamodb_ttl;
OK
Time taken: 0.221 seconds

hive> use dynamodb_ttl;
OK
Time taken: 0.039 seconds

Now create a wrapper over the DynamoDB table so that Hive queries can be performed against it. In the DynamoDBStorageHandler, there are no column mappings for map, list, and Boolean data types, which the DynamoDB data contains. So, create a single entity called item that represents the entire DynamoDB item as a map of strings for both keys and values in the map. Include another column called expiration_timestamp of type bigint, where TTL values will actually be backfilled. See the following Hive query:

hive> CREATE EXTERNAL TABLE Orders(item map<string,string>, expiration_timestamp bigint COMMENT ‘from deserializer’)
STORED BY ‘org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler’
TBLPROPERTIES (‘dynamodb.table.name’=’Orders’, “dynamodb.column.mapping” = “expiration_timestamp:expiration_timestamp”);

The backfill time largely depends on two factors: the DynamoDB capacity available for the process, and the number of nodes in the Amazon EMR cluster. For this test, the table is in the provisioned throughput mode with the ProvisionedReadCapacity and ProvisionedWriteCapacity set to 40000 each.

With some Amazon EMR versions (5.25.0 to 5.34.0), you must set the dynamodb.throughput.write and dynamodb.throughput.read parameters on Hive.

hive> SET dynamodb.throughput.write=40000;

hive> SET dynamodb.throughput.read=40000;

Now run the following command to find the number of items in the table without expiration_timestamp:

hive> select count(*) from Orders where item[“expiration_timestamp”] IS NULL;

Query ID = hadoop_20220328145453_32f2eb87-825e-4838-9aeb-845f4e339cc8
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1648478810565_0001)

———————————————————————————————-
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
———————————————————————————————-
Map 1 ………. container SUCCEEDED 62 62 0 0 0 0
Reducer 2 …… container SUCCEEDED 1 1 0 0 0 0
———————————————————————————————-
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 19.88 s
———————————————————————————————-
OK
2000000
Time taken: 20.597 seconds, Fetched: 1 row(s)

As shown in the preceding output, there are 2 million items without the TTL attribute. Now run an INSERT OVERWRITE command with the regexp_replace() function to carry out the calculation and backfill in place. If data is being updated or deleted between the time when the SELECT part of the query runs and when the INSERT OVERWITE part runs, you could end up with incorrect results. This is why we caution you to only use this technique for insert-only workloads or during a maintenance window when no updates or deletes are occurring. To calculate the expiration_timestamp value, first parse the creation_timestamp, which is an ISO 8601 formatted string, into a format expected by Hive’s unix_timestamp function. Then replace the parsed value with an integer timestamp, which is measured in seconds since the epoch as required by the DynamoDB TTL functionality. In this case, items need to be expired 180 days later than their creation_timestamp. See the following Hive query:

hive> INSERT OVERWRITE TABLE Orders SELECT item,(unix_timestamp(regexp_replace(item[“creation_timestamp”], ‘\{“s”:”(.+?)T(.+?)Z”\}$’,’$1 $2′)) + (60*60*24*180)) FROM Orders;
Query ID = hadoop_20220329170732_cf1168d9-4604-49c9-9a14-db0acdd7cb27
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1648571110202_0004)
———————————————————————————————-
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
———————————————————————————————-
Map 1 ………. container SUCCEEDED 62 62 0 0 0 0
———————————————————————————————-
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 124.17 s
———————————————————————————————-
OK
Time taken: 125.11 seconds

When you switch the table to on-demand throughput mode, the peak value is now set to 40,000 reads and 40,000 writes. When you run another test, you can observe that the time is similar to the earlier test when the table was using provisioned capacity:

hive> INSERT OVERWRITE TABLE Orders SELECT item,(unix_timestamp(regexp_replace(item[“creation_timestamp”], ‘\{“s”:”(.+?)T(.+?)Z”\}$’,’$1 $2′)) + (60*60*24*180)) FROM Orders;
Query ID = hadoop_20220329170155_047dee68-e499-4383-ba08-2d6d77029d47
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1648571110202_0004)
———————————————————————————————-
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
———————————————————————————————-
Map 1 ………. container SUCCEEDED 62 62 0 0 0 0
———————————————————————————————-
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 124.34 s
———————————————————————————————-
OK
Time taken: 125.337 seconds

To verify if all items now have the expiration_timestamp, run the following Hive query again:

hive> select count(*) from Orders where item[“expiration_timestamp”] IS NULL;

Query ID = hadoop_20220313115251_ed5d2e68-b855-4cb1-a6d3-dd1efb63b36a
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1647166731942_0011)

———————————————————————————————-
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
———————————————————————————————-
Map 1 ………. container SUCCEEDED 62 62 0 0 0 0
Reducer 2 …… container SUCCEEDED 1 1 0 0 0 0
———————————————————————————————-
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 21.06 s
———————————————————————————————-
OK
0
Time taken: 21.849 seconds, Fetched: 1 row(s)

As shown in the output, there are no items where the TTL attribute value is absent anymore. To look at the expiration timestamp for the earlier item, run the same get-item call:

aws dynamodb get-item –table-name Orders –key ‘{“order_id”:{“S”:”e9bba98e-d579-43bb-a571-93ccdb32c960″}}’
{
“Item”: {
“delivery_address”: {
“M”: {
“city”: {
“S”: “Jonathan”
},
“state”: {
“S”: “VA”
},
“door”: {
“S”: “5482”
},
“pin”: {
“S”: “28458”
}
}
},
“is_cod”: {
“BOOL”: true
},
“item_list”: {
“L”: [
{
“S”: “pglyg”
}
]
},
“order_id”: {
“S”: “e9bba98e-d579-43bb-a571-93ccdb32c960”
},
“creation_timestamp”: {
“S”: “2022-02-01T20:23:25Z”
},
“expiration_timestamp”: {
“N”: “1659299005”
}
}
}

The epoch 1659299005 corresponds to 2022-07-31T20:23:25Z as per Epoch Converter, which is 180 days after the creation_timestamp value of 2022-02-01T20:23:25Z for this item.

Clean up

If you don’t need the Amazon EMR cluster anymore, delete it after the backfill is complete to avoid unnecessary costs.

Conclusion

In this post, I showed you how to backfill the TTL attribute in a DynamoDB table with complex data types using Hive queries. Hive offers a flexible way to detect, calculate, and overwrite data based on conditions such as missing attributes, along with rate-limiting the throughput consumption for backfill.

Additional reading:

Getting started with DynamoDB
Hive tutorial
Creating an External Table in Hive
Date Functions in the Apache Hive

About the author

Juhi Patil is a London-based DynamoDB Specialist Solutions Architect with a background in big data technologies. In her current role, she helps customers design, evaluate, and optimize their DynamoDB-based solutions.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments