Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
^benchmark$
^CRAN-RELEASE$
^.*\.Rproj$
^\.Rproj\.user$
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,8 @@ tests/testthat/Rplots.pdf
cran-comments.md
CRAN-RELEASE
release-prep.R

agent/*
data/*
scratch-files/*
notes/*
3 changes: 3 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Depends:
Imports:
checkmate,
matrixStats (>= 0.52),
mirai,
mori,
parallel,
posterior (>= 1.7.0),
stats
Expand All @@ -62,3 +64,4 @@ LazyData: TRUE
Roxygen: list(markdown = TRUE)
SystemRequirements: pandoc (>= 1.12.3), pandoc-citeproc
Config/roxygen2/version: 8.0.0
RoxygenNote: 7.3.3
101 changes: 44 additions & 57 deletions R/effective_sample_sizes.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,29 @@ relative_eff.array <- function(x, ..., cores = getOption("mc.cores", 1)) {
stopifnot(length(dim(x)) == 3)
S <- prod(dim(x)[1:2]) # posterior sample size = iter * chains

if (cores == 1) {
n_eff_vec <- apply(x, 3, posterior::ess_mean)
} else {
if (!os_is_windows()) {
n_eff_list <-
parallel::mclapply(
mc.cores = cores,
X = seq_len(dim(x)[3]),
FUN = function(i) posterior::ess_mean(x[, , i, drop = TRUE])
)
} else {
cl <- parallel::makePSOCKcluster(cores)
on.exit(parallel::stopCluster(cl))
n_eff_list <-
parallel::parLapply(
cl = cl,
X = seq_len(dim(x)[3]),
fun = function(i) posterior::ess_mean(x[, , i, drop = TRUE])
)
}
n_eff_vec <- unlist(n_eff_list, use.names = FALSE)
}
# The full draws array is reused across observations, so it is broadcast via
# shared memory on a local pool. Each worker reads only its slice `x[, , i]`.
n_eff_list <- with_loo_daemons(
cores,
loo_map(
seq_len(dim(x)[3]),
relative_eff_i_array,
cores = cores,
broadcast = list(x = x)
)
)
n_eff_vec <- unlist(n_eff_list, use.names = FALSE)

return(n_eff_vec / S)
}

#' Worker computing `ess_mean()` for a single slice of a draws array
#' @noRd
#' @keywords internal
relative_eff_i_array <- function(i, x) {
posterior::ess_mean(x[, , i, drop = TRUE])
}

#' @export
#' @templateVar fn relative_eff
#' @template function
Expand All @@ -104,46 +101,36 @@ relative_eff.function <-
f_i <- validate_llfun(x) # not really an llfun, should return exp(ll) or exp(-ll)
N <- dim(data)[1]

if (cores == 1) {
n_eff_list <-
lapply(
X = seq_len(N),
FUN = function(i) {
val_i <- f_i(data_i = data[i, , drop = FALSE], draws = draws, ...)
relative_eff.default(as.vector(val_i), chain_id = chain_id, cores = 1)
}
)
} else {
if (!os_is_windows()) {
n_eff_list <-
parallel::mclapply(
X = seq_len(N),
FUN = function(i) {
val_i <- f_i(data_i = data[i, , drop = FALSE], draws = draws, ...)
relative_eff.default(as.vector(val_i), chain_id = chain_id, cores = 1)
},
mc.cores = cores
)
} else {
cl <- parallel::makePSOCKcluster(cores)
parallel::clusterExport(cl=cl, varlist=c("draws", "chain_id", "data"), envir=environment())
on.exit(parallel::stopCluster(cl))
n_eff_list <-
parallel::parLapply(
cl = cl,
X = seq_len(N),
fun = function(i) {
val_i <- f_i(data_i = data[i, , drop = FALSE], draws = draws, ...)
relative_eff.default(as.vector(val_i), chain_id = chain_id, cores = 1)
}
)
}
}
# `data` and `draws` are reused for every observation, so they are broadcast
# via shared memory on a local pool and serialized on a remote pool.
n_eff_list <- with_loo_daemons(
cores,
loo_map(
seq_len(N),
relative_eff_i_function,
f_i = f_i,
chain_id = chain_id,
re_dots = list(...),
cores = cores,
broadcast = list(data = data, draws = draws)
)
)

n_eff_vec <- unlist(n_eff_list, use.names = FALSE)
return(n_eff_vec)
}

#' Worker computing the relative effective sample size for observation `i`
#' @noRd
#' @keywords internal
relative_eff_i_function <- function(i, f_i, data, draws, chain_id, re_dots) {
val_i <- do.call(
f_i,
c(list(data_i = data[i, , drop = FALSE], draws = draws), re_dots)
)
relative_eff.default(as.vector(val_i), chain_id = chain_id, cores = 1)
}

#' @export
#' @describeIn relative_eff
#' If `x` is an object of class `"psis"`, `relative_eff()` simply returns
Expand Down
61 changes: 38 additions & 23 deletions R/importance_sampling.R
Original file line number Diff line number Diff line change
Expand Up @@ -198,29 +198,23 @@ do_importance_sampling <- function(log_ratios, r_eff, cores, method) {
stop("Incorrect IS method.")
}

if (cores == 1) {
lw_list <- lapply(seq_len(N), function(i)
is_fun(log_ratios_i = log_ratios[, i], tail_len_i = tail_len[i]))
} else {
if (!os_is_windows()) {
lw_list <- parallel::mclapply(
X = seq_len(N),
mc.cores = cores,
FUN = function(i)
is_fun(log_ratios_i = log_ratios[, i], tail_len_i = tail_len[i])
)
} else {
cl <- parallel::makePSOCKcluster(cores)
on.exit(parallel::stopCluster(cl))
lw_list <-
parallel::parLapply(
cl = cl,
X = seq_len(N),
fun = function(i)
is_fun(log_ratios_i = log_ratios[, i], tail_len_i = tail_len[i])
)
}
}
# Each observation needs a different column of `log_ratios`, but the whole
# matrix is reused across the map, so it is a broadcast object: `loo_map()`
# shares it via shared memory on a local pool (zero-copy column access) and
# falls back to serialization on a remote pool. Serial work runs as a plain
# lapply(). `with_loo_daemons()` provides a pool when this is the top-level
# call (e.g. psis()) and reuses an outer pool when called from loo().
lw_list <- with_loo_daemons(
cores,
loo_map(
seq_len(N),
do_is_i,
is_fun = is_fun,
tail_len = tail_len,
cores = cores,
broadcast = list(log_ratios = log_ratios)
)
)

log_weights <- psis_apply(lw_list, "log_weights", fun_val = numeric(S))
pareto_k <- psis_apply(lw_list, "pareto_k")
Expand All @@ -234,3 +228,24 @@ do_importance_sampling <- function(log_ratios, r_eff, cores, method) {
method = rep(method, length(pareto_k)) # Conform to other attr that exist per obs.
)
}

#' Apply an importance sampling method to a single observation
#'
#' @noRd
#' @keywords internal
#' @description
#' Worker function mapped over observations (matrix columns) by
#' [do_importance_sampling()], either serially via [lapply()] or in parallel
#' via [mirai::mirai_map()].
#' @param i Integer column index of the observation to process.
#' @param is_fun The per-observation importance sampling function to apply, one
#' of [do_psis_i()], [do_tis_i()], or [do_sis_i()].
#' @param log_ratios Matrix of log ratios (`-loglik`). May be a shared-memory
#' object created by [mori::share()] to avoid copying to each worker.
#' @param tail_len Vector of tail lengths used to fit the GPD, one per
#' observation.
#' @return The result of `is_fun` for observation `i` (a list with elements
#' such as `log_weights` and `pareto_k`).
do_is_i <- function(i, is_fun, log_ratios, tail_len) {
is_fun(log_ratios_i = log_ratios[, i], tail_len_i = tail_len[i])
}
67 changes: 19 additions & 48 deletions R/loo.R
Original file line number Diff line number Diff line change
Expand Up @@ -665,52 +665,23 @@ parallel_importance_sampling_list <- function(N, .loo_i, .llfun,
data, draws, r_eff,
save_psis, cores,
method, ...){
if (cores == 1) {
psis_list <-
lapply(
X = seq_len(N),
FUN = .loo_i,
llfun = .llfun,
data = data,
draws = draws,
r_eff = r_eff,
save_psis = save_psis,
is_method = method,
...
)
} else {
if (!os_is_windows()) {
# On Mac or Linux use mclapply() for multiple cores
psis_list <-
parallel::mclapply(
mc.cores = cores,
X = seq_len(N),
FUN = .loo_i,
llfun = .llfun,
data = data,
draws = draws,
r_eff = r_eff,
save_psis = save_psis,
is_method = method,
...
)
} else {
# On Windows use makePSOCKcluster() and parLapply() for multiple cores
cl <- parallel::makePSOCKcluster(cores)
on.exit(parallel::stopCluster(cl))
psis_list <-
parallel::parLapply(
cl = cl,
X = seq_len(N),
fun = .loo_i,
llfun = .llfun,
data = data,
draws = draws,
r_eff = r_eff,
save_psis = save_psis,
is_method = method,
...
)
}
}
# `draws` (and `data`) are reused identically for every observation, so they
# are broadcast objects: shared once via shared memory on a local pool
# (recovering the copy-on-write benefit fork gave the old mclapply() path)
# and serialized on a remote pool. A single cross-platform code path replaces
# the previous mclapply()/parLapply() branching.
with_loo_daemons(
cores,
loo_map(
seq_len(N),
.loo_i,
llfun = .llfun,
r_eff = r_eff,
save_psis = save_psis,
is_method = method,
...,
cores = cores,
broadcast = list(data = data, draws = draws)
)
)
}
27 changes: 18 additions & 9 deletions R/loo_model_weights.R
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,24 @@ loo_model_weights.default <-
N <- ncol(x[[1]]) # number of data points
validate_log_lik_list(x)
validate_r_eff_list(r_eff_list, K, N)
lpd_point <- matrix(NA, N, K)
elpd_loo <- rep(NA, K)
for (k in 1:K) {
r_eff_k <- r_eff_list[[k]] # possibly NULL
log_likelihood <- x[[k]]
loo_object <- loo(log_likelihood, r_eff = r_eff_k, cores = cores)
lpd_point[, k] <- loo_object$pointwise[, "elpd_loo"] #calculate log(p_k (y_i | y_-i))
elpd_loo[k] <- loo_object$estimates["elpd_loo", "Estimate"]
}
# Establish a single daemon pool for all K models so each inner loo()
# reuses it instead of spinning a pool up and down K times.
loo_objects <- with_loo_daemons(
cores,
lapply(seq_len(K), function(k) {
loo(x[[k]], r_eff = r_eff_list[[k]], cores = cores)
})
)
lpd_point <- vapply(
loo_objects,
function(o) o$pointwise[, "elpd_loo"], #calculate log(p_k (y_i | y_-i))
FUN.VALUE = numeric(N)
)
elpd_loo <- vapply(
loo_objects,
function(o) o$estimates["elpd_loo", "Estimate"],
FUN.VALUE = numeric(1)
)
} else if (is.psis_loo(x[[1]])) {
validate_psis_loo_list(x)
lpd_point <- do.call(cbind, lapply(x, function(obj) obj$pointwise[, "elpd_loo"]))
Expand Down
Loading
Loading