In this post, we’ll see how we can use Service Bus triggered Azure Durable Function with D365 CE to perform CRUD operation.

Requirement: When Rate Amount(custom field) field changes in Rate(custom entity) record, update Rate(custom field) field on related(Revenue Schedule Line(custom entity)) records of the corresponding Rate record.

Of course, there are multiple ways to acheive this, however, we’ll achieve this using the below mentioned approach:

After we see the custom message posted in Queue, we’ll proceed with creating ServiceBus triggered Azure Durable Function.
Please see below links for more information on Azure Durable Function and its application patterns.
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp
https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp#fan-in-out
We’ll use fan-out/fan-in application pattern in this example, in which we can execute multiple Azure functions in parallel and wait for them to complete.

Below is the code for ServiceBus triggered function which will trigger the orchestration function.

static CRMWebAPI crmApi = null;
static MessageReceiver receiver = null;
static string token = string.Empty;
[FunctionName("UpdateGlobalRatesOnRSLs")]
public static async Task Run([ServiceBusTrigger("%QueueName%", Connection = "AzureWebJobsServiceBus")] Message message, MessageReceiver messageReceiver, string lockToken, [OrchestrationClient]DurableOrchestrationClient starter, ILogger log)//QueueName and AzureWebJobsServiceBus are key value pairs on localsettings.json 
{
	string inputMessage = Encoding.UTF8.GetString(message.Body);
	log.LogInformation($"message - " + inputMessage);
	receiver = messageReceiver;
	token = lockToken;
	if (string.IsNullOrWhiteSpace(inputMessage)) await messageReceiver.DeadLetterAsync(lockToken, "Message content is empty.", "Message content is empty.");

	RateObjectConverted rateObjectConverted = DeserializeMessage(inputMessage);

	if (rateObjectConverted.RateID == null)
	{
		log.LogInformation($"Dead Lettering Message.");
		await messageReceiver.DeadLetterAsync(lockToken, "Missing Required Fields.", "RateID is missing.");//Manually dead letter the message instead of throwing exception
		log.LogInformation($"Message Dead Lettered: RateID is missing.");
	}
	
	string instanceID = await starter.StartNewAsync("UpdateGlobalRatesOnRSLs_OrchestrationFunction", rateObjectConverted);
    log.LogInformation($"Orchestration Started with ID: {instanceID}");
}

The function dead letters the message if any required input is missing in the message and starts the orchestration function.

Below is the code for orchestration function which passes “rateObjectConverted” as an input to the activity function.

[FunctionName("UpdateGlobalRatesOnRSLs_OrchestrationFunction")]
public static async Task<string> RunOrchestrator([OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
	RateObjectConverted rateObjectConverted = context.GetInput<RateObjectConverted>();
	bool allProcessed = true;

	if (rateObjectConverted != null)
	{
		if (crmApi == null) crmApi = await GetCRMWebAPI();

		log.LogInformation($"Got CRM Api");
		var results = GetRSLs(crmApi, rateObjectConverted, log).ToList();
		log.LogInformation($"Got Revenue Schedule Lines");
		var parallelTasks = new List<Task<bool>>();
		if (results != null && results.Count > 0)
		{
			log.LogInformation($"Revenue Schedule Lines Count: {results.Count}");
			for (int i = 0; i < results.Count; i++)
			{
				var task = context.CallActivityAsync<bool>("UpdateGlobalRatesOnRSLs_ActivityFunction", results[i]);
				parallelTasks.Add(task);
			}
		}

		await Task.WhenAll(parallelTasks);
		log.LogInformation($"Tasks Completed.");
		foreach (var individualTask in parallelTasks)
		{
			if (!individualTask.Result)
			{
				allProcessed = false;
				break;
			}
		}
	}
	log.LogInformation($"All Processed: {allProcessed.ToString()}");

	if (!allProcessed)
	{
		log.LogInformation($"Message Abandon Started.");
		await receiver.AbandonAsync(token);//Manually abandon the message so that it'll be processed again(based on the number of retry attempts set on the queue)
		log.LogInformation($"Message Abandoned.");
	}
	else
	{
		log.LogInformation($"Completing the message.");
		await receiver.CompleteAsync(token);//Manually complete the message.
		log.LogInformation($"Message Completed.");
	}
	return $"Message Processed Successfully.";
}

The orchestration function basically gets the related records of Rate record and calls the activity function for each related record found. The function also abandons/completes the message based on the result from the activity function.
The highlighted lines above show the usage of fan-out/fan-in application pattern.

Here’s the code for activity function which updates the each record passed from orchestration function.

[FunctionName("UpdateGlobalRatesOnRSLs_ActivityFunction")]
public static async Task<bool> DoAction([ActivityTrigger] RSLObject entity, ILogger log)
{
	if (crmApi == null) crmApi = await GetCRMWebAPI();

	var RSLDictionary = new Dictionary<string, object>();
	RSLDictionary.Add("dxc_rate", 500);//used static value for simplicity.
	try
	{
		var response = crmApi.Update("dxc_revenueschedulelines", entity.RSLID, RSLDictionary).Result;
		log.LogInformation($"Revenue Schedule Line ID:{entity.RSLID.ToString()} Updated");
		var associated = await crmApi.Associate("dxc_revenueschedulelines", entity.RSLID, "dxc_revenuescheduleline_dxc_rate", "dxc_rates", entity.rateObjectConverted.RateID.Value);//associates the records
		log.LogInformation($"RSL and Rate associated.");
		return true;
	}
	catch (Exception ex)
	{
		log.LogInformation($"RSL {entity.RSLID.ToString()} coculd not be updated or associated with rate.");
		log.LogInformation($"Error: {ex.Message}");
		return false;
	}
}

Below are some helper methods used in the above functions:

public static RateObjectConverted DeserializeMessage(string message)
{
	RateObjectConverted rateObjectConverted = new RateObjectConverted();

	var rateObject = JsonConvert.DeserializeObject<RateObject>(message);

	if (!string.IsNullOrWhiteSpace(rateObject.EndDate)) rateObjectConverted.EndDate = Convert.ToDateTime(rateObject.EndDate);

	if (!string.IsNullOrWhiteSpace(rateObject.StartDate)) rateObjectConverted.StartDate = Convert.ToDateTime(rateObject.StartDate);

	if (!string.IsNullOrWhiteSpace(rateObject.RateID)) rateObjectConverted.RateID = new Guid(rateObject.RateID);

	if (!string.IsNullOrWhiteSpace(rateObject.RevenueCodeID)) rateObjectConverted.RevenueCodeID = new Guid(rateObject.RevenueCodeID);

	if (!string.IsNullOrWhiteSpace(rateObject.RateAmount)) rateObjectConverted.RateAmount = Convert.ToDecimal(rateObject.RateAmount);

	if (!string.IsNullOrWhiteSpace(rateObject.RatePercentage)) rateObjectConverted.RatePercentage = Convert.ToDecimal(rateObject.RatePercentage);

	if (!string.IsNullOrWhiteSpace(rateObject.RateModel)) rateObjectConverted.RateModel = Convert.ToInt32(rateObject.RateModel);

	return rateObjectConverted;
}

private static IEnumerable<RSLObject> GetRSLs(CRMWebAPI api, RateObjectConverted rateObjectConverted, ILogger log)
{
	var xml = @"<fetch version='1.0' output-format='xml-platform' mapping='logical' distinct='true'>
				<entity name='dxc_revenuescheduleline'>
				<attribute name='dxc_revenueschedulelineid' />
				<attribute name='dxc_name' />    
				<attribute name='dxc_startdate' />
					<filter type='and'>
						<condition attribute='dxc_revenuecodeid' operator='eq' uitype='dxc_revenuecode' value='" + rateObjectConverted.RevenueCodeID.ToString() + @"' />
					</filter>
				</entity>
			</fetch>";

	log.LogInformation($"Revenue Schedule Line FetchXML: {xml}");

	var options = new CRMGetListOptions()
	{
		FormattedValues = true,
		FetchXml = xml
	};
	var result = api.GetList("dxc_revenueschedulelines", options);

	if (result == null || result.Result == null || result.Result.List.Count == 0)
		return new List<RSLObject>();

	return (from r in result.Result.List.Cast<IDictionary<string, object>>()
			select new RSLObject
			{
				RSLID = Guid.Parse(r["dxc_revenueschedulelineid"].ToString()),
				StartDate = r.ContainsKey("dxc_startdate") ? Convert.ToDateTime(r["dxc_startdate"]) : DateTime.UtcNow.Date,
				rateObjectConverted = rateObjectConverted
			});
}

private static async Task<CRMWebAPI> GetCRMWebAPI()
{
	var crmOrganizationUrl = Environment.GetEnvironmentVariable("CRMOrganization");
	var crmOrganizationVersionUrl = Environment.GetEnvironmentVariable("CRMOrganizationVersionUrl");
	var crmUrl = $"{crmOrganizationUrl}{crmOrganizationVersionUrl}";
	var resultAccessToken = await CRMWebApiHelper.GetAccessToken();
	var crmApi = new CRMWebAPI(crmUrl, resultAccessToken);
	return crmApi;
}

We are getting Environment variables used above from local.settings.json file in the project as shown below:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "CRMOrganization": "https://<crmOrgName>.crm.dynamics.com",
    "ClientSecret": "<clientSecret>",
    "ClientId": "<clientID>",
    "TenantId": "<tenantID>",
    "AuthorityUrl": "https://login.microsoftonline.com/",
    "CRMOrganizationVersionUrl": "/api/data/v9.1/",
	"QueueName": "<QueueName>",
    "AzureWebJobsServiceBus": "<PrimaryConnectionStringOfSharedAccessPolicyOfServiceBusNamespace>"
  }
}

Use the below namespaces for the code written above:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Xrm.Tools.WebAPI;
using Xrm.Tools.WebAPI.Requests;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus;
using System.Text;

Use the nuget packages shown below:

Now that we have written the code, we can follow the steps discussed here to deploy the functions to Azure: https://ajitpatra.com/2019/10/30/d365-azure-durable-function-with-d365-ce-part-3/.

After deployment, let’s change Rate field on Rate record to “6000” as shown:

The change in Rate field will trigger the post-update plugin that we have written above to post custom message to Azure Service Bus Queue.

When the message arrives in queue, Azure function that we created will be triggered and we can check the logs of Azure function to verify that it has got all the information as input that we had passed to the message as shown:

Hope it helps !!