Stream Transform user handler function

The handler function is the responsible for handling all the record transformation. It is the responsibility of the developer to write it. It can clone or modify the input records, skip them and emit multiple records.

Synchronous versus asynchronous execution

The mode is defined by the signature of transformation function. It is expected to run synchronously when it declares only one argument, the data to transform. It is expected to run asynchronously when it declares two arguments, the data to transform and the callback to be called once the transformed data is ready.

In synchronous mode, you may simply return the altered data or throw an error. In asynchronous mode, you must call the provided callback with 2 arguments, the error if any and the altered data.

Using the asynchronous mode present the advantage that more than one record may be emitted per transform callback.

Defining synchronous transformations

In the synchronous example, the transformation function is run synchronously because it only declares one argument, the data to be transformed. It is expected to return the transformed data or to throw an error.

const transform = require('stream-transform')
transform([
  ['1','2','3','4'],
  ['a','b','c','d']
], function(data){
  data.push(data.shift())
  return data.join(',')+'\n'
})
.pipe(process.stdout)

The output on the console will be:

2,3,4,1
b,c,d,a

Defining asynchronous transformations

In the asynchronous example, the transformation callback declares two arguments, the data to transform and the callback to call once the data is ready. The transformation callback is executed concurrently with a maximum of 20 parallel executions.

const transform = require('stream-transform')
transform([
  ['1','2','3','4'],
  ['a','b','c','d']
], function(data, callback){
  setImmediate(function(){
    data.push(data.shift())
    callback(null, data.join(',')+'\n')
  })
}, {
  parallel: 20
})
.pipe(process.stdout)

The output on the console will be:

2,3,4,1
b,c,d,a

Altering or cloning the provided data

The data received inside the transformation function is the original data and is not modified nor cloned. Depending on which api you choose, it may be provided in the constructor or send to the write function. If you wish to not alter the original data, it is your responsibility to send a new data in your transformation function instead of the original modified data.

Skipping records

Skipping records is easily achieved by returning null in synchronous mode or passing null to the callback handler in asynchronous mode.

Creating multiple records

Generating multiple records is only supported in asynchronous mode by providing n-arguments after the error argument instead of simply one.