Sunday, February 5, 2023
No menu items!
HomeDatabase ManagementWriting results from an Athena query to Amazon DynamoDB

Writing results from an Athena query to Amazon DynamoDB

Many industries are taking advantage of the Internet of Things (IoT) to track information from and about connected devices. One example is the energy industry, which is using smart electricity meters to collect energy consumption from customers for analytics and control purposes.

Vector, a New Zealand energy company, combines its energy knowledge with Amazon Web Services (AWS) and IoT to improve the way energy consumption data is collected and processed. ENGIE, an independent energy producer, used AWS IoT services to build solutions that provide monitoring and real-time data analysis for smart buildings and power stations. Tibber, a Nordic energy startup, created an IoT solution on AWS that shows consumers their real-time electricity consumption and costs.

In this post, you walk through a scenario for a hypothetical energy company where IoT data from electricity meter sensors is ingested into an Amazon Simple Storage Service (Amazon S3) bucket, which works as the central storage of a data lake in the AWS Cloud. Then, the company can use Amazon Athena to run SQL queries on top of these files in the S3 bucket, without the need to load them to a specific datastore. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. It’s serverless, so there’s no infrastructure to manage and you pay only for the queries that you run.

Additional Athena queries run on top of the metering data inside the data lake to get customer electricity consumption per hour, date, and month. In a production application, these statistics would be stored on Amazon DynamoDB to be consumed by thousands of customers through dashboards in web pages or mobile apps without losing performance. DynamoDB is a serverless key-value NoSQL database that delivers single-digit-millisecond performance at any scale. It’s a fully managed, multi-Region, multi-active database with built-in security, backup and restore, and in-memory caching for internet-scale applications. DynamoDB can handle more than 10 trillion requests per day and support peaks of more than 20 million requests per second. Hundreds of thousands of AWS customers have chosen DynamoDB as their database for mobile, web, gaming, ad tech, IoT, and other applications that need low-latency data access at any scale.

Scenario overview and solution walkthrough

In this scenario, you simulate the generation of statistics for January, 2022.

You collect data from IoT sensors and convert and summarize it into CSV files, then upload the files to an S3 bucket that’s configured as a data lake. In another path in the same bucket, you create another CSV file with customer information, associating each customer to a specific meter. The statistics generation with Athena and subsequent write to DynamoDB in this solution are fully automated. The architectural diagram for this solution is shown in Figure 1 that follows:

Figure 1: IoT analytics architecture

The workflow of this solution is:

There is a dataset stored in CSV format to simulate data collected from electricity meter sensors (IoT) from 10 customers.
This meter data is generated hourly and sent daily to an S3 bucket, which is configured as a data lake. This scenario assumes the following:
For each customer, there’s a row of meter data in the CSV file for each hour of the day.
For each day of January, 2022, there are 24 files to be stored in the data lake.
The S3 bucket path, defined as the data lake location, will be partitioned (Apache Hive style) by year, month, and day-of-month to improve performance of Athena queries, which will generate statistics filtered by these periods.

There will be two external tables defined in AWS Glue Data Catalog—one pointing to the meter data, and another pointing to the customer data, which resides in the same bucket.
Each night, a Lambda function is triggered to run Athena queries to summarize meter data by customer, grouped by hour, day, and month. This meter data will be joined with customer data in each query.
Each query result will be stored as CSV outputs in another S3 bucket.
When a new query result file is written to the output bucket, it initiates another Lambda function, which will read the content of the query result file and write it to a DynamoDB table named ElectricityMeteredByPeriod, which will be accessed by customers.

Deploy the solution

To deploy and test the AWS Serverless Application Model (AWS SAM)-based solution, you must meet some prerequisites. In the GitHub repository for this solution, follow the instructions in Prerequisites to prepare everything you need.

After completing the prerequisites, follow the detailed instructions in Deploy the solution.

The deployment process delivers a stack, which creates several resources used by this solution. In the next sections, there are detailed descriptions about them and their use.

S3 buckets for the data lake, Athena outputs, and server logging

This solution delivers three S3 buckets:

One bucket called iot-analytic-bucket-{AWS Account ID}{AWS Region}, where the CSV files containing the electricity meter data will be uploaded.
A second bucket called iot-athena-results-{AWS Account ID}{AWS Region}, that will receive the output results of Athena queries. This bucket has an event notification, configured to run for every CSV file created on it.

Each time this event is run, it will call a Lambda function called fn-write-athena-output-to-ddb, which will be described later. The event notification should have the following information, as shown in Figure 2 that follows:

Event types: All object create events
Filters: ,.csv
Destination type: Lambda function
Destination: fn-write-athena-output-to-ddb

Figure 2: Event notification

A third bucket, named log-bucket-{AWS Account ID}{AWS Region}, that receives server access logs from the other two buckets. There are two folders dedicated to the logs of each bucket:
iot-analytic-logs/ receives server access logs from bucket iot-analytic-bucket-{AWS Account ID}{AWS Region}
athena-results-logs/ receives server access logs from bucket iot-athena-results-{AWS Account ID}{AWS Region}

Sample data, with customer and IoT electricity metering data, uploaded to the data lake

A sample dataset was created (as CSV files) to test this solution:

A customer information data file, associating each customer to a specific meter. This is the structure for this CSV file:
File name: customer_meter_sensor.csv
Headings: CustomerID, SensorID, SensorGroup

The attributes of each line have the following meaning:

CustomerID: ID of customer associated with a specific electricity meter
SensorID: ID of IoT sensor associated with the electricity meter
SensorGroup: Energy consumer classification associated with customer:
A: Large consumers (such as industry and shopping malls)
B: Small consumers (residential and rural properties)

You can access this GitHub repo link to view the sample file.

Several electricity meter data files were created for each day of January, 2022. Each file represents electricity consumed by customers in a specific hour of the day. The structure for those CSV files is:
File name: electricity_metering_blockA1_<YYYY>_<MM>_<DD>_<HH24>.csv
Headings: SensorID, Voltage, Current, Frequency, kWh, HourCollected

The attributes of each line have the following meaning:

SensorID: ID of IoT sensor associated with the electricity meter
Voltage: Maximum voltage supported, measured in volts
Current: Maximum electrical current supported, measured in amperes
Frequency: Operational frequency of the electrical network, measured in hertz
kWh: Electrical energy usage on that hour, measured in kilowatt-hours
HourCollected: Hour that energy usage was collected

You can access this GitHub repo link to verify the content of these sample files, grouped in folders representing each day of January, 2022.

The files are uploaded by running the bash script deploy-solution.sh. After running deploy-solution.sh, two prefixes containing the uploaded data will be created in the S3 bucket iot-analytic-bucket-{AWS Account ID}{AWS Region}:

The prefix customer_meter_sensor/ contains the customer information data, represented in a single CSV file. This is a sample of this file content:

CustomerID,SensorID,SensorGroup
10027610,51246501,B
10027611,51246502,B
10027612,51246503,B
10027613,51246504,A

The prefix iot_electricity_metering/ contains the IoT electricity meter data, uploaded to prefixes representing the data partitioned by year, month, and day of month. This is a sample of one of these files:

SensorID,Voltage,Current,Frequency,kWh,HourCollected
51246501,240,10,60,0.29,19
51246502,240,10,60,0.30,19
51246503,240,10,60,0.26,19
51246504,240,20,60,0.54,19
51246505,240,10,60,0.27,19

DynamoDB table to hold the summarized data from the Athena query

There is a DynamoDB table created, named as ElectricityMeteredByPeriod, which will receive summarized data by hour, day, and month with the following attributes:

CustomerID: The partition key, which will receive the numerical ID of a customer associated with a specific electricity meter.
SensorID-Period: The sort key, which is a string value compound of the IoT sensor ID associated with the electricity meter and the specific period when the energy consumption was collected. This attribute can assume the following value formats, depending of the period analyzed:
Electricity meter data by hour: SensorID#YYYY-MM-DDTHH
Electricity meter data by day: SensorID#YYYY-MM-DD
Electricity meter data by month: SensorID#YYYY-MM

kWh-Amount: The energy consumed in the specified period, measured in kWh.

Figure 3 that follows shows the data model of the table, representing its access pattern using sample data for one customer:

Figure 3: Data model for CustomerID, SensorID-Period, and kWh-Amount

For this scenario, the sample considers only residential customers, which have only one electricity meter per customer.

However, it’s important to consider industrial customers, which can have several electricity meters per customer. You need to get energy consumption statistics by meter and by customer. You can do this by creating a global secondary index (GSI) with CustomerID as the partition key and SensorId as the sort key, while the main table would have SensorID as a partition key and the Period as sort keys.

We can illustrate this scenario with another data model, considering both access patterns:

For the main table access pattern, we can have a table named ElectricitySensorByPeriod with sample data representing two industrial customers that each have two electricity meters, as shown in Figure 4 that follows:

Figure 4: Data model using SensorID as the partition key and Period as the sort key

Figure 5 that follows is an example of the global secondary index GSI-ElectricityCustomerBySensor, which enables you to retrieve electricity sensors by CustomerID:

Figure 5: Data model with CustomerID as the partition key and SensorID as the sort key

Lambda functions for the solution

The AWS SAM application builds and deploys two AWS Lambda functions to automate the IoT analytical solution:

A function, called fn-run-athena-query, runs an Athena query to summarize the electricity meter data uploaded to the S3 bucket iot-analytic-bucket-{AWS Account ID}{AWS Region}. You can see the code for this function in this GitHub repository. The function is initiated by an Amazon EventBridge rule, named RunAthenaQuerySchedule, defined with type Event Schedule.

Note: This rule is disabled. For the purposes of this post, you won’t use this rule to initiate the Lambda function fn-run-athena-query. Instead, you’ll invoke the function manually to run the Athena queries to test the solution. In a production operation, this rule initiates the Lambda function every day at 03:30 UTC.

Another function, called fn-write-athena-output-to-ddb, reads the output of an Athena query, which is then written to the S3 bucket iot-athena-results-{AWS Account ID}{AWS Region} as a CSV file. The function writes each line of the file as an item into the DynamoDB table ElectricityMeteredByPeriod. You can see the code for this function in this GitHub repo.

For more information about the implementation of the Lambda functions above, see Usage in the GitHub repository for this solution.

Database and external tables to be accessed by Athena

During setup, a database and external table definitions are created in Data Catalog, to be used by Athena in the queries called by the Lambda function fn-run-athena-query.

To verify these definitions from the Athena console:

Sign in to the AWS Management Console, go to Athena, and choose Explore the query editor.
Select the Settings tab and choose Manage.
In the Manage settings form, for Location of query result, enter the value s3://iot-athena-results-{AWS Account ID}{AWS Region}. Choose Save.
Select the Editor tab, and at the left side of the query editor, choose iotanalyticsdb from the Database drop-down list to make it the current database.

Figure 6: Choose iotanalyticsdb for the database

Go back to the query editor window, choose Clear to erase the previous query, enter the following SQL statement, and then choose Run.

SHOW CREATE TABLE customer_meter_sensor;

You should see the following in the Query results tab:

CREATE EXTERNAL TABLE `customer_meter_sensor`(
`customerid` bigint,
`sensorid` bigint,
`sensorgroup` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS INPUTFORMAT
‘org.apache.hadoop.mapred.TextInputFormat’
OUTPUTFORMAT
‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’
LOCATION
‘s3://iot-analytic-bucket-{AWS Account ID}-{AWS Region}/customer_meter_sensor’
TBLPROPERTIES (
‘skip.header.line.count’=’1’,
‘transient_lastDdlTime’=’1662585866’)

This external table points to the customer information data in the data lake. The bucket named in the S3 URI is where the CSV files containing the electricity meter data and customer information data were uploaded.

Choose Clear to clear the query editor window again, enter the following SQL statement, and then choose Run:

SHOW CREATE TABLE iot_electricity_metering;

Below, in the Query results tab, you should see this result:

CREATE EXTERNAL TABLE `iot_electricity_metering`(
`sensorid` bigint,
`voltage` bigint,
`current` bigint,
`frequency` bigint,
`kwh` double,
`hourcollected` string)
PARTITIONED BY (
`year` string,
`month` string,
`day` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS INPUTFORMAT
‘org.apache.hadoop.mapred.TextInputFormat’
OUTPUTFORMAT
‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’
LOCATION
‘s3://iot-analytic-bucket-{AWS Account ID}-{AWS Region}/iot_electricity_metering’
TBLPROPERTIES (
‘skip.header.line.count’=’1’,
‘transient_lastDdlTime’=’1662585867’)

This external table points to the IoT electricity meter data in the data lake, and is partitioned by year, month, and day-of-month. The bucket named in the S3 URI is where the CSV files containing the electricity meter data and customer information data were uploaded.

Test the solution

Now that all components of the solution are configured, it’s time to test it with a simulation. To do that, you will manually invoke the Lambda function fn-run-athena-query several times. Each time you invoke the function, the environment variable ev_date_run will be set, simulating the invocation of this Lambda function to process electricity meter data from that date. This simulation will invoke the Lambda function fn-run-athena-query for each day of January, 2022. For each query output generated, another function, fn-write-athena-output-to-ddb, will be invoked to store the summarized data on the DynamoDB table ElectricityMeteredByPeriod. At the end of the simulation, you’ll query this DynamoDB table to simulate statistics that customers can obtain to monitor their electricity use.

To test the solution

In the AWS Cloud9 environment created when you deployed the solution, run the bash script invoke-fn-run-athena-query-dates.sh (from the GitHub repository). Before you run the script, confirm you are in the <deployment path>/iot-analytics-athena-ddb/scripts/bash directory. This should generate and place the output files needed for this simulation into the bucket iot-athena-results-{AWS Account ID}{AWS Region}. To see examples of the files, see the topic Usage in the GitHub repository. To verify an excerpt of output generated by this bash script—and to confirm it finished successfully—see the description in the Github repository.
Every time a new query output file is generated in the bucket iot-athena-results-{AWS Account ID}{AWS Region}, the Lambda function fn-write-athena-output-to-ddb is invoked and writes the content of the output .CSV file to the DynamoDB table ElectricityMeteredByPeriod. The aggregated data in this table can be accessed by different queries as needed, so users can see their readings by hour, day, and month in a dashboard without having to aggregate the data every time they access the dashboards. To verify the content of this table, go to the DynamoDB console and choose Explore items under the option Tables, and then select ElectricityMeteredByPeriod to see the content in the Items returned panel as shown in Figure 7 that follows:

Figure 7: ElectrictyMeteredByPeriod items returned

To simulate the statistics that a customer or the energy company can obtain for specific periods, run PartiQL queries on this table. To learn more, see Getting statistics with PartiQL for examples of queries you can use.
This is an example of a PartiQL query that you can run to obtain data from the table:

SELECT “CustomerID”, “SensorID-Period”, “kWh-Amount”
FROM “ElectricityMeteredByPeriod”
WHERE “CustomerID” = 10027615
AND “SensorID-Period” BETWEEN ‘51246506#2022-01-02’ AND ‘51246506#2022-01-08’
AND NOT contains(“SensorID-Period”, ‘T’)

The above query returns the daily kWh consumed by the customer with ID 10027615, during the first week of January, 2022.

The result should be similar to the following:

CustomerID SensorID-Period kWh-Amount
———- ———————- ———-
10027615 51246506#2022-01-02 5.77
10027615 51246506#2022-01-03 5.77
10027615 51246506#2022-01-04 5.77
10027615 51246506#2022-01-05 5.77
10027615 51246506#2022-01-06 5.77
10027615 51246506#2022-01-07 5.77
10027615 51246506#2022-01-08 5.77

Cleaning up

To avoid unnecessary charges (storage and computing costs), you can run the bash script undeploy-solution.sh—which you can review in the GitHub repo—in the AWS Cloud9 environment created at the beginning of the solution. It’s also recommended that you delete the AWS Cloud9 environment.

For a detailed explanation of how to run the above script and what is deleted—together with guidance on how to delete the AWS Cloud9 environment—see Cleaning up from the GitHub repo main page.

Final considerations

The use of IoT by the energy industry is already a reality, mainly when discussing smart electricity meters. As mentioned in the introduction, many AWS customers that operate in this industry are already taking advantage of AWS IoT services to capture and process IoT data.

When processing IoT data, energy companies address two main objectives:

Analyze a huge amount of data collected from multiple smart electricity meters to better understand energy consumption, improve service monitoring and demand forecasting, and conduct predictive maintenance.
Share the analyzed IoT data with customers to help them track and manage their energy consumption.

The IoT analytical solution described in this post addresses both requirements:

Running Athena SQL queries on top of an S3 bucket working as a data lake, which receives a daily upload of files with IoT data from electricity meter sensors, addresses the forecasting and predictive maintenance needs of the energy company.
Storing the customer energy data summarized by hour, date, and month into DynamoDB tables provides a datastore that can be used to share this information with customers.

Additionally, you can create and embed dashboards in web pages or mobile apps to make the data available to customers. You can use Lambda functions and Amazon DynamoDB Streams to build a dynamic dashboard to give access to customer consumption data as soon as it’s written to DynamoDB.

To learn more about building dynamic dashboards using the above approach, see Part 1 and Part 2 of How to Build Dynamic Dashboards Using Lambda and DynamoDB Streams.

Conclusion

In this post, you learned how to build an automated IoT analytical solution for data from smart electricity meters. The solution runs nightly Athena queries on an S3 bucket containing the electricity meter data and writes the results to a DynamoDB table. You can extend this solution to include dynamic dashboards to enhance the user experience.

This solution can be adapted to any kind of high-volume data scenario where you need to quickly analyze data and want to share the results with millions of users.

To learn more about the resources used in this solution, visit the documentation for AWS SAM, Athena, and DynamoDB. Also, visit the GitHub repository for this solution for additional details about the code used in this post.

Share your feedback in the comments section.

About the Author

Andre Rosa is a Sr. Partner Trainer with Amazon Web Services. He has more than 20 years of experience in the IT industry, mostly dedicated to the database world. As a member of the AWS Training and Certification team, he exercises one of his passions: learning and sharing knowledge with AWS Partners as they work on projects on their journey to the cloud.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments