Materialize - w4111/w4111.github.io GitHub Wiki

  • sy3196
    • Sultana Yeasmin - everything

Materialize

The Problem and Solution

In multiple assignments for COMSW4111, we worked with the Iowa dataset to analyze factors such as total sales by category, customer demographics, and purchasing behavior. This dataset has provided valuable insights, but is it truly accurate? In the real world, numbers are constantly changing. For example, it's December, a time when college students are preparing for exams or gearing up for the holidays. Naturally, purchasing patterns during this period will look different from other times of the year. Additionally, as new data is constantly being generated, today's updates could significantly affect past trends.

Consider another example in quantitative finance, where analysts use financial data to predict stock market trends. In this scenario, stock prices can fluctuate in seconds, making it crucial to update data in real time. However, traditional databases are designed for static data, meaning that unless a user manually triggers an update, the data remains unchanged. This can lead to outdated information being used to make critical decisions, potentially causing companies to miss out on opportunities or make poor predictions. Another related issue is latency. Traditional databases store snapshots of data, which means that updates require recalculations that are inefficient. One way that engineers have tried to fix this issue is through materialized views, which are used to cache the results of complex queries to avoid repeated calculations.

While this can improve performance for repeated queries, it introduces a significant challenge: these materialized views must be manually refreshed to reflect the latest data. This manual refresh process creates latency, meaning that even if new data arrives in the system, it may take time before it's visible in the cached views. You can see how this is particularly problematic in scenarios where data is changing rapidly.

The constant need for data updates, paired with the latency of traditional systems, presents a major challenge for data engineers who need to ensure that the information they are working with is both accurate and timely. Traditional systems struggle to maintain this balance and often compromise between freshness of data and system performance.

Introducing Materialize

Materialize presents a cloud-based data store that combines the speed of streaming with a traditional data warehouse. Unlike traditional databases, Materialize provides continuous real-time updates by incrementally adjusting results as new data is added. Rather than recalculating entire views on every query ("on reads"), the company’s workflow performs the necessary updates as new data is ingested ("on writes"), ensuring that data remains fresh without the need for costly and time-consuming recalculation processes. Also, by removing the need for manual refreshes or periodic full recalculations, the technology reduces the latency present in traditional systems and eliminates delays commonly encountered with static systems.

In a scenario like stock market analysis, Materialize offers a powerful advantage. Developers can access a continuously updated view of stock prices, with changes reflected instantaneously, eliminating the need for re-running heavy queries. This provides up-to-date insights at every query, ensuring decisions are made based on the most current data available without delays.

How does it work?

Materialize is powered by Timely Dataflow and Differential Dataflow frameworks. These frameworks allow workflow to update only the portions of data that have changed instead of recalculating entire datasets, which significantly reduces the computational load and allows for real-time data processing without sacrificing performance.

Timely Dataflow is designed for handling high-throughput, low-latency data streams, where data is treated as a series of events that occur over time. It ensures that computations are performed as new events arrive. This is achieved through "timed" data processing, where events are assigned timestamps, and computations are processed according to these timestamps. What’s really attractive about Timely Dataflow is its ability to excel with large volumes of constantly updating data.

On top of that, Differential Dataflow adds the capability to perform incremental computations by only processing the changes (or "deltas") that occur when new data arrives, rather than recalculating everything from scratch. This allows Materialize to efficiently handle complex operations, such as joins and aggregations, while minimizing computational load. By combining these two frameworks, Materialize can also scale horizontally, maintaining high availability and ensuring that its data views are always up-to-date, even as data volumes grow. For instance, if new stock price data arrives, the workflow will only update the views related to the affected stocks, instead of recalculating the entire dataset, which is highly efficient compared to traditional systems that require recalculations for every update. Materialize, on the other hand, assures that updates happen quickly and with minimal computational cost.

Relation to COMSW4111

Differential Dataflow

In Differential Dataflow, computations are expressed as dataflow graphs. These graphs consist of collections of data and operators that define how the data is transformed. Unlike traditional systems, where an entire dataset would be recalculated whenever there’s a change, Differential Dataflow only updates the parts of the data that have changed. This is achieved using operators like `map`, `reduce`, `join`, and `semijoin`, which perform the computations incrementally.

For example, in the case of intersecting lists, rather than converting lists into sets and checking for membership through iteration, Differential Dataflow can use a `semijoin` operator, which efficiently matches the elements of two collections based on keys. This operator only considers the keys present in both collections, allowing for faster computation. By adding dummy values and using operators like `distinct` to ensure uniqueness, Differential Dataflow achieves set semantics without needing to recompute the entire dataset.

image Understanding Differential Dataflow, Materialize

Materialize, a system built on top of Differential Dataflow, takes advantage of this approach to provide real-time, incremental updates to queries. When a new piece of data arrives or an existing piece changes, the workflow only processes the changes, avoiding the need to recompute the entire dataset. This makes Materialize particularly well-suited for handling high-throughput, low-latency data streams, as well as complex queries involving joins, aggregations, or filtering.

Indexing

In class, we discussed various indexing strategies, such as primary and secondary indexes, hash indexes, and the associated cost of maintaining these indexes. The key goal in indexing is to speed up query performance by providing efficient ways to locate data, often at the cost of increased storage and maintenance overhead.

Materialize offers two types of indexing: indexing on views and indexing on materialized views. In traditional databases, primary and secondary indexes are often used to speed up query execution, with primary indexes usually linked to the primary key and secondary indexes providing alternate access paths to data. Materialize’s indexed views, which store results in memory for fast access, are similar to in-memory indexes used in relational databases, where indexes are designed to facilitate quick lookups without needing to recompute results. On the other hand, materialized views in Materialize are comparable to traditional secondary indexes or precomputed query results, which are stored in durable storage and are accessible across clusters. This allows for the quick retrieval of data without needing to reprocess complex queries, much like a traditional materialized view or a cached query result in relational databases.

Materialize employs advanced indexing techniques to optimize query performance, including storing mappings from indexed columns to primary keys rather than the entire dataset. This strategy is particularly efficient because it reduces storage requirements by focusing only on the relevant parts of the data needed to answer queries. In traditional indexing, B+ trees are commonly used to store and organize data in a way that allows for efficient range queries and point lookups. Materialize’s indexing approach is more compact, storing only the necessary mappings to quickly retrieve the relevant data, which reduces both storage costs and computational overhead.

Additionally, Materialize optimizes memory usage by allowing multiple queries to share indexes, reducing redundancy and ensuring efficient memory utilization. This contrasts with traditional systems where each query might require its own dedicated index, especially when dealing with complex queries or large datasets. Materialize’s ability to allow multiple queries to share an index reduces memory usage while ensuring that performance remains high, even in scenarios with many concurrent requests.

Delta Joins and Late Materialization

Delta joins and late materialization in Materialize apply the concepts of efficient storage, indexing, and query optimization that were emphasized in our lectures.

Using delta joins allows Materialize to perform joins on streaming data. Unlike traditional dataflow systems that maintain separate indexes for each join, the company’s workflow uses a shared arrangement of data through indexes, which minimizes the need to store intermediate results. As we learned in class about physical design and storage management, the concept of using shared indexes and avoiding redundant storage is crucial for improving efficiency. This also aligns with the concept of access methods, where efficient index structures play a key role in speeding up query execution and minimizing resource consumption.

By avoiding the replication of the entire dataset in an index and instead storing only the mapping between the indexed columns and the primary key, Materialize reduces storage overhead. This approach is akin to the use of narrow indexes, where indexing only the necessary attributes instead of the entire data set results in faster access times and lower memory usage. Late materialization also reduces redundancy, as it focuses on indexing primary and foreign keys rather than entire tables, echoing the best practices for minimizing data duplication in database design.

Memory and Cost

Materialize’s approach to memory optimization is very different to how traditional relational databases manage memory and indexing. In the relational model, the primary focus is on efficiently organizing and querying data. Secondary storage, like disk storage, is crucial for handling large datasets that cannot be entirely held in memory. Hash indexing, as we discussed in class, uses a hash function to efficiently look up records based on key values, often stored in secondary storage when memory limits are reached. In this way, disk storage in traditional systems allows for scaling, but access times are slower than memory access.

Initially, each of Materialize’s intermediate record required 96 bytes of overhead due to the data structures used, including key-value offsets, row representations, and time-difference pairs. Over time, the company has made significant strides in optimizing memory usage by reducing the overhead required to maintain SQL views. Recent improvements have lowered this overhead to between 0 and 16 bytes. These gains were achieved through several techniques such as bespoke compression for snapshot updates, meaning that a customized compression technique was specifically designed for updates. Unlike general-purpose compression algorithms, which treat all types of data the same, bespoke compression is tailored to the specific patterns and needs of the data being stored.

Materialize also reduced row representation sizes, and replaced usize offsets with more compact u32 offsets. In this context, u32 refers to a 32-bit unsigned integer, which uses less memory compared to the usize type (a larger integer type sized for the system architecture), thereby saving space for systems with smaller datasets. This relates to the way relational databases consider memory optimization through the efficient use of pointers and indexes. In traditional systems, minimizing the size of indexes is important for reducing the memory footprint, which can be seen in the design of hash indexes or B-trees, where pointer sizes are critical in ensuring that indexes do not grow excessively large.

Additionally, the company employs a "spill-to-disk" strategy that has similarities to secondary storage usage in traditional databases. When memory usage becomes a constraint, Materialize offloads data to disk, ensuring continued processing without overwhelming system memory. However, unlike traditional systems, where secondary storage is more static and used for larger data volumes (such as index files), Materialize dynamically adjusts data storage in real-time, optimizing the tradeoff between performance and resource availability. This is different from the usual disk-based hash indexing in relational databases, where indexes are pre-constructed and disk I/O is a fixed cost associated with query processing.

Tutorial

In this tutorial, we’ll build a real-time data pipeline that scrapes continuously updated Redfin real estate data, sends it to Kafka, and ingests it into Materialize for querying. Kafka will act as the messaging platform on localhost:9092, while Materialize will enable us to run real-time SQL queries on the incoming data. We will use the Redfin Scraper to collect real estate data and push it into Kafka, which Materialize will listen to for real-time querying.

Step 1: Install the Redfin Scraper

This package allows us to scrape real estate data from Redfin.

pip3 install -U redfin-scraper

Step 2: Download the CSV File

Download the CSV file of zip codes from the Redfin Scraper repository. This file contains a list of zip codes that will be used to fetch real estate data.

Step 3: Set Up and Run `test.py`

You will need to modify and run the test.py script to scrape real estate data. The following code demonstrates how to set up the scraper and send data to Kafka through localhost, after setting it up. I have disabled multiprocessing for simplicity. Be sure to install confluent kaka on terminal as well.

from redfin_scraper import RedfinScraper
from confluent_kafka import Producer
import json

## running on 9092
kafka_config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(kafka_config)

# the following is from tests.py but using NY data
scraper = RedfinScraper()
scraper.setup('./zip_code_database.csv', multiprocessing=False)
scraper.scrape(city_states=['New York, NY'], zip_codes=['10027'])
scraped_data = {
    "city": "New York",
}

# here we send 'real_estate_topic' to Kafka
producer.produce('real_estate_topic', key='omaha_listing', value=json.dumps(scraped_data), callback=delivery_report)
producer.flush()

Step 5. Using Materialize

Here we create a Kafka source to process the data using SQL queries. The following SQL will allow you to create a source from the Kafka topic. I did this process incorrectly so many times that I ran out of their free trial clusters. Therefore, I have no idea if this is correct, but will continue my tutorial for the sake of explanations.

CREATE SOURCE real_estate_source
  FROM KAFKA CONNECTION kafka_connection (TOPIC 'real_estate_topic')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_connection
  ENVELOPE NONE;

Step 6. Querying Time!!

Here we can run example queries to determine information about the real estate market in NY, which should be constantly updating.

SELECT city, state, price
FROM real_estate_source
WHERE price < 1000000
ORDER BY price DESC;

However, I could not check whether this updates constantly since I ran out of their free trial clusters.