Tuesday, April 16, 2024
No menu items!
HomeDatabase ManagementEnable near real-time notifications from Amazon Aurora PostgreSQL by using database triggers,...

Enable near real-time notifications from Amazon Aurora PostgreSQL by using database triggers, AWS Lambda, and Amazon SNS

In this post, we show you how to push a database DML (Data Manipulation Language) event from an Amazon Aurora PostgreSQL-Compatible Edition table out to downstream applications, by using a PostgreSQL database trigger, an AWS Lambda, and Amazon Simple Notification Service (Amazon SNS).

Aurora PostgreSQL is a PostgreSQL-compatible relational database built for the cloud that combines the performance and availability of traditional enterprise databases with the simplicity and cost-effectiveness of open-source databases.

You may need to notify downstream applications when there is a content update made to your relational database tables. This update could be in the form of an insert, update, or delete statement on the table, which needs to be relayed in real time to a Lambda function. With the support for Aurora PostgreSQL database triggers and their integration with Lambda, you can now call a downstream Lambda function in near-real-time whenever data manipulation is performed on the source database. For example, a new customer onboarding application might involve an insert of a new customer row into a database table. This customer lifecycle event might need to send a welcome email, or perform some other back office workflow.

You can also use the solution presented in this post with Amazon Relational Database Service (Amazon RDS) for PostgreSQL and Amazon Aurora MySQL-Compatible Edition.

Solution overview

The following diagram illustrates the solution architecture.

This architecture involves the following components:

A single database DML transaction to start the event chain – The event can be any database table insert, update, or delete, which invokes a corresponding database trigger.
An Aurora PostgreSQL table trigger – The trigger invokes a custom PL/pgSQL trigger function that makes a call to a consuming Lambda function.
A target Lambda function consumer of the database trigger – The Lambda function receives the newly inserted row from the database in its input argument. Then it publishes this to an SNS topic.
A target SNS topic – This topic is the recipient of either a summary (row key only) or the detailed row.
An SNS topic subscription – You can use this subscription to deliver notifications to the downstream actors (applications, Operations teams, and email inboxes).

We walk you through the following configuration steps:

Create Amazon Virtual Private Cloud (Amazon VPC) endpoints exposed inside the VPC, for Amazon SNS and the Lambda function.
Create an SNS topic and subscription.
Create a custom AWS Identity and Access Management (IAM) role for Lambda.
Create a database event-handling Lambda function launched in the same VPC (subnet).
Create an Amazon RDS IAM role.
Configure the Aurora PostgreSQL database.

Example use case

In our scenario, we assume we have different customer management applications, adding (inserting) different customer management business events into a relational business events table that is hosted on an Aurora PostgreSQL database.

The customer_business_events table records data from different applications, such as customer-onboard-app, customer-subscription-app, customer-payment-app, and customer-complaint-app. It records the customer in question (customer_reference_id), the category of the problem, and the reason, error, or exception (exception_reason) of the problem. The born-on business event timestamp is recorded in the event_timestamp column.

You can create the table with the following code:

————————————————
–Create a Sample Business Events Table———
————————————————
CREATE TABLE public.customer_business_events (
application varchar(64) NULL,
category varchar(64) NULL,
customer_reference_id varchar(8) NULL,
exception_reason varchar(128) NULL,
event_timestamp timestamp NULL
);

The following are some examples of the insert statement that populates the customer_business_events table:

————————————————
— Examples:
— Test data inserts in the table: customer_business_events
————————————————
INSERT INTO customer_business_events VALUES (‘customer-onboard-app’, ‘validate-optional-data’, ‘1001’, ‘Missing secondary phone number’, TO_TIMESTAMP(‘2021-07-14 14:00:00’, ‘YYYY-MM-DD HH24:MI:SS’ ));
INSERT INTO customer_business_events VALUES (‘customer-onboard-app’, ‘validate-optional-data’, ‘1001’, ‘Invalid email address’, TO_TIMESTAMP(‘2021-07-14 14:00:01’, ‘YYYY-MM-DD HH24:MI:SS’ ));
…..
…..
INSERT INTO customer_business_events VALUES (‘customer-complaint-app’, ‘log-customer-complaint’, ‘1001’, ‘Customer logged a complaint’, TO_TIMESTAMP(‘2021-07-14 14:14:06’, ‘YYYY-MM-DD HH24:MI:SS’ ));

Our goal is to have this table, upon insert (or update or delete), automatically invoke an event-processing function outside of the database to notify the Customer Relationship Operations team of this event for corresponding actions.

Prerequisites

This post assumes that you have an Aurora PostgreSQL (version 11.9 or above) cluster set up. For more information, see Creating an Amazon Aurora DB Cluster.

You can also use Amazon RDS for PostgreSQL (versions 12.6+, 13.2+, or 14.1+) instead of Aurora PostgreSQL.

Create VPC endpoints

We create two VPC endpoints to make our service access local to our VPC (in our private subnets), one for Amazon SNS and the other for Lambda function. This is opposed to having them go through the public internet. For instructions, refer to Create a VPC endpoint service configuration for interface endpoints. The following screenshot shows our endpoints.

Create an SNS topic and SNS topic subscription

We create the SNS topic my-rds-triggered-topic-1 and corresponding subscription for our Operations team’s email recipients, as shown in the following screenshot.

Create a Lambda IAM execution role

For this example, we set up a custom IAM execution role called MyRdsTriggeredLambdaExecutionRole for Lambda. For instructions, refer to Creating a role to delegate permissions to an AWS service. The role has three policies:

An AWS managed policy for permissions to run the Lambda function in a VPC, called arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole.
A custom policy to be able to publish to our SNS topic, called MyRdsTriggeredTopicWriterPolicy. See the following code:

{
“Version”: “2012-10-17”,
“Statement”: [
{
“Sid”: “TopicWriterSID”,
“Effect”: “Allow”,
“Action”: “sns:Publish”,
“Resource”: “arn:aws:sns:us-east-2::my-rds-triggered-topic-1”
}
]
}

A custom policy (MyBasicLambdaExecutionRolePolicy ) with basic permissions to run the Lambda function (such as access to Amazon CloudWatch Logs). See the following code:

{
“Version”: “2012-10-17”,
“Statement”: [
{
“Sid”: “VisualEditor0”,
“Effect”: “Allow”,
“Action”: “logs:CreateLogGroup”,
“Resource”:”arn:aws:logs:us-east-2::log-group:/aws/lambda/my_rds_triggered_topic_writer:*”
},
{
“Sid”: “VisualEditor1”,
“Effect”: “Allow”,
“Action”: [
“logs:CreateLogStream”,
“logs:PutLogEvents”
],
“Resource”:”arn:aws:logs:us-east-2::log-group:/aws/lambda/my_rds_triggered_topic_writer:*”
}
]
}

Create a Lambda function

We create the function my_rds_triggered_topic_writer launched in the same VPC (subnet). For more information, refer to Configuring a Lambda function to access resources in a VPC.

When we create our Lambda function, we set up a few key configurations:

Lambda function role – We attach the custom role we created.
Amazon VPC – Because our Aurora PostgreSQL instance is inside a VPC, as is typically the case, we host our Lambda function inside our VPC.
Environment variable for the SNS topic ARN – We add the environment variable ENV_TOPIC_ARN with the value arn:aws:sns:us-east-2:123456789100:my-rds-triggered-topic-1, which points to the ARN of the target SNS topic.
Lambda function code – We have the following as the source code for the Lambda function:

import json
import boto3
import os

# TOPIC ARN———————————–
TOPIC_ARN=os.environ[‘ENV_TOPIC_ARN’]
# API Related————————-
client = boto3.client(‘sns’)

def lambda_handler(event, context):
# TODO implement
print(‘Starting Lambda’)

# PRINT PostgreSQL Inserted Record ——————–
print(‘Printing Event record from RDS:’, event)

# PUBLISH TO TOPIC——————————
message_subject = “Notification for row-insert in RDS”
response = client.publish(
TopicArn=TOPIC_ARN,
Message=json.dumps({‘default’: json.dumps(event)}),
#Message=json.dumps(event),
MessageStructure=’json’,
Subject=message_subject
)
print(‘Response from SNS:’, response)

print(‘Exiting Lambda’)
return {
‘statusCode’: 200,
‘body’: json.dumps(‘Notification sent:’ + response[‘MessageId’])
}

The code reads the Amazon RDS JSON event payload (a single inserted row), and prints this out. Then it publishes the result to an SNS topic.

Create an Amazon RDS IAM role

For this example, we set up a custom role (MyRdsLambdaExecutorPolicyRole) to associate with our RDS instance. This includes a policy with permissions to invoke our Lambda function. See the following code:

{
“Version”: “2012-10-17”,
“Statement”: [
{
“Sid”: “AllowAccessToMyRDSTriggeredLambdaFunction”,
“Effect”: “Allow”,
“Action”: “lambda:InvokeFunction”,
“Resource”: “arn:aws:lambda:us-east-2::function:my_rds_triggered_topic_writer”
}
]
}

Create an Aurora PostgreSQL database

We set up our PostgreSQL database (Aurora for PostgreSQL v12.6, in our case) and launch the cluster inside our VPC, as highlighted in the following screenshot.

Install PostgreSQL extensions

We install the aws_lambda extension as shown in the following code. When you run the create extension command, that two additional schemas are created. We use the utilities inside these two schemas to invoke our Lambda function.

————————————————
–Create Extension for Lambda function———-
————————————————
CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE;

To establish connection to the Aurora PostgreSQL database you can use your preferred client tool as long as it has connectivity to the VPC. We used DBeaver tool to connect to the database.

PostgreSQL trigger function and JSON utility function

We use a utility function to convert tabular row data to JSON for convenience, as shown in the following code:

————————————————
–Utility Function to convert row data to JSON—
————————————————
CREATE OR REPLACE FUNCTION public.my_json_converter(
application varchar(64),
category varchar(64),
customer_reference_id varchar(8),
exception_reason varchar(128),
event_timestamp timestamp
)
RETURNS json
LANGUAGE PLPGSQL
AS
$$
declare
json_payload varchar(300);
BEGIN
select json_build_object(
‘application’, application ,
‘category’, category ,
‘customer_reference_id’, customer_reference_id ,
‘exception_reason’, exception_reason ,
‘event_timestamp’, event_timestamp
) into json_payload;

RETURN json_payload;
END;
$$

We use a trigger function to actually make the Lambda function call, as shown in the following code:

———————————————————-
–Trigger Function that calls external lambda function —
———————————————————-
CREATE OR REPLACE FUNCTION public.my_db_trigger_function()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
declare
my_record record;
BEGIN
raise notice ‘Start my_db_trigger_function’;
SELECT * FROM aws_lambda.invoke(aws_commons.create_lambda_function_arn(‘arn:aws:lambda:us-east-2: :function:my_rds_triggered_topic_writer’)
,(select public.my_json_converter(
new.application ,
new.category ,
new.customer_reference_id ,
new.exception_reason ,
new.event_timestamp
)
)
, ‘Event’
) into my_record;
raise notice ‘End my_db_trigger_function’;
RETURN NEW;
END;
$$

The create_lambda_function_arn creates a Lambda function ARN structure that is compatible with the invoke API call. Because we don’t need a response from the Lambda function into Aurora PostgreSQL, we choose to make an asynchronous call (Event) to the Lambda function without returning a response from Lambda function. For more information, refer to Example: Asynchronous (Event) invocation of Lambda functions.

PostgreSQL trigger

We tie the trigger function to the after-insert trigger on the customer_business_events table, as shown in the following code:

———————————————————
–Trigger Function that calls external lambda function—
———————————————————
CREATE TRIGGER trigger_insert_event
AFTER INSERT
ON customer_business_events
FOR EACH ROW
EXECUTE PROCEDURE my_db_trigger_function();

Test the solution

To test the solution, we insert a new row into the customer_business_events table:

INSERT INTO customer_business_events VALUES (‘customer-onboard-app’, ‘validate-optional-data’, ‘2001’, ‘Missing secondary phone number’, now());

We can validate the insert via the following query:

SELECT * from customer_business_events WHERE customer_reference_id = ‘2001’;

This invokes the trigger on the customer_business_events table, which in turn invokes the Lambda function with an output in the CloudWatch logs. In the following screenshot, the output shows the event payload (red). The timestamp, in green, captures the inserted time of the row in the database table (call to now()). Finally, it publishes the result to an SNS topic with the messageID returned in the response (in blue).

The corresponding email notification delivered via the topic subscription is as follows.

Troubleshooting

If your function code raises an error, Lambda generates a JSON representation of the error. For more information, refer to AWS Lambda function errors in Python.

Considerations

This solution is useful when you want near-real-time latency between a row insert (or update or delete), and the corresponding downstream notification. This might be the case for critical events. Although the example use case referenced in this post is a business events table, this approach isn’t limited to just business events. In fact, any data hosted in an RDS for PostgreSQL or an Aurora PostgreSQL table is a candidate for this approach. However, you should take the following considerations into account when using this approach:

Querying the table to find recent events is not possible. For example, the table doesn’t contain a last system-change-datetime column. Additionally, querying a table can’t account for deleted events.
The table has periods of OLTP inactivity as well.
This post references a FOR EACH ROW trigger, which is triggered one time for every row that is inserted (or updated or deleted). This may cause a small but noticeable performance penalty on the database cluster, for tables with high volumes of transactions.
Using a synchronous (request response) call to the Lambda function can impact database performance, because the trigger has to wait for the function response to return. This post uses the asynchronous approach to optimize latency. For more information about Lambda function invocation types, refer to Invoking an AWS Lambda function from an RDS for PostgreSQL DB instance.
High bursts of transactions against the database table result in an equivalent number of Lambda function invocations, which may breach the concurrency limits (Region-specific) of the function, causing possible throttling. Therefore, make sure that the transactions per second on the source table are under the Lambda concurrency limits (especially for FOR EACH ROW triggers). For more information, refer to Lambda function scaling.
This framework uses Amazon SNS as one of its steps to relay the content downstream. There is a 256 KB limit on a single Amazon SNS message payload. To mitigate this, you should populate the message body with just the essential data, such as IDs, keys, and a few selected attributes, to reduce the payload size. For information pertaining to publishing large messages, refer to Publishing large messages with Amazon SNS and Amazon S3. Additionally, refer to Amazon Simple Notification Service endpoints and quotas for more about quotas and limits.

Conclusion

In this post, we showed how you can invoke a Lambda function from a database trigger whenever a DML operation occurs on your source Aurora PostgreSQL table. Additionally, we showed how to integrate this Lambda function with an SNS topic. This allows you to notify downstream applications, in real time, when critical business updates are made to your relational database tables.

Although this post specifically uses Amazon RDS for PostgreSQL and Aurora for PostgreSQL, you can also extend it to Amazon RDS for MySQL and Amazon Aurora MySQL-Compatible Edition. For more information, refer to Invoking a Lambda function from an Amazon Aurora MySQL DB cluster.

About the Authors

Wajid Ali Mir is a Database Consultant with the Professional Services team at Amazon Web Services. He works as database migration specialist to help Amazon customers to migrate their on-premises database environment to AWS cloud database solutions.

Anjan Mukherjee is a Data Lake Architect at AWS, specializing in big data and analytics solutions. He helps customers build scalable, reliable, secure and high-performance applications on the AWS platform.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments