Introduction
I was meeting with Julie Koesmarno earlier this week and she showed me a "dynamic" SQL script that read the source and converts columns to defaults. Since then I've presented about five accelerator sessions in Australia and was asked similar questions by the attendees. This is a good introduction to the ImportDB functionality and hopefully it is helpful as a reference.
Environment
Simple file to setup you environment. If you are using Mist just add it and set the Mist Action to live.
<Biml xmlns="http://schemas.varigence.com/biml.xsd"> <Connections> <Connection Name="Source" ConnectionString="Provider=SQLNCLI11;Server=localhost;Initial Catalog=AdventureWorks2012;Integrated Security=SSPI;" /> <Connection Name="Target" ConnectionString="Provider=SQLNCLI11;Server=localhost;Initial Catalog=AdventureWorks2012;Integrated Security=SSPI;" /> </Connections> <Databases> <Database Name="AdventureWorks2012" ConnectionName="Target" /> </Databases> <Schemas> <Schema Name="stg" DatabaseName="AdventureWorks2012" /> </Schemas> </Biml>
Import Tables
The following script will import the database schema into Mist or memory for Bidshelper and add a couple of Annotation Tags. If you look at the TransformColumn function you will see that we set a Tag called AltSelect based on the relevant ISNULL convertion required.
<#string packageSubPath = "SQL_AW_EX"; string srcSchema = "dbo"; string dstSchema = "stg"; string dstDb = "AdventureWorks2012";#> <Biml xmlns="http://schemas.varigence.com/biml.xsd"> <Tables> <!--Import Table Metadata--> <# var importResult = ((AstOleDbConnectionNode)RootNode.Connections["Source"]).ImportDB("","",ImportOptions.ExcludeForeignKey|ImportOptions.ExcludeColumnDefault|ImportOptions.ExcludeViews);#> <!--Limited import to specific schemas --> <#foreach (var tableNode in importResult.TableNodes.Where(i=> i.Name != "AWBuildVersion").OrderBy(item => item.Name)){#> <Table Name="<#=tableNode.Name#>" SchemaName="<#=dstDb#>.<#=dstSchema#>" PackageSubpath="<#=packageSubPath#>" > <Annotations> <Annotation AnnotationType="Documentation" Tag="Source">Julie_AW</Annotation> <Annotation AnnotationType="Tag" Tag="EtlPatternStg"><#=GetEtlPatternStg(tableNode)#></Annotation> <Annotation AnnotationType="Tag" Tag="SourceType">Tables</Annotation> <Annotation AnnotationType="Tag" Tag="SourceSchema"><#=tableNode.SchemaName#></Annotation> <Annotation AnnotationType="Tag" Tag="SourceTable"><#=tableNode.Name#></Annotation> <Annotation AnnotationType="Tag" Tag="HasSourcePK"><#if (tableNode.PreferredKey != null){ #>true<#}else{ #>false<#} #></Annotation> <Annotation AnnotationType="Tag" Tag="HasIdentity"><#if (tableNode.HasIdentity == true){ #>true<#}else{ #>false<#} #></Annotation> <Annotation AnnotationType="Tag" Tag="PreferredKey"><#if (tableNode.PreferredKey != null){ #>true<#}else{ #>false<#} #></Annotation> </Annotations> <Columns> <#foreach (var columnNode in tableNode.Columns.Where(c=> c.DataType.ToString() != "Binary")){var tColumn = TransformColumn(tableNode, columnNode, true);#> <#=tColumn.GetBiml()#> <#}#> <Column Name="RowLoadDate" DataType="DateTime" IsNullable="false" /> <Column Name="RowLoadKey" IsNullable="false" /> </Columns> <Keys> <#=tableNode.Keys.GetBiml()#> </Keys> <#* <Indexes> <#=tableNode.Indexes.GetBiml()#> </Indexes> *#> </Table> <#}#> </Tables> </Biml> <#+ // Transform AstTableColumnBaseNode based on specific source and destination requirments. private AstTableColumnBaseNode TransformColumn(AstTableNode table, AstTableColumnBaseNode column, bool RemoveComputed) { var dColumn = column.Duplicate() as AstTableColumnBaseNode; dColumn.EmitAllXml(); ////Remove Computed attribute and save it in am annotation. if(dColumn.Computed != null) { dColumn.AddAnnotation(AnnotationType.Tag,column.Computed.ToString(),"SourceComputed"); dColumn.Computed = null; } ////Set ScdType for Table Keys. if(table.HasIdentity == false && column.IsUsedInPrimaryKey) { dColumn.AddAnnotation(AnnotationType.Documentation,table.HasIdentity == true?"true":"false" ,"SourceTableHasIdentityColumn"); dColumn.ScdType = ScdType.Key; } else if(dColumn.IsIdentityColumn) { dColumn.ScdType = ScdType.Key; dColumn.IdentityIncrement = 0; ////Remove IdentityIncrement dColumn.AddAnnotation(AnnotationType.Documentation,"true","SourceIsIdentityColumn"); } var dataType = dColumn.DataType; switch (dataType) { case DbType.Byte: case DbType.Single: dColumn.AddAnnotation(AnnotationType.Tag,dColumn.DataType.ToString() + "(" + dColumn.Precision.ToString() + ", " + dColumn.Scale.ToString() + ")","SourceDataType"); dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], 0) AS [" + dColumn.Name + "]","AltSelect"); break; case DbType.Int16: case DbType.Int32: case DbType.Int64: dColumn.AddAnnotation(AnnotationType.Tag,dColumn.DataType.ToString() + "(" + dColumn.Precision.ToString() + ", " + dColumn.Scale.ToString() + ")","SourceDataType"); dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], -1) AS [" + dColumn.Name + "]","AltSelect"); break; case DbType.Decimal: case DbType.Double: case DbType.VarNumeric: case DbType.Currency: break; case DbType.String: case DbType.StringFixedLength: case DbType.AnsiString: case DbType.AnsiStringFixedLength: if (dColumn.Length < 7) { dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], 'UNK') AS [" + dColumn.Name + "]","AltSelect"); } else { dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], 'Unknown') AS [" + dColumn.Name + "]","AltSelect"); } dColumn.AddAnnotation(AnnotationType.Tag,dColumn.DataType.ToString() + "(" + dColumn.Precision.ToString() + ", " + dColumn.Scale.ToString() + ")","SourceDataType"); break; case DbType.Date: dColumn.AddAnnotation(AnnotationType.Tag,dColumn.DataType.ToString(),"SourceDataType"); dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], '1900-01-01') AS [" + dColumn.Name + "]","AltSelect"); break; case DbType.DateTime: case DbType.DateTime2: case DbType.DateTimeOffset: dColumn.AddAnnotation(AnnotationType.Tag,dColumn.DataType.ToString(),"SourceDataType"); dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], '1900-01-01 00:00:00') AS [" + dColumn.Name + "]","AltSelect"); break; case DbType.Guid: dColumn.AddAnnotation(AnnotationType.Tag,dColumn.DataType.ToString(),"SourceDataType"); dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], '00000000-0000-0000-0000-000000000000') AS [" + dColumn.Name + "]","AltSelect"); break; case DbType.Xml: dColumn.ScdType = ScdType.Historical; dColumn.DataType = DbType.String; dColumn.Length = -1; dColumn.AddAnnotation(AnnotationType.CodeComment,"The XML data type cannot be compared or sorted, except when using the IS NULL operator.","Note1"); dColumn.CustomType = null; break; case DbType.Object: var customType = dColumn.CustomType; switch (customType){ case "hierarchyid": dColumn.DataType = DbType.Binary; dColumn.Length = 892; dColumn.AddAnnotation(AnnotationType.Tag,dColumn.CustomType.ToString(),"SourceCustomType"); dColumn.CustomType = null; break; case "text": ////Convert depricated columns to nvarchar(max) case "ntext": dColumn.DataType = DbType.String; dColumn.Length = -1; dColumn.AddAnnotation(AnnotationType.Tag,dColumn.CustomType.ToString(),"SourceCustomType"); dColumn.AddAnnotation(AnnotationType.Tag,"ISNULL([" + dColumn.Name + "], 'Unknown') AS [" + dColumn.Name + "]","AltSelect"); dColumn.CustomType = null; break; case "timestamp": break; break; } break; default: break; } // switch return dColumn; } #> <#+ //// Determine Subject based on table name. private string GetEtlPatternStg(AstTableNode tableNode) { var etlPatternStg = ""; switch (tableNode.Name) { case "DatabaseLog": case "ErrorLog": etlPatternStg = "Ignore"; break; default: etlPatternStg = tableNode.PreferredKey != null?"ChangeTracking1":"EndOfDay"; break; } return etlPatternStg; } #> <#@ template language="C#" tier="1"#> <#@ import namespace="System.Data" #> <#@ import namespace="Varigence.Hadron.CoreLowerer.TSqlEmitter" #>
Extract Tables
Again a fairly basic BimlScript to extract source to target. At the top of the script is where we use the Annotation Tags that was set in the previous script. The rest is fairly straight forward and self explanatory. Have fun with Biml.
<#@ template language="C#" tier="3" #> <Biml xmlns="http://schemas.varigence.com/biml.xsd"> <Packages> <# foreach(var tableNode in RootNode.Tables.Where(item => item.GetTag("EtlPatternStg") != "Ignore")) { var expressionSql = ""; var sourceQuery = ""; bool isFirstExp = true; foreach (var column in tableNode.Columns.Where(item => item.Annotations["ToBeProcessed"] == null || item.Annotations["ToBeProcessed"].Text != "false")) { if (column.Name != "RowLoadDate" && column.Name != "RowLoadKey"){ if (isFirstExp) { isFirstExp = false; } else { sourceQuery += ", "; } if (column.Annotations["AltSelect"] != null) { sourceQuery += column.Annotations["AltSelect"].Text; } else { sourceQuery += "[" + column.Name + @"]"; } } } sourceQuery = "SELECT " + sourceQuery + @" FROM [" + tableNode.GetTag("SourceSchema") + "].[" + tableNode.GetTag("SourceTable") +"]"; expressionSql = sourceQuery + "\""; #> <Package Name="Load <#=tableNode.SchemaName#>_<#=tableNode.SsisSafeName#>" ConstraintMode="Linear" PackageSubpath="WP_ODS_NONBET_<#=tableNode.GetTag("EtlPatternStg")#>" ProtectionLevel="EncryptSensitiveWithUserKey"> <Connections> <Connection ConnectionName="Source" /> <Connection ConnectionName="Target" /> </Connections> <Tasks> <ExecuteSQL Name="SQL - Initialise <#=tableNode.SsisSafeScopedName#>" ForcedExecutionValueDataType="Empty" ConnectionName="Target"> <DirectInput>TRUNCATE TABLE <#=tableNode.ScopedName#></DirectInput> </ExecuteSQL> <Dataflow Name="Load Table <#=tableNode.SsisSafeScopedName#>"> <Transformations> <OleDbSource Name="OLE_SRC <#=tableNode.GetTag("SourceSchema")#>_<#=tableNode.SsisSafeName#>" ConnectionName="Source" ValidateExternalMetadata="false"> <Annotations> <Annotation AnnotationType="Tag" Tag="SourceObjectName">[<#=tableNode.SchemaName#>].[<#=tableNode.Name#>]</Annotation> <Annotation AnnotationType="Tag" Tag="SourceSafeObjectName"><#=tableNode.SchemaName#>_<#=tableNode.SsisSafeName#></Annotation> </Annotations> <DirectInput><#=sourceQuery#></DirectInput> </OleDbSource> <DerivedColumns Name="DC - Add Audit Columns"> <Columns> <Column Name="RowLoadKey" DataType="Int64" ReplaceExisting="false">@[System::ServerExecutionID]</Column> <Column Name="RowLoadDate" DataType="DateTime" ReplaceExisting="false">(DT_DBTIMESTAMP)GETDATE()</Column> </Columns> </DerivedColumns> <OleDbDestination Name="OLE_DST <#=tableNode.SsisSafeScopedName#>" ConnectionName="Target"> <TableOutput TableName="<#=tableNode.ScopedName#>" /> </OleDbDestination> </Transformations> </Dataflow> </Tasks> </Package> <# } #> </Packages> </Biml>
Comments
There are no comments yet.