Modin - w4111/w4111.github.io GitHub Wiki

Here are the UNIs and names of this team, as well as what each person contributed:

  • jm5530 Jacob Million
    • High level overview, Tutorial, and Example
  • mc5672 Michael Carrion
    • The Problem and Solution, Alternatives & Why Modin
  • kd2990 Kaushal Damania
    • Selinger Optimizer and Modin
  • ap4623 Abhishek Paul
    • Advanced Debugging in Modin, Modin with Cloud Storage

Modin

High Level Overview

Modin is built on research that comes out of UC Berkeley. It is comparable to Pandas, a popular library for programming and data science, but allows for better scalability, and is better for working with larger data sets. While Pandas is single threaded, Modin can be run on multiple CPUs at the same time, allowing for better results for operations that allow parallel processing.

The Problem and Solution

  • Explain the problem that it solves.
  • How does the technology solve the problem?
  • How it relates to concpts from 4111.
  • NOTE: illustrating the relationship with concepts from this class IS BY FAR THE MOST IMPORTANT ASPECT of this extra credit assignment

Remember, the more detailed and thorough you contrast the technology with the topics in class the better. We've discussed the relational model, constraints, SQL features, transactions, query execution and optimization, and recovery. Pick a subset of topics to really dive deep and compare and contrast the similarities and differences between our discussion in class and the technology.


Alternatives & Why Modin

  • What are the alternatives, and what are the pros and cons of the technology compared with alternatives? (What makes it unique?)

Alternatives –

  1. Standard/Vanilla Pandas

  2. Dask DataFrame (DaskDF):
    DaskDF extends pandas by partitioning large DataFrames into smaller pandas DataFrames and scheduling their execution across multiple cores/distributed systems. Using lazy evaluation, DaskDF constructs a task graph that executes when triggered, optimizing task dependencies and resource usage. DaskDF is part of the broader Dask ecosystem and integrates well with other Dask tools (such as Dask Distributed, Dask-ML, or Dask-Kubernetes).

  3. Koalas:
    Koalas offers a pandas-like API on top of Apache Spark, allowing users to leverage Spark’s distributed computing capabilities without having to learn Spark’s native APIs. It bridges the gap between pandas, which is optimized for single-node execution, and Spark, which excels at processing large datasets across distributed clusters. Koalas also uses lazy evaluation to optimize and defer computations until explicitly executed.

Why Modin? Pros and Cons –

  1. Partitioning and Parallelization

    Modin:
    Modin supports both row- and column-oriented partitioning. Further, Modin can reshape the partitioning as necessary for the corresponding operation, based on whether the operation is row-parallel, column-parallel, or cell-parallel (independently applied to each unit cell). This approach enables Modin to support both row-parallel and column-parallel operations efficiently, such as transpose, median, and quantile.

    DaskDF and Koalas:
    Both DaskDF and Koalas partition DataFrames row-wise. Conceptually, the DataFrame is broken down into horizontal partitions along rows, where each partition is independently processed if possible. (NB: This approach is analogous to relational databases!) This strategy works well for operations like row-based filters or simple transformations but is less efficient for column-based computations such as statistical aggregations.

  2. API Coverage

    Modin:
    Modin supports over 90% of the pandas API, making it one of the most comprehensive scalable pandas alternatives. This high coverage ensures that existing pandas workflows can run on Modin with minimal changes.

    DaskDF and Koalas:
    Both DaskDF and Koalas support approximately 55% of the pandas API. Specifically, they do not implement certain functionality that would deviate from the row-wise partitioning approach. For example, DaskDF doesn’t support iloc, median, quantile, or column-wise operations such as apply(axis=0).

  3. Execution Semantics

    Modin:
    Modin employs eager execution, where operations are executed immediately upon invocation. This mirrors pandas’ behavior, making it intuitive for users already familiar with pandas (and requires no code modifications unlike DaskDF and Koalas).

    DaskDF and Koalas:
    Both DaskDF and Koalas use lazy evaluation, deferring computation until explicitly triggered (i.e., by calling .compute() in DaskDF or .to_pandas() in Koalas). Lazy evaluation enables optimization of task graphs or query plans but may introduce additional latency for execution. Further, this places a lot of optimization responsibility on the user, forcing them to think about when it would be useful to inspect the intermediate results or to delay doing so.

  4. Ordering Semantics

    Modin:
    By default, pandas preserves the order of the DataFrame, so users can expect a consistent, ordered view as they operate on their DataFrame. Modin preserves pandas’ ordering semantics, ensuring consistency in indexing and result order. This makes it easier to adopt Modin for workflows that rely on the precise ordering of rows or columns.

    DaskDF and Koalas:
    Neither DaskDF nor Koalas guarantee the preservation of row order in a DataFrame. For example, DaskDF sorts the index for optimization purposes, which results in occasional deviations from pandas’ strict ordering semantics. Further, DaskDF does not support multi-indexing or sorting (unlike Modin, which supports both of these capabilities).

  5. Compatibility with Computational Frameworks

    Modin:
    Modin is designed to run on a variety of systems and to support a variety of APIs. Currently, Modin supports running on both Dask and Ray (a separate framework designed for building scalable/distributed applications). Additionally, Modin is continually expanding to support other popular data processing APIs (such as SQL).

    DaskDF and Koalas:
    DaskDF is inherently tied to the Dask framework, and Koalas is built on Apache Spark. Unfortunately, they cannot be ported to other computational frameworks.

  6. Performance

    On operations supported by all systems, Modin provides substantial speedups. Further, Modin provides substantial speedups even on operators not supported by DaskDF and Koalas. Thanks to its flexible partitioning schemes, Modin provides benefits on a variety of operations, such as joins.

Tutorial

Example

While working on Project 1, reading in data was something that needed to be considered. While our team eventually opted to use generated data, the question of efficient data intake remains. The Python Pandas library allows for user friendly data work. Modin spoofs this library, providing the same functionality at a faster rate. While in class we discussed optimization techniques such as projection pushdown, being careful about ourer loop vs inner loop joins, and different, optimized queries, Modin takes another approach to improve overall runtime. It works by parallelizing operations, allowing multiple operations to run simultaniously. This is useful, especially when reading in large CSV files, as one might do for Project 1. Below is a tutorial of how Modin is used, and how simple it is to switch from Pandas to this library.

Tutorial

Here is a short tutorial that both shows the speedup from Pandas to Modin, in the context of loading in disease data for Project 1 (please be sure to access from a columbia.edu email address in order to have permissions to view):

Colab Modin Disease Info Intake Example

Note that the diseases.csv file may need to be replaced if the Colab file is opened in a new runtime. The file was too large to attach to the GitHub, so feel free to reach out if needed, and I can provide the CSV file for testing as well! To recreate, the rows are: Disease_ID, Name, and Category. A smaller version of the CSV file is attached in the REPO, but this example may be too small to see major differences in runtime betewen Pandas and Modin.

Selinger Optimizer and Modin

For large datasets, vanilla python will throw out of memory error if the dataset is larger than available memory. Modin circumvents this problem by spilling over to disk. This is called object spilling. Hence based on the principles taught in class, we can assume that the highest cost of most operations is limited to disk access. We can now test out concepts we learned related to query optimization to Modin (since the highest cost now is disk access)
This provides us with the opportunity to study various query plans on big data. Specifically, we will try to apply Selinger optimizer principles to Modin Dataframes and check the results.

To get concrete results we have to force our dataframe to spill over to disk. This is not possible to do on google colab so you will have to follow this tutorial on your on system. I am using linux with kernel 6.12.1-arch1-1 and KDE plasma (plasmashell 6.2.4) as desktop environment (these factors matter as we shall soon see).

Modin can be used with Dask, Ray or MPI.
I will be using modin in combination with Ray, here is the reason other choices were discarded:

  1. Triggering spillover on Modin + Dask is complicated, details to make this work is not pertinent to concepts discussed in class.
  2. Setting up MPI has lot many caveats compared to ray which were not pertinent to the concepts discussed in class, so we skip this out.

Let's start out by importing necessary libraries
image

You may be tempted to use dataframe with 10+ GBs, however since this data is written to disk (assuming you don't want a lot of writes on your SSD), I recommend reducing the object pool size limit on ray itself, which will force the objects onto disk. Moreover, since there are other applications active on the system, your operating system may shut down the notebook seeing its rapid occupation of ram. In my case plasmashell terminated my instance before spillover could start (If you are using other operating systems/desktop environments your system may react differently). Hence, placing a limit on Ray itself should help trigger spillover. Alternatively, you can use a virtual machine without a desktop environment.

Lets set up Ray instance. Notice that we provide a directory to store Ray spillover objects. You can provide a custom directory, else Ray will store it in a default location based on your operating system.
image
Now lets create Dataframes which will spill to disk.
image image

You should see Rays message that spillover has occurred. You can check on your disk directory that you selected to see that Ray has started storing objects there
image image

We will review following concepts related to Selinger Optimization using Modin:

  1. Push selectivity down
  2. Avoid Cross Joins
  3. Dynamic Programming for selecting query Path
1 - Push selectivity down

We will consider below query.

IMG_0069

Based on Selinger Optimizer principles, we expect that query 2 would be faster than query 1.
image image

We see that the system was not even able to complete because it would require around 4TB of data to compute the cross product followed by selectivity. (This is because for each row in df1 we need around 1 TB of space for df2)
image
We observe that using query 2, we are not only able to compute the result, but we do in within a minute.

Now consider below 2 query plans for the same query:
image


Using Selinger optimizer principles we expect query 2 to be faster than query 1.

image

We observe it took more than 2 minutes for the query plan 1 to complete

image

On the other hand query plan 2 was able to complete in less than 30 seconds.

2 - Avoid cross product

image


We run the query with the same selectivity as query 2 above and add an inner join step. We observe that it is faster than taking cross product.
Note: This speed up can also be because inner join is supported natively by Modin (multiple cores are able to work on calculating the inner join.)


3 - Dynamic Programming for selecting query Path

Consider that we want to run the following query:


IMG_0074


A naive approch would be something like below image image


We observe that Ray ran into a memory error. Is is mostly likely due to the computation of inner product between df1 and df2.


Lets try to create a query plan based on Selinger Optimizer.
Selinger optimizer's dynamic programming alogirthm requires some statistics to calculate the best query plan. In our case we use the following function as our cost estimation. (In a real world scenario replace this with some statistic based on the dataframes in question). In our case we know that there is only one match for df[1] == 101. rest of the distribution is random. We use this fact to create our cost estimation function.

image


Now consider the implementation of Selinger optimizer's dynamic programming algorithm

image image


Running the query plan suggested by this optimizer:


image

We observe that we are now able to run the query without running into any memory issues

Note: The reason we do a df.head() call after every query is because modin starts computing the dataframes only after an actual call to the dataframe is done. Otherwise only the semantics of the dataframe is calculated.

Conclusion: Since Modin does not have a query optimizer like a sql database, we need to me mindful while doing joins on large datasets. Some joins which may seem impossible to execute on a single node can be executed if we plan our query properly. To this extend, concepts from Selinger Optimizer prove really helpful.

Advanced Debugging in Modin

Overview

Debugging in Modin involves identifying performance bottlenecks, tracing memory usage, and optimizing distributed data processing workflows. These techniques help developers ensure Modin's operations are efficient and scalable.


Tutorials

1. Performance Debugging

Performance debugging focuses on identifying slow operations and improving execution speed.

Installation of Profiling Tools

To profile Modin workflows:

pip install modin[all] line_profiler

Example: Profiling a Workflow

import modin.pandas as pd
from line_profiler import LineProfiler

def process_data():
    df = pd.read_csv("data.csv")
    df = df[df["value"] > 10]
    return df.sum()

# Profile the process_data function
profiler = LineProfiler()
profiler.add_function(process_data)
profiler.enable_by_count()
process_data()
profiler.print_stats()

Best Practices

  • Use smaller datasets during profiling to reduce overhead.
  • Analyze the output to pinpoint operations with high execution times.

2. Memory Profiling

Memory profiling tracks memory usage to optimize workflows and prevent crashes.

Installation

Install memory_profiler:

pip install memory_profiler

Example: Profiling Memory Usage

import modin.pandas as pd
from memory_profiler import profile

@profile
def analyze_data():
    df = pd.read_csv("large_data.csv")
    return df.mean()

analyze_data()

Best Practices

  • Profile large workflows to identify memory-hungry operations.
  • Adjust the chunk size or use partitioned reads to optimize memory usage.

3. Execution Plan Debugging

Understand task execution across distributed engines like Ray or Dask. This is similar the Query Execution Plan using the EXPLAIN query taught in class.

Using Ray Dashboard

  1. Start Modin with Ray:
    import ray
    ray.init()
  2. Open the Ray dashboard at http://127.0.0.1:8265 to visualize task execution and resource usage.

Using Dask Dashboard

  1. Start Modin with Dask:
    import dask
    dask.config.set(scheduler="threads")
  2. Access the Dask dashboard at http://127.0.0.1:8787 for detailed execution graphs.

4. Detailed Logging

Enable Modin logging to inspect internal operations.

Example: Turning on Logging

from modin.config import Logger
Logger.set("DEBUG")
import modin.pandas as pd

# Perform operations
df = pd.read_csv("data.csv")
print(df.head())

The logs provide insights into the distributed execution of Modin operations.


Conclusion

Advanced debugging with Modin enables efficient identification of bottlenecks and better utilization of distributed systems. Leveraging performance profiling, memory tracking, execution plan visualization, and detailed logging ensures workflows are optimized for speed and scalability.

Modin with Cloud Storage

Overview

We had used Cloud RDS for Project 1. But, Modin simplifies data processing from cloud storage systems as well like AWS S3, Azure Blob Storage, and Google Cloud Storage, enabling seamless integration with distributed engines like Ray or Dask.


Tutorials

1. Accessing AWS S3 Data

  • Installation:
pip install modin[fsspec]
  • Example:
import modin.pandas as pd

s3_url = "s3://bucket-name/dataset.csv"
df = pd.read_csv(s3_url)
print(df.info())

2. Accessing Azure Blob Data

  • Installation:
pip install azure-storage-blob modin[fsspec]
  • Example:
from azure.storage.blob import BlobServiceClient
import modin.pandas as pd

blob_service_client = BlobServiceClient.from_connection_string("your_connection_string")
blob_client = blob_service_client.get_blob_client(container="container-name", blob="dataset.csv")
with open("temp_dataset.csv", "wb") as f:
    f.write(blob_client.download_blob().readall())

df = pd.read_csv("temp_dataset.csv")
print(df.head())

3. Accessing Google Cloud Storage Data

  • Installation:
pip install google-cloud-storage modin[fsspec]
  • Example:
from google.cloud import storage
import modin.pandas as pd

client = storage.Client()
bucket = client.get_bucket("bucket-name")
blob = bucket.blob("dataset.csv")
blob.download_to_filename("temp_dataset.csv")

df = pd.read_csv("temp_dataset.csv")
print(df.describe())

Best Practices

  • Use appropriate authentication methods for secure access (e.g., IAM roles, service accounts).
  • Avoid downloading large datasets locally; process them directly in the cloud when possible.
  • Use chunked or lazy loading for extremely large files.

Sources

  1. Modin Documentation
  2. Dask Documentation (User Interfaces)
  3. Koalas Documentation
  4. Ray Documentation
  5. Dask Documentation

⚠️ **GitHub.com Fallback** ⚠️