Tuesday, April 16, 2024
No menu items!
HomeCloud ComputingTransform SQL into SQLX for Dataform

Transform SQL into SQLX for Dataform

Introduction

Developing in SQL poses significant problems when compared to other languages and frameworks.  It’s not easy to reuse statements across different scripts, there’s no way to write tests to ensure data consistency, and dependency management requires external software solutions.  Developers will typically write thousands of lines of SQL to ensure data processing occurs in the correct order.  Additionally, documentation and metadata are afterthoughts because they need to be managed in an external catalog.

Google Cloud offers Dataform and SQLX to solve these challenges. 

Dataform is a service for data analysts to test, develop, and deploy complex SQL workflows for data transformation in BigQuery. Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After extracting raw data from source systems and loading into BigQuery, Dataform helps you transform it into a well-defined, tested, and documented suite of data tables.

SQLX is an open source extension of SQL and the primary tool used in Dataform. As it is an extension, every SQL file is also a valid SQLX file. SQLX brings additional features to SQL to make development faster, more reliable, and scalable. It includes functions including dependencies management, automated data quality testing, and data documentation

Teams should quickly transform their SQL into SQLX to gain the full suite of benefits that Dataform provides.  This blog contains a high-level, introductory guide demonstrating this process.

The steps in this guide use the Dataform on Google Cloud console. You can follow along or implement these steps with your own SQL scripts!

Getting Started

Here is an example SQL script we will transform into SQLX. This script takes a source table containing reddit data. The script cleans, deduplicates, and inserts the data into a new table with a partition.

code_block[StructValue([(u’code’, u’CREATE OR REPLACE TABLE reddit_stream.comments_partitionedrnPARTITION BYrn comment_daternASrnrnWITH t1 as (rnSELECTrn comment_id,rn subreddit,rn author,rn comment_text,rn CAST(total_words AS INT64) total_words,rn CAST(reading_ease_score AS FLOAT64) reading_ease_score,rn reading_ease,rn reading_grade_level,rn CAST(sentiment_score AS FLOAT64) sentiment_score,rn CAST(censored AS INT64) censored,rn CAST(positive AS INT64) positive,rn CAST(neutral AS INT64) neutral,rn CAST(negative AS INT64) negative,rn CAST(subjectivity_score AS FLOAT64) subjectivity_score,rn CAST(subjective AS INT64) subjective,rn url,rn DATE(comment_date) comment_date,rn CAST(comment_hour AS INT64) comment_hour,rn CAST(comment_year AS INT64) comment_year,rn CAST(comment_day AS INT64) comment_dayrnFROM reddit_stream.comments_streamrn)rnSELECT k.*rnFROM (rn SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] krn FROM t1 rowrn GROUP BY comment_idrn)’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a595baf90>)])]

1.  Create a new SQLX file and add your SQL 

In this guide we’ll title our file as comments_partitioned.sqlx.

As you can see below, our dependency graph does not provide much information.

2. Refactor SQL to remove DDL and use only SELECT

In SQLX, you only write SELECT statements. You specify what you want the output of the script to be in the config block, like a view or a table as well as other types available. Dataform takes care of adding CREATE OR REPLACE or INSERT boilerplate statements.

3. Add a config object containing metadata

The config object will contain the output type, description, schema (dataset), tags, columns and their descriptions, and the BigQuery-related configuration. Check out the example below.

code_block[StructValue([(u’code’, u’config {rn type: “table”,rn description: “cleaned comments data and partitioned by date for faster performance”,rn schema: “demo_optimized_staging”,rn tags: [“reddit”],rn columns: {rn comment_id: “unique id for each comment”,rn subreddit: “which reddit community the comment occurred”,rn author: “which reddit user commented”,rn comment_text: “the body of text for the comment”,rn total_words: “total number of words in the comment”,rn reading_ease_score: “a float value for comment readability score”,rn reading_ease: “a plain-text english categorization of readability”,rn reading_grade_level: “a plain-text english categorization of readability by school grade level”,rn sentiment_score: “float value for sentiment of comment between -1 and 1”,rn censored: “whether the comment needed to censoring by some process upstream”,rn positive: “one-hot encoding 1 or 0 for positive”,rn neutral: “one-hot encoding 1 or 0 for neutral”,rn negative: “one-hot encoding 1 or 0 for negative”,rn subjectivity_score: “float value for comment subjectivity score”,rn subjective: “one-hot encoding 1 or 0 for subjective”,rn url: “link to the comment on reddit”,rn comment_date: “date timestamp for when the comment occurred”,rn comment_hour: “integer for hour of comment post time”,rn comment_year: “integer for year of comment post time”,rn comment_month: “integer for month of comment post time”,rn comment_day: “integer for day of comment post time”rn },rn bigquery: {rn partitionBy: “comment_date”,rn labels: {rn cost_center: “123456”rn }rn }rn}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959ec10>)])]

4. Create declarations for any source tables

In our SQL script, we directly write reddit_stream.comments_stream. In SQLX, we’ll want to utilize a declaration to create relationships between source data and tables created by Dataform. Add a new comments_stream.sqlx file to your project for this declaration:

code_block[StructValue([(u’code’, u’config {rn type: “declaration”,rn database: “my-project”,rn schema: “reddit_stream”,rn name: “comments_stream”,rn description: “A BigQuery table acting as a data sink for comments streaming in real-time.”rn}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959e5d0>)])]

We’ll utilize this declaration in the next step.

5. Add references to declarations, tables, and views.

This will help build the dependency graph.  In our SQL script, there is a single reference to the declaration. Simply replace reddit_stream.comments_stream with ${ref(“comments_stream”)}. 

Managing dependencies with the ref function has numerous advantages.

The dependency tree complexity is abstracted away. Developers simply need to use the ref function and list dependencies.

It enables us to write smaller, more reusable and more modular queries instead of thousand-line-long queries. That makes pipelines easier to debug.

You get alerted in real time about issues like missing or circular dependencies

6. Add assertions for data validation

You can define data quality tests, called assertions, directly from the config block of your SQLX file. Use assertions to check for uniqueness, null values or any custom row condition. The dependency tree adds assertions for visibility.

Here are assertions for our example:

code_block[StructValue([(u’code’, u’assertions: {rn uniqueKey: [“comment_id”],rn nonNull: [“comment_text”],rn rowConditions: [rn “total_words > 0″rn ]rn }’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959c090>)])]

These assertions will pass if comment_id is a unique key, if comment_text is non-null, and if all rows have total_words greater than zero.

7. Utilize JavaScript for repeatable SQL and parameterization

Our example has a deduplication SQL block.  This is a perfect opportunity to create a JavaScript function to reference this functionality in other SQLX files.  For this scenario, we’ll create the includes folder and add a common.js file with the following contents:

code_block[StructValue([(u’code’, u’function dedupe(table, group_by_cols) {rn return `rnSELECT k.*rnFROM (rn SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] krn FROM ${table} rowrn GROUP BY ${group_by_cols}rn)rn `rn}rnrnmodule.exports = { dedupe };’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959c1d0>)])]

Now, we can replace that code block with this function call in our SQLX file as such: 

${common.dedupe(“t1”, “comment_id”)}

In certain scenarios, you may want to use constants in your SQLX files. Let’s add a constants.js file to our includes folder and create a cost center dictionary.

code_block[StructValue([(u’code’, u’const COST_CENTERS = {rn dev: “000000”,rn stage: “123123”,rn prod: “123456”rn}rnrnmodule.exports = { COST_CENTERS }’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959c810>)])]

We can use this to label our output BigQuery table with a cost center.  Here’s an example of using the constant in a SQLX config block:

code_block[StructValue([(u’code’, u’bigquery: {rn partitionBy: “comment_date”,rn labels: {rn cost_center: constants.COST_CENTERS.devrn }rn }’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959c950>)])]

8. Validate the final SQLX file and compiled dependency graph

After completing the above steps, let’s have a look at the final SQLX files:

comments_stream.sqlx

code_block[StructValue([(u’code’, u’config {rn type: “declaration”,rn database: “my-project”,rn schema: “reddit_stream”,rn name: “comments_stream”,rn description: “A BigQuery table acting as a data sink for comments streaming in real-time.”rn}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959ca90>)])]

comments_partitioned.sqlx

code_block[StructValue([(u’code’, u’config {rn type: “table”,rn description: “cleaned comments data and partitioned by date for faster performance”,rn schema: “demo_optimized_staging”,rn tags: [“reddit”],rn columns: {rn comment_id: “unique id for each comment”,rn subreddit: “which reddit community the comment occurred”,rn author: “which reddit user commented”,rn comment_text: “the body of text for the comment”,rn total_words: “total number of words in the comment”,rn reading_ease_score: “a float value for comment readability score”,rn reading_ease: “a plain-text english categorization of readability”,rn reading_grade_level: “a plain-text english categorization of readability by school grade level”,rn sentiment_score: “float value for sentiment of comment between -1 and 1”,rn censored: “whether the comment needed to censoring by some process upstream”,rn positive: “one-hot encoding 1 or 0 for positive”,rn neutral: “one-hot encoding 1 or 0 for neutral”,rn negative: “one-hot encoding 1 or 0 for negative”,rn subjectivity_score: “float value for comment subjectivity score”,rn subjective: “one-hot encoding 1 or 0 for subjective”,rn url: “link to the comment on reddit”,rn comment_date: “date timestamp for when the comment occurred”,rn comment_hour: “integer for hour of comment post time”,rn comment_year: “integer for year of comment post time”,rn comment_month: “integer for month of comment post time”,rn comment_day: “integer for day of comment post time”rn },rn bigquery: {rn partitionBy: “comment_date”,rn labels: {rn cost_center: constants.COST_CENTERS.devrn }rn },rn assertions: {rn uniqueKey: [“comment_id”],rn nonNull: [“comment_text”],rn rowConditions: [rn “total_words > 0″rn ]rn }rn}rnrnWITH t1 as (rnSELECTrn comment_id,rn subreddit,rn author,rn comment_text,rn CAST(total_words AS INT64) total_words,rn CAST(reading_ease_score AS FLOAT64) reading_ease_score,rn reading_ease,rn reading_grade_level,rn CAST(sentiment_score AS FLOAT64) sentiment_score,rn CAST(censored AS INT64) censored,rn CAST(positive AS INT64) positive,rn CAST(neutral AS INT64) neutral,rn CAST(negative AS INT64) negative,rn CAST(subjectivity_score AS FLOAT64) subjectivity_score,rn CAST(subjective AS INT64) subjective,rn url,rn DATE(comment_date) comment_date,rn CAST(comment_hour AS INT64) comment_hour,rn CAST(comment_year AS INT64) comment_year,rn CAST(comment_month AS INT64) comment_month,rn CAST(comment_day AS INT64) comment_dayrnFROM ${ref(‘comments_stream’)}rnWHERE CAST(total_words AS INT64) > 0)rnrnrn${common.dedupe(“t1”, “comment_id”)}’), (u’language’, u”), (u’caption’, <wagtail.wagtailcore.rich_text.RichText object at 0x3e6a5959cdd0>)])]

Let’s validate the dependency graph and ensure the order of operations looks correct.

Now it’s easy to visualize where the source data is coming from, what output type comments_partitioned is, and what data quality tests will occur!

Next Steps

This guide outlines the first steps of transitioning legacy SQL solutions to SQLX and Dataform for improved metadata management, comprehensive data quality testing, and efficient development. Adopting Dataform streamlines the management of your cloud data warehouse processes allowing you to focus more on analytics and less on infrastructure management. For more information, check out Google Cloud’s Overview of Dataform.  Explore our official Dataform guides and Dataform sample script library for even more hands-on experiences.

Related Article

Dataform is joining Google Cloud: Deploy data transformations with SQL in BigQuery

With our acquisition of Dataform, you can now leverage software development best practices to define, document, test and deploy data tran…

Read Article

Cloud BlogRead More

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments