A few thoughts on the existing code parallelization

A few weeks ago I worked on some old code parallelization. The whole process made me think about how efficient parallelization of the existing code in R can really be and what should be considered efficient. There is a lot of information about parallelization in R online (thanks to everybody who put time to write tutorials and overviews on that topic, they were all helpful in their own way!), but here are a few of my thoughts.

Ideally, parallelization would consist of adding a few lines of code that would distribute old procedures across processors and, thus, make them run in parallel. However, this would require that the existing code is written in the portable manners that allows such a simple addition. In R, this would most likely consist of using pply functions instead of loops, minimizing (or avoiding) using for or other loops, avoiding nested loops, and “packing” independent chunks of code into individual functions that would return results in standardize format, e.g., data frame, vector, or a list. Hence, to achieve this seamless parallelization one would either have to specifically write a code with idea of portability in mind or, in general, be an eager supporter (and implementer) of good R programming practices.

Unfortunately, every now and then there are circumstances in which having a working version of a program that performs as expected and runs reasonably fast on the given data set trumps code optimization and (some of the) good practices. And in the most of such cases, once the project is done, the code gets archived without any additional optimization. But what happens when the old code needs to be reused and it does not scale well with the new data sets? If one is lucky (although we can debate whether or not the luck has anything to do with it), pretty efficient parallelization can be achieved by adjusting one or two procedures to make them suitable for distributed processing would also be an acceptable option. But often more effort is required. And that is when one comes to the point when it has to be decided how much effort is really worth putting into existing code parallelization vs (re)writing the parallel version of the code itself?

Clearly, there is no universal answer. But there are at least three things to consider when weighting that decision: 1) who is the original code author (it is probably easier to modify your own code than somebody else’s), 2) when was the code written (how much one remembers about the code/project and are there new libraries available now that can improve the code), and 3) programming practices used in coding, code length, quality of code commenting, and the formatting style used. Generally, building on with your own code is always easier, not only because you are more familiar with your coding style, but also because you are familiar with the problem the code is trying to solve. In that case, even if you decide to rewrite the code, you may still be able to reuse large portions of the original code (just make sure that if your goal is a clean and optimized code, you will reuse only code that fits those criteria). Working with somebody else’s code could be tricky, especially if there are no standardized programming practices defined and code is not well documented. The most important thing here is to be sure to understand what each line of the code intends to do. And of course, in both cases, the new code needs to be tested and the results need to be compared to those obtained by the old code.

Now, let’s talk about some more practical parallelization issues.

R does not support parallelization by default. But as with many other topics, there is a number of great libraries to use to enable parallelization of your code. Cran r-project provides a great summary list of libraries for different types of parallel computing: implicit, explicit, hadoop, grid computing, etc. When thinking about how to parallelize the code it is very important to thing to think about where will the code run, as it impacts the way in which parallelization has to be addressed in the code. Thus, the implementation of parallel code that runs locally on a single multiprocessor computer or on a cluster of computers is probably differ, was well as implementation for unix vs windows system.

Probably the easiest way to illustrate this is using registerDoParallel function from the doParallel package. This package provides a parallel backend for the foreach package and represents a merger of doMC and doSNOW packages. By default, doParallel uses multicore functionality on Unix-like systems and snow functionality on Windows, as multicore supports multiple workers only on those operating systems that support the fork system call.

Let’s say we have an N-core processor (on a local machine) and we want to use N-1 cores to run our program.

On Unix-based systems, we could do it as:
cl <- makeCluster(detectCores() - 1)
registerDoParallel(cl)
the rest of code including foreach
#stopCluster(cl)

Where cl represents cluster size, or in this case a number of workers that will be used to execute the program.

Alternatively, we could specify the number of cores we want to use directly as:
registerDoParallel(cores = (detectCores() - 1))

Remember that multicore functionality only runs tasks on a single computer, not a cluster of computers. Also note that from the hardware perspective clusters and cores represent different things and should not be treated as equal; but in local machine environment, the execution of both examples will have the same flow.

While both versions of the code work on the Unix-based systems, on Windows machines, only the first version will work (attempting to use more than one core with parallel results in an error).

Of course, if you use the cluster version, don't forget to stop the cluster with stopCluster(cl) command. This ensures that cluster objects are not left in a corrupt state or out of sync (for example due to leftover data in the the socket connections to the cluster workers). If you're not sure that the existing clusters are in uncorrupt state, you can always call the stopCluster function first and then create another cluster.

The system where the code will be executed makes a difference in the parallelism implementation. However, it is worth keeping in mind that a clean code that is optimized for one type of parallelism will likely be easier to adjust to another type of parallelism than ad-hoc code.

There are many other things that would be interesting to discuss from the system perspective, but I am not an expert in the field and don't have knowledge to discuss in-depth issues associated with those, so I will just briefly mention two of them.

First, the majority of R packages are not written with parallelization in mind. While this does not mean that they cannot be used in parallel environment, it does impose some restrictions on parallelization. For example, let's talk about parallelization of operations on network nodes. If we want to parallelize an operation on network nodes, we can assign one network node to each processor, perform some calculations on each node (for example, count the number of its neighbors), and merge the results. However, if we want to perform a bit more complex operations on the nodes, e.g., operations that require calculations that involve other network nodes (for example, count the number of graphlets or orbits that the node is involved in), we would not be able to achieve optimal parallelization using the non-parallel tools, as the same calculation would be repeated on multiple nodes (as there is no way for nodes to share the intermediate, common results), or we may not be able to simply merge the results, as that may result in overcounting.

Second issue to consider is the dependency among the packages and overall lack of consistency among the available packages. That means that if you plan to use a package that does not include parallel implementation of desired function, you should carefully test if it performs as expected in parallel environment. For example, you should check is the package written in a way that all of its dependencies will be exported across the clusters and whether or not it requires additional, non R libraries (are those libraries installed on all clusters?)? More complex issues would include the way package deals with resource allocation, data partition, data merging, etc.

Of course, these are all issues that should be addressed even if one writes a program from scratch and does not use other packages/libraries.

Now let's go back to a few more practical notes before I finish.

I will focus on the foreach package. This package provides a looping construct for executing R code in parallel (but also sequentially). While it is a looping construct and contains for in its name, it is not a simple, copy-paste replacement for a standard for loop. The main difference between these two is in the way data is handled - forach does not solely execute code repeatedly but also merges and returns the results obtained by execution of each loop.

For example, within the standard for loop one can execute multiple commands, but in order to merge the results for each iteration, one needs to explicitly add the results into the predefined variable (e.g., vector or list):
result <- c()
for (i in 1:5){
a <- i + i
result <- c(result, a*i)
}

Within the foreach loop one can also execute multiple commands, but it also returns a value. In this sense, the foreach loop behaves similar to a function, as it expects that the last line represents the value that will be returned (or it will return null). However, foreach also combines the returned values for each iteration into a list (or other data structures, I will talk about that soon):
result_par <- foreach (i = 1:5) %dopar% {
a <- i + i
a*i
}

Therefore, when writing a foreach loop, one should keep in mind that 1) it is expected to return a single variable/object and 2)how variables/objects from each iteration should be combined.

Both of these things can be addressed pretty simple. If one needs to perform multiple calculations within the loop and wants to save multiple values, those values can be combined into a list:
results <- foreach(i = 1:100,....
{...
list(result_1, result_2, ... )
}

Depending what these results represent, one may need to write a custom function to ensure that they merge in an appropriate manner. Assuming that result_1 and result_2 from the above example represent data frames, we can create a new merging function that will combine results for each iteration in a data frame:
rbindList <- function(x,y){
list(rbind(x[[1]], y[[1]]), rbind(x[[2]], y[[2]]))
}

The only thing left to do is to specify that this function will be used to merge data in the foreach loop:
results <- foreach(i = 1:100, .combine = rbindList,....
{...
list(result_1, result_2, ... )
}

When using foreach, it is also important to ensure that all custom functions called within the foreach loop are exported using the .export option. Similarly, one needs to ensure that all packages that are required for the proper execution are also exported using .packages option:
foreach (i = 1:100, ..., .export = c("myFunction1", "myFunction2"), .packages=c("foreach", "igraph")) %dopar% {...

The foreach package also provides an option for nested for loop implementation using the %:% operator. Technically speaking, this operator turns multiple foreach loops into a single loop, creating a single stream of tasks that can all be executed in parallel. I am not going to go into details about the nested foreach loop, but will just say that some of the practices mentioned above are also applicable to it, specifically procedures to merge data (can differ between multiple loops) and the approaches on how the data will be returned from each loop(that is, in which format). This is especially important in case one wants to use more than two for loops (which probably should be avoided if possible).

Occasionally one needs to write to or read from files in the for loop. The same can be done within the foreach loops:
result_par <- foreach (i = 1:5) %dopar% {
a <- i + i
write.table(a*i, file = "test.txt", row.names = FALSE, col.names = FALSE,append=TRUE)
}

Keep in mind that disc access speed is lower than the processor speed, so disc operations will likely represent a bottleneck and slowdown the execution time. There are some cases when one needs to write data in the file. For example, if the generated results are too big to stay in the memory until all parallel operations are completed, one can decide to execute (1/N)-th of processes in parallel N times. Something like:
for(part_num in 1:5){
print((part_num -1)*500)
results_part <- foreach(j = 1:500,...) %dopar% {
...
}
write.table(results_part, file="results.txt" , sep = "\t", row.names = FALSE, col.names = FALSE, append = TRUE)
}

While the above approach solves the need for often disc access, it requires multiple parallelization calls and these calls bring some computational overhead due to distributing processes across the clusters/processors, loading the required data, etc. For that reason, one should not make assumptions about the execution speed without testing and comparing one version against the other. Similarly, when working on code parallelization it is important to keep in mind that trying to parallelize things that run efficiently sequentially can also slow down your program and make an opposite effect. This is particularly true for very short jobs, where the parallelization overhead diminishes the parallelization benefits. Again, if you're not sure if the parallelization of specific piece of code improves its performances (speedwise), you can always check the execution time using the system.time() command (and if possible, do this on the system on which you will run the code and with sufficiently big example(s)).

Posted in Parallelization | Tagged , , | 2 Comments