Thursday, April 18, 2024
No menu items!
HomeDatabase ManagementLoad RDF data into Amazon Neptune with AWS Glue

Load RDF data into Amazon Neptune with AWS Glue

In this post, we present a design for a common technical requirement: ingest data from multiple sources to a target Resource Description Framework (RDF) graph database. Our target is Amazon Neptune, a managed graph database service. RDF is one of two graph models supported by Neptune. The other is Labeled Property Graph (LPG). Each graph model has its merits, and each has a slightly different ETL design approach. We focus on RDF because it is often used to implement knowledge graphs and it draws its data from multiple, disparate sources, often requiring ETL workflow to bring the data together. Additionally, performing ETL against a Neptune RDF graph presents technical challenges that we are eager to solve. These challenges are the following:

Ingesting data at scale is preferably done using a bulk loader. Neptune has a built-in bulk loader to ingest data already represented in graph form. But source data usually is not provided in a supported RDF serialization format: Turtle, N-Triples, N-Quads, or RDF/XML. Data providers commonly offer data in a format such as CSV.
To keep all the information correctly updated, we need to insert, update and remove triples from the RDF graph as part of a merge. This cannot be achieved with the bulk loader without previous transformations.
RDF, as mentioned, has several serialization formats, but there are few examples showing how to produce or consume a large RDF dataset using a parallel processing technology such as Apache Spark.

To implement the ETL workflow we demonstrate the use of AWS Glue (a serverless data integration service) and an approach to serialize and de-serialize N-Quads in Apache Spark.

Solution Overview

Globally each year organizations lose tens of billions of dollars to fraud. An RDF knowledge graph can store relationships between transactions, actors, and related entities to help find common patterns and detect fraudulent activities.

Let’s assume that there is an organization that contracts construction companies. The organization must track associations between thousands of interlinked companies to be able to review and investigate entities that may be excluded from contracting because of a guilt by association with a company that has been already excluded in the past. This is done to comply with public regulation or simply avoid the risk of fraud.

To speed up the review process, the organization has built an RDF knowledge graph in Neptune, which holds all the companies and exclusion criteria information, together with other significant data.

To keep company data current, we receive data from multiple sources in a variety of formats, which cannot be directly ingested in Neptune using the bulk loader. We use a daily AWS Glue workflow to update the graph. The following diagram shows the approach described.

Every day an external data provider uploads to a designated Amazon Simple Storage Service (Amazon S3) bucket two CSV files. The first file lists new companies and changes in name or status of existing companies. The second file is an exclusion list indicating companies that are flagged with legal trouble and should not be contracted. Both files are deltas, indicating new or updated company data, rather than the full company list.
The ETL workflow implemented with AWS Glue first exports from Neptune current data about each of the companies mentioned in the daily files. The export is RDF data represented as N-Quads.
The workflow then runs a job that loads the three sources and applies business logic to determine how to merge the new data into the Neptune database. It determines the necessary inserts, updates, and deletes needed in the Neptune database.
The workflow stages in the S3 bucket two sets of changes for the Neptune database: adds (inserts and updates) and deletes. This data is RDF serialized as N-Quads.
The workflow concludes by bulk loading to the Neptune database the adds and unloading the deletes.
Afterwards, a company data analyst explores the changes in the relationship using visualizations in a Neptune Workbench notebook, a Jupyter notebook environment customized for use with Neptune.

Our pattern has a few salient design characteristics:

Bulk read – Because the AWS Glue job intends to update, not blindly insert, into the Neptune database, it must read current data from the database. Working at scale, we prefer to read current data in bulk rather than performing lots of small SPARQL queries using the SPARQL endpoint. In this post, we discuss how to use the Neptune-Export service to efficiently export the data’s current state to an S3 bucket. We assume the data in the Neptune database does not change between the time of export and ETL.
Bulk load – We write to Neptune using its bulk loader rather than its SPARQL endpoint. The bulk loader can efficiently ingest a large dataset into Neptune. Our AWS Glue workflow prepares this dataset as a file for the bulk loader, but it avoids having to connect a Neptune endpoint and manage its own SPARQL writes, addressing concurrency and error handling
Bulk update – A bulk load appends to, but doesn’t update, the Neptune database. But what if the old value is no longer valid as a result of the job? For example, if a company changes it legal entity name, we want to add the new name and remove the old name. In our pattern, the job tracks all deletes in a file (or multiple files if the number is large) and calls Neptune’s SPARQL endpoint to issue a SPARQL UNLOAD of those deletes.
RDF serialization – The Neptune bulk loader requires the RDF data in a serialization such as Turtle, RDF-XML, NTriples, or N-Quads. These formats are different from CSV, JSON, Parquet, and other common formats featured in AWS Glue examples. We need to produce serialized large RDF data efficiently. We demonstrate this using Spark for N-Quads serialization, taking advantage of N-Quads’ similarity to CSV.

We implement the solution through the following steps:

Set up AWS resources, including Neptune, a Neptune Workbench notebook, and AWS Glue resources.
In the Neptune notebook, load the initial Day Zero dataset and explore it through various SPARQL queries.
Run the Day One ETL workflow in AWS Glue (nep_glue_wf). There are four steps:
Run the AWS Glue job to perform the export of data from the Neptune database.
Run the AWS Glue job to perform the main processing of Day One.
Run the AWS Glue job to bulk load the updates to Neptune database.
Run the AWS Glue job to remove old data, via SPARQL UNLOAD, in the Neptune database.

In the Neptune notebook, explore the Day One changes through various SPARQL queries. Update the status a few companies in the network that are “guilty by association.”

Prerequisites

For this walkthrough, you need an AWS account with permission to create resources in Neptune, AWS Glue, and related services.

Set up resources with AWS CloudFormation

We provide an AWS CloudFormation template stack you can launch to create your resources. Provide a stack name on the first page and leave the default settings for everything else. Wait for the stack to display Create Complete before moving on to the other steps.

Create your resources by launching the following CloudFormation stack:

The CloudFormation stack creates the following resources:

A Neptune database cluster
A Neptune notebook
A VPC with at least two private subnets that have routes to an Amazon S3 VPC endpoint
The Neptune-Export service
The S3 bucket nep-glue-s3bucket-{xxx}, which contains the following files:
data/source/initial.nq
data/source/company/2021-06-01/daily_company.csv
data/source/exclusion/2021-06-01/daily_exclusion_list.csv
notebook/Neptune-Glue-Blog-Post.ipynb

An AWS Glue Data Catalog database (nep_glue_db) and two connections to your Neptune cluster (neptune_connection, neptune_connection_export)
Four AWS Glue jobs to transform and load the data into your Neptune database:
neptune_export
neptune_daily_ETL, the main job to combine and transform data and prepare the update to the Neptune database
neptune_load
neptune_delete

An AWS Glue workflow (nep_glue_wf) to orchestrate the crawler and run the jobs

Explore the initial dataset using Neptune notebook

Assuming the role of graph analyst, we investigate the initial company relationships in the Neptune notebook as follows.

On the Outputs tab of the CloudFormation stack, find the value for NeptuneSagemakerNotebook and open the link. You’re redirected to the main Jupyter page.
Select the Neptune-Glue-Blog-Post.ipynb notebook. In the remaining steps, you work in that notebook.
Load the Day Zero data by running the cell under the heading Step 1 – Day Zero/Initial State Load. The Source and Load ARN parameters are set to the correct values for your environment; confirm by comparing them with the output values S3Bucket and NeptuneLoadFromS3IAMRoleArn from the CloudFormation stack.

Run the query under the heading 2a) Companies to view a summary of companies. You can observe that each company has a legal entity name, a status (Active or Dissolved), an update date, and a parent company.

Run the query under the heading 2b) Exclusions to see current exclusions for companies 3000 and 4000.

Run the query under the heading 2c) Relationships to see that there are relationships linking companies 110, 2000, and 9000. There is also a relationship between 3000 and 9500.

Run the query under the heading 2d) Relationship to Excluded to see there is a relationship between companies 3000 and 9500, and 3000 has an exclusion. As graph analyst, you might then investigate if 9500 should be excluded.
Run the query under the heading 2e) Visualization to produce a visualization of companies, their relationships, and their exclusions. Choose the Graph tab to view the visualization. The graph shows that 3000 is linked to 9500 but also has an exclusion, as we saw in our tabular result in the previous step.

Skip the cell under the heading 2f) Example of an insert to exclude a company. We come back to this after Day One.

Run an ETL workflow in AWS Glue

To run the Day One AWS Glue workflow, complete the following steps:

On the AWS Glue console, choose Workflows in the navigation pane.
From the list of workflows, choose nep_glue_wf.
On the Actions menu, choose Run.

The workflow runs four AWS Glue jobs. The next sections describe those jobs.

Neptune_export

neptune_export is a Python shell job that exports to the S3 bucket current company data and exclusions from the Neptune database. This data is used by the next job in the workflow (neptune_daily_ETL), which depends on knowledge of the current state in the Neptune database to determine how to update it.

The Neptune_export job calls the Neptune Export service, requesting a dump of company and exclusion data in RDF form. When the job is complete, that data is available as an N-Quads file in the S3 bucket. To specify which data to dump, the job issues a SPARQL CONSTRUCT query. In case the graph is very large, we filter the data by the list of companies that are present in the daily incremental file. The following query gets company data for companies in today’s list:

CONSTRUCT {?company ?p ?o} WHERE {
?company rdf:type octank:company .
FILTER(?company in (COMPANY_LIST)) . # today’s companies only
{
{
?company ?p ?o .
FILTER (?p in (octank:companyID, octank:legalEntityName,
octank:companyStatus, octank:updateDate,
octank:hasParentCompany)) . # these predicates
} UNION {
# plus most recent exclusion
?company octank:hasExclusion ?o .
{
SELECT ?s (MAX(?ex) as ?o) WHERE {
?s octank:hasExclusion ?ex .
?ex octank:updateDate ?upd .
} GROUP BY ?s
} .
BIND(octank:hasExclusion as ?p)
}
}

To create the company list, we read the daily incremental file using the AWS SDK for Pandas included with the AWS Glue Python shell with Python v3.9:

df = wr.s3.read_csv(path=args[‘s3InputPath’], usecols=[‘companyID’])
COMPANY_LIST = df[‘companyID’].tolist()

Daily_company_ETL

Daily_company_ETL is an Apache Spark job that combines the two daily company CSV files with the exported Neptune data. It applies business rules to decide how to update the Neptune database. It doesn’t actually connect to the Neptune database; it reads from the export rather than querying the Neptune database directly. It also doesn’t update the Neptune database directly; it tracks any necessary adds and deletes in Amazon S3, which are processed downstream by other jobs in the workflow.

The business logic is as follows:

If a company from the daily feed is not in the Neptune database, it’s added to the Neptune database.
If a company from the daily feed is in the Neptune database but has a change (for example, a status change, or a change in legal entity name), the change is added to the Neptune database. The old value is deleted from the Neptune database.
If a new company exclusion is reported for a company that is in the Neptune database, that exclusion is added to the Neptune database’s historical record of exclusion activity for the company.

The job must read and write RDF data in a scalable way using common Spark techniques. To support this goal, we serialize RDF using N-Quads. N-Quads’s structure is ideal for Spark processing. It has one statement per line, where each statement is a space-separated list of subject (S), predicate (P), object (O), and named graph (G). An N-Quads file can be regarded as a CSV with no header row and a space delimiter. The following is an excerpt of the company export:

<http://octank.ca/cn/company#110> <http://octank.ca/cn/companyID> “110” .
<http://octank.ca/cn/company#110> <http://octank.ca/cn/legalEntityName> “Spinka LLC” .
<http://octank.ca/cn/company#110> <http://octank.ca/cn/companyStatus> “Active” .
<http://octank.ca/cn/company#1500> <http://octank.ca/cn/companyID> “1500” .
<http://octank.ca/cn/company#1500> <http://octank.ca/cn/legalEntityName> “Megasystems” .
<http://octank.ca/cn/company#1500> <http://octank.ca/cn/companyStatus> “Active” .

The Spark job reads an N-Quads file into a data frame–with columns S, P, O, and G–using logic like the following:

df = spark.read.options(delimiter=’ ‘).csv(source_path)
.select(
col(“_c0”).alias(“S”),
col(“_c1”).alias(“P”),
col(“_c2”).alias(“O”),
col(“_c3”).alias(“G”),
)

The job then strips out the namespace portion of URIs in the RDF data, keeping just IDs. This makes comparison with the daily company feed – which does not use URIs — easier.

ColumnList = [ “S”,”P”,”O”]
ExpressionMap = {“P” : NAMED_GRAPH_URI+”|<|>” , “O”:PARENT_URI+”|”+ENTITY2_URI+”|<|>|#” }

df = source_df.select(
ColumnList[0],
lit(regexp_replace(ColumnList[1],ExpressionMap[ColumnList[1]],””)).alias(ColumnList[1]),
lit(regexp_replace(ColumnList[2],ExpressionMap[ColumnList[2]],””)).alias(ColumnList[2])
)

For example, if the predicate (P) is <http://octank.ca/cn/legalEntityName>, it simplifies it to legalEntityName.

The job then groups the data by subject and pivots by predicate so that each record in the frame is one customer object:

result_df= df.groupBy(“S”).pivot(“P”).agg(max_(“O”)).drop(“S”)

In short, starting from raw N-Quads, the job produces an orderly data frame like in the following table, which can then be combined with other source data to implement business logic.

Company ID
Legal Entity Name
Company Status
110
Spinka LLC
Active
1500
Megasystems
Active

The output of the job is again N-Quads-serialized RDF. We reverse the preceding logic by adding namespaces back to form URIs and unpivoting the data frame so that each row is an N-Quads statement with S, P, O, and G. We have implemented some functions for the serialization. You can review the full logic of the job by viewing the script on the AWS Glue console.

Neptune_load

Neptune_load is a Python shell job that calls the Neptune bulk loader to load into the Neptune database the adds data in Amazon S3 that was produced by the Daily_Company_ETL job. The job invokes the loader’s HTTP-based load endpoint, passing the path in Amazon S3 of the adds file. On a given day, there might be multiple adds files. It loads each and checks the load status until the load is complete.

Several design considerations influence this job:

The Neptune loader supports several RDF serializations. The job uses N-Quads because it can be produced efficiently by a Spark job using the approach discussed previously.
The Neptune loader can be optimized in several respects. Among the optimizations we chose are:
Using OVERSUBSCRIBE parallelism to use maximum resources in the Neptune writer instance.
Loading a few large files rather than several small files. The data in our demo is small, but in general the ETL job can partition the files so that it produces a few large files.

Neptune_delete

The Neptune_delete job is a Python shell job that calls the Neptune SPARQL endpoint to perform a SPARQL UNLOAD in the Neptune database of the deletes data in Amazon S3 that was produced by the Daily_Company_ETL job.

Each deletes file–there can be several–consists of RDF triples serialized as N-Quads. Unloading requires two steps: creating a presigned URL for the S3 object, and invoking the SPARQL UNLOAD to unload that data. The call to the SPARQL endpoint is a POST containing a header called update of the following form:

UNLOAD <s3-presigned-url> FROM GRAPH <named-graph-uri>

To achieve optimal efficiency, we cap each deletes file at 10,000 triples. We avoid sending huge files because the SPARQL endpoint is transactional and not meant for bulk deletes. Additionally, the SPARQL endpoint doesn’t support Gzipped files.

Explore Day One relationships in the Neptune notebook

Return to the Neptune notebook to discover changes from Day One. Rerun the queries and observe the changes.

Run the cell under the heading 2a) Companies. There is a new company: 1000. The legal name of company 2000 changed to Ontario Trucking. The status of company 1500 is now Dissolved.

Run the cell under the heading 2b) Exclusions. The newly added company 1000 has an exclusion with status Not Excluded, indicating that it was removed from an exclusion before appearing in the Neptune database. Company 110 now has an exclusion. Company 3000 has two exclusions. The most recent has a status of Not Excluded. Therefore, on Day One, company 3000 was removed from the exclusion list!

Run the cell under the heading 2c) Relationships. The results are the same as on Day Zero.
Run the cell under the heading 2d) Relationship to Excluded. The results suggest that because company 110 now has an exclusion, related companies 2000 and 9000 should be investigated. Notice also that companies 3000 and 9500 don’t appear in the results this time. Company 3000 is no longer excluded!

Run the visualization cell under the heading 2e) Visualization. As graph analyst, notice the cluster of companies 110, 2000, and 9000. Company 110 is shown with an exclusion, which brings the whole cluster into question. You decide to mark companies 2000 and 9000 as excluded. You decide not to investigate the parent company 42.

Run the cell under the heading 2f) Example of an insert to exclude a company to perform a SPARQL insert. The next ETL job will have this update in its export and will consider companies 2000 and 9000 excluded.

Clean up

To avoid incurring future charges, delete the stack that you created previously.

Conclusion

In this post, we showed an ETL pattern built on AWS Glue to update RDF data in a Neptune database. The pattern uses the Neptune bulk loader, but because the loader requires serialized RDF, one of the AWS Glue job’s responsibilities is to stage the data to update in a serialized form. It uses N-Quads. The job needs to read existing Neptune data; we show how to use the Neptune-Export service to export existing data to Amazon S3 so it can be read by the AWS Glue job. Additionally, the AWS Glue job sometimes needs to replace an existing value. It assembles all the old triples to be deleted in an N-Quads file and calls Neptune SPARQL UNLOAD to perform the delete.

You may also be interested in glue-neptune tools, which you can use to load a property graph–a graph representation that Neptune supports in addition to RDF–into Neptune using AWS Glue. Additionally, Neptune supports loading data from a relational database using AWS Database Migration Service (AWS DMS).

Ask your questions in the comments.

About the Authors

Mike Havey is a Solutions Architect for AWS with over 25 years of experience building enterprise applications. Mike is the author of two books and numerous articles. His Amazon author page is https://www.amazon.com/Michael-Havey/e/B001IO9JBI.

Fabrizio Napolitano is a Principal Analytics Specialist SA. He has worked in the analytics domain for the last 20 years, and has recently and quite by surprise become a Hockey Dad after moving to Canada.

Read MoreAWS Database Blog

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments