Sir, i have tried to provide as much as material so that u also can reproduce the problem.
if i am doing any thing wrong, please let me know.
I have tried to simutate one deadlock situation .The files are...
Graph File:
<?xml version="1.0" encoding="UTF-8"?>
<Graph name="Testing Simple Copy">
<!-- This graph demonstrates functionality of SimpleCopy component. Everything brought to SimpleCopy on port 0 is
duplicated onto all connected OUTPUT31 ports.
It also shows functionality of Trash Component. It discards everything which is sent into it. Its purpose
is debugging - shows how many records ended in it and can print incoming records, if desired (option debugPrint)
-->
<Global>
<Metadata id="MetaData5">
<Record name="Record1" type="delimited">
<Field name="Record1" type="string" delimiter="|" />
<Field name="Record2" type="string" delimiter="|" />
<Field name="Record3" type="string" delimiter="|" />
<Field name="Record4" type="string" delimiter="|\n" />
</Record>
</Metadata>
</Global>
<Phase number="0">
<Node id="INPUT1" type="DELIMITED_DATA_READER_NIO" fileURL="check.txt" DataPolicy="Strict" />
<Node id="SPLIT" type="SPLIT" OprFlag="NOThing" transformClass="trapFilterImp" />
<Node id="MERGE" type="MERGE" KeyFields="0|0" OprFlag="Duplicate" />
<Node id="OUTPUT" type="DELIMITED_DATA_WRITER_NIO" append="false" fileURL="output/orders.merged"/>
<Edge id="FINAL14" fromNode="INPUT1:0" toNode="SPLIT:0" metadata="MetaData5"/>
<Edge id="FINAL16" fromNode="SPLIT:0" toNode="MERGE:0" metadata="MetaData5"/>
<Edge id="FINAL15" fromNode="SPLIT:1" toNode="MERGE:1" metadata="MetaData5"/>
<Edge id="FINAL17" fromNode="MERGE:0" toNode="OUTPUT:0" metadata="MetaData5"/>
</Phase>
</Graph>
SPLIT COMPONENT's Run Function:
This function tries to read from 0 port and Writes only on 0 number Port, Doesnt Write any thing on other OutPorts.
Logic:
public void run() {
boolean isDriverDifferent;
// get all ports involved
InputPort inPort = getInputPort(0);
DataRecord driverRecordread = new DataRecord(inPort.getMetadata());
driverRecordread.init();
// int numActive;// counter of still active ports - those without EOF status
int index=0;
//get array of all ports defined/connected - use collection Collection - getInPorts();
OutputPort outPorts[];
outPorts = (OutputPort[]) getOutPorts().toArray(new OutputPort[0]);
//create array holding incoming records
outputRecords = new DataRecord[outPorts.length];
// initialize array of data records (for each input port one)
for (int i = 0; i < outPorts.length; i++) {
outputRecords[i] = new DataRecord(outPorts[i].getMetadata());
outputRecords[i].init();
}
//numActive=inPorts.length;
int cou[] = new int[5];
while (runIt) {
try {
driverRecordread = inPort.readRecord(driverRecordread);
if(driverRecordread != null){
//This Function Will always Return zero.
int result_t = 0;
outPorts[result_t].writeRecord(driverRecordread);
//System.out.println("In Run: index" + result_t);
}else{
break;
}
} catch (IOException ex) {
resultMsg = ex.getMessage();
resultCode = Node.RESULT_ERROR;
closeAllOutputPorts();
return;
} catch (Exception ex) {
resultMsg = ex.getMessage();
resultCode = Node.RESULT_FATAL_ERROR;
return;
}
}
broadcastEOF();
if (runIt) {
resultMsg = "OK";
} else {
resultMsg = "STOPPED";
}
resultCode = Node.RESULT_OK;
}
Data File: Check.txt
1|two|three|four|
2|two|three|four|
3|two|three|four|
....
...
10000|two|three|four|
if i am doing any thing wrong, please let me know.
I have tried to simutate one deadlock situation .The files are...
Graph File:
<?xml version="1.0" encoding="UTF-8"?>
<Graph name="Testing Simple Copy">
<!-- This graph demonstrates functionality of SimpleCopy component. Everything brought to SimpleCopy on port 0 is
duplicated onto all connected OUTPUT31 ports.
It also shows functionality of Trash Component. It discards everything which is sent into it. Its purpose
is debugging - shows how many records ended in it and can print incoming records, if desired (option debugPrint)
-->
<Global>
<Metadata id="MetaData5">
<Record name="Record1" type="delimited">
<Field name="Record1" type="string" delimiter="|" />
<Field name="Record2" type="string" delimiter="|" />
<Field name="Record3" type="string" delimiter="|" />
<Field name="Record4" type="string" delimiter="|\n" />
</Record>
</Metadata>
</Global>
<Phase number="0">
<Node id="INPUT1" type="DELIMITED_DATA_READER_NIO" fileURL="check.txt" DataPolicy="Strict" />
<Node id="SPLIT" type="SPLIT" OprFlag="NOThing" transformClass="trapFilterImp" />
<Node id="MERGE" type="MERGE" KeyFields="0|0" OprFlag="Duplicate" />
<Node id="OUTPUT" type="DELIMITED_DATA_WRITER_NIO" append="false" fileURL="output/orders.merged"/>
<Edge id="FINAL14" fromNode="INPUT1:0" toNode="SPLIT:0" metadata="MetaData5"/>
<Edge id="FINAL16" fromNode="SPLIT:0" toNode="MERGE:0" metadata="MetaData5"/>
<Edge id="FINAL15" fromNode="SPLIT:1" toNode="MERGE:1" metadata="MetaData5"/>
<Edge id="FINAL17" fromNode="MERGE:0" toNode="OUTPUT:0" metadata="MetaData5"/>
</Phase>
</Graph>
SPLIT COMPONENT's Run Function:
This function tries to read from 0 port and Writes only on 0 number Port, Doesnt Write any thing on other OutPorts.
Logic:
public void run() {
boolean isDriverDifferent;
// get all ports involved
InputPort inPort = getInputPort(0);
DataRecord driverRecordread = new DataRecord(inPort.getMetadata());
driverRecordread.init();
// int numActive;// counter of still active ports - those without EOF status
int index=0;
//get array of all ports defined/connected - use collection Collection - getInPorts();
OutputPort outPorts[];
outPorts = (OutputPort[]) getOutPorts().toArray(new OutputPort[0]);
//create array holding incoming records
outputRecords = new DataRecord[outPorts.length];
// initialize array of data records (for each input port one)
for (int i = 0; i < outPorts.length; i++) {
outputRecords[i] = new DataRecord(outPorts[i].getMetadata());
outputRecords[i].init();
}
//numActive=inPorts.length;
int cou[] = new int[5];
while (runIt) {
try {
driverRecordread = inPort.readRecord(driverRecordread);
if(driverRecordread != null){
//This Function Will always Return zero.
int result_t = 0;
outPorts[result_t].writeRecord(driverRecordread);
//System.out.println("In Run: index" + result_t);
}else{
break;
}
} catch (IOException ex) {
resultMsg = ex.getMessage();
resultCode = Node.RESULT_ERROR;
closeAllOutputPorts();
return;
} catch (Exception ex) {
resultMsg = ex.getMessage();
resultCode = Node.RESULT_FATAL_ERROR;
return;
}
}
broadcastEOF();
if (runIt) {
resultMsg = "OK";
} else {
resultMsg = "STOPPED";
}
resultCode = Node.RESULT_OK;
}
Data File: Check.txt
1|two|three|four|
2|two|three|four|
3|two|three|four|
....
...
10000|two|three|four|
-
Hello !
This problem/issue is known - in situation like this, both edges - FINAL16,FINAL15 are bufferes so the SPLIT component can keep sending data to one or other and merge can
keep reading from both.
BUT ! the size of the buffer (which is internal) is limited and once filled, it causes the deadlock.
The solution:
a) increase the buffer size, so it can accomodate more records
b) put splitter and merger to two different phases - this ensures full buffering (on disk) so
you can first process all data in splitter and then merge them
c) don't create transformation class/node with
two ports connected but the intention to send data only through one of them. If you want to crash the engine, you will definitely succeed :-)
Please sign in to leave a comment.
Comments 1