Overview - RConsortium/ddR GitHub Wiki
Architecture: The APIs below assume the following
-
There is one master R process with which the user is interacting
-
There are multiple worker R processes on which most of the processing occurs. The workers may reside on different physical machines. As an example scenario, two 12-core servers may have 1 R master and 24 R workers.
Distributed Data-structures We will provide distributed version of R’s# traditional data-structures. Let’s call them darray, dframe, and dlist corresponding to array, data.frame, and list in R.
-
dlist: distributed list
dlist(A,B,C,…, nparts, psize): construct a dlist from elements. Similar to list() convention i. A,B,C are dlists or regular R lists ii. “nparts” indicates how many partitions should be created from the input data. Default is equal to the number of workers. iii. “psize” indicates number of elements in each partition. It is optional. Default sets psize to 1/#workers of the total length of the list
- as.dlist(input, nparts, psize): convert an R list into a distributed list i. “input” is an ordinary R object ii. “nparts” indicates how many partitions should be created from the input data. Default is equal to the number of workers. iii. “psize” indicates number of elements in each partition. It is optional. Default sets psize to 1/#workers of the total length of the list
-
darray: distributed array
darray(dim, data, sparse, nparts, psize): creates distributed array. i. “dim” is the dimension of the array ii. “data” is the initial value of all elements in the array. Default is 0. iii. “sparse” logical. Indicates if array is stored as a sparse matrix. Default is false. iv. “nparts” indicates how many partitions should be created from the input data. If used data is row partitioned. Default is equal to the number of workers.
v. “psize” size of each partition as a vector specifying the number of rows and columns. If missing, the array is row divided into #partitions=#workers, i.e., each partition has dim[1]/#worker rows and dim[2] columns.
- as.darray(input, nparts, psize): convert an R array into a distributed array i. “input” is an ordinary R array ii. “nparts” indicates how many partitions should be created from the input data. If used data is row partitioned. Default is equal to the number of workers. iii. “psize” size of each partition as a vector specifying the number of rows and columns. If missing, the array is row partitioned with #partitions=#workers, i.e., each partition has dim(input)[1]/#worker rows and dim(input)[2] columns.
-
dframe: distributed data-frame
dframe(dim, nparts, psize): creates distributed data frame. i. “dim” is the dimension of the data frame. If missing, results in an empty dframe ii. “nparts” indicates how many partitions should be created from the input data. If used data is row partitioned. Default is equal to the number of workers. iii. “psize” size of each partition as a vector specifying the number of rows and columns. If missing, the array is row divided into #partitions=#workers, i.e., each partition has dim[1]/#worker rows and dim[2] columns.
- as.dframe(input, nparts, psize): convert an R data.frame into a distributed data.frame i. “input” is an ordinary R data.frame ii. “nparts” indicates how many partitions should be created from the input data. If used data is row partitioned. Default is equal to the number of workers. iii. “psize” size of each partition as a vector specifying the number of rows and columns. If missing, the array is row partitioned with #partitions=#workers, i.e., each partition has dim(input)[1]/#worker rows and dim(input)[2] columns.
-
split(A, f): split an existing distributed object. Same syntax as R’s split(), except the return type is a dlist, dframe, or darray
-
Y=parts(A): return the set of partitions as a list of distributed objects
“A” is a dlist, dframe or darray “Y” is a list. Each element in a list points to a distributed object (in some sense a filtered view of the distributed object). E.g., Y1 refers to the first partition of A.
-
collect(A): return the data-structure A at the master
“A” is a dlist, dframe or darray E.g., collect(parts(A)1) will return the first partition, collect(A) will return the whole data-structure
Q1) I don’t want to handle how data is partitioned. Should I care about arguments such as “nparts” and “psize”? Ans.) The API does not require you to specify details of the partitions. We expect most users to simply use default values wherein the runtime takes care of partitioning details. For example, users will typically create a darray by just stating the array dimensions and the content: darray(dim=c(100,100), data=22). The runtime will figure out the number of partitions and their sizes. Similarly, when data is loaded from an external source, partitioning may be implicit and dependent on the external source. Partition specific arguments are only for those users who want low-level control over their data to optimize their applications. Q2) What is the difference between dlist(x) and as.dlist(x)? Both can take a list as an argument. Ans.) dlist(x) makes 'x' the first element of the list, regardless of whether 'x' is a list or not, i.e., there is nesting of lists. as.dlist() behaves analogously to as.list(), so as.dlist(x) would satisfy length(x)==length(as.dlist(x)) if 'x' is a list. Calling as.dlist() on a matrix/array would make each cell of the matrix an element in the list. The partitioning would be carried over directly. Distributed processing Our goal is to provide easy to use functions to express distributed computations. Proposed syntax for mapply on dlists, darrays, and dframes:
-
A<-mapply(FUN,X,Y, MoreArgs=…, SIMPLIFY): apply the function FUN on each element of X and Y.
“X” and “Y” are the input distributed objects “A” is the output returned as a distributed object “FUN” is a function which acts on partitions of X “MoreArgs” are arguments made available to each invocation of FUN “SIMPLIFY” attempts to reduce the result to a darray or dframe. Default is TRUE.
Q1) How do I pass all the contents of dlist to the lapply function? How do I pass inputs that are just normal R objects, i.e., not distributed? Ans.) Use “MoreArgs”. Here is an example: #X,Y,Z are dlists A<-mapply(FUN(x,y,z1,z2), X,Y,MoreArgs=list(z1=Z,z2=c(10,10))) In the above case, FUN will be applied to each element of X and Y. Additionally, each invocation of FUN will receive the full dlist Z and the vector c(10,10).
Q2) How do I return more than one data-structure from mapply? Ans.) You will need to combine multiple results into a single list. Example code: #X, Y are dlists A<-mapply(FUN(x,y,z){return list(x,y,z)}, X,Y, MoreArgs=list(z=22))
Q3) How do I apply a function on partitions of a distributed data structure? Ans.) Use mapply on parts(X), where X is the dlist. Example code: #X is a dlist A<-mapply(FUN(x){}, parts(X))
Q4) How do I pass partition ids when working with partitions of a distributed data-structure? Ans.) Use mapply on parts(X) and pass an index counter. Example code: #X is a dlist A<-mapply(FUN(id, x){}, 1:length(parts(X), parts(X)) Q5) What if I have to pass arbitrary dlist partitions to lapply? For example, I want to write a function that takes first partition of X and second partition of Y. Ans.) Use the corresponding partitions of dlist with mapply. #X, Y are dlists A<-mapply(FUN(x,y){}, parts(X)1, parts(Y)2) Q6) Can I use lapply instead of mapply? Ans.) Yes. Lapply will internally call mapply that backends implement. Therefore, a backend only needs to support mapply. Q7) The input to mapply are a mix of distributed arrays and list. Will the output be a dlist? Ans.) It depends. The mapply function has an argument called “SIMPLIFY” which tries to reduce the output into a darray or dframe. However, this reduction is successful only if each partition of the output is well formed and can be combined into a meaningful darray or dframe. For example, it there are unequal number of rows in each output partition, the result cannot be reduced to a darray or dframe. By default mapply binds output by columns, i.e. there will be a column for each input element of X. Q8) How do I load data from files in local and distributed filesystems? Ans.) File loaders can be implemented using distributed data-structures and mapply as specified in the API. We expect contributors to write packages for loading data from different sources.
You may also want to use visual panels to communicate related information, tips or things users need to be aware of. Related articles Related articles appear here based on the labels you select. Click to edit the macro and add or change labels.
Related issues