Last data update: 2014.03.03

R: Enable multi-core parallel evaluation
MulticoreParam-classR Documentation

Enable multi-core parallel evaluation

Description

This class is used to parameterize single computer multicore parallel evaluation on non-Windows computers. multicoreWorkers() chooses the number of workers based on operating system (Windows only supports 1 core), global user preference (options(mc.cores=...)), or the minimum of 8 and the number of detected cores (detectCores()).

Usage


## constructor 
## ------------------------------------

MulticoreParam(workers = multicoreWorkers(), tasks = 0L, 
               catch.errors = TRUE, stop.on.error = TRUE,
               progressbar = FALSE, RNGseed = NULL,
               timeout = 30L * 24L * 60L * 60L,
               log = FALSE, threshold = "INFO", logdir = NA_character_, 
               resultdir = NA_character_, jobname = "BPJOB", ...)

## detect workers 
## ------------------------------------

multicoreWorkers()

Arguments

workers

integer(1) Number of workers. Defaults to all cores available as determined by detectCores.

tasks

integer(1). The number of tasks per job. value must be a scalar integer >= 0L.

In this documentation a job is defined as a single call to a function, such as bplapply, bpmapply etc. A task is the division of the X argument into chunks. When tasks == 0 (default), X is divided as evenly as possible over the number of workers.

A tasks value of > 0 specifies the exact number of tasks. Values can range from 1 (all of X to a single worker) to the length of X (each element of X to a different worker).

When the length of X is less than the number of workers each element of X is sent to a worker and tasks is ignored.

catch.errors

DEPRECATED. logical(1) Enable the catching of errors and warnings.

stop.on.error

logical(1) Enable stop on error.

progressbar

logical(1) Enable progress bar (based on plyr:::progress_text).

RNGseed

integer(1) Seed for random number generation. When not NULL, this value is passed to parallel::clusterSetRNGStream to generate random number streams on each worker.

timeout

numeric(1) Time (in seconds) allowed for worker to complete a task. This value is passed to base::setTimeLimit() as both the cpu and elapsed arguments. If the computation exceeds timeout an error is thrown with message 'reached elapsed time limit'.

log

logical(1) Enable logging.

threshold

character(1) Logging threshold as defined in futile.logger.

logdir

character(1) Log files directory. When not provided, log messages are returned to stdout.

resultdir

character(1) Job results directory. When not provided, results are returned as an R object (list) to the workspace.

jobname

character(1) Job name that is prepended to log and result files. Default is "BPJOB".

...

Additional arguments passed to makeCluster

Details

MulticoreParam is used for shared memory computing. Under the hood the cluster is created with makeCluster(..., type ="FORK") from the parallel package. If not specified, the default number of workers is determined by multicoreWorkers(), which is parallel::detectCores() - 2. Machines with 3 or less cores are assigned a single worker.

A FORK transport starts workers with the mcfork function and communicates between master and workers using socket connections. mcfork builds on fork() and thus a Linux cluster is not supported. Because FORK clusters are Posix based they are not supported on Windows. When MulticoreParam is created/used in Windows it defaults to SerialParam which is the equivalent of using a single worker.

error handling:

The catch.errors field has been deprecated.

By default all computations are attempted and partial results are returned with any error messages.

  • catch.errors (DEPRECATED) determines whether errors are caught and returned with other results. When TRUE, all computations are attempted and output contains both errors and successfully completed results. When FALSE, the job is terminated as soon as the first error is hit and only the error message is returned (no results); this is the default behavior of the parent packages, e.g., parallel, snow, foreach.

  • stop.on.error A logical. Stops all jobs as soon as one job fails or wait for all jobs to terminate. When FALSE, the return value is a list of successful results along with error messages as 'conditions'.

  • The bpok(x) function returns a logical() vector that is FALSE for any jobs that threw an error. The input x is a list output from a bp*apply function such as bplapply or bpmapply.

logging:

When log = TRUE the futile.logger package is loaded on the workers. All log messages written in the futile.logger format are captured by the logging mechanism and returned real-time (i.e., as each task completes) instead of after all jobs have finished.

Messages sent to stdout and stderr are returned to the workspace by default. When log = TRUE these are diverted to the log output. Those familiar with the outfile argument to makeCluster can think of log = FALSE as equivalent to outfile = NULL; providing a logdir is the same as providing a name for outfile except that BiocParallel writes a log file for each task.

The log output includes additional statistics such as memory use and task runtime. Memory use is computed by calling gc(reset=TRUE) before code evaluation and gc() (no reseet) after. The output of the second gc() call is sent to the log file. There are many ways to track memory use - this particular approach was taken because it is consistent with how the BatchJobs package reports memory on the workers.

log and result files:

Results and logs can be written to a file instead of returned to the workspace. Writing to files is done from the master as each task completes. Options can be set with the logdir and resultdir fields in the constructor or with the accessors, bplogdir and bpresultdir.

random number generation:

MulticoreParam and SnowParam use the random number generation support from the parallel package. These params are snow-derived clusters so the arguments for multicore-derived functions such as mc.set.seed and mc.reset.stream do not apply.

Random number generation is controlled through the param argument, RNGseed which is passed to parallel::clusterSetRNGStream. clusterSetRNGStream uses the L'Ecuyer-CMRG random number generator and distributes streams to the members of a cluster. If RNGseed is not NULL it serves as the seed to the streams, otherwise the streams are set from the current seed of the master process after selecting the L'Ecuyer generator. See ?clusterSetRNGStream for more details.

Constructor

MulticoreParam(workers = multicoreWorkers(), tasks = 0L, catch.errors = TRUE, stop.on.error = FALSE, tasks = 0L, progressbar = FALSE, RNGseed = NULL, timeout = Inf, log = FALSE, threshold = "INFO", logdir = NA_character_, resultdir = NA_character_, jobname = "BPJOB", ...):

Return an object representing a FORK cluster. The cluster is not created until bpstart is called. Named arguments in ... are passed to makeCluster.

Accessors: Logging and results

In the following code, x is a MulticoreParam object.

bpprogress(x), bpprogress(x) <- value: Get or set the value to enable text progress bar. value must be a logical(1).

bpjobname(x), bpjobname(x) <- value: Get or set the job name.

bpRNGseed(x), bpRNGseed(x) <- value: Get or set the seed for random number generaton. value must be a numeric(1).

bplog(x), bplog(x) <- value: Get or set the value to enable logging. value must be a logical(1).

bpthreshold(x), bpthreshold(x) <- value: Get or set the logging threshold. value must be a character(1) string of one of the levels defined in the futile.logger package: “TRACE”, “DEBUG”, “INFO”, “WARN”, “ERROR”, or “FATAL”.

bplogdir(x), bplogdir(x) <- value: Get or set the directory for the log file. value must be a character(1) path, not a file name. The file is written out as LOGFILE.out. If no logdir is provided and bplog=TRUE log messages are sent to stdout.

bpresultdir(x), bpresultdir(x) <- value: Get or set the directory for the result files. value must be a character(1) path, not a file name. Separate files are written for each job with the prefix JOB (e.g., JOB1, JOB2, etc.). When no resultdir is provided the results are returned to the session as list.

Accessors: Back-end control

In the code below x is a MulticoreParam object. See the ?BiocParallelParam man page for details on these accessors.

bpworkers(x)

bpnworkers(x)

bptasks(x), bptasks(x) <- value

bpstart(x)

bpstop(x)

bpisup(x)

bpbackend(x), bpbackend(x) <- value

Accessors: Error Handling

In the code below x is a MulticoreParam object. See the ?BiocParallelParam man page for details on these accessors.

bpcatchErrors(x), bpcatchErrors(x) <- value

bpstopOnError(x), bpstopOnError(x) <- value

Methods: Evaluation

In the code below BPPARAM is a MulticoreParam object. Full documentation for these functions are on separate man pages: see ?bpmapply, ?bplapply, ?bpvec, ?bpiterate and ?bpaggregate.

bpmapply(FUN, ..., MoreArgs=NULL, SIMPLIFY=TRUE, USE.NAMES=TRUE, BPPARAM=bpparam())

bplapply(X, FUN, ..., BPPARAM=bpparam())

bpvec(X, FUN, ..., AGGREGATE=c, BPPARAM=bpparam())

bpiterate(ITER, FUN, ..., BPPARAM=bpparam())

bpaggregate(x, data, FUN, ..., BPPARAM=bpparam())

Methods: Other

In the code below x is a MulticoreParam object.

show(x): Displays the MulticoreParam object.

Author(s)

Martin Morgan mailto:mtmorgan@fhcrc.org and Valerie Obenchain

See Also

  • register for registering parameter classes for use in parallel evaluation.

  • SnowParam for computing in distributed memory

  • BatchJobsParam for computing with cluster schedulers

  • DoparParam for computing with foreach

  • SerialParam for non-parallel evaluation

Examples

## -----------------------------------------------------------------------
## Job configuration:
## -----------------------------------------------------------------------

## MulticoreParam supports shared memory computing. The object fields
## control the division of tasks, error handling, logging and 
## result format.
bpparam <- MulticoreParam()
bpparam

## By default the param is created with the maximum available workers
## determined by multicoreWorkers().
multicoreWorkers()

## Fields are modified with accessors of the same name:
bplog(bpparam) <- TRUE
bpresultdir(bpparam) <- "/myResults/"
bpparam

## -----------------------------------------------------------------------
## Logging:
## -----------------------------------------------------------------------

## When 'log == TRUE' the workers use a custom script (in BiocParallel) 
## that enables logging and access to other job statistics. Log messages 
## are returned as each job completes rather than waiting for all to finish.

## In 'fun', a value of 'x = 1' will throw a warning, 'x = 2' is ok
## and 'x = 3' throws an error. Because 'x = 1' sleeps, the warning
## should return after the error.

X <- 1:3 
fun <- function(x) {
    if (x == 1) {
        Sys.sleep(2)
        if (TRUE & c(TRUE, TRUE))  ## warning
            x 
    } else if (x == 2) { 
        x                          ## ok
    } else if (x == 3) { 
        sqrt("FOO")                ## error
    }
}

## By default logging is off. Turn it on with the bplog()<- setter
## or by specifying 'log = TRUE' in the constructor.
bpparam <- MulticoreParam(3, log = TRUE, stop.on.error = FALSE)
res <- tryCatch({
    bplapply(X, fun, BPPARAM=bpparam)
}, error=identity)
res

## When a 'logdir' location is given the messages are redirected to a file:
## Not run: 
bplogdir(bpparam) <- tempdir()
bplapply(X, fun, BPPARAM = bpparam)
list.files(bplogdir(bpparam))

## End(Not run)

## -----------------------------------------------------------------------
## Managing results:
## -----------------------------------------------------------------------

## By default results are returned as a list. When 'resultdir' is given
## files are saved in the directory specified by job, e.g., 'TASK1.Rda', 
## 'TASK2.Rda', etc.
## Not run: 
bpparam <- MulticoreParam(2, resultdir = tempdir(), stop.on.error = FALSE)
bplapply(X, fun, BPPARAM = bpparam)
list.files(bpresultdir(bpparam))

## End(Not run)

## -----------------------------------------------------------------------
## Error handling:
## -----------------------------------------------------------------------

## When 'stop.on.error' is TRUE the job is terminated as soon as an 
## error is hit. When FALSE, all computations are attempted and partial
## results are returned along with errors. In this example the number of
## 'tasks' is set to equal the length of 'X' so each element is run 
## separately. (Default behavior is to divide 'X' evenly over workers.)

## All results along with error:
bpparam <- MulticoreParam(2, tasks = 4, stop.on.error = FALSE)
res <- bptry(bplapply(list(1, "two", 3, 4), sqrt, BPPARAM = bpparam))
res

## Calling bpok() on the result list returns TRUE for elements with no error.
bpok(res)

## -----------------------------------------------------------------------
## Random number generation:
## -----------------------------------------------------------------------

## Random number generation is controlled with the 'RNGseed' field. 
## This seed is passed to parallel::clusterSetRNGStream
## which uses the L'Ecuyer-CMRG random number generator and distributes
## streams to members of the cluster.

bpparam <- MulticoreParam(3, RNGseed = 7739465)
bplapply(seq_len(bpnworkers(bpparam)), function(i) rnorm(1), BPPARAM = bpparam)

Results


R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
Copyright (C) 2016 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

> library(BiocParallel)
> png(filename="/home/ddbj/snapshot/RGM3/R_BC/result/BiocParallel/MulticoreParam-class.Rd_%03d_medium.png", width=480, height=480)
> ### Name: MulticoreParam-class
> ### Title: Enable multi-core parallel evaluation
> ### Aliases: MulticoreParam MulticoreParam-class multicoreWorkers
> ###   bpisup,MulticoreParam-method bpschedule,MulticoreParam-method
> ###   bpworkers<-,MulticoreParam,numeric-method show,MulticoreParam-method
> ### Keywords: classes methods
> 
> ### ** Examples
> 
> ## -----------------------------------------------------------------------
> ## Job configuration:
> ## -----------------------------------------------------------------------
> 
> ## MulticoreParam supports shared memory computing. The object fields
> ## control the division of tasks, error handling, logging and 
> ## result format.
> bpparam <- MulticoreParam()
> bpparam
class: MulticoreParam
  bpisup: FALSE; bpworkers: 2; bptasks: 0; bpjobname: BPJOB
  bplog: FALSE; bpthreshold: INFO; bpstopOnError: TRUE
  bptimeout: 2592000; bpprogressbar: FALSE
  bpRNGseed: 
  bplogdir: NA
  bpresultdir: NA
  cluster type: FORK
> 
> ## By default the param is created with the maximum available workers
> ## determined by multicoreWorkers().
> multicoreWorkers()
[1] 2
> 
> ## Fields are modified with accessors of the same name:
> bplog(bpparam) <- TRUE
> bpresultdir(bpparam) <- "/myResults/"
> bpparam
class: MulticoreParam
  bpisup: FALSE; bpworkers: 2; bptasks: 0; bpjobname: BPJOB
  bplog: TRUE; bpthreshold: INFO; bpstopOnError: TRUE
  bptimeout: 2592000; bpprogressbar: FALSE
  bpRNGseed: 
  bplogdir: NA
  bpresultdir: /myResults/
  cluster type: FORK
> 
> ## -----------------------------------------------------------------------
> ## Logging:
> ## -----------------------------------------------------------------------
> 
> ## When 'log == TRUE' the workers use a custom script (in BiocParallel) 
> ## that enables logging and access to other job statistics. Log messages 
> ## are returned as each job completes rather than waiting for all to finish.
> 
> ## In 'fun', a value of 'x = 1' will throw a warning, 'x = 2' is ok
> ## and 'x = 3' throws an error. Because 'x = 1' sleeps, the warning
> ## should return after the error.
> 
> X <- 1:3 
> fun <- function(x) {
+     if (x == 1) {
+         Sys.sleep(2)
+         if (TRUE & c(TRUE, TRUE))  ## warning
+             x 
+     } else if (x == 2) { 
+         x                          ## ok
+     } else if (x == 3) { 
+         sqrt("FOO")                ## error
+     }
+ }
> 
> ## By default logging is off. Turn it on with the bplog()<- setter
> ## or by specifying 'log = TRUE' in the constructor.
> bpparam <- MulticoreParam(3, log = TRUE, stop.on.error = FALSE)
> res <- tryCatch({
+     bplapply(X, fun, BPPARAM=bpparam)
+ }, error=identity)
############### LOG OUTPUT ###############
Task: 2
Node: 2
Timestamp: 2016-07-05 17:51:03
Success: TRUE
Task duration:
   user  system elapsed 
      0       0       0 
Memory used:
         used (Mb) gc trigger (Mb) max used (Mb)
Ncells 303043 16.2     592000 31.7   364772 19.5
Vcells 413274  3.2    1023718  7.9   786432  6.0
Log messages:

stderr and stdout:
character(0)
############### LOG OUTPUT ###############
Task: 3
Node: 3
Timestamp: 2016-07-05 17:51:03
Success: FALSE
Task duration:
   user  system elapsed 
  0.004   0.000   0.017 
Memory used:
         used (Mb) gc trigger (Mb) max used (Mb)
Ncells 303098 16.2     592000 31.7   364772 19.5
Vcells 413831  3.2    1023718  7.9   786432  6.0
Log messages:
ERROR [2016-07-05 17:51:03] non-numeric argument to mathematical function

stderr and stdout:
character(0)
############### LOG OUTPUT ###############
Task: 1
Node: 1
Timestamp: 2016-07-05 17:51:05
Success: TRUE
Task duration:
   user  system elapsed 
  0.004   0.000   2.004 
Memory used:
         used (Mb) gc trigger (Mb) max used (Mb)
Ncells 303100 16.2     592000 31.7   364772 19.5
Vcells 413833  3.2    1023718  7.9   786432  6.0
Log messages:
WARN [2016-07-05 17:51:05] the condition has length > 1 and only the first element will be used

stderr and stdout:
character(0)
> res
<bplist_error: BiocParallel errors
  element index: 3
  first error: non-numeric argument to mathematical function>
results and errors available as 'attr(x, "result")'
> 
> ## When a 'logdir' location is given the messages are redirected to a file:
> ## Not run: 
> ##D bplogdir(bpparam) <- tempdir()
> ##D bplapply(X, fun, BPPARAM = bpparam)
> ##D list.files(bplogdir(bpparam))
> ## End(Not run)
> 
> ## -----------------------------------------------------------------------
> ## Managing results:
> ## -----------------------------------------------------------------------
> 
> ## By default results are returned as a list. When 'resultdir' is given
> ## files are saved in the directory specified by job, e.g., 'TASK1.Rda', 
> ## 'TASK2.Rda', etc.
> ## Not run: 
> ##D bpparam <- MulticoreParam(2, resultdir = tempdir(), stop.on.error = FALSE)
> ##D bplapply(X, fun, BPPARAM = bpparam)
> ##D list.files(bpresultdir(bpparam))
> ## End(Not run)
> 
> ## -----------------------------------------------------------------------
> ## Error handling:
> ## -----------------------------------------------------------------------
> 
> ## When 'stop.on.error' is TRUE the job is terminated as soon as an 
> ## error is hit. When FALSE, all computations are attempted and partial
> ## results are returned along with errors. In this example the number of
> ## 'tasks' is set to equal the length of 'X' so each element is run 
> ## separately. (Default behavior is to divide 'X' evenly over workers.)
> 
> ## All results along with error:
> bpparam <- MulticoreParam(2, tasks = 4, stop.on.error = FALSE)
> res <- bptry(bplapply(list(1, "two", 3, 4), sqrt, BPPARAM = bpparam))
> res
[[1]]
[1] 1

[[2]]
<remote_error in FUN(...): non-numeric argument to mathematical function>
traceback() available as 'attr(x, "traceback")'

[[3]]
[1] 1.732051

[[4]]
[1] 2

> 
> ## Calling bpok() on the result list returns TRUE for elements with no error.
> bpok(res)
[1]  TRUE FALSE  TRUE  TRUE
> 
> ## -----------------------------------------------------------------------
> ## Random number generation:
> ## -----------------------------------------------------------------------
> 
> ## Random number generation is controlled with the 'RNGseed' field. 
> ## This seed is passed to parallel::clusterSetRNGStream
> ## which uses the L'Ecuyer-CMRG random number generator and distributes
> ## streams to members of the cluster.
> 
> bpparam <- MulticoreParam(3, RNGseed = 7739465)
> bplapply(seq_len(bpnworkers(bpparam)), function(i) rnorm(1), BPPARAM = bpparam)
[[1]]
[1] 0.8552377

[[2]]
[1] -0.2198241

[[3]]
[1] -0.5324814

> 
> 
> 
> 
> 
> 
> dev.off()
null device 
          1 
>