Skip to main content

Notifications

Dynamics 365 Community / Blogs / Learn with Subs / Large data integration with...

Large data integration with D365F&O leveraging Azure CosmosDB and Az Functions

Subhad365 Profile Picture Subhad365 7 User Group Leader
 


Hi friends, we would be discussing today about a very powerful integration strategy: integrating Cosmos Db to D365F&O using Azure Functions.

Scenario: 

If you have large datasets coming to D365F&O system, and you want the data to be processed with a high performance/execution speed, then plugging CosmosDB to D365F&O with Azure functions could be a very powerful option.  


When should you use Cosmos Db:

Azure Cosmos DB natively partitions your data for high availability and scalability. Azure Cosmos DB offers 99.99% guarantees for availability, throughput, low latency, and consistency on all single-region accounts and all multi-region accounts with relaxed consistency, and 99.999% read availability on all multi-region database accounts.

  • Azure Cosmos DB has SSD backed storage with low-latency order-of-millisecond response times.
  • Azure Cosmos DB's support for consistency levels like eventual, consistent prefix, session, and bounded-staleness allows for full flexibility and low cost-to-performance ratio. No database service offers as much flexibility as Azure Cosmos DB in levels consistency.
  • Azure Cosmos DB has a flexible data-friendly pricing model that meters storage and throughput independently.
  • Azure Cosmos DB's reserved throughput model allows you to think in terms of number of reads/writes instead of CPU/memory/IOPs of the underlying hardware.
  • Azure Cosmos DB's design lets you scale to massive request volumes in the order of trillions of requests per day.

Summing up we can use Cosmos DB when we have a system that needs to be 

  • Low Response time
  • Needs to handle massive data throughputs

Creating a CosmosDB:


Creating a Cosmos DB is very straight forward:
Reach out to https://portal.azure.com and search for Cosmos DB:

In the screen that opens, choose Azure NOSQL >> fill out the form as follows:
Select Subscription, Resource group, Location. Optionally you could also choose to apply Free Tier Account.
Click Review + Create to create/deploy a Cosmos DB account.  
It will take a while to create and make your DB ready. As stated above here are the Todolist and Item, which got created:
You can create records, or upload a bulk file into the CosmosDB, in just no time:

I have prepared a Json file beforehand to feed it into the Cosmos DB:
Remember: there should be an id column, which should be unique, rest whatever fields you keep in your JSON file, it will accept that. After Upload, I am querying the CosmosDb:

It's showing the records in JSON format. Alternately you can directly see all the records, by clicking on items themselves:

Querying a CosmosDb

And heck yes, you can a query CosmosDB, even write a StoredProc too. The below screenshot shows a query execution statement, where we can filter Processed =  False:
You can create StoredProcedure or UDFs also, if you want to make things more easy or reusable. 
 

Solution Design



Imagine we have a record structure that fills up a table: SalesPool, which is having the following fields:
Pool Id, Pool name/description, Data Area Id 
For this we are maintaining a payload like this:

[
    {
        "dataAreaId": "usmf",
        "PoolId": "011",
        "PoolName": "Wholesale order pool11",
"Processed": "No"  
    },
    {
        "dataAreaId": "usmf",
        "PoolId": "012",
        "PoolName": "Wholesale order pool12",
"Processed": "No"
    },
    {
        "dataAreaId": "usmf",
        "PoolId": "013",
        "PoolName": "Wholesale order pool13",
"Processed": "No"
    }
]

Where we have a new column called: Processed. This will indicate if the record is processed or not.
a. We will be uploading the JSON file like above to be uploaded to Cosmos DB.
b. We will create a Azure function which will be triggered regularly over a certain frequency
c. This Azure function will keep querying the Azure Cosmos DB with all Unprocessed records. And the result will be split into a Fan-in/Fan-out Azure function that will work as a parallel thread and keep on processing on each of these records and will insert/update in D365F&O.
d. Each of these Children Azure Functions will be updating the Azure Comsos DB with Processed = Yes, so that next time it won't be considered.
e. The whole of the execution is happening outside the conventional box of DMF, enforcing more and more speed, integrity and custom error handling capabilities.

Fan-in/Fan-out functions:

Fan-out/fan-in refers to the pattern of executing multiple functions concurrently and then performing some aggregation on the results. This article explains a sample that uses Durable Functions to implement a fan-in/fan-out scenario. The sample is a durable function that backs up all or some of an app's site content into Azure Storage.
In layman's words: Azure functions are stateless executions, they would not able to retain anything between different calls. Durable functions allow them to retain things betweeen the calls.
This article is about leveraging Durable functions to perform CRUD operations to D365F&O, that too in a real time/near real time condition. 

Start with Visual studio and create a Durable Function Orchestrator like this:



Step 1:
Create a class to generate token like this:

using Azure.Identity;
using Azure.Security.KeyVault.Secrets;
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Azure.Security.KeyVault.Secrets;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace DF_SalesPool_MultiThreadedDemo
{
    internal class GenerateToken
    {
        public string GetSecretValue(string secretName)

        {
            string kvUri = Environment.GetEnvironmentVariable("VAULTURI");

            var client = new SecretClient(new Uri(kvUri), new DefaultAzureCredential());

            var secret = client.GetSecretAsync(secretName).GetAwaiter().GetResult().Value;

            return secret.Value;

        }

        public string GetToken(ILogger log)
        {
            string responseInString = String.Empty;

            using (var wb = new WebClient())
            {
                GenerateToken kvManager = new GenerateToken();

                string tokenUrl = kvManager.GetSecretValue("tokenurluat");



                var data = new NameValueCollection();

                data["client_id"] = kvManager.GetSecretValue("clientiduat");

                data["client_secret"] = kvManager.GetSecretValue("secretvaluat");



                data["grant_type"] = "client_credentials";

                data["Scope"] = kvManager.GetSecretValue("scopeurl");



                var response = wb.UploadValues(tokenUrl, "POST", data);

                responseInString = this.getTokenValue(Encoding.UTF8.GetString(response));

            }

            return responseInString;

        }

        public string getTokenValue(string responsePayload)
        {
            dynamic json = JObject.Parse(responsePayload);

            string accessToken = json.access_token;

            return accessToken;

        }
    }
}


 
Here we are creating the logic to generate the token and which will be passed on to our subsequent D365F&O calls.

Step 2:
Come back to you Azure function class, and create another class, that will accepot the JSON input string and split it as lists in as deseralized objects in the class:

using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace DF_SalesPool_MultiThreadedDemo
{
    public class PayloadElements
    {
        public string PoolRecords { get; set; }
        public List<PoolRecords> PoolList { get; set; }

    }

    public class PoolRecords
    {
        public string DataAreaId { get; set; }
        public string PoolId { get; set; }
        public string PoolName { get; set; }

        public PayloadElements SplitAndStore(string ProcessedStatus)
        {
            var parameterizedQuery = new QueryDefinition(
query: "SELECT * FROM products p WHERE p.Processed > @ProcessedStatus" ) .WithParameter("@ProcessedStatus", 'No'); // Query multiple items from container     using     FeedIterator<Product> filteredFeed = container.GetItemQueryIterator<Product>(     queryDefinition: parameterizedQuery     );
       
 while (filteredFeed.HasMoreResults)
{ FeedResponse<Product> response = await filteredFeed.ReadNextAsync(); // Iterate query results foreach (Product item in response)
            {
                if (financialData.PoolList == null)
                {
                    financialData.PoolList = new List<PoolRecords>();
                }

                financialData.PoolList.Add(JsonConvert.DeserializeObject<PoolRecords>(account.ToString()));
            }
        }
            return financialData;
        }

    }
    
}

Step 3:
Now we are going to call the Durable function:
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Azure.Identity;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using OpenTelemetry.Logs;



namespace DF_SalesPool_MultiThreadedDemo
{
    public static class FunctionDemoSalesOrderPool
    {


        [FunctionName("FunctionDemoSalesOrderPool")]
        public static async Task<string> RunOrchestrator(
            [OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            
            GenerateToken token = new GenerateToken();
            string tokenVl = token.GetToken();
            var jsonBody = context.GetInput<string>();


            PayloadElements output = await context.CallActivityAsync<PayloadElements>("GetListofRecords", jsonBody);
            List<PoolRecords> records = output.PoolList;
            var parallelTasks = new List<Task<string>>();

            for (int i = 0; i < records.Count; i++)
            {
                RequestObject obj = new RequestObject();
                obj.Payload = records[i].ToString();
                obj.TokenVal = tokenVl;
                Task <string> task = context.CallActivityAsync<string>("CreateRecord", obj);
                parallelTasks.Add(task);
            }
            await Task.WhenAll(parallelTasks);
            //Optionally you can process any further step: example in case of journals processing, you can call the posting routine here
            string result = await context.CallActivityAsync<string>("FinishCall", "");
            
            return result;
        }

        [FunctionName(nameof(FinishCall))]
        public static string FinishCall([ActivityTrigger] string result, ILogger log)
        {
            return "Process completed.";
        }

        [FunctionName(nameof(GetListofRecords))]
        public static PayloadElements GetListofRecords([ActivityTrigger] string json, ILogger log)
        {
            PoolRecords elements = new PoolRecords();
            PayloadElements element = elements.SplitAndStore(json);
            return element;
        }

        [FunctionName("CreateRecord")]
        public static async Task<string> CreateRecord(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] RequestObject obj,
            [DurableClient] IDurableOrchestrationClient starter,
            ILogger log)
        {
            string instanceId = await starter.StartNewAsync("CreateRecord", null);

            string URL = Environment.GetEnvironmentVariable("D365URL");

            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(URL);
            string payload = obj.Payload;
            string token = obj.TokenVal

            request.Method = "POST";

            request.ContentType = "application/json";

            request.ContentLength = payload.Length;

            StreamWriter requestWriter = new StreamWriter(request.GetRequestStream(), System.Text.Encoding.ASCII);

            request.Headers.Add("Authorization", string.Format("Bearer {0}", token));

            requestWriter.Write(payload);

            requestWriter.Close();

            string response = String.Empty;
      
            try

            {

                WebResponse webResponse = request.GetResponse();

                Stream webStream = webResponse.GetResponseStream();

                StreamReader responseReader = new StreamReader(webStream);

                response = responseReader.ReadToEnd();

                responseReader.Close();
                
               

            }

            catch (Exception e)
            {
                
                log.LogInformation(e.Message);

            }


            return instanceId;

        }


    }
}

Look at the code, and see how it's working.

1. First we are staring with the method: FunctionDemoSalesOrderPool-- which is an Orchestrator function, It simply is the entry point from where everything starts.
2. We are getting individual records passed in an array and looping them in a for loop by calling another Azure function called GetListofRecords and calling another Azure function called CreateRecods to loop through them. 
3. I have created another class called: RequestObject which simply gets the payload and tokenvalue in variables and store them:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DF_SalesPool_MultiThreadedDemo
{
    internal class RequestObject
    {
        string payload;
        string tokenVal;

        public string Payload{get; set;}
        public string TokenVal{get; set;}


    }
}

We are passing this class as as a parameter between orchestrator and unwrappiing them in the createRecord method:  
string payload = obj.Payload;
string token = obj.TokenVal

Lastly, call the following logic to update the record, from the function 'CreateRecord':

cosmosClient = new CosmosClient(EndpointUri, PrimaryKey);
database = await cosmosClient.CreateDatabaseIfNotExistsAsync("ToDoList");
container = await database.CreateContainerIfNotExistsAsync("Items", "/Processed");
execStatmt{ "op": "set", "path": "/Processed", "value": "Yes" }

Above is an example of a Set operation where we are updating the value of Processed = Yes, and consequently this won't be again considered in the next run.
EnPointURL = is the URL of the cosmos DB, which you can keep in a LocalSettings.JSON which gets deployed as an Environment variable, once deployed.

With that, let me take your leave. Please let me know how it went (I am available on LinkedIn messanger/c.subhashish@outlook.com). 
Would soon be back with more such ideas on handling bulk records CRUD operations with D365. Much love and Namaste, as always 💓💓💓

Comments

*This post is locked for comments