Help with CSV Split

Post Reply
Sunnyland
Member
Posts: 56
Joined: Mon Aug 19, 2013 1:34 am

Help with CSV Split

Post by Sunnyland »

I have a large CSV file which I want the split into kinds.
The file has id's (packNrs) I would like to split using these values. At this stage I have found code that allows me to split on these values but my issue is witting the new CSV files basically I am only get one row written before the next fie begins or the script is complete.

I am not strong with nodejs but understand I need to use promises. Any help on how I could make sure the file is written before written the next file would be great.

this is the code so far.

const fs = require('fs-extra');
const fastCsv = require('fast-csv');

async function jobArrived(s, flowElement, job) {
const options = {headers: true, delimiters: ','}; // relative to your CSV usage

const jobPath = await job.get(AccessLevel.ReadOnly);
try{
await readCSV(jobPath,options);
await job.sendToData(Connection.Level.Success);
}
catch(e){
console.log(e);
await job.sendToData(Connection.Level.Error);
}
}



var readCSV = function(path, options) {
const datas = {};
return new Promise(function(resolve, reject) {

fastCsv
.parseFile(path, options)
.on('data', d => {
if (!datas[d.id]) datas[d.id] = [];
datas[d.packNr].push(d)
})
.on('end', () => {
Object.keys(data).forEach(id=> {
// For each ID, write a new CSV file
fastCsv
.write(datas[id],options)
.pipe(fs.createWriteStream(`./data-id-${id}.csv`))
})

})
})
}


So basically I am reading the CSV file and separating each id into an array. I then need to save a new csv file containing only the records with the same id. At the moment files are saving with either the header or the first record. I am assuming that the promise is returning before the writing is complete but I am unsure how to fix.
freddyp
Advanced member
Posts: 1008
Joined: Thu Feb 09, 2012 3:53 pm

Re: Help with CSV Split

Post by freddyp »

Remark upfront: if you show code, place it between the code tags. Presenting it like text, especially without indentation, makes it unreadable.

Whenever you see sample code on the internet that looks like this:

Code: Select all

  return new Promise(function (resolve, reject) {
    fastCsv
      .parseFile(path, options)
      .on("data", (d) => {
        if (!datas[d.id]) datas[d.id] = [];
        datas[d.packNr].push(d);
      })
      .on("end", () => {
        Object.keys(data).forEach((id) => {
          // For each ID, write a new CSV file
          fastCsv.write(datas[id], options).pipe(fs.createWriteStream(`./data-id-${id}.csv`));
        });
      });
  });
with .someFunction and .on("data") and .on("end") you can also write it like this:

Code: Select all

 let csv = await fastCsv.parseFile(jobPath, options);
But this does not catch errors, so what you really want to write is this:

Code: Select all

  const options = { headers: true, delimiters: "," }; // relative to your CSV usage
  const jobPath = await job.get(AccessLevel.ReadOnly);
  let csv; //declared before the try, so it is available after the catch
  try {
    csv = await fastCsv.parseFile(jobPath, options);
  } catch (e) {
    await job.log(LogLevel.Error, e.message);
    await job.sendToData(Connection.Level.Error);
    return;
  }
  //here csv is the object returned by fastCsv
  await job.log(LogLevel.Info, JSON.stringify(csv)); //not advised for big files
  
  //and now you start looping over the object, writing to temporary files and sending them to output
  
  //and finally you get rid of the input file
  await job.sendToNull();
There are of course many variations possible, but this is the structure that I find works well in the context of Switch and that is easy to understand.

From your description I get the impression that you are getting some output, but that is impossible because you are not writing the output to a temporary file on the basis of which you create a childJob and then you send that childJob to output.
Check out the webinar Laurent and I gave to see how to do that:https://learning.enfocus.com/course/view.php?id=304
Sunnyland
Member
Posts: 56
Joined: Mon Aug 19, 2013 1:34 am

Re: Help with CSV Split

Post by Sunnyland »

Thanks Freddy, yes I was getting output. At this stage I wasn't too concerned about passing the files back into the flow I just needed the separated CSV files which I had writing to a temp location. The main problem was that the promise was completing before fastcsv would finish writing the new files, I was either getting files with just the header text or they were incomplete.

A friendly user on stack overflow was able to help with this is by adding the code

Code: Select all

 const dataObj = Object.keys(datas)
        const lg = dataObj.length
        let i = 0
        dataObj.forEach(id=> {
            // For each ID, write a new CSV file
            pipeline(
               fastCsv.write(datas[packNr],options),
               fs.createWriteStream(`./data-id-${id}.csv`),
               e => {
                    if(e) reject(e)
                    i++
                    if(i === lg) resolve()
               }           
            )
When creating these files should I be storing them in a specific file location? What is the best practice for temp files in switch?
Sunnyland
Member
Posts: 56
Joined: Mon Aug 19, 2013 1:34 am

Re: Help with CSV Split

Post by Sunnyland »

Thanks Freddy, yes I was getting output. At this stage I wasn't too concerned about passing the files back into the flow I just needed the separated CSV files which I had writing to a temp location. The main problem was that the promise was completing before fastcsv would finish writing the new files, I was either getting files with just the header text or they were incomplete.

A friendly user on stack overflow was able to help with this is by adding the code

Code: Select all

 const dataObj = Object.keys(datas)
        const lg = dataObj.length
        let i = 0
        dataObj.forEach(id=> {
            // For each ID, write a new CSV file
            pipeline(
               fastCsv.write(datas[packNr],options),
               fs.createWriteStream(`./data-id-${id}.csv`),
               e => {
                    if(e) reject(e)
                    i++
                    if(i === lg) resolve()
               }           
            )
When creating these files should I be storing them in a specific file location? What is the best practice for temp files in switch?
laurentd
Member
Posts: 137
Joined: Wed Mar 13, 2019 2:06 pm

Re: Help with CSV Split

Post by laurentd »

Here is an example how to create and use a tmp file.
Note the tmp file path is tmpobj.name
Again, have a look at the webinar to learn more.

Code: Select all

const tmp = require("tmp");
let csvString = "csv content";

async function jobArrived(s, flowElement, job) {
  //create temp file
  const tmpobj = tmp.fileSync();
  // optional log tmp path
  await job.log(LogLevel.Info, tmpobj.name);

  // write csv
  await fs.writeFileSync(tmpobj.name, csvString);

  // send csv to output
  let newJob = await job.createChild(tmpobj.name);
  let newName = job.getName(false) + ".csv";
  await newJob.sendToSingle(newName);
  
  // output incoming job
  await job.sendToSingle();
}
Laurent De Wilde, Solution Architect @ Enfocus
Sunnyland
Member
Posts: 56
Joined: Mon Aug 19, 2013 1:34 am

Re: Help with CSV Split

Post by Sunnyland »

Thanks for your help. Below is my final code to split a CSV file and group by the id field.

Code: Select all

const fs = require('fs-extra');
const fastCsv = require('fast-csv');
const { pipeline } = require('stream')

async function jobArrived(s, flowElement, job) {
const options = {headers: true, delimiters: ','}; // relative to your CSV usage

const jobPath = await job.get(AccessLevel.ReadOnly);
try{
    
    let csvtemppath = await flowElement.getPropertyStringValue("FilesTempFolder")
    let csvtempfilename = await flowElement.getPropertyStringValue("FileName")

    await readCSV(jobPath,options,csvtemppath,csvtempfilename);    

    currentfiles = await fs.readdirSync(csvtemppath);

    for(var i =0; i < currentfiles.length; i++){
        try{
            let filelocation = `${csvtemppath}/${currentfiles[i]}`
            let newJob = await flowElement.createJob(filelocation);             
            await newJob.sendToSingle(currentfiles[i]); //send the new job and name it as required
            await job.log(LogLevel.Info, "SplitCSV file - %1 routed successfully", [currentfiles[i]]);
            fs.remove(filelocation);
            await job.log(LogLevel.Info, "SplitCSV file - %1 removed from %2 folder", [currentfiles[i],csvtemppath]);
        }catch(e){
        console.log(e);
        await job.log(LogLevel.Error, e.message); 
        }    
    }  

    }
    catch(e){
        console.log(e);
        await job.log(LogLevel.Error, e.message);
    }
    {
        await job.sendToNull();
    }
}

    var readCSV = function(path, options,csvtemppath,csvtempfilename) {
    const datas = {}; // data['123'] = CSV data filtered for id = 123
    
    return new Promise(function(resolve, reject) {

    fastCsv
    .parseFile(path, options)
    .on('data', d => {
        if (!datas[d.packNr]) datas[d.packNr] = [];
        datas[d.packNr].push(d)        
    })
    .on('end', () => {
        const dataObj = Object.keys(datas)
        const lg = dataObj.length
        let i = 0
        dataObj.forEach(packNr => {
            // For each ID, write a new CSV file
            pipeline(
               fastCsv.write(datas[packNr],options),
               fs.createWriteStream( `${csvtemppath}/${csvtempfilename}-${packNr}.csv`),
               e => {
                    if(e) reject(e)
                    i++
                    if(i === lg) resolve()
               }           
            )
        })
     })
})
}
Post Reply