Requirement
Recently, I had a requirement where a huge flat file having around 20K records was supposed to be debatched and processed. This file is received on a file location and an orchestration starts for each debatched message later in the processing.
I used a flat file disassembler to debacth the file and it produced 20K messages and 20 K orchestratoin started and obviously the server was struggling with CPU usage of 100%.
This was not acceptable and hence I thought we need some mechanism so that only defined number of orchestrations should start at a particular time.
Solution
I created a custom pipeline component(code at end) to debatch messages in a manner so that it produces a batch of messages which has defined number of records.
So, for example, incoming file has 20K records and we define batch size of 50 in pipeline properties.
The custom pipeline component will create 400 messages each having 50 records.
Next, I created a staging database, where I dump these 400 messages with status "Ready". Then a WCF-SQL receive location can poll for this message one at a time at some defined polling interval.
So, this polled message is then published to BizTalk which starts 50 orchestrations at once.
Depending on polling interval next 50 orchestrations will not start until polling interval is elapsed.
We can increase/decrease batch size and polling interval according to the resources we have available on BizTalk environment.For e.g. if we increase batch size to 100 and keep polling interval to 60 seconds, 100 orchestrations will run for 1 minute.So, if say 100 orchestrations do not finish processing in 1 minute you can increase polling interval. And if 100 orchestration are causing high CPU usage, decrease batch size and so on.
This is like a funnel, which controls the outflow of messages to BizTalk:
The complete flow look like below:
1. Receive a large flat file in File receive location
2. Debatch using custom pipeline component to produce multiple messages
3. A WCF-SQL send port subscribes to these messages and saves them to database.
4. A WCF-SQL receive location polls messages from this database at a defined interval.
5. A send port subscribes to this message and sends to a file location where we can use normal flat file disassemble which will debatch each record into individual messages and start further processing.
One advantage of this approach is that we can use the File receive location of the 5th step for small files. So you can skip pre-processing done on large files.
The custom disassembler is very generic which accepts any flat file, you need to specify the document schema and header schema like we do for general Flat File Dis assembler.
Code:
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
[System.Runtime.InteropServices.Guid("57D51828-C973-4a62-A534-6DB3CB3CB251")]
public class LargeFlatfileSplitter : FFDasmComp,IBaseComponent, IDisassemblerComponent, IPersistPropertyBag
{
private string systemPropertiesNamespace = @"http://schemas.microsoft.com/BizTalk/2003/system-properties";
private int _BatchSize;
public int BatchSize
{
get { return _BatchSize; }
set { _BatchSize = value; }
}
List outboundMessages = new List();
void IPersistPropertyBag.GetClassID(out Guid classID)
{
classID = new Guid("57D51828-C973-4a62-A534-6DB3CB3CB251");
}
void IPersistPropertyBag.Save
(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
{
base.Save(propertyBag, clearDirty, saveAllProperties);
object val = (object)BatchSize;
propertyBag.Write("BatchSize", ref val);
}
public LargeFlatfileSplitter()
{
}
public string Description
{
get
{
return "Component to batch (break) large message into multiple small messages";
}
}
public string Name
{
get
{
return "LargeFlatfileSplitter";
}
}
public string Version
{
get
{
return "1.0.0.0";
}
}
public System.IntPtr Icon
{
get
{
return new System.IntPtr();
}
}
public new void InitNew()
{
base.InitNew();
}
public new void Load(IPropertyBag propertyBag, int errorLog)
{
object val = null;
try
{
propertyBag.Read("BatchSize", out val, 0);
}
catch (Exception ex)
{
throw new ApplicationException("Error reading propertybag: " + ex.Message);
}
if (val != null)
_BatchSize = (int)val;
else
_BatchSize = 1;
base.Load(propertyBag, errorLog);
}
public System.Collections.IEnumerator Validate(object projectSystem)
{
return null;
}
public new void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
base.Disassemble(pContext, pInMsg);
}
public new IBaseMessage GetNext(IPipelineContext pContext)
{
IBaseMessage disassembledMessage = null;
IBaseMessage outMsg = null;
int counter = 1;
StringBuilder messageString;
messageString = new StringBuilder();
string namespaceURI = "";
string rootElement = "Root";
string receivePortName = "";
while (counter <= BatchSize)
{
disassembledMessage = base.GetNext(pContext);
if (disassembledMessage == null)
break;
//string originalDataString;
XmlDocument originalMessageDoc = new XmlDocument();
try
{
Stream originalMessageStream = disassembledMessage.BodyPart.GetOriginalDataStream();
originalMessageDoc.Load(originalMessageStream);
pContext.ResourceTracker.AddResource(originalMessageStream);
}
catch (Exception ex)
{
throw new ApplicationException("Error in reading original message: " + ex.Message);
}
try
{
if (counter == 1)
{
namespaceURI = originalMessageDoc.DocumentElement.NamespaceURI;
rootElement = originalMessageDoc.DocumentElement.Name;
receivePortName= System.Convert.ToString(disassembledMessage.Context.Read("ReceivePortName", "http://schemas.microsoft.com/BizTalk/2003/system-properties"));
messageString.Append("<" + rootElement + " xmlns='" + namespaceURI + "'>");
}
foreach (XmlNode childNode in originalMessageDoc.DocumentElement.ChildNodes)
{
if (counter == BatchSize)
{
messageString.Append(childNode.OuterXml);
messageString.Append("</" + rootElement + ">");
//Queue message
outMsg = CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement, receivePortName);
// counter = 1;
messageString.Remove(0, messageString.Length);
messageString.Append("<" + rootElement + " xmlns='" + namespaceURI + "'>");
messageString.Append(childNode.OuterXml);
}
else
{
messageString.Append(childNode.OuterXml);
}
}
}
catch (Exception ex)
{
throw new ApplicationException("Error in creating output message: " + ex.Message);
}
counter = counter + 1;
}
if (counter < BatchSize && messageString.Length !=0)
{
messageString.Append("</" + rootElement + ">");
//Queue message
outMsg = CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement, receivePortName);
messageString.Remove(0, messageString.Length);
}
return outMsg;
}
private IBaseMessage CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement, string receivePortName)
{
IBaseMessage outMsg;
try
{
//create outgoing message
outMsg = pContext.GetMessageFactory().CreateMessage();
outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
outMsg.Context.Promote("MessageType", systemPropertiesNamespace, namespaceURI + "#" + rootElement.Replace("ns0:", ""));
outMsg.Context.Promote("ReceivePortName", systemPropertiesNamespace, receivePortName);
byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);
return outMsg;
}
catch (Exception ex)
{
throw new ApplicationException("Error in queueing outgoing messages: " + ex.Message);
}
}
}