I have a large CSV file which I want the split into kinds.
The file has id's (packNrs) I would like to split using these values. At this stage I have found code that allows me to split on these values but my issue is witting the new CSV files basically I am only get one row written before the next fie begins or the script is complete.
I am not strong with nodejs but understand I need to use promises. Any help on how I could make sure the file is written before written the next file would be great.
this is the code so far.
const fs = require('fs-extra');
const fastCsv = require('fast-csv');
async function jobArrived(s, flowElement, job) {
const options = {headers: true, delimiters: ','}; // relative to your CSV usage
const jobPath = await job.get(AccessLevel.ReadOnly);
try{
await readCSV(jobPath,options);
await job.sendToData(Connection.Level.Success);
}
catch(e){
console.log(e);
await job.sendToData(Connection.Level.Error);
}
}
var readCSV = function(path, options) {
const datas = {};
return new Promise(function(resolve, reject) {
fastCsv
.parseFile(path, options)
.on('data', d => {
if (!datas[d.id]) datas[d.id] = [];
datas[d.packNr].push(d)
})
.on('end', () => {
Object.keys(data).forEach(id=> {
// For each ID, write a new CSV file
fastCsv
.write(datas[id],options)
.pipe(fs.createWriteStream(`./data-id-${id}.csv`))
})
})
})
}
So basically I am reading the CSV file and separating each id into an array. I then need to save a new csv file containing only the records with the same id. At the moment files are saving with either the header or the first record. I am assuming that the promise is returning before the writing is complete but I am unsure how to fix.
Help with CSV Split
Re: Help with CSV Split
Remark upfront: if you show code, place it between the code tags. Presenting it like text, especially without indentation, makes it unreadable.
Whenever you see sample code on the internet that looks like this:
with .someFunction and .on("data") and .on("end") you can also write it like this:
But this does not catch errors, so what you really want to write is this:
There are of course many variations possible, but this is the structure that I find works well in the context of Switch and that is easy to understand.
From your description I get the impression that you are getting some output, but that is impossible because you are not writing the output to a temporary file on the basis of which you create a childJob and then you send that childJob to output.
Check out the webinar Laurent and I gave to see how to do that:https://learning.enfocus.com/course/view.php?id=304
Whenever you see sample code on the internet that looks like this:
Code: Select all
return new Promise(function (resolve, reject) {
fastCsv
.parseFile(path, options)
.on("data", (d) => {
if (!datas[d.id]) datas[d.id] = [];
datas[d.packNr].push(d);
})
.on("end", () => {
Object.keys(data).forEach((id) => {
// For each ID, write a new CSV file
fastCsv.write(datas[id], options).pipe(fs.createWriteStream(`./data-id-${id}.csv`));
});
});
});
Code: Select all
let csv = await fastCsv.parseFile(jobPath, options);
Code: Select all
const options = { headers: true, delimiters: "," }; // relative to your CSV usage
const jobPath = await job.get(AccessLevel.ReadOnly);
let csv; //declared before the try, so it is available after the catch
try {
csv = await fastCsv.parseFile(jobPath, options);
} catch (e) {
await job.log(LogLevel.Error, e.message);
await job.sendToData(Connection.Level.Error);
return;
}
//here csv is the object returned by fastCsv
await job.log(LogLevel.Info, JSON.stringify(csv)); //not advised for big files
//and now you start looping over the object, writing to temporary files and sending them to output
//and finally you get rid of the input file
await job.sendToNull();
From your description I get the impression that you are getting some output, but that is impossible because you are not writing the output to a temporary file on the basis of which you create a childJob and then you send that childJob to output.
Check out the webinar Laurent and I gave to see how to do that:https://learning.enfocus.com/course/view.php?id=304
Re: Help with CSV Split
Thanks Freddy, yes I was getting output. At this stage I wasn't too concerned about passing the files back into the flow I just needed the separated CSV files which I had writing to a temp location. The main problem was that the promise was completing before fastcsv would finish writing the new files, I was either getting files with just the header text or they were incomplete.
A friendly user on stack overflow was able to help with this is by adding the code
When creating these files should I be storing them in a specific file location? What is the best practice for temp files in switch?
A friendly user on stack overflow was able to help with this is by adding the code
Code: Select all
const dataObj = Object.keys(datas)
const lg = dataObj.length
let i = 0
dataObj.forEach(id=> {
// For each ID, write a new CSV file
pipeline(
fastCsv.write(datas[packNr],options),
fs.createWriteStream(`./data-id-${id}.csv`),
e => {
if(e) reject(e)
i++
if(i === lg) resolve()
}
)
Re: Help with CSV Split
Thanks Freddy, yes I was getting output. At this stage I wasn't too concerned about passing the files back into the flow I just needed the separated CSV files which I had writing to a temp location. The main problem was that the promise was completing before fastcsv would finish writing the new files, I was either getting files with just the header text or they were incomplete.
A friendly user on stack overflow was able to help with this is by adding the code
When creating these files should I be storing them in a specific file location? What is the best practice for temp files in switch?
A friendly user on stack overflow was able to help with this is by adding the code
Code: Select all
const dataObj = Object.keys(datas)
const lg = dataObj.length
let i = 0
dataObj.forEach(id=> {
// For each ID, write a new CSV file
pipeline(
fastCsv.write(datas[packNr],options),
fs.createWriteStream(`./data-id-${id}.csv`),
e => {
if(e) reject(e)
i++
if(i === lg) resolve()
}
)
Re: Help with CSV Split
Here is an example how to create and use a tmp file.
Note the tmp file path is tmpobj.name
Again, have a look at the webinar to learn more.
Note the tmp file path is tmpobj.name
Again, have a look at the webinar to learn more.
Code: Select all
const tmp = require("tmp");
let csvString = "csv content";
async function jobArrived(s, flowElement, job) {
//create temp file
const tmpobj = tmp.fileSync();
// optional log tmp path
await job.log(LogLevel.Info, tmpobj.name);
// write csv
await fs.writeFileSync(tmpobj.name, csvString);
// send csv to output
let newJob = await job.createChild(tmpobj.name);
let newName = job.getName(false) + ".csv";
await newJob.sendToSingle(newName);
// output incoming job
await job.sendToSingle();
}
Laurent De Wilde, Solution Architect @ Enfocus
Re: Help with CSV Split
Thanks for your help. Below is my final code to split a CSV file and group by the id field.
Code: Select all
const fs = require('fs-extra');
const fastCsv = require('fast-csv');
const { pipeline } = require('stream')
async function jobArrived(s, flowElement, job) {
const options = {headers: true, delimiters: ','}; // relative to your CSV usage
const jobPath = await job.get(AccessLevel.ReadOnly);
try{
let csvtemppath = await flowElement.getPropertyStringValue("FilesTempFolder")
let csvtempfilename = await flowElement.getPropertyStringValue("FileName")
await readCSV(jobPath,options,csvtemppath,csvtempfilename);
currentfiles = await fs.readdirSync(csvtemppath);
for(var i =0; i < currentfiles.length; i++){
try{
let filelocation = `${csvtemppath}/${currentfiles[i]}`
let newJob = await flowElement.createJob(filelocation);
await newJob.sendToSingle(currentfiles[i]); //send the new job and name it as required
await job.log(LogLevel.Info, "SplitCSV file - %1 routed successfully", [currentfiles[i]]);
fs.remove(filelocation);
await job.log(LogLevel.Info, "SplitCSV file - %1 removed from %2 folder", [currentfiles[i],csvtemppath]);
}catch(e){
console.log(e);
await job.log(LogLevel.Error, e.message);
}
}
}
catch(e){
console.log(e);
await job.log(LogLevel.Error, e.message);
}
{
await job.sendToNull();
}
}
var readCSV = function(path, options,csvtemppath,csvtempfilename) {
const datas = {}; // data['123'] = CSV data filtered for id = 123
return new Promise(function(resolve, reject) {
fastCsv
.parseFile(path, options)
.on('data', d => {
if (!datas[d.packNr]) datas[d.packNr] = [];
datas[d.packNr].push(d)
})
.on('end', () => {
const dataObj = Object.keys(datas)
const lg = dataObj.length
let i = 0
dataObj.forEach(packNr => {
// For each ID, write a new CSV file
pipeline(
fastCsv.write(datas[packNr],options),
fs.createWriteStream( `${csvtemppath}/${csvtempfilename}-${packNr}.csv`),
e => {
if(e) reject(e)
i++
if(i === lg) resolve()
}
)
})
})
})
}