Using a Merge Task123

gravatar

Jon

An explanation and example of the Merge task.

published 01.01.13

last updated 04.27.13


Share

Tags

  • alerts
  • bids
  • BIML
  • google
  • merge
  • mist
  • Task

Introduction

So, you want to pull from a data source and store the information in a database. You already know how easy it is to pull data from a data source with Biml, so you go ahead and write the code for pulling that data. This is where you hit a snag. The data you are pulling might overlap with the data already in the database from the last pull. There are several ways you could do this, but I am going to cover using a Merge task. It is quite simple and easy to implement. After I cover the Merge task, I have included an example of how the Merge task could be useful.

Explanation

To help in the understanding of what a Merge task does, Paul Waters from Varigence has provided an explanation: “A Biml Merge task is used to declare a t-sql MERGE statement without needing to use t-sql. Therefore, when an SSIS package is created from Biml there will be a native SSIS Execute SQL Task containing a t-sql MERGE statement in the place where the Biml Merge Task was declared.” The Merge task does the hard part of writing the MERGE statement. All you have to do is supply some simple parameters in the Biml.

Task Overview

Here is the overview of how the Merge task works:

  1. Compare data from a temporary processing table with data in the destination table.
  2. Any data identified as needing to be merged in the comparison is merged into the destination table

How-To

Let’s start. The task requires two tables: 1) The processing table that temporarily holds the data to be merged into the final destination table. This table should be emptied (read truncated) after each Merge task. We will call this table the source table. 2) The final destination table. This table may already have data in it. Thus why we are merging into it. We will call this table the target table. Here is what the Biml of the Merge task looks like:

<Merge Name="Merge Alerts into dbo_GoogleAlerts" UnspecifiedColumnDefaultUsageType="CompareUpdateInsert" TargetConstraintName="DataWarehouse.dbo.GoogleAlerts.PK_GoogleAlerts">
<TableSource TableName="DataWarehouse.Process.GoogleAlerts" />
</Merge>

Parts

Let’s take a closer look at the Merge task parts.

Name: Obviously what you want the Merge to be called.

UnspecifiedColumnDefaultUsageType: For the usage here, this specifies how we want the Biml to compare the target with the source. This comparison is row by row.

The comparison will only compare columns between the two tables if they have the same names for columns. If for instance you have an identity column in the target table but not in the source table, the identity column will not be included in the comparison.

The Biml will ONLY compare the whole row IF the data in the constraint specified by the TargetConstraintName matches data in the source table. If the target constraint data for a row matches something from the source table, the Biml will compare the rest of the columns and do one of two things: 1) If all of the same named columns in a row from both tables contain the same data, nothing is done with the row from the source as it is already in the target. 2) If any of the same named columns in a row from both tables has a difference in data, the different data from the source table will be used to update the target data.

If the target constraint data for a row does not match data in a source row, the row from the source is inserted into the target.

All of the above is accomplished with simple Biml code.

TargetConstraintName: The name of the constraint to be used in the comparison of the two tables. The table on which the constraint is tells Biml what table to use as the target for the merge. Please note that Biml cannot grab constraint names from SQL Server. The constraint needs to be specified in the Biml. See the example below if you are confused.

TableSource TableName: The name of the source table that is being merged into the target.

Example

Here is an example of using the merge task to pull GoogleAlerts data into a database. Please note the ExecuteSQL task that is used to truncate the process table so it is empty next time the data needs to be loaded. Also, please note the constraint being used specifies a column (EntryId). It is this column in both tables that is used for the initial Merge comparison.

<Biml xmlns="http://schemas.varigence.com/biml.xsd">
    <Annotations>
        <Annotation>
            File: Google_Alerts.biml
            Reading an XML Google Alert feed
            DB: MS-SQL2012
            BIML: 1.6 VS2010 BIDS Helper
        </Annotation>
        <Annotation>
            Table Creation scripts:

            CREATE TABLE [dbo].[GoogleAlerts](
            [CrawlTimestamp] [nvarchar](128) NULL,
            [EntryId] [nvarchar](2048) NOT NULL,
            [Title] [nvarchar](2048) NULL,
            [Link] [nvarchar](2048) NULL,
            [Content] [nvarchar](2048) NULL,
            [AlertSearchTerm] [nvarchar](128) NULL,
            [AlertId] [bigint] IDENTITY(1,1) NOT NULL,
            CONSTRAINT [PK_GoogleAlerts] PRIMARY KEY CLUSTERED 
            (
            [EntryId] ASC
            )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
            ) ON [PRIMARY]

            CREATE TABLE [Process].[GoogleAlerts](
            [CrawlTimestamp] [nvarchar](128) NOT NULL,
            [EntryId] [nvarchar](2048) NOT NULL,
            [Title] [nvarchar](2048) NULL,
            [Link] [nvarchar](2048) NULL,
            [Content] [nvarchar](2048) NULL,
            [AlertSearchTerm] [nvarchar](128) NULL
            ) ON [PRIMARY]
        </Annotation>
    </Annotations>
    <Connections>
        <OleDbConnection
            Name="DW"
            ConnectionString="Data Source=localhost;Initial Catalog=Dashboard;Provider=SQLNCLI10.1;Integrated Security=SSPI;Connect Timeout=30;">
        </OleDbConnection>
    </Connections>
    <Databases>
        <Database Name="DataWarehouse" ConnectionName="DW" />
    </Databases>
    <Schemas>
        <Schema Name="Process" DatabaseName="DataWarehouse" />
        <Schema Name="dbo" DatabaseName="DataWarehouse" />
    </Schemas>
    <Tables>
        <Table Name="GoogleAlerts" SchemaName="DataWarehouse.dbo">
            <Columns>
                <Column Name="CrawlTimestamp" IsNullable="true" DataType="String" Length="128" />
                <Column Name="EntryId" DataType="String" Length="2048" IsNullable="false" />
                <Column Name="Title" DataType="String" Length="2048" IsNullable="true" />
                <Column Name="Link" DataType="String" Length="2048" IsNullable="true" />
                <Column Name="Content" DataType="String" Length="2048" IsNullable="true" />
                <Column Name="AlertSearchTerm" DataType="String" Length="128" IsNullable="true" />
                <Column Name="AlertId" IdentityIncrement="1" DataType="Int64" />
            </Columns>
            <Keys>
                <PrimaryKey Name="PK_GoogleAlerts">
                    <Columns>
                        <Column ColumnName="EntryId" />
                    </Columns>
                </PrimaryKey>
            </Keys>
        </Table>        
        <Table Name="GoogleAlerts" SchemaName="DataWarehouse.Process">
            <Columns>
                <Column Name="CrawlTimestamp" IsNullable="false" DataType="String" Length="128" />
                <Column Name="EntryId" DataType="String" Length="2048" IsNullable="false" />
                <Column Name="Title" DataType="String" Length="2048" IsNullable="true" />
                <Column Name="Link" DataType="String" Length="2048" IsNullable="true" />
                <Column Name="Content" DataType="String" Length="2048" IsNullable="true" />
                <Column Name="AlertSearchTerm" DataType="String" Length="128" IsNullable="true" />
            </Columns>
        </Table>
    </Tables>
    <ScriptProjects>
        <ScriptComponentProject Name="Google_Alerts">
            <AssemblyReferences>
            <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSPipelineWrap.dll" />
            <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSRuntimeWrap.dll" />
            <AssemblyReference AssemblyPath="Microsoft.SqlServer.PipelineHost.dll" />
            <AssemblyReference AssemblyPath="Microsoft.SqlServer.TxScript.dll" />
            <AssemblyReference AssemblyPath="System.dll" />
            <AssemblyReference AssemblyPath="System.Data.dll" />
            <AssemblyReference AssemblyPath="System.Windows.Forms.dll" />
            <AssemblyReference AssemblyPath="System.Xml.dll" />
        </AssemblyReferences>
        <OutputBuffers>
            <OutputBuffer Name="Alerts" IsSynchronous="false">
                <Columns>
                    <Column Name="CrawlTimestamp" DataType="String" Length="128" />
                    <Column Name="EntryId" DataType="String" Length="2048" />
                    <Column Name="Title" Length="2048" DataType="String" />
                    <Column Name="Link" DataType="String" Length="2048" />
                    <Column Name="Content" DataType="String" Length="2048" />
                    <Column Name="AlertSearchTerm" DataType="String" Length="128" />
                </Columns>
            </OutputBuffer>
        </OutputBuffers>
        <Files>
            <File Path="AssemblyInfo.cs">
                using System.Reflection;
                using System.Runtime.CompilerServices;
                [assembly: AssemblyTitle("SC_Google_Alerts")]
                [assembly: AssemblyDescription("")]
                [assembly: AssemblyConfiguration("")]
                [assembly: AssemblyCompany("")]
                [assembly: AssemblyProduct("SC_Google_Alerts")]
                [assembly: AssemblyCopyright("Copyright @ 2012")]
                [assembly: AssemblyTrademark("")]
                [assembly: AssemblyCulture("")]
                [assembly: AssemblyVersion("1.0.*")]
            </File>
            <File Path="main.cs">
                using System;
                using System.Data;
                using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
                using Microsoft.SqlServer.Dts.Runtime.Wrapper;
                using System.Xml;
                using System.Web;
                using System.Net;

                [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
                public class ScriptMain : UserComponent
                {
                    public override void CreateNewOutputRows()
                    {
                        System.Xml.XmlDocument xml_doc = new System.Xml.XmlDocument();
                        string AlertUri = "http://www.google.com/reader/public/atom/user%2F01091636185598549437%2Fstate%2Fcom.google%2Falerts%2F9073511343195502523";
                        xml_doc = GetResponse(AlertUri);

                        XmlNodeList child_nodes = xml_doc.GetElementsByTagName("entry");

                        foreach (XmlNode child in child_nodes)
                        {
                            AlertsBuffer.AddRow();
                            AlertsBuffer.CrawlTimestamp = child.Attributes["gr:crawl-timestamp-msec"].Value.ToString();
                            AlertsBuffer.EntryId = child["id"].InnerXml;
                            AlertsBuffer.Title = child["title"].InnerXml;
                            AlertsBuffer.Link = child["link"].Attributes["href"].Value.ToString();
                            AlertsBuffer.Content = child["content"].InnerXml;
                            AlertsBuffer.AlertSearchTerm = "ssis";
                        }
                    }

                    public XmlDocument GetResponse(string uri)
                    {
                        XmlDocument doc = new XmlDocument();

                        WebRequest myRequest = WebRequest.Create(new Uri(uri));
                        IWebProxy proxy = myRequest.Proxy;
                        if (proxy != null)
                        {
                            proxy.GetProxy(myRequest.RequestUri);
                        }

                        doc.Load(myRequest.GetResponse().GetResponseStream());

                        return doc;
                    }
                }
            </File>
        </Files>
        </ScriptComponentProject>
    </ScriptProjects>
    <Packages>
        <Package Name="Google_Alerts" ConstraintMode="Linear">
            <Tasks>
                <Dataflow Name="Load Data From Google Alerts">
                    <Transformations>
                        <ScriptComponentSource Name="Google_Alerts">
                            <ScriptComponentProjectReference ScriptComponentProjectName="Google_Alerts" />
                        </ScriptComponentSource>
                        <OleDbDestination Name="Put data into Process_GoogleAlerts" ConnectionName="DW">
                            <TableOutput TableName="DataWarehouse.Process.GoogleAlerts" />
                        </OleDbDestination>
                    </Transformations>
                </Dataflow>
                <Merge Name="Merge Alerts into dbo_GoogleAlerts" UnspecifiedColumnDefaultUsageType="CompareUpdateInsert" TargetConstraintName="DataWarehouse.dbo.GoogleAlerts.PK_GoogleAlerts">
                    <TableSource TableName="DataWarehouse.Process.GoogleAlerts" />
                </Merge>
                <ExecuteSQL Name="Truncate the process table" ConnectionName="DW">
                    <DirectInput>TRUNCATE TABLE Process.GoogleAlerts</DirectInput>
                </ExecuteSQL>
            </Tasks>
        </Package>
    </Packages>
</Biml>
You are not authorized to comment. A verification email has been sent to your email address. Please verify your account.

Comments

gravatar

Brian2

7:59pm 03.31.16

I like the concept but I want to use three part table name MERGE and not two part name with different schemas. Is there an easy way to do this, I have to move several hundred tables and need a simple way to do it programmatically.