Writing an Azure Function to programmatically update Azure Data Factory

Let’s imagine that we create an Azure Data Factory (ADF) with a pipeline containing a Copy Activity that populates SQL Azure with data from an on-premise SQL Server database. If we set the schedule with a short interval, say to run every 15 minutes over a 3 month period, then we will discover that it will generate a large number of executions or slices (around 9000). So many slices can be difficult to navigate and manage through the Azure Portal. Currently the ADF portal has a very basic scheduler and  it is not possible to set a schedule executing regularly but only running say on a Monday and Friday only, 7am to 7pm.

Thankfully, ADF along with many other Azure resources can be updated programmatically. An Azure Function (AF) can be used to dynamically update ADF properties, including the pipeline/activity schedule.  AF itself can be triggered from its own scheduler using the much more powerful and very flexible CRON syntax.

Hence in the above example, the ADF pipeline can instead be initially configured in the JSON template to repeat every 15 minutes but with a placeholder, non slice generating, start and end  date.  The AF can then be scheduled to run at midnight every Monday and Friday that updates ADF start as the current day: 7am and end at the current day: 7pm. The advantage of this is that it only generates new slices for each day as AF executes. Also, because you are not scheduling slices when data sources or sinks could be down for maintenance you don’t flood your mailbox with failure alerts.

To do this you first have to create an AF in the Azure portal and set the trigger with a CRON schedule. You can then enter the code to perform this action by pasting into the portal. What’s the catch you say? Well there is a catch, you will need to register the AF as an app in Azure AD so that it can authenticate itself before updating the ADF. An excellent article to do this can be found here.

The code below is sufficient to update the ADF start and end dates.
Function.json contains the JSON for the AF trigger. In this case we are using a timerTrigger, with the schedule using CRON syntax. Project.json provides references to NuGet packages that will be downloaded on demand. Run.csx is the actual C# code that is compiled on the fly using Roslyn and executed. Note the slightly different syntax for the using statements.

//function.json
{
  "bindings": [
     {
       "name":"myTimer",
       "type":"timerTrigger",
       "direction":"in",
       "schedule":"0 0 5 * * 1-5"
     }
  ]
  "disabled": false
}

//project.json
{
  "frameworks": {
    "net46":{
      "dependencies": {
			"Hyak.Common" : "1.1.0",
			"Microsoft.Azure.Common":"2.1.0",
			"Microsoft.Azure.Common.Dependencies":"1.0.0",
			"Microsoft.Azure.Management.DataFactories":"4.9.0",
			"Microsoft.Bcl":"1.1.10",
			"Microsoft.Bcl.Async":"1.0.168",
			"Microsoft.Bcl.Build":"1.0.21",
			"Microsoft.IdentityModel.Clients.ActiveDirectory":"3.10.305231913",
			"Microsoft.Net.Http":"2.2.29"
      }
    }
   }
}

//run.csx
#r "System.Runtime"
#r "System.Threading.Tasks"

using System;
using System.Net;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Microsoft.Azure;
using Microsoft.Azure.Management.DataFactories;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;

public static void Run(TimerInfo myTimer, TraceWriter log)
{  
    var activeDirectoryEndpoint = "https://login.windows.net/";
    var resourceManagerEndpoint = "https://management.azure.com/";
    var windowsManagementUri = "https://management.core.windows.net/";
    var subscriptionId = "aaaaaaa-6b74-4d0d-be61-64f35116b69a";
    var activeDirectoryTenantId = "bbbbbbbb-0c93-41fa-a79a-d80f5a1ee64d";
    var clientSecret = "cccccccccQ3NcJOAsLaViXpQ1+X2aVtjbeb4Q=";
    var resourceGroupName = "DfDevRg";
    var dataFactoryName = "DfDev";
    var clientId = "ddddddd-6f17-4fec-aeb3-ba0c907bb81b";

    string[] pipelines = {"Pipeline1","Pipeline2"};
    
    var authenticationContext = new AuthenticationContext(activeDirectoryEndpoint + activeDirectoryTenantId);
    var credential = new ClientCredential(clientId: clientId, clientSecret: clientSecret);
    var result = authenticationContext.AcquireTokenAsync(resource: windowsManagementUri, clientCredential: credential).Result;
     
    if (result == null) throw new InvalidOperationException("Failed to obtain the JWT token");

    var token = result.AccessToken;
    var aadTokenCredentials = new TokenCloudCredentials(subscriptionId, token);
    var resourceManagerUri = new Uri(resourceManagerEndpoint);
    var client = new DataFactoryManagementClient(aadTokenCredentials, resourceManagerUri);
    
    var slice = DateTime.Now;
    var start = DateTime.Parse($"{slice.Date:yyyy-MM-dd}T06:00:00Z");
    var end = DateTime.Parse($"{slice.Date:yyyy-MM-dd}T18:01:00Z");

    foreach(var pipelineName in pipelines)
    {
        try
        {
            log.Info("Starting updating " + pipelineName);
            var pl = client.Pipelines.Get(resourceGroupName, dataFactoryName, pipelineName);

            pl.Pipeline.Properties.Start = start;
            pl.Pipeline.Properties.End = end;
            pl.Pipeline.Properties.IsPaused = false;

            client.Pipelines.CreateOrUpdate(resourceGroupName, dataFactoryName, new PipelineCreateOrUpdateParameters()
            {
                Pipeline = pl.Pipeline
            });

            log.Info("Updated " + pipelineName + " start:" + start + " end:" + end + " pause:false");
        }
        catch (Exception e)
        {
            log.Info(e.Message);
        }
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *