Wednesday, 8 February, 2023 UTC


Summary

Introduction
Developing in SQL poses significant problems when compared to other languages and frameworks. It's not easy to reuse statements across different scripts, there's no way to write tests to ensure data consistency, and dependency management requires external software solutions. Developers will typically write thousands of lines of SQL to ensure data processing occurs in the correct order. Additionally, documentation and metadata are afterthoughts because they need to be managed in an external catalog.
Google Cloud offers Dataform and SQLX to solve these challenges.
Dataform is a service for data analysts to test, develop, and deploy complex SQL workflows for data transformation in BigQuery. Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After extracting raw data from source systems and loading into BigQuery, Dataform helps you transform it into a well-defined, tested, and documented suite of data tables.
SQLX is an open source extension of SQL and the primary tool used in Dataform. As it is an extension, every SQL file is also a valid SQLX file. SQLX brings additional features to SQL to make development faster, more reliable, and scalable. It includes functions including dependencies management, automated data quality testing, and data documentation
Teams should quickly transform their SQL into SQLX to gain the full suite of benefits that Dataform provides. This blog contains a high-level, introductory guide demonstrating this process.
The steps in this guide use the Dataform on Google Cloud console. You can follow along or implement these steps with your own SQL scripts!
Getting Started
Here is an example SQL script we will transform into SQLX. This script takes a source table containing reddit data. The script cleans, deduplicates, and inserts the data into a new table with a partition.
code_block[StructValue([(u'code', u'CREATE OR REPLACE TABLE reddit_stream.comments_partitioned\r\nPARTITION BY\r\n comment_date\r\nAS\r\n\r\nWITH t1 as (\r\nSELECT\r\n comment_id,\r\n subreddit,\r\n author,\r\n comment_text,\r\n CAST(total_words AS INT64) total_words,\r\n CAST(reading_ease_score AS FLOAT64) reading_ease_score,\r\n reading_ease,\r\n reading_grade_level,\r\n CAST(sentiment_score AS FLOAT64) sentiment_score,\r\n CAST(censored AS INT64) censored,\r\n CAST(positive AS INT64) positive,\r\n CAST(neutral AS INT64) neutral,\r\n CAST(negative AS INT64) negative,\r\n CAST(subjectivity_score AS FLOAT64) subjectivity_score,\r\n CAST(subjective AS INT64) subjective,\r\n url,\r\n DATE(comment_date) comment_date,\r\n CAST(comment_hour AS INT64) comment_hour,\r\n CAST(comment_year AS INT64) comment_year,\r\n CAST(comment_day AS INT64) comment_day\r\nFROM reddit_stream.comments_stream\r\n)\r\nSELECT k.*\r\nFROM (\r\n SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k\r\n FROM t1 row\r\n GROUP BY comment_id\r\n)'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca851a4810>)])]

1. Create a new SQLX file and add your SQL

In this guide we’ll title our file as comments_partitioned.sqlx.
As you can see below, our dependency graph does not provide much information.

2. Refactor SQL to remove DDL and use only SELECT

In SQLX, you only write SELECT statements. You specify what you want the output of the script to be in the config block, like a view or a table as well as other types available. Dataform takes care of adding CREATE OR REPLACE or INSERT boilerplate statements.

3. Add a config object containing metadata

The config object will contain the output type, description, schema (dataset), tags, columns and their descriptions, and the BigQuery-related configuration. Check out the example below.
code_block[StructValue([(u'code', u'config {\r\n type: "table",\r\n description: "cleaned comments data and partitioned by date for faster performance",\r\n schema: "demo_optimized_staging",\r\n tags: ["reddit"],\r\n columns: {\r\n comment_id: "unique id for each comment",\r\n subreddit: "which reddit community the comment occurred",\r\n author: "which reddit user commented",\r\n comment_text: "the body of text for the comment",\r\n total_words: "total number of words in the comment",\r\n reading_ease_score: "a float value for comment readability score",\r\n reading_ease: "a plain-text english categorization of readability",\r\n reading_grade_level: "a plain-text english categorization of readability by school grade level",\r\n sentiment_score: "float value for sentiment of comment between -1 and 1",\r\n censored: "whether the comment needed to censoring by some process upstream",\r\n positive: "one-hot encoding 1 or 0 for positive",\r\n neutral: "one-hot encoding 1 or 0 for neutral",\r\n negative: "one-hot encoding 1 or 0 for negative",\r\n subjectivity_score: "float value for comment subjectivity score",\r\n subjective: "one-hot encoding 1 or 0 for subjective",\r\n url: "link to the comment on reddit",\r\n comment_date: "date timestamp for when the comment occurred",\r\n comment_hour: "integer for hour of comment post time",\r\n comment_year: "integer for year of comment post time",\r\n comment_month: "integer for month of comment post time",\r\n comment_day: "integer for day of comment post time"\r\n },\r\n bigquery: {\r\n partitionBy: "comment_date",\r\n labels: {\r\n cost_center: "123456"\r\n }\r\n }\r\n}'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e99ad0>)])]

4. Create declarations for any source tables

In our SQL script, we directly write reddit_stream.comments_stream. In SQLX, we’ll want to utilize a declaration to create relationships between source data and tables created by Dataform. Add a new comments_stream.sqlx file to your project for this declaration:
code_block[StructValue([(u'code', u'config {\r\n type: "declaration",\r\n database: "my-project",\r\n schema: "reddit_stream",\r\n name: "comments_stream",\r\n description: "A BigQuery table acting as a data sink for comments streaming in real-time."\r\n}'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e99b90>)])]
We’ll utilize this declaration in the next step.

5. Add references to declarations, tables, and views.

This will help build the dependency graph. In our SQL script, there is a single reference to the declaration. Simply replace reddit_stream.comments_stream with ${ref("comments_stream")}.
Managing dependencies with the ref function has numerous advantages.
  • The dependency tree complexity is abstracted away. Developers simply need to use the ref function and list dependencies.
  • It enables us to write smaller, more reusable and more modular queries instead of thousand-line-long queries. That makes pipelines easier to debug.
  • You get alerted in real time about issues like missing or circular dependencies

6. Add assertions for data validation

You can define data quality tests, called assertions, directly from the config block of your SQLX file. Use assertions to check for uniqueness, null values or any custom row condition. The dependency tree adds assertions for visibility.
Here are assertions for our example:
code_block[StructValue([(u'code', u'assertions: {\r\n uniqueKey: ["comment_id"],\r\n nonNull: ["comment_text"],\r\n rowConditions: [\r\n "total_words > 0"\r\n ]\r\n }'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e19ad0>)])]
These assertions will pass if comment_id is a unique key, if comment_text is non-null, and if all rows have total_words greater than zero.

7. Utilize JavaScript for repeatable SQL and parameterization

Our example has a deduplication SQL block. This is a perfect opportunity to create a JavaScript function to reference this functionality in other SQLX files. For this scenario, we’ll create the includes folder and add a common.js file with the following contents:
code_block[StructValue([(u'code', u'function dedupe(table, group_by_cols) {\r\n return `\r\nSELECT k.*\r\nFROM (\r\n SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k\r\n FROM ${table} row\r\n GROUP BY ${group_by_cols}\r\n)\r\n `\r\n}\r\n\r\nmodule.exports = { dedupe };'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e19c50>)])]
Now, we can replace that code block with this function call in our SQLX file as such:
${common.dedupe("t1", "comment_id")}
In certain scenarios, you may want to use constants in your SQLX files. Let’s add a constants.js file to our includes folder and create a cost center dictionary.
code_block[StructValue([(u'code', u'const COST_CENTERS = {\r\n dev: "000000",\r\n stage: "123123",\r\n prod: "123456"\r\n}\r\n\r\nmodule.exports = { COST_CENTERS }'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e20150>)])]
We can use this to label our output BigQuery table with a cost center. Here’s an example of using the constant in a SQLX config block:
code_block[StructValue([(u'code', u'bigquery: {\r\n partitionBy: "comment_date",\r\n labels: {\r\n cost_center: constants.COST_CENTERS.dev\r\n }\r\n }'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e201d0>)])]

8. Validate the final SQLX file and compiled dependency graph

After completing the above steps, let’s have a look at the final SQLX files:
comments_stream.sqlx
code_block[StructValue([(u'code', u'config {\r\n type: "declaration",\r\n database: "my-project",\r\n schema: "reddit_stream",\r\n name: "comments_stream",\r\n description: "A BigQuery table acting as a data sink for comments streaming in real-time."\r\n}'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e20310>)])]
comments_partitioned.sqlx
code_block[StructValue([(u'code', u'config {\r\n type: "table",\r\n description: "cleaned comments data and partitioned by date for faster performance",\r\n schema: "demo_optimized_staging",\r\n tags: ["reddit"],\r\n columns: {\r\n comment_id: "unique id for each comment",\r\n subreddit: "which reddit community the comment occurred",\r\n author: "which reddit user commented",\r\n comment_text: "the body of text for the comment",\r\n total_words: "total number of words in the comment",\r\n reading_ease_score: "a float value for comment readability score",\r\n reading_ease: "a plain-text english categorization of readability",\r\n reading_grade_level: "a plain-text english categorization of readability by school grade level",\r\n sentiment_score: "float value for sentiment of comment between -1 and 1",\r\n censored: "whether the comment needed to censoring by some process upstream",\r\n positive: "one-hot encoding 1 or 0 for positive",\r\n neutral: "one-hot encoding 1 or 0 for neutral",\r\n negative: "one-hot encoding 1 or 0 for negative",\r\n subjectivity_score: "float value for comment subjectivity score",\r\n subjective: "one-hot encoding 1 or 0 for subjective",\r\n url: "link to the comment on reddit",\r\n comment_date: "date timestamp for when the comment occurred",\r\n comment_hour: "integer for hour of comment post time",\r\n comment_year: "integer for year of comment post time",\r\n comment_month: "integer for month of comment post time",\r\n comment_day: "integer for day of comment post time"\r\n },\r\n bigquery: {\r\n partitionBy: "comment_date",\r\n labels: {\r\n cost_center: constants.COST_CENTERS.dev\r\n }\r\n },\r\n assertions: {\r\n uniqueKey: ["comment_id"],\r\n nonNull: ["comment_text"],\r\n rowConditions: [\r\n "total_words > 0"\r\n ]\r\n }\r\n}\r\n\r\nWITH t1 as (\r\nSELECT\r\n comment_id,\r\n subreddit,\r\n author,\r\n comment_text,\r\n CAST(total_words AS INT64) total_words,\r\n CAST(reading_ease_score AS FLOAT64) reading_ease_score,\r\n reading_ease,\r\n reading_grade_level,\r\n CAST(sentiment_score AS FLOAT64) sentiment_score,\r\n CAST(censored AS INT64) censored,\r\n CAST(positive AS INT64) positive,\r\n CAST(neutral AS INT64) neutral,\r\n CAST(negative AS INT64) negative,\r\n CAST(subjectivity_score AS FLOAT64) subjectivity_score,\r\n CAST(subjective AS INT64) subjective,\r\n url,\r\n DATE(comment_date) comment_date,\r\n CAST(comment_hour AS INT64) comment_hour,\r\n CAST(comment_year AS INT64) comment_year,\r\n CAST(comment_month AS INT64) comment_month,\r\n CAST(comment_day AS INT64) comment_day\r\nFROM ${ref(\'comments_stream\')}\r\nWHERE CAST(total_words AS INT64) > 0)\r\n\r\n\r\n${common.dedupe("t1", "comment_id")}'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3eca69e204d0>)])]
Let's validate the dependency graph and ensure the order of operations looks correct.
Now it’s easy to visualize where the source data is coming from, what output type comments_partitioned is, and what data quality tests will occur!
Next Steps
This guide outlines the first steps of transitioning legacy SQL solutions to SQLX and Dataform for improved metadata management, comprehensive data quality testing, and efficient development. Adopting Dataform streamlines the management of your cloud data warehouse processes allowing you to focus more on analytics and less on infrastructure management. For more information, check out Google Cloud’s Overview of Dataform. Explore our official Dataform guides and Dataform sample script library for even more hands-on experiences.
Related Article

Dataform is joining Google Cloud: Deploy data transformations with SQL in BigQuery

With our acquisition of Dataform, you can now leverage software development best practices to define, document, test and deploy data tran...
Read Article