Sunday, May 26, 2024
No menu items!
HomeDatabase ManagementIntroducing the Amazon Timestream UNLOAD statement: Export time-series data for additional insights

Introducing the Amazon Timestream UNLOAD statement: Export time-series data for additional insights

Amazon Timestream is a fully managed, scalable, and serverless time series database service that makes it easy to store and analyze trillions of events per day. Customers across a broad range of industry verticals have adopted Timestream to derive real-time insights, monitor critical business applications, and analyze millions of real-time events across websites and applications. Timestream makes it easy to build these solutions because it can automatically scale depending on the workload without operational overhead to manage the underlying infrastructure.

Many Timestream customers want to derive additional value from their time series data by using it in other contexts, such as adding it to their data lake, training machine learning (ML) models for forecasting, or enriching it with data using other AWS or third-party services. However, doing so is time-consuming because it requires complex custom solutions. To address these needs, we are excited to introduce support for the UNLOAD statement, a secure and cost-effective way to build AI/ML pipelines and simplify extract, transform, and load (ETL) processes.

In this post, we demonstrate how to export time series data from Timestream to Amazon Simple Storage Service (Amazon S3) using the UNLOAD statement.

Whether you’re a climate researcher predicting weather trends, a healthcare provider monitoring patient health, a manufacturing engineer overseeing production, a supply chain manager optimizing operations, or an ecommerce manager tracking sales, you can use the new UNLOAD statement to derive additional value from your time series data.

The following are example scenarios where the UNLOAD statement can help:

Healthcare analytics – Healthcare organizations monitor the health metrics of their patients over a period of time, generating massive amounts of time series data. They can use Timestream to track and monitor patient health in real time. They can now export the data to their data lake where they can enrich and further analyze it to predict outcomes and improve patient care.
Supply chain analytics – Supply chain analysts can use Timestream to track metrics across the supply chain such as inventory levels, delivery times and delays, to optimize their supply chains. They can now use the UNLOAD statement to export gigabytes of the data into Amazon S3 where they can use other AWS services or third-party services for predictive modeling.
Ecommerce analytics – Ecommerce managers can use Timestream to track ecommerce store and website metrics, such as source of traffic, clickthrough rate, and quantity sold. They can now use the UNLOAD statement to export the data to Amazon S3 and analyze it with other relevant non-time series data such as customer demographics to optimize marketing investments.

UNLOAD statement overview

The UNLOAD statement in Timestream enables you to export your query results into Amazon S3 in a secure and cost-effective manner. With UNLOAD, you can export gigabytes of time series data to select S3 buckets in either Apache Parquet or comma-separated values (CSV) format, providing you the flexibility to store, combine, and analyze time series data using other services such as Amazon Athena, Amazon EMR, and Amazon SageMaker. The UNLOAD statement also allows you to encrypt your exported data using Amazon S3 managed keys (SSE-S3) or AWS Key Management Service (AWS KMS) managed keys (SSE-KMS) and compress it to prevent unauthorized data access and reduce storage costs. In addition, you have the flexibility to choose one or more columns to partition the exported data, enabling downstream services to scan only the data relevant to a query, thereby minimizing the processing time and cost.

The syntax for the UNLOAD statement is as follows:

UNLOAD (SELECT statement)
TO ‘s3://bucket-name/folder’
WITH ( option = expression [, …] )

We define option as follows:

{ partitioned_by = ARRAY[ col_name[,…] ]
| format = [ ‘{ CSV | PARQUET }’ ]
| compression = [ ‘{ GZIP | NONE }’ ]
| encryption = [ ‘{ SSE_KMS | SSE_S3 }’ ]
| kms_key = ‘<string>’
| field_delimiter ='<character>’
| escaped_by = ‘<character>’
| include_header = [‘{true, false}’]
| max_file_size = ‘<value>’
}

You can run the UNLOAD statement using the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS SDK.

Solution overview

In this post, we discuss the steps and best practices to export data from Timestream to Amazon S3 and derive additional insights, which includes the following high-level steps:

Ingest sample data into Timestream.
Perform data analysis.
Use the UNLOAD statement to export the query result set to Amazon S3.
Create an AWS Glue Data Catalog table.
Derive additional business insights using Athena.

The following diagram illustrates the solution architecture.

Note that you will incur the cost of AWS resources used at public pricing if you choose to reproduce the solution in your environment.

We demonstrate the solution through a sample use case where we use Timestream for tracking metrics from an ecommerce website. Every time a product is sold, the sales data—including product ID, quantity sold, the channel that drove the customer to the website (such as social media or organic search), timestamp of the transaction, and other relevant details—is recorded and ingested into Timestream. We’ve created sample data that you can ingest into Timestream. This data has been generated using Faker and cleaned for the purposes of this demonstration.

The data contains the following information: channel, ip_address, session_id, user_id, event, user_group, current_time, query, product_id, product, and quantity. Whenever a search results in a purchase, the product_id and quantity are recorded. When ingesting the data into Timestream, we used the following data model:

Dimensions – We used channel, ip_address, session_id, user_id, event, user_group. For more information about dimensions, refer to Amazon Timestream concepts.
Time – We used current_time. Note that the sample might have an outdated time. The sample code provided in this post changes it to a recent timestamp while ingesting.
Multi-measure records – We use query, product_id, product, quantity. For more information, refer to Multi-measure records.

Prerequisites

To follow along with this post, you must meet the following prerequisites:

To create databases and tables, you need these permissions to allow CRUD operations
To insert records, you need these permissions to allow insert operations
To run an UNLOAD query, you need these prerequisites for writing data to Amazon S3
Before you run the code blocks, export the appropriate AWS account credentials as environment variables

Ingest data into Timestream

You can use the sample code in this post to create a database and table, and then ingest ecommerce website sales data into Timestream. Complete the following steps:

Set up a Jupyter notebook or integrated development environment (IDE) of your choice. The following code is split into multiple parts for illustrative purposes and uses Python version 3.9. If you intend to use the same code, combine the code blocks into a single program or use a Jupyter notebook to follow the sample.
Initialize your Timestream clients:

import boto3
from botocore.config import Config

session = boto3.Session()

write_client = session.client(‘timestream-write’, config=Config(region_name=”<region>”, read_timeout=20, max_pool_connections = 5000, retries={‘max_attempts’: 10}))
query_client = session.client(‘timestream-query’, config=Config(region_name=”<region>”))

Create a database:

database_name = “timestream_sample_database”
write_client.create_database(DatabaseName=database_name)
print(“Database [%s] created successfully.” % database_name)

After you create the database, you can view it on the Timestream console.

Create a table:

table_name = “timestream_sample_unload_table”
retention_properties = {
‘MemoryStoreRetentionPeriodInHours’: 12,
‘MagneticStoreRetentionPeriodInDays’: 7
}
write_client.create_table(DatabaseName=database_name, TableName=table_name,
RetentionProperties=retention_properties)
print(“Table [%s] successfully created.” % table_name)

The table is now viewable on the Timestream console.

Ingest the sample data into the table:

def __submit_batch(records, n):
try:
result = write_client.write_records(DatabaseName=database_name, TableName=table_name,
Records=records, CommonAttributes={})
if result and result[‘ResponseMetadata’]:
print(“Processed [%d] records. WriteRecords Status: [%s]” % (n, result[‘ResponseMetadata’][‘HTTPStatusCode’]))
except Exception as err:
print(“Error:”, err)

import csv
import time

with open(“Downloads/sample_unload.csv”, ‘r’) as csvfile:
csvreader = csv.reader(csvfile)

records = []
current_time = str(int(round(time.time() * 1000)))

header_row = []
# extracting csv file content
row_counter = 0
for i, row in enumerate(csvreader, 1):

if(len(row) == 0 ):
continue;

# skip csv header
if(i == 1 ):
header_row = row
continue

row_counter = row_counter + 1

record = {
‘Dimensions’: [
{‘Name’: header_row[0], ‘Value’: row[0]},
{‘Name’: header_row[1], ‘Value’: row[1]},
{‘Name’: header_row[2], ‘Value’: row[2]},
{‘Name’: header_row[3], ‘Value’: row[3]},
{‘Name’: header_row[4], ‘Value’: row[4]},
{‘Name’: header_row[5], ‘Value’: row[5]},
],
‘Time’: str(int(current_time) – (i * 50)) # Modifying time in the sample to current time
}

measure_values = []

if (row[7] != “”):
measure_values.append( {
“Name”: header_row[7],
“Value”: row[7],
“Type”: ‘VARCHAR’,
})

if (row[8] != “”):
measure_values.append( {
“Name”: header_row[8],
“Value”: row[8],
“Type”: ‘VARCHAR’,
})

if (row[9] != “”):
measure_values.append( {
“Name”: header_row[9],
“Value”: row[9],
“Type”: ‘VARCHAR’,
})

if (row[10] != “”):
measure_values.append( {
“Name”: header_row[10],
“Value”: row[10],
“Type”: ‘DOUBLE’,
})

record.update(
{
‘MeasureName’: “metrics”,
‘MeasureValueType’: “MULTI”,
‘MeasureValues’: measure_values
}
)
records.append(record)

if len(records) == 100:
__submit_batch(records, row_counter)
records = []

if records:
__submit_batch(records, row_counter)

print(“Ingested %d records” % row_counter)

After ingesting, you can preview the contents of the table using the query editor on the Timestream console.

Perform data analysis

In Timestream, you can perform real-time analytics on the ingested data. For example, you can query the number of units sold per product in a day, the number of customers landing on the store from social media advertising in the past week, trends in sales, patterns in purchases for the last hour, and so on.

To find the number of units sold per product in the last 24 hours, use the following query:

SELECT sum(quantity) as number_of_units_sold, product
FROM “timestream_sample_database”.”timestream_sample_unload_table”
WHERE time between ago(1d) and now() GROUP BY product

Export the data to Amazon S3

You can use the UNLOAD statement to export the time series data to Amazon S3 for additional analysis. In this example, we analyze customers based on the channel by which they arrived on the website. You can partition the data using the partitioned_by clause to export channel-specific data into a folder. In this example, we use Parquet format to export the data:

UNLOAD(SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel
FROM “timestream_sample_database”.”timestream_sample_unload_table” WHERE time BETWEEN ago(2d) AND now())
TO ‘s3://<your_bucket_name>/partition_by_channel’ WITH (format = ‘PARQUET’, partitioned_by = ARRAY[‘channel’])

When you use the partitioned_by clause, the columns used in the partitioned_by field must be the same as the last columns in the SELECT statement. They must be put into the ARRAY value in the same order they appear in the SELECT statement.

After you run the preceding query containing the UNLOAD statement, you can review the details in the Export to Amazon S3 summary section on the Query results tab.

When you view the results folder in Amazon S3, you can see that data is partitioned by the channel name.

Create an AWS Glue Data Catalog table

You create an AWS Glue crawler to scan the data in the S3 bucket, infer the schema, and create a metadata table in the AWS Glue Data Catalog for the data exported out of Timestream. Assuming you have the required permissions in AWS Glue, in this section, we present two options: create a metadata file for each channel separately, or crawl the entire results folder and automatically detect partitions.

Option 1: Create an AWS Glue metadata file for each channel separately

If you need to perform different analyses for each channel and you used the partitioned_by clause to separate out time series data by channel, you can generate an AWS Glue Data Catalog for a particular channel. For this example, we create a Data Catalog for the Social media channel. Complete the following steps:

On the AWS Glue console, choose Crawlers in the navigation pane.
Choose Create crawler.
Add a new S3 data source with the location s3://<your_bucket_name>/partition_by_channel/results/channel=Social media.

This is the location that contains all Social media channel-related time series data.

Create a new AWS Glue database or use an existing one as per your needs.
Usually, AWS Glue infers the table name from the provided S3 folder structure, but if needed, you can add an optional table prefix. In this case, because we kept the table prefix empty, the final table name will be channel_social_media.
Keep the schedule set to On demand because we’re just going to crawl it one time to create a Data Catalog.
Fill in the other required fields and choose Create crawler.

After the crawler is created, select it on the Crawlers page and choose Run crawler to create a Data Catalog based on the exported Timestream data.

When the run is complete, you will see the run history, where it says “1 table change,” which indicates that one table was added to the Data Catalog.

If you navigate to the Tables page on the AWS Glue console, you should see the new table channel_social_media with a schema that has been auto-inferred by the crawler.

You can now use Athena to view the data in this table:

SELECT * From “AwsDataCatalog”.”timestream-sample”.”channel_social_media”;

Option 2: Crawl the results folder with auto-detected partitions by the AWS Glue metastore

Crawler creation for this option follows the same procedure as before. The only change is the S3 location selected is the parent folder results.

This time, when the crawler runs successfully, you will see that the table changes indicate one table was created with five partitions.

In the table schema, you can notice that the channel is auto-inferred as a partition key.

Creating a partitioned AWS Glue table makes it straightforward to query across channels without joining tables, as well as query on a per-channel basis. For more information, refer to Work with partitioned data in AWS Glue.

Derive insights with Amazon Athena

You can combine the time series data that you track and analyze in Timestream with non-time series data that you have outside of Timestream to derive insightful trends using services such as Athena.

For this example, we use a publicly available user dataset. This dataset has details such as user_id, first_name, last_name, zip, job, and age. User_id is also a dimension in our time series data, and we use user_id to join the time series data with non-time series data to derive insights about user behavior for customers landing on the page from the Social media channel.

Upload the file to Amazon S3 and create a table in Athena for this data:

CREATE EXTERNAL TABLE user_data
(
user_id string,
first_name string,
last_name string,
zip string,
job string,
age int
)

ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.OpenCSVSerde’
WITH SERDEPROPERTIES (
‘separatorChar’ = ‘,’,
‘quoteChar’ = ‘”‘,
‘escapeChar’ = ‘\’
)
STORED AS TEXTFILE
LOCATION ‘s3://<your_bucket_name>/csv_user_data/’
TBLPROPERTIES (
“skip.header.line.count”=”1”)

We also use a free zipcode dataset, which has Zipcode, ZipCodeType, City, State, LocationType, Lat, Long, Location, Decommisioned, TaxReturnsFiled, EstimatedPopulation, and TotalWages. We use this dataset to derive demographical insights.

Upload this file to Amazon S3 and create a table in Athena for the data. Note that because the file has quoted all the fields, we import all the fields as a string for simplicity and later cast them to appropriate types when needed.

CREATE EXTERNAL TABLE zipcode_data
(
Zipcode string,
ZipCodeType string,
City string,
State string,
LocationType string,
Lat string,
Long string,
Location string,
Decommisioned string,
TaxReturnsFiled string,
EstimatedPopulation string,
TotalWages string
)

ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.OpenCSVSerde’
WITH SERDEPROPERTIES (
‘separatorChar’ = ‘,’,
‘quoteChar’ = ‘”‘,
‘escapeChar’ = ‘\’
)
STORED AS TEXTFILE
LOCATION ‘s3://<your_bucket_name>/zipcode_data/’
TBLPROPERTIES (
“skip.header.line.count”=”1”)

Perform a join of all three S3 datasets:

SELECT * FROM “timestream-sample”.”user_data” user_data
JOIN “timestream-sample”.”channel_social_media” social_media ON social_media.user_id = user_data.user_id
JOIN “timestream-sample”.”zipcode_data” zipcode ON user_data.zip = zipcode.zipcode;

Use the following code to find sales by age group for the channel Social media:

SELECT sum(social_media.quantity) as quantity_sold, case
when user_data.age < 18 then ‘Under 18’
when user_data.age between 18 and 35 then ’18-35′
when user_data.age between 36 and 60 then ’36-60′
else ‘Above 60’
END AS age_range FROM “timestream-sample”.”user_data” user_data
JOIN “timestream-sample”.”channel_social_media” social_media ON social_media.user_id = user_data.user_id
GROUP BY case
when user_data.age < 18 then ‘Under 18’
when user_data.age between 18 and 35 then ’18-35′
when user_data.age between 36 and 60 then ’36-60′
else ‘Above 60’
END

We get the following results:

# quantity_sold age_range
1 29.0 18-35
2 2.0 Under 18
3 38.0 Above 60
4 28.0 36-60

Use the following query to view sales by state:

SELECT sum(social_media.quantity) as quantity_sold, zipcode.state FROM “timestream-sample”.”user_data” user_data
JOIN “timestream-sample”.”channel_social_media” social_media ON social_media.user_id = user_data.user_id
JOIN “timestream-sample”.”zipcode_data” zipcode ON user_data.zip = zipcode.zipcode
GROUP BY zipcode.state

We get the following results:

# quantity_sold state
1 4.0 MI
2 7.0 TN
3 3.0 WA
4 0.0 FL
5 16.0 AP
6 4.0 AL
7 29.0 TX
8 1.0 VT
9 9.0 NH
10 2.0 WI
11 0.0 CA
12 12.0 CO
13 0.0 IL
14 2.0 NC
15 7.0 GA
16 0.0 KS
17 1.0 MN

Clean up

To avoid future costs, remove all the resources you created for this post:

On the Timestream console, choose Databases in the navigation pane and delete the database you created.
Choose Tables in the navigation pane and delete the table you created.
On the AWS Glue console, choose Crawlers in the navigation pane.
Select the crawler you created and on the Action menu, choose Delete crawler.
Choose Tables on the console and delete the Data Catalog tables created for this post.
On the Amazon S3 console, choose Buckets in the navigation pane.
Empty and delete the bucket created for this post.

Conclusion

The UNLOAD statement in Timestream enables you to unload your time series data into Amazon S3 in a secure and cost-effective manner. The statement allows flexibility through a range of options, such as partitioning by one or more columns or choosing the format, compression, and encryption. Whether you plan to add time series data to a data warehouse, build an AI/ML pipeline, or simplify ETL processes for time series data, the UNLOAD statement will make the process more straightforward.

To learn more about the UNLOAD statement, refer to UNLOAD Concepts.

About the Authors

Shravanthi Rajagopal is a Senior Software Engineer at Amazon Timestream. With over 8 years of experience in building scalable distributed systems, she demonstrates excellence in ensuring seamless customer experiences. Shravanthi has been part of the Timestream team since its inception and is currently a tech lead working on exciting customer features. Away from screens, she finds joy in singing and embarking on culinary adventures in search of delicious cuisine.

Praneeth Kavuri is a Senior Product Manager in AWS working on Amazon Timestream. He enjoys building scalable solutions and working with customers to help deploy and optimize database workloads on AWS.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments