Hi,
Lets assume I have directory with 1000 files.
CloverETL read the data from those files and wrote the data to database after processing.
I execute CloverETL again after 10 minutes, and 10 new files were added to the directory
during this period.
I want only the 10 new files to be processed, and not all the 1010 files.
1. Does ColverETL support this feature? If yes then how?
2. If I assume the name of the file is sequence number, is there an easier solution?
Regards,
Asaf
Lets assume I have directory with 1000 files.
CloverETL read the data from those files and wrote the data to database after processing.
I execute CloverETL again after 10 minutes, and 10 new files were added to the directory
during this period.
I want only the 10 new files to be processed, and not all the 1010 files.
1. Does ColverETL support this feature? If yes then how?
2. If I assume the name of the file is sequence number, is there an easier solution?
Regards,
Asaf
-
Hello,
unfortunately our incremental reading feature can't recognized new files in the time being, but I've created an issue (http://bug.cloveretl.com/view.php?id=3130) for this feature.
The easiest way to achieve your goal is to moved successfully processed files into another folder. You can do it directly in Clover with SystemExecute component:<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1256635798303" name="incrementalMultifile1" >
<Global>
<Metadata id="Metadata0">
<Record fieldDelimiter="|" name="recordName1" recordDelimiter="\n" type="delimited">
<Field name="field1" type="string"/>
</Record>
</Metadata>
<Metadata id="Metadata1" >
<Record fieldDelimiter="|" name="twoFields" recordDelimiter="\n" type="delimited">
<Field auto_filling="source_name" name="fileName" type="string"/>
<Field name="field2" type="string"/>
</Record>
</Metadata>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
</Global>
<Phase number="0">
<Node fileURL="${DATAIN_DIR}/*" id="DATA_READER0" type="DATA_READER"/>
<Node id="SIMPLE_COPY0" type="SIMPLE_COPY"/>
<Node id="TRASH0" type="TRASH"/>
<Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (output)" toNode="SIMPLE_COPY0:0"/>
<Edge fromNode="SIMPLE_COPY0:0" id="Edge1" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (out)" toNode="TRASH0:0"/>
<Edge fromNode="SIMPLE_COPY0:1" id="Edge2" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 1 (out)" toNode="REFORMAT0:0"/>
</Phase>
<Phase number="1">
<Node id="REFORMAT0" type="REFORMAT">
<attr name="transform"><![CDATA[//#TL
string fileName = '';
// Transforms input record into output record.
function transform() {
if ($0.fileName.eq.fileName) return SKIP;
fileName = $0.fileName;
$0.field1 := $0.fileName;
}
]]></attr>
</Node>
<Node id="SYS_EXECUTE0" interpreter="xargs sh ${}" type="SYS_EXECUTE">
<attr name="command"><![CDATA[mv -t processed-in $@
]]></attr>
</Node>
<Edge debugMode="true" fromNode="REFORMAT0:0" id="Edge3" inPort="Port 0 (input for command)" metadata="Metadata0" outPort="Port 0 (out)" toNode="SYS_EXECUTE0:0"/>
</Phase>
</Graph>
or, if the file name contains increasing number, you can add DataGenerator component, that prepares file names to read before your graph:<?xml version="1.0" encoding="UTF-8"?>
<Graph id="1256559327644" name="incrementalMultifile" revision="1.35">
<Global>
<Metadata id="Metadata0" >
<Record fieldDelimiter="|" name="recordName1" recordDelimiter="\n" type="delimited">
<Field name="field1" type="string"/>
</Record>
</Metadata>
<Property fileURL="workspace.prm" id="GraphParameter0"/>
</Global>
<Phase number="0">
<Node generateURL="${TRANS_DIR}/readFiles.java" id="DATA_GENERATOR0" recordsNumber="1000" type="DATA_GENERATOR"/>
<Node fileURL="port:$0.field1:source" id="DATA_READER0" type="DATA_READER"/>
<Node debugPrint="true" id="TRASH0" type="TRASH"/>
<Edge debugMode="true" fromNode="DATA_GENERATOR0:0" id="Edge2" inPort="Port 0 (input)" metadata="Metadata0" outPort="Port 0 (out)" toNode="DATA_READER0:0"/>
<Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (output)" toNode="TRASH0:0"/>
</Phase>
</Graph>
with readFiles.java:import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.jetel.component.DataRecordGenerate;
import org.jetel.data.DataRecord;
import org.jetel.exception.ComponentNotReadyException;
import org.jetel.exception.TransformException;
public class readFiles extends DataRecordGenerate {
private String NUMBER_FILE;
private final static Pattern FILE_PATTERN = Pattern.compile("\\D*(\\d+)\\D*");
private int lastNumber = -1;
private File numberFile;
private File[] inFile;
private int index;
protected int currentNumber;
public boolean init() throws ComponentNotReadyException {
NUMBER_FILE= getGraph().getGraphProperties().getStringProperty("DATATMP_DIR") + "/inc.bin";
numberFile = new File(NUMBER_FILE);
if (numberFile.exists()) {
try {
lastNumber = (new ObjectInputStream(new FileInputStream(numberFile))).readInt();
} catch (Exception e) {
throw new ComponentNotReadyException(e);
}
}
inFile = (new File(getGraph().getGraphProperties().getStringProperty("DATAIN_DIR")).listFiles(new FileFilter() {
public boolean accept(File pathname) {
if (lastNumber == -1) return true;
Matcher fnm = FILE_PATTERN.matcher(pathname.getName());
if (fnm.find()) {
currentNumber = Integer.parseInt(fnm.group(1));
return currentNumber > lastNumber;
}
return false;
}
}));
index = 0;
return super.init();
}
public int generate(DataRecord[] arg0) throws TransformException {
if (index >= inFile.length) return SKIP;
Matcher fnm = FILE_PATTERN.matcher(inFile[index].getName());
if (fnm.find()) {
currentNumber = Integer.parseInt(fnm.group(1));
if (currentNumber > lastNumber) {
lastNumber = currentNumber;
}
arg0[0].getField(0).setValue(inFile[index++].getAbsolutePath());
return 0;
}
return SKIP;
}
public void finished() {
try {
if (!numberFile.exists()) {
numberFile.createNewFile();
}
ObjectOutputStream writer = new ObjectOutputStream(new FileOutputStream(numberFile));
writer.writeInt(lastNumber);
writer.flush();
writer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
super.finished();
}
} -
Hi Agata,
Thanks a lot for your comprehensive response.
When I tried the DATA_GENERATOR approach I got the following error:
(org.jetel.graph.runtime.WatchDog executePhase:449) ERROR - Phase initialization failed with reason: DATA_READER ...FAILED !
Reason: The field not found for the statement: 'port:$0.field1:source'
Can you please assist with this error? -
Hello asaf,
you shouldn't have 'port:$0.field1:source' in the DataGenerator. This string should be used as fileURL in DataReader.
Please sign in to leave a comment.
Comments 3