Customer Portal

Process only unprocessed files

Comments 3

  • Avatar
    avackova
    0
    Comment actions Permalink
    Hello,
    unfortunately our incremental reading feature can't recognized new files in the time being, but I've created an issue (http://bug.cloveretl.com/view.php?id=3130) for this feature.
    The easiest way to achieve your goal is to moved successfully processed files into another folder. You can do it directly in Clover with SystemExecute component:
    <?xml version="1.0" encoding="UTF-8"?>
    <Graph id="1256635798303" name="incrementalMultifile1" >
    <Global>
    <Metadata id="Metadata0">
    <Record fieldDelimiter="|" name="recordName1" recordDelimiter="\n" type="delimited">
    <Field name="field1" type="string"/>
    </Record>
    </Metadata>
    <Metadata id="Metadata1" >
    <Record fieldDelimiter="|" name="twoFields" recordDelimiter="\n" type="delimited">
    <Field auto_filling="source_name" name="fileName" type="string"/>
    <Field name="field2" type="string"/>
    </Record>
    </Metadata>
    <Property fileURL="workspace.prm" id="GraphParameter0"/>
    </Global>
    <Phase number="0">
    <Node fileURL="${DATAIN_DIR}/*" id="DATA_READER0" type="DATA_READER"/>
    <Node id="SIMPLE_COPY0" type="SIMPLE_COPY"/>
    <Node id="TRASH0" type="TRASH"/>
    <Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (output)" toNode="SIMPLE_COPY0:0"/>
    <Edge fromNode="SIMPLE_COPY0:0" id="Edge1" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 0 (out)" toNode="TRASH0:0"/>
    <Edge fromNode="SIMPLE_COPY0:1" id="Edge2" inPort="Port 0 (in)" metadata="Metadata1" outPort="Port 1 (out)" toNode="REFORMAT0:0"/>
    </Phase>
    <Phase number="1">
    <Node id="REFORMAT0" type="REFORMAT">
    <attr name="transform"><![CDATA[//#TL
    string fileName = '';
    // Transforms input record into output record.
    function transform() {
    if ($0.fileName.eq.fileName) return SKIP;
    fileName = $0.fileName;
    $0.field1 := $0.fileName;
    }
    ]]></attr>
    </Node>
    <Node id="SYS_EXECUTE0" interpreter="xargs sh ${}" type="SYS_EXECUTE">
    <attr name="command"><![CDATA[mv -t processed-in $@
    ]]></attr>
    </Node>
    <Edge debugMode="true" fromNode="REFORMAT0:0" id="Edge3" inPort="Port 0 (input for command)" metadata="Metadata0" outPort="Port 0 (out)" toNode="SYS_EXECUTE0:0"/>
    </Phase>
    </Graph>

    or, if the file name contains increasing number, you can add DataGenerator component, that prepares file names to read before your graph:
    <?xml version="1.0" encoding="UTF-8"?>
    <Graph id="1256559327644" name="incrementalMultifile" revision="1.35">
    <Global>
    <Metadata id="Metadata0" >
    <Record fieldDelimiter="|" name="recordName1" recordDelimiter="\n" type="delimited">
    <Field name="field1" type="string"/>
    </Record>
    </Metadata>
    <Property fileURL="workspace.prm" id="GraphParameter0"/>
    </Global>
    <Phase number="0">
    <Node generateURL="${TRANS_DIR}/readFiles.java" id="DATA_GENERATOR0" recordsNumber="1000" type="DATA_GENERATOR"/>
    <Node fileURL="port:$0.field1:source" id="DATA_READER0" type="DATA_READER"/>
    <Node debugPrint="true" id="TRASH0" type="TRASH"/>
    <Edge debugMode="true" fromNode="DATA_GENERATOR0:0" id="Edge2" inPort="Port 0 (input)" metadata="Metadata0" outPort="Port 0 (out)" toNode="DATA_READER0:0"/>
    <Edge fromNode="DATA_READER0:0" id="Edge0" inPort="Port 0 (in)" metadata="Metadata0" outPort="Port 0 (output)" toNode="TRASH0:0"/>
    </Phase>
    </Graph>

    with readFiles.java:
    import java.io.File;
    import java.io.FileFilter;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;

    import org.jetel.component.DataRecordGenerate;
    import org.jetel.data.DataRecord;
    import org.jetel.exception.ComponentNotReadyException;
    import org.jetel.exception.TransformException;


    public class readFiles extends DataRecordGenerate {

    private String NUMBER_FILE;
    private final static Pattern FILE_PATTERN = Pattern.compile("\\D*(\\d+)\\D*");

    private int lastNumber = -1;
    private File numberFile;

    private File[] inFile;

    private int index;
    protected int currentNumber;

    public boolean init() throws ComponentNotReadyException {
    NUMBER_FILE= getGraph().getGraphProperties().getStringProperty("DATATMP_DIR") + "/inc.bin";
    numberFile = new File(NUMBER_FILE);
    if (numberFile.exists()) {
    try {
    lastNumber = (new ObjectInputStream(new FileInputStream(numberFile))).readInt();
    } catch (Exception e) {
    throw new ComponentNotReadyException(e);
    }
    }
    inFile = (new File(getGraph().getGraphProperties().getStringProperty("DATAIN_DIR")).listFiles(new FileFilter() {
    public boolean accept(File pathname) {
    if (lastNumber == -1) return true;
    Matcher fnm = FILE_PATTERN.matcher(pathname.getName());
    if (fnm.find()) {
    currentNumber = Integer.parseInt(fnm.group(1));
    return currentNumber > lastNumber;
    }
    return false;
    }
    }));
    index = 0;
    return super.init();
    }

    public int generate(DataRecord[] arg0) throws TransformException {
    if (index >= inFile.length) return SKIP;
    Matcher fnm = FILE_PATTERN.matcher(inFile[index].getName());
    if (fnm.find()) {
    currentNumber = Integer.parseInt(fnm.group(1));
    if (currentNumber > lastNumber) {
    lastNumber = currentNumber;
    }
    arg0[0].getField(0).setValue(inFile[index++].getAbsolutePath());
    return 0;
    }
    return SKIP;
    }

    public void finished() {
    try {
    if (!numberFile.exists()) {
    numberFile.createNewFile();
    }
    ObjectOutputStream writer = new ObjectOutputStream(new FileOutputStream(numberFile));
    writer.writeInt(lastNumber);
    writer.flush();
    writer.close();
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    super.finished();
    }
    }
  • Avatar
    asaf
    0
    Comment actions Permalink
    Hi Agata,

    Thanks a lot for your comprehensive response.

    When I tried the DATA_GENERATOR approach I got the following error:
    (org.jetel.graph.runtime.WatchDog executePhase:449) ERROR - Phase initialization failed with reason: DATA_READER ...FAILED !
    Reason: The field not found for the statement: 'port:$0.field1:source'

    Can you please assist with this error?
  • Avatar
    avackova
    0
    Comment actions Permalink
    Hello asaf,
    you shouldn't have 'port:$0.field1:source' in the DataGenerator. This string should be used as fileURL in DataReader.

Please sign in to leave a comment.