Spark Backend - RConsortium/ddR GitHub Wiki
Based on our discussions we have decided to use sparklyR
for our spark backend.
The following data structures need to be implemented
Currently sparklyR
only support data frames
. We would need to call spark
primitives from the apache spark
library by invoking static api class.
The following files would be created to implement a spark
backend
zzz.R
will be responsible for initialization of the backend with.onAttach
.driver.R
contains all logic for the driver code with methods likedo_mapply
,shutdown
etc.object.R
will contains methods likeget_parts
,do_collect
andcombine
.utils.R
will contain all the utility methods for handlingrdds
.ops.R
will contains the implementations forrbind
,cbind
etc.
Data Structures
dframe
Basic functionalities like rbind
, cbind
could be achieved by using sdf_bind_rows
and sdf_bind_cols
. names
and colnames
are supported out of the box by the sparklyr
api.
Summary operations like colSums
, colMeans
, rowSums
, sum
, mean
can be implemented by checking if the columns are numeric.
dlist and darray
Currently rdd
support is not available in sparklyr
. This would mean to access org.apache.spark.api.r.RRDD
class by invoking java objects in sparklyr
. Initial implementation of this can be found here. darray
could be potentially built using spark distributed matrix api.
Functions
dmapply
Given below the signature of dmapply
setMethod("do_dmapply",
signature(driver="DistributedR.ddR",func="function"),
function(driver,func,...,MoreArgs=list(),
output.type = c("dlist","dframe","darray","sparse_darray"),
nparts=NULL,
combine=c("default","c","rbind","cbind"))
Combine
For dlist and drray we would like to expose combine
with all options on spark. However dframe we would only expose the default as the combine option.
Partitions (nparts)
Currently the way ddr
is by assembling chunks into distributed lists, matrices, and dataframes through (key, chunk) pairs. These chunks are list, matrices or dataframes which are stitched back together after data processing. This allows a user to gain fine grained control over the way data is portioned over nodes.
We would want spark to manage this as the number of partitions are same number of blocks as you see in HDFS (for preserving data locality) or number of CPU cores, which done as default.
We would like to keep nparts
to be NULL by default. This would preserve sparks
internal partitioning system.
lapply
Spark currently supports lapply however this operation is not lazy.