The transform instance export a few properties which are also available from the user callback function:
transform.state.finished
The number of transformation callback which have been executed; was transform.finished before version 2.
transform.state.running
The number of transformation callback being run at a given time; was transform.finished before version 2.
transform.state.started
The number of transformation callback which have been initiated; was transform.finished before version 2.
Get state information
The state instance example illustrates how to accessed the state from a running instance.
import{ transform }from'stream-transform';// Generate a dataset of 5 recordsconst records ='record\n'.repeat(5).trim().split('\n');// Initialize the transformationconst transformer =transform(records,(record, callback)=>{setTimeout(()=>{const{running, started, finished}= transformer.state;callback(null,`${running}_${started}_${finished}\n`);},100);});// Get notify when done
transformer.on('end',()=>{
process.stdout.write('-------\n');const{running, started, finished}= transformer.state;
process.stdout.write(`${running}_${started}_${finished}\n`);});// Print the transformed records to the standard output
transformer.pipe(process.stdout);
The handler and event functions are bound with the context of the transformer. Thus, it is possible to access the state properties from inside the functions. Also, the Node.js stream API will call the functions associated with events with the context of the stream instance. Of course, this won't work with fat arrow functions.
The state handler example references the state properties from inside the user function and check their values.
import{ transform }from'stream-transform';import assert from'node:assert';// Generate a dataset of 5 recordsconst records ='record\n'.repeat(5).trim().split('\n');const assertions =['2_2_0','1_2_1','2_4_2','1_4_3','1_5_4',];let count =0;// Execute the transformationtransform(records,{parallel:2},function(_, callback){setTimeout(()=>{const{running, started, finished}=this.state;
assert.equal(assertions[count++],`${running}_${started}_${finished}`);callback(null,`${running}_${started}_${finished}\n`);},100);})// Get notify on error.on('end',function(){const{running, started, finished}=this.state;
process.stdout.write(`end: ${running}_${started}_${finished}\n`);})// Print the transformed records to the standard output.pipe(process.stdout);