R: Enable simple network of workstations (SNOW)-style parallel...
SnowParam-class
R Documentation
Enable simple network of workstations (SNOW)-style parallel evaluation
Description
This class is used to parameterize simple network of workstations
(SNOW) parallel evaluation on one or several physical computers.
snowWorkers() chooses the number of workers based on global
user preference (options(mc.cores=...)), or the minimum of 8
and the number of detected cores (detectCores()).
integer(1) Number of workers. Defaults to all cores available as
determined by detectCores. For a SOCK cluster workers
can be a character() vector of host names.
type
character(1) Type of cluster to use. Possible values are
SOCK (default) and MPI. Instead of type=FORK use
MulticoreParam.
tasks
integer(1). The number of tasks per job. value must be a
scalar integer >= 0L.
In this documentation a job is defined as a single call to a function, such
as bplapply, bpmapply etc. A task is the division of the
X argument into chunks. When tasks == 0 (default), X
is divided as evenly as possible over the number of workers.
A tasks value of > 0 specifies the exact number of tasks. Values
can range from 1 (all of X to a single worker) to the length of
X (each element of X to a different worker).
When the length of X is less than the number of workers each
element of X is sent to a worker and tasks is ignored.
catch.errors
DEPRECATED.
logical(1) Enable the catching of errors and warnings.
stop.on.error
logical(1) Enable stop on error.
progressbar
logical(1) Enable progress bar (based on plyr:::progress_text).
RNGseed
integer(1) Seed for random number generation. When not NULL,
this value is passed to parallel::clusterSetRNGStream to generate
random number streams on each worker.
timeout
numeric(1) Time (in seconds) allowed for worker to complete a task.
This value is passed to base::setTimeLimit() as both the cpu and
elapsed arguments. If the computation exceeds timeout an
error is thrown with message 'reached elapsed time limit'.
log
logical(1) Enable logging.
threshold
character(1) Logging threshold as defined in futile.logger.
logdir
character(1) Log files directory. When not provided, log
messages are returned to stdout.
resultdir
character(1) Job results directory. When not provided, results
are returned as an R object (list) to the workspace.
jobname
character(1) Job name that is prepended to log and result files.
Default is "BPJOB".
...
Additional arguments passed to makeCluster
Details
SnowParam is used for distributed memory computing and supports
2 cluster types: ‘SOCK’ (default) and ‘MPI’. The SnowParam
builds on infrastructure in the snow and parallel
packages and provides the additional features of error handling, logging
and writing out results. When not specified, the default number of
workers is determined by snowWorkers() which is
parallel::detectCores() - 2. Machines with 3 or less cores are assigned
a single worker.
error handling:
The catch.errors field has been deprecated.
By default all computations are attempted and partial results
are returned with any error messages.
catch.errors (DEPRECATED)
determines whether errors are caught and
returned with other results. When TRUE, all computations
are attempted and output contains both errors and successfully
completed results. When FALSE, the job is terminated as
soon as the first error is hit and only the error message is
returned (no results); this is the default behavior of the
parent packages, e.g., parallel, snow,
foreach.
stop.on.error A logical. Stops all jobs as soon
as one job fails or wait for all jobs to terminate. When
FALSE, the return value is a list of successful results
along with error messages as 'conditions'.
The bpok(x) function returns a logical() vector
that is FALSE for any jobs that threw an error. The input
x is a list output from a bp*apply function such as
bplapply or bpmapply.
logging:
When log = TRUE the futile.logger package is loaded on
the workers. All log messages written in the futile.logger format
are captured by the logging mechanism and returned real-time
(i.e., as each task completes) instead of after all jobs have finished.
Messages sent to stdout and stderr are returned to
the workspace by default. When log = TRUE these
are diverted to the log output. Those familiar with the outfile
argument to makeCluster can think of log = FALSE as
equivalent to outfile = NULL; providing a logdir is the
same as providing a name for outfile except that BiocParallel
writes a log file for each task.
The log output includes additional statistics such as memory use
and task runtime. Memory use is computed by calling gc(reset=TRUE)
before code evaluation and gc() (no reseet) after. The output of the
second gc() call is sent to the log file. There are many ways to
track memory use - this particular approach was taken because it is
consistent with how the BatchJobs package reports memory on the
workers.
log and result files:
Results and logs can be written to a file instead of returned to
the workspace. Writing to files is done from the master as each task
completes. Options can be set with the logdir and
resultdir fields in the constructor or with the accessors,
bplogdir and bpresultdir.
random number generation:
MulticoreParam and SnowParam use the random number
generation support from the parallel package. These params are
snow-derived clusters so the arguments for multicore-derived functions
such as mc.set.seed and mc.reset.stream do not apply.
Random number generation is controlled through the param argument,
RNGseed which is passed to parallel::clusterSetRNGStream.
clusterSetRNGStream uses the L'Ecuyer-CMRG random number
generator and distributes streams to the members of a cluster. If
RNGseed is not NULL it serves as the seed to the streams,
otherwise the streams are set from the current seed of the master process
after selecting the L'Ecuyer generator. See ?clusterSetRNGStream
for more details.
NOTE: The PSOCK cluster from the parallel package does not
support cluster options scriptdir and useRscript. PSOCK
is not supported because these options are needed to re-direct to an
alternate worker script located in BiocParallel.
Return an object representing a SNOW cluster. The cluster is not
created until bpstart is called. Named arguments in ...
are passed to makeCluster.
Accessors: Logging and results
In the following code, x is a SnowParam object.
bpprogressbar(x), bpprogressbar(x) <- value:
Get or set the value to enable text progress bar.
value must be a logical(1).
bpjobname(x), bpjobname(x) <- value:
Get or set the job name.
bpRNGseed(x), bpRNGseed(x) <- value:
Get or set the seed for random number generaton. value must be a
numeric(1).
bplog(x), bplog(x) <- value:
Get or set the value to enable logging. value must be a
logical(1).
bpthreshold(x), bpthreshold(x) <- value:
Get or set the logging threshold. value must be a
character(1) string of one of the levels defined in the
futile.logger package: “TRACE”, “DEBUG”,
“INFO”, “WARN”, “ERROR”, or “FATAL”.
bplogdir(x), bplogdir(x) <- value:
Get or set the directory for the log file. value must be a
character(1) path, not a file name. The file is written out as
BPLOG.out. If no logdir is provided and bplog=TRUE log
messages are sent to stdout.
bpresultdir(x), bpresultdir(x) <- value:
Get or set the directory for the result files. value must be a
character(1) path, not a file name. Separate files are written for
each job with the prefix TASK (e.g., TASK1, TASK2, etc.). When no
resultdir is provided the results are returned to the session as
list.
Accessors: Back-end control
In the code below x is a SnowParam object. See the
?BiocParallelParam man page for details on these accessors.
In the code below x is a SnowParam object. See the
?BiocParallelParam man page for details on these accessors.
bpcatchErrors(x), bpcatchErrors(x) <- value
bpstopOnError(x), bpstopOnError(x) <- value
Methods: Evaluation
In the code below BPPARAM is a SnowParam object.
Full documentation for these functions are on separate man pages: see
?bpmapply, ?bplapply, ?bpvec, ?bpiterate and
?bpaggregate.
bpok(x):
Returns a logical() vector: FALSE for any jobs that resulted in
an error. x is the result list output by a BiocParallel
function such as bplapply or bpmapply.
Coercion
as(from, "SnowParam"):
Creates a SnowParam object from a SOCKcluster or
spawnedMPIcluster object. Instances created in this way
cannot be started or stopped.
Author(s)
Martin Morgan and Valerie Obenchain.
See Also
register for registering parameter classes for use in
parallel evaluation.
MulticoreParam for computing in shared memory
BatchJobsParam for computing with cluster schedulers
DoparParam for computing with foreach
SerialParam for non-parallel evaluation
Examples
## -----------------------------------------------------------------------
## Job configuration:
## -----------------------------------------------------------------------
## SnowParam supports distributed memory computing. The object fields
## control the division of tasks, error handling, logging and result
## format.
bpparam <- SnowParam()
bpparam
## Fields are modified with accessors of the same name:
bplog(bpparam) <- TRUE
bpresultdir(bpparam) <- "/myResults/"
bpparam
## -----------------------------------------------------------------------
## Logging:
## -----------------------------------------------------------------------
## When 'log == TRUE' the workers use a custom script (in BiocParallel)
## that enables logging and access to other job statistics. Log messages
## are returned as each job completes rather than waiting for all to
## finish.
## In 'fun', a value of 'x = 1' will throw a warning, 'x = 2' is ok
## and 'x = 3' throws an error. Because 'x = 1' sleeps, the warning
## should return after the error.
X <- 1:3
fun <- function(x) {
if (x == 1) {
Sys.sleep(2)
if (TRUE & c(TRUE, TRUE)) ## warning
x
} else if (x == 2) {
x ## ok
} else if (x == 3) {
sqrt("FOO") ## error
}
}
## By default logging is off. Turn it on with the bplog()<- setter
## or by specifying 'log = TRUE' in the constructor.
bpparam <- SnowParam(3, log = TRUE, stop.on.error = FALSE)
tryCatch({
bplapply(X, fun, BPPARAM = bpparam)
}, error=identity)
## When a 'logdir' location is given the messages are redirected to a
## file:
## Not run:
bplogdir(bpparam) <- tempdir()
bplapply(X, fun, BPPARAM = bpparam)
list.files(bplogdir(bpparam))
## End(Not run)
## -----------------------------------------------------------------------
## Managing results:
## -----------------------------------------------------------------------
## By default results are returned as a list. When 'resultdir' is given
## files are saved in the directory specified by job, e.g., 'TASK1.Rda',
## 'TASK2.Rda', etc.
## Not run:
bpparam <- SnowParam(2, resultdir = tempdir())
bplapply(X, fun, BPPARAM = bpparam)
list.files(bpresultdir(bpparam))
## End(Not run)
## -----------------------------------------------------------------------
## Error handling:
## -----------------------------------------------------------------------
## When 'stop.on.error' is TRUE the process returns as soon as an error
## is thrown.
## When 'stop.on.error' is FALSE all computations are attempted. Partial
## results are returned along with errors. Use bptry() to see the
## partial results
bpparam <- SnowParam(2, stop.on.error = FALSE)
res <- bptry(bplapply(list(1, "two", 3, 4), sqrt, BPPARAM = bpparam))
res
## Calling bpok() on the result list returns TRUE for elements with no
## error.
bpok(res)
## -----------------------------------------------------------------------
## Random number generation:
## -----------------------------------------------------------------------
## Random number generation is controlled with the 'RNGseed' field.
## This seed is passed to parallel::clusterSetRNGStream
## which uses the L'Ecuyer-CMRG random number generator and distributes
## streams to members of the cluster.
bpparam <- SnowParam(3, RNGseed = 7739465)
bplapply(seq_len(bpnworkers(bpparam)), function(i) rnorm(1),
BPPARAM = bpparam)
Results
R version 3.3.1 (2016-06-21) -- "Bug in Your Hair"
Copyright (C) 2016 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
> library(BiocParallel)
> png(filename="/home/ddbj/snapshot/RGM3/R_BC/result/BiocParallel/SnowParam-class.Rd_%03d_medium.png", width=480, height=480)
> ### Name: SnowParam-class
> ### Title: Enable simple network of workstations (SNOW)-style parallel
> ### evaluation
> ### Aliases: SnowParam SnowParam-class snowWorkers
> ### bpbackend,SnowParam-method bpbackend<-,SnowParam,cluster-method
> ### bpisup,SnowParam-method bpstart,SnowParam-method
> ### bpstop,SnowParam-method bpworkers,SnowParam-method
> ### bpworkers<-,SnowParam,numeric-method
> ### bpworkers<-,SnowParam,character-method bplog,SnowParam-method
> ### bplog<-,SnowParam,logical-method bpthreshold,SnowParam-method
> ### bpthreshold<-,SnowParam,character-method bpRNGseed bpRNGseed<-
> ### bpRNGseed,SnowParam-method bpRNGseed<-,SnowParam,numeric-method
> ### bplogdir bplogdir<- bplogdir,SnowParam-method
> ### bplogdir<-,SnowParam,character-method bpresultdir bpresultdir<-
> ### bpresultdir,SnowParam-method bpresultdir<-,SnowParam,character-method
> ### coerce,SOCKcluster,SnowParam-method
> ### coerce,spawnedMPIcluster,SnowParam-method show,SnowParam-method
> ### Keywords: classes methods
>
> ### ** Examples
>
>
> ## -----------------------------------------------------------------------
> ## Job configuration:
> ## -----------------------------------------------------------------------
>
> ## SnowParam supports distributed memory computing. The object fields
> ## control the division of tasks, error handling, logging and result
> ## format.
> bpparam <- SnowParam()
> bpparam
class: SnowParam
bpisup: FALSE; bpworkers: 2; bptasks: 0; bpjobname: BPJOB
bplog: FALSE; bpthreshold: INFO; bpstopOnError: TRUE
bptimeout: 2592000; bpprogressbar: FALSE
bpRNGseed:
bplogdir: NA
bpresultdir: NA
cluster type: SOCK
>
> ## Fields are modified with accessors of the same name:
> bplog(bpparam) <- TRUE
> bpresultdir(bpparam) <- "/myResults/"
> bpparam
class: SnowParam
bpisup: FALSE; bpworkers: 2; bptasks: 0; bpjobname: BPJOB
bplog: TRUE; bpthreshold: INFO; bpstopOnError: TRUE
bptimeout: 2592000; bpprogressbar: FALSE
bpRNGseed:
bplogdir: NA
bpresultdir: /myResults/
cluster type: SOCK
>
> ## -----------------------------------------------------------------------
> ## Logging:
> ## -----------------------------------------------------------------------
>
> ## When 'log == TRUE' the workers use a custom script (in BiocParallel)
> ## that enables logging and access to other job statistics. Log messages
> ## are returned as each job completes rather than waiting for all to
> ## finish.
>
> ## In 'fun', a value of 'x = 1' will throw a warning, 'x = 2' is ok
> ## and 'x = 3' throws an error. Because 'x = 1' sleeps, the warning
> ## should return after the error.
>
> X <- 1:3
> fun <- function(x) {
+ if (x == 1) {
+ Sys.sleep(2)
+ if (TRUE & c(TRUE, TRUE)) ## warning
+ x
+ } else if (x == 2) {
+ x ## ok
+ } else if (x == 3) {
+ sqrt("FOO") ## error
+ }
+ }
>
> ## By default logging is off. Turn it on with the bplog()<- setter
> ## or by specifying 'log = TRUE' in the constructor.
> bpparam <- SnowParam(3, log = TRUE, stop.on.error = FALSE)
> tryCatch({
+ bplapply(X, fun, BPPARAM = bpparam)
+ }, error=identity)
starting worker localhost:11149
starting worker localhost:11149
starting worker localhost:11149
############### LOG OUTPUT ###############
Task: 2
Node: 2
Timestamp: 2016-07-05 17:51:08
Success: TRUE
Task duration:
user system elapsed
0 0 0
Memory used:
used (Mb) gc trigger (Mb) max used (Mb)
Ncells 304009 16.3 592000 31.7 378451 20.3
Vcells 412615 3.2 1023718 7.9 786432 6.0
Log messages:
stderr and stdout:
character(0)
############### LOG OUTPUT ###############
Task: 3
Node: 3
Timestamp: 2016-07-05 17:51:08
Success: FALSE
Task duration:
user system elapsed
0.008 0.000 0.014
Memory used:
used (Mb) gc trigger (Mb) max used (Mb)
Ncells 304032 16.3 592000 31.7 378451 20.3
Vcells 413509 3.2 1023718 7.9 786432 6.0
Log messages:
ERROR [2016-07-05 17:51:08] non-numeric argument to mathematical function
stderr and stdout:
character(0)
############### LOG OUTPUT ###############
Task: 1
Node: 1
Timestamp: 2016-07-05 17:51:10
Success: TRUE
Task duration:
user system elapsed
0.000 0.000 2.004
Memory used:
used (Mb) gc trigger (Mb) max used (Mb)
Ncells 304034 16.3 592000 31.7 378451 20.3
Vcells 413511 3.2 1023718 7.9 786432 6.0
Log messages:
WARN [2016-07-05 17:51:10] the condition has length > 1 and only the first element will be used
stderr and stdout:
character(0)
Warning message:
In if (TRUE & c(TRUE, TRUE)) x :
the condition has length > 1 and only the first element will be used
<bplist_error: BiocParallel errors
element index: 3
first error: non-numeric argument to mathematical function>
results and errors available as 'attr(x, "result")'
>
> ## When a 'logdir' location is given the messages are redirected to a
> ## file:
> ## Not run:
> ##D bplogdir(bpparam) <- tempdir()
> ##D bplapply(X, fun, BPPARAM = bpparam)
> ##D list.files(bplogdir(bpparam))
> ## End(Not run)
>
> ## -----------------------------------------------------------------------
> ## Managing results:
> ## -----------------------------------------------------------------------
>
> ## By default results are returned as a list. When 'resultdir' is given
> ## files are saved in the directory specified by job, e.g., 'TASK1.Rda',
> ## 'TASK2.Rda', etc.
> ## Not run:
> ##D bpparam <- SnowParam(2, resultdir = tempdir())
> ##D bplapply(X, fun, BPPARAM = bpparam)
> ##D list.files(bpresultdir(bpparam))
> ## End(Not run)
>
> ## -----------------------------------------------------------------------
> ## Error handling:
> ## -----------------------------------------------------------------------
>
> ## When 'stop.on.error' is TRUE the process returns as soon as an error
> ## is thrown.
>
> ## When 'stop.on.error' is FALSE all computations are attempted. Partial
> ## results are returned along with errors. Use bptry() to see the
> ## partial results
> bpparam <- SnowParam(2, stop.on.error = FALSE)
> res <- bptry(bplapply(list(1, "two", 3, 4), sqrt, BPPARAM = bpparam))
starting worker localhost:11149
starting worker localhost:11149
> res
[[1]]
[1] 1
[[2]]
<remote_error in FUN(...): non-numeric argument to mathematical function>
traceback() available as 'attr(x, "traceback")'
[[3]]
[1] 1.732051
[[4]]
[1] 2
>
> ## Calling bpok() on the result list returns TRUE for elements with no
> ## error.
> bpok(res)
[1] TRUE FALSE TRUE TRUE
>
> ## -----------------------------------------------------------------------
> ## Random number generation:
> ## -----------------------------------------------------------------------
>
> ## Random number generation is controlled with the 'RNGseed' field.
> ## This seed is passed to parallel::clusterSetRNGStream
> ## which uses the L'Ecuyer-CMRG random number generator and distributes
> ## streams to members of the cluster.
>
> bpparam <- SnowParam(3, RNGseed = 7739465)
> bplapply(seq_len(bpnworkers(bpparam)), function(i) rnorm(1),
+ BPPARAM = bpparam)
starting worker localhost:11149
starting worker localhost:11149
starting worker localhost:11149
[[1]]
[1] 0.8552377
[[2]]
[1] -0.2198241
[[3]]
[1] -0.5324814
>
>
>
>
>
>
> dev.off()
null device
1
>