Azure Data Factory, Extracting multiple tables with 1 dataset

Oct 29, 2021 12:21 PM

Personal Blog
Bryce Kerseboom
Microsoft
Azure
Data Factory
Azure SQL Database
ELT.

Extract, Load, Transform (ETL)

With more and more data platforms shifting to the cloud we also see a change in how to process our data. This change is mostly based on ELT, where we first upload everything to the cloud and then start to transform it, in contrast to the traditional ETL we previously used in on-premise environments where we transform the data first and then upload it.

Data, everywhere!

In the modern data world we see that more and more datasets become available to consume as sources for our data platforms. Most sources are still databases, and within a Microsoft environment these are mostly SQL Server based databases, containing many objects (tables, views, etc.).

When using the Azure Data Factory (ADF) as data orchestrator, it can become very tedious, quite quickly when you have to deal with many objects which you need to extract and load into an Azure environment. Specifying multiple datasets in the ADF is a lot of manual work and doing this for hundreds if not more objects is not a productive way of spending your time.

Luckily there is a neat way to get this done with minimal effort.

Dynamically getting your objects from SQL

In this example I used 2 Azure SQL databases, one of which is pre-filled with the example setup from the AdventureWorksLT (sqldb-Source), and one of which is completely empty (sqldb-Target). This can be selected when creating the resource in Azure.

I have added all the SalesLT tables from sqldb-Source in the sqldb-Target database, but under the Stg schema. If you want to follow the example you can use the following code for your convenience:

CREATE SCHEMA [Stg]
GO

CREATE TABLE [Stg].[Address](
    [AddressID] [int] NOT NULL,
    [AddressLine1] [nvarchar](60) NOT NULL,
    [AddressLine2] [nvarchar](60) NULL,
    [City] [nvarchar](30) NOT NULL,
    [StateProvince] [nvarchar](30) NOT NULL,
    [CountryRegion] [nvarchar](30) NOT NULL,
    [PostalCode] [nvarchar](15) NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[Customer](
    [CustomerID] [int] NOT NULL,
    [NameStyle] [bit] NOT NULL,
    [Title] [nvarchar](8) NULL,
    [FirstName] [nvarchar](30) NOT NULL,
    [MiddleName] [nvarchar](30) NULL,
    [LastName] [nvarchar](30) NOT NULL,
    [Suffix] [nvarchar](10) NULL,
    [CompanyName] [nvarchar](128) NULL,
    [SalesPerson] [nvarchar](256) NULL,
    [EmailAddress] [nvarchar](50) NULL,
    [Phone] [nvarchar](24) NULL,
    [PasswordHash] [varchar](128) NOT NULL,
    [PasswordSalt] [varchar](10) NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[CustomerAddress](
    [CustomerID] [int] NOT NULL,
    [AddressID] [int] NOT NULL,
    [AddressType] [nvarchar](30) NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[Product](
    [ProductID] [int] NOT NULL,
    [Name] [nvarchar](50) NOT NULL,
    [ProductNumber] [nvarchar](25) NOT NULL,
    [Color] [nvarchar](15) NULL,
    [StandardCost] [money] NOT NULL,
    [ListPrice] [money] NOT NULL,
    [Size] [nvarchar](5) NULL,
    [Weight] [decimal](8, 2) NULL,
    [ProductCategoryID] [int] NULL,
    [ProductModelID] [int] NULL,
    [SellStartDate] [datetime] NOT NULL,
    [SellEndDate] [datetime] NULL,
    [DiscontinuedDate] [datetime] NULL,
    [ThumbNailPhoto] [varbinary](max) NULL,
    [ThumbnailPhotoFileName] [nvarchar](50) NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY] 
GO

CREATE TABLE [Stg].[ProductCategory](
    [ProductCategoryID] [int] NOT NULL,
    [ParentProductCategoryID] [int] NULL,
    [Name] [nvarchar](30) NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[ProductDescription](
    [ProductDescriptionID] [int]  NOT NULL,
    [Description] [nvarchar](400) NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[ProductModel](
    [ProductModelID] [int] NOT NULL,
    [Name] [nvarchar](30) NOT NULL,
    [CatalogDescription] [xml] NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[ProductModelProductDescription](
    [ProductModelID] [int] NOT NULL,
    [ProductDescriptionID] [int] NOT NULL,
    [Culture] [nchar](6) NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[SalesOrderDetail](
    [SalesOrderID] [int] NOT NULL,
    [SalesOrderDetailID] [int] NOT NULL,
    [OrderQty] [smallint] NOT NULL,
    [ProductID] [int] NOT NULL,
    [UnitPrice] [money] NOT NULL,
    [UnitPriceDiscount] [money] NOT NULL,
    [LineTotal] [money] NOT NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

CREATE TABLE [Stg].[SalesOrderHeader](
    [SalesOrderID] [int] NOT NULL,
    [RevisionNumber] [tinyint] NOT NULL,
    [OrderDate] [datetime] NOT NULL,
    [DueDate] [datetime] NOT NULL,
    [ShipDate] [datetime] NULL,
    [Status] [tinyint] NOT NULL,
    [OnlineOrderFlag] [bit] NOT NULL,
    [SalesOrderNumber] [nvarchar](23) NOT NULL,
    [PurchaseOrderNumber] [nvarchar](23) NULL,
    [AccountNumber] [nvarchar](23) NULL,
    [CustomerID] [int] NOT NULL,
    [ShipToAddressID] [int] NULL,
    [BillToAddressID] [int] NULL,
    [ShipMethod] [nvarchar](50) NOT NULL,
    [CreditCardApprovalCode] [varchar](15) NULL,
    [SubTotal] [money] NOT NULL,
    [TaxAmt] [money] NOT NULL,
    [Freight] [money] NOT NULL,
    [TotalDue] [money] NOT NULL,
    [Comment] [nvarchar](max) NULL,
    [rowguid] [uniqueidentifier] NOT NULL,
    [ModifiedDate] [datetime] NOT NULL
) ON [PRIMARY]
GO

After setting up the database and creating the necessary tables in my sqldb-Target, I created an Azure Data Factory. I added the following 2 Linked Services to this, in which the LS_sqldb_target acts as our Cloud Data warehouse:

Linked Services

After creating the Linked Services, create a dataset for the sqldb-target, I called it DS_target_tables and added the following parameter within the Parameters tab:

This is then followed by adding the Stg schema in the Connection tab and the parameter via Dynamic content.

Now it is time to create a pipeline where all the magic is going to happen, I called it PL_copy for this example.

Open the General Tab on the left side and drag & drop the Lookup component onto the canvas, followed by doing the same by going to the Iterations Tab and drag & drop the ForEach component onto the canvas. Connect both by drawing the green line between the Lookup and the ForEach.

Open the Lookup component and select the source dataset. Continue by changing the Use query mode to Query mode and click on the Add Dynamic Content text.

Add the following code to the content box:

`SELECT TABLE_SCHEMA AS Table_Schema, TABLE_NAME AS Table_Name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'SalesLT' AND TABLE_TYPE = 'BASE TABLE'

With this we will be able to get all the tables from the SalesLT schema from our source database.

Now we need to be able to use this data about our tables to be able to process them and load them all onto our target database. To do this click on the ForEach component and go to the Settings tab. Next to Items, click on Add Dynamic Content text.

Select the output from the Lookup so it will be added to the content box and add .value behind it to get the actual values from the lookup component.

Click on the Activities Tab and on the pencil icon to add a new activity. A new canvas will be opened. Select the Copy Data from the Move & Transform category and drag & drop this onto the canvas.

Click on the Copy data component and go to the Source Tab. Continue by changing the Use query mode to Query mode and click on the Add Dynamic Content text.

Add the following code to the content box:

SELECT * FROM @{item().Table_Schema}.@{item().Table_Name}

Go to the Sink Tab and add the target database as dataset. You will see it requires a parameter, which was added earlier. Click on the Add Dynamic Content text again and add the following code to the content box:

@item().Table_Name

With this we have created a fully working pipeline! Don't forget to publish everything by clicking on the Publish button, which makes sure everything is being saved.

Click on the Debug button to test your pipeline, you will see it loop through all the tables and get all the data from it.

You could further improve this by changing the code into something more conditional based on the latest load time, changing it into getting only the new/latest rows from the source tables.

What's next?

Let's stay within the Azure Data Factory for a moment and look at how we can insert data directly into a SQL Stored Procedure, which could come in handy for validating the data or minor transformation if you do want to go for an ETL approach.