CloverDX Blog on Data Integration

Introducing Rollup in CloverDX: Here's What You Need to Know

Written by Martin Janik | November 12, 2010

Rollup is a general transformation component introduced in CloverDX version 2.8. It serves as an executor of rollup transformations written in Java or CTL. Basically, rollup transformations are used to process groups of data records. Each group consisting of M data records may be used to output N different data records. The number of output data records is determined at runtime within the rollup transformation. If no group key is defined, all input data records belong to a single group.

Each group of data records may share a data record referred to as a group accumulator. It may be used to store intermediate results on group basis. A group accumulator is created and initialized when the first data record of a group is encountered and may be updated for each data record in this group (including the first and the last data record). When the last data record of a group is encountered, processing of this group is finished and the group accumulator is disposed. If the input data records are not sorted, each group is finished as soon as all input data records are read and processed.

Because Rollup is a general transformation component, it can be used instead of Aggregate, Dedup, Normalizer and Denormalizer components. Rollup does not require sorted input data records and therefore can save overhead required to sort them. So if you have millions of unsorted data records belonging to a few groups, Rollup is definitely the way to go. It is also possible to use Rollup to output intermediate results related to each input data record within a group.

Lifecycle of a Rollup Transformation

The lifecycle of a rollup transform is described in the org.jetel.component.rollup.RecordRollup Java interface. Here goes its nicer version:

  • The init(Properties, DataRecordMetadata, DataRecordMetadata, DataRecordMetadata[]) method is called to initialize the transform.
  • For each input data record as a current data record:
    • If the current data recordbelongs to a new group:
      • If requested, a group accumulator is created.
      • The initGroup(DataRecord, DataRecord) method is called to initialize processing of the group and to initialize the group accumulator (if it exists).
    • The updateGroup(DataRecord, DataRecord) method is called for the current data record and the corresponding group accumulator (if it was requested).
    • If the method returned true, the updateTransform(int, DataRecord, DataRecord, DataRecord[]) method is called repeatedly to generate output data record(s) until it returns RecordRollup.SKIP. If it returns value < RecordRollup.SKIP, the getMessage() method is called.
    • If the current data recordis the last one in its group:
      • The finishGroup(DataRecord, DataRecord) method is called to finish the group processing.
      • If the method returned true, the transform(int, DataRecord, DataRecord, DataRecord[]) method is called repeatedly to generate output data record(s) until it returns RecordRollup.SKIP. If it returns value < RecordRollup.SKIP, the getMessage() method is called.
      • If the group accumulator was requested, its contents is disposed.
  • The finished() method is called to notify that the rollup transform finished.
  • The reset() method may optionally be called to reset the transformation and so that the previous two steps may be executed again.

The lifecycle seems complicated but the main idea is quite simple - process each group and record within that group and notify the user about each relevant event. This way it is possible to implement virtually any transformation with just a single component.

Example of Rollup Transformation

Let's have a very simple rollup transformation graph:

The DataGenerator component is used as a source of sample data records with an integer key and a decimal value. The Rollup component is used to count the number of occurrences within each group, the sum of all decimal values within a group, and the average value within a group. The Trash component serves as /dev/null.

The rollup transform itself is not complicated either:

function void initGroup(output groupAccumulator) {
// default values are provided here
groupAccumulator.key = $0.key;
groupAccumulator.occurrences = 0;
groupAccumulator.sum = 0;
}

function boolean updateGroup(output groupAccumulator) {
groupAccumulator.occurrences++;
groupAccumulator.sum = groupAccumulator.sum + $0.value;
return false; // no intermediate output records will be generated
}

function boolean finishGroup(output groupAccumulator) {
groupAccumulator.average = groupAccumulator.sum / groupAccumulator.occurrences;
return true; // group output records will be generated
}

function integer updateTransform(integer counter, output groupAccumulator) {
raiseError("This function will not be called at all!");
}

function integer transform(integer counter, output groupAccumulator) {
// only a single output data record will be generated
if (counter > 0) {
return SKIP;
}
$0.* = groupAccumulator.*;
return ALL;
}

The above five functions are required functions that must be implemented for the rollup transform to work correctly. All the other (commented out) functions presented in the CTL template are optional. As you might see, the possibilities are endless.

Do you use the Rollup component in your project? Please let us know what do you use it for or what attributes/functionality you miss.