CloverDX Blog on Data Integration

Parallel Data Processing with CloverDX Cluster

Written by CloverDX | November 04, 2009

For the upcoming release of CloverDX 2.9, we are working on improvements in CloverDX Server which will allow run transformations in parallel on multiple cluster nodes.

CloverDX Server already supports clustering, so more instances may cooperate to each other. Current stable version already implements common cluster features: fail-over/high-availability and scalability of lots of requests which are load-balanced on available cluster nodes. These features are actually implemented since version 1.3.

The basic concept of new parallelism
Transformation may be automatically executed in parallel on more cluster nodes according to configuration and each of these "worker" transformations processes just its part of data. Because there is one "master" transformation, which manages the other transformations and which gathers tracking data from "worker" transformations, the parallelism is transparent for CloverDX Server client. Client by default "sees" just one (master) execution and aggregated tracking data. However there are still logs and tracking data for each of "worker" transformations, so it's still possible to inspect details of this parallel execution. "Worker" transformations outputs are gathered to the "master", thus client has one single transformation output which may be processed further.

So how to get parts of input data?
Basically, transformation can process data which is already partitioned, which is the best case and there is no overhead with partitioning of data, or CloverDX Server itself can partition input data from one single source and distribute data on the fly (during the transformation) to several cluster nodes using the network connection. Overhead of this operation depends on the speed of network communication and other conditions.

Design changes in the graph
We aim to keep the transformation graph almost the same as it would be for "standalone" execution. Thus there will be just a couple of extra components in the graph which is intended to run in parallel. These components will handle partitioning/departitioning of data in case it's not already partitioned.

Scalability
The new parallelism in CloverDX Server is a giant leap for scalability of the transformations. Ever since the graph is designed for paraller run, the number of computers which run this transformation depends just on cluster configuration. Graph itself is still the same. Configuration of the parallelism includes:

  • working CloverDX Server cluster, thus standalone server instances won't be able to handle such execution
  • "partitioned" sandbox(see below) with list of locations

New sandbox types
On server side, graphs and related files are organized in so-called sandboxes. Until version 2.8, there was just one type: "shared" sandbox. It means that it contains the same files and directory structure on all cluster nodes. Since version 2.9 there will be two more types:

  • "local" sandbox - is (locally) accessible on just one cluster node. It's intended for huge input/output data which is not intended to be shared/replicated among multiple cluster nodes.
  • "partitioned" sandbox - each of its physical location contains just part of data. It's intended as a storage for partitioned input/output data of transformations which are supposed to run in parallel. List of physical locations actually specifies nodes which will run "worker" transformations.

Master - worker responsibilities
Master observes all related workers and when some transformation phase is finished on all workers, it's master's responsibility to allow the workers to process next phase. When any of the workers fails from any reason, it's master's responsibility to abort all the other workers and select whole execution as failed. Master/worker - These terms have meaning only in the scope of one transformation. Since 2.9 there is no privileged node configured as "master" in the cluster, but it doesn't mean that all the nodes are equal. There may be differences between nodes in accessibility to physical sources. Configuration of sandboxes should reflect it.