Optimize Tables Not Queries - jackghm/Vertica GitHub Wiki

Disclaimer: The views and opinions expressed in this article are those of the author and do not necessarily reflect the official policy or position of my current or past employers.

The following Wiki will help you understand how Vertica tables are defined, but more importantly how to define Vertica tables that are optimized for disk storage, query performance, and deletes

In the event you find this information helpful, but you have a number of large tables that need optimizing, I will post another Wiki or Gist on how I go about moving large tables (Billions of rows) into optimized table defs

How I came upon this method

I came upon this method while I was the manager of the Analytics Data Warehouse team at Twitter where we had a pretty big Vertica cluster :-) After spending a lot of time running analyze and profile on queries that take a long time to execute, I discovered that optimizing the tables was a better solution.

See this Gist Vertica Query requests over time by user for VSQL that shows query request times by user

I chose a breadth of v_monitor.query_request queries from users, tools, and jobs running SELECT, COPY, INSERT, and DELETE on very large tables. Using a "scientific" method of running several iterations of queries against the unoptimized table as a baseline, I then ran the same queries against several optimized tables defs.

Note that I have used the Database Designer, but I find the following method to be more comprehensive and precise. I have also used this method on MySQL tables in order to design the best Vertica table def before loading data from MySQL into Vertica. Of course, with either method there are always questions you should ask yourself. For instance, even though a column is not an RLE candidate, should I include it in the Order By so it will be sorted since it is used in joins.

Conventions

For HP-Vertica commands using the Command Line Interface (CLI)

`vsql>` 
  • a double dash "--" starts a single line comment
  • /* a SQL comment block */
  • [optional]

Let's start with creating a simple table that we will use later

`-- create a test schema so we do not pollute our production schemas
 vsql> create schema if not exists test;
 vsql> create table test.tmp_column_cardinality (
 table_name varchar(255) NOT NULL DEFAULT 'Unknown',
 column_name varchar(255) NOT NULL DEFAULT 'Unknown',
 column_cardinality_count INT NOT NULL DEFAULT 0,
 created_at timestamp NOT NULL DEFAULT now() );`
 -- show the table definition
 vsql> select export_objects('','test.tmp_column_cardinality'); `

Table definition

Let's jump in with a contrived Aggregate table definition. We will then break down the how/why we defined the table the way we did (or you could read from the bottom up :-) ) This contrived table is used by the Analytics (BI/BA) team to do ad-hoc queries, as well as dashboards showing "user event" counts by client and country over time.

`vsql> create schema if not exists test;
 vsql> create table test.agg_event_by_client_geo_day
 (
 date_id int NOT NULL, -- Format is YYYYMMDD
 client_id int NOT NULL,
 country_id varchar(2) NOT NULL DEFAULT '-',
 uu_day int NOT NULL DEFAULT 0, -- unique user count by day
 event_cnt_day int NOT NULL DEFAULT 0, -- event count by day
 uu_ptd int NOT NULL DEFAULT 0, -- unique user count past thirty days
 event_cnt_ptd int NOT NULL DEFAULT 0, -- event count past thirty days
 created_at timestamp NOT NULL DEFAULT now(), -- when this aggregate record was created
 batch_id int NOT NULL -- a batch identifier from the ETL/Aggregation job
 )
 PARTITION BY (date_id); 
 -- _Note_ that by partitioning at a daily granularity, we can only retain ~2.5 years of Agg data due to the Vertica 1024 ROS Container limit
 -- Don't get me started on merging ROS containers. It does NOT work <-- Vertica please fix this!!!
 ALTER TABLE test.agg_event_by_client_geo_day ADD CONSTRAINT PK_agg_event_by_client_geo_day PRIMARY KEY (date_id, client_id, country_id);
 CREATE PROJECTION test.agg_event_by_client_geo_day_p1
 (
   date_id ENCODING RLE
 , client_id ENCODING RLE
 , country_id ENCODING RLE
 , uu_day 
 , event_cnt_day 
 , uu_ptd 
 , event_cnt_ptd 
 , created_at
 , batch_id ENCODING RLE)
 AS
 SELECT 
   date_id
 , client_id 
 , country_id
 , uu_day 
 , event_cnt_day 
 , uu_ptd 
 , event_cnt_ptd 
 , created_at
 , batch_id
 FROM test.agg_event_by_client_geo_day
 ORDER BY client_id, date_id, country_id, batch_id
 SEGMENTED BY hash(date_id, client_id, country_id) ALL NODES KSAFE 1;`

-- Note: Vertica will implicitly create a buddy projection offset by 1, when the first row is inserted into this table

-- Note: for Dimension tables your super PROJECTION should likely be UNSEGMENTED ALL NODES. Unsegmented means the data is replicated on all nodes in the cluster. Due to data locality, this will allow joins to be more efficient. There is no need for a hash(key) for unsegmented tables

PROJECTION

"What" data is stored

  • The Vertica physical table definition and the column storage is managed through Projections.
  • Every Vertica table needs one projection (a Super Projection) which is defined with every column in the table

SEGMENTED

"Where" data is stored

  • Data is distributed across cluster nodes by segmenting data using a hash key
  • The goal of segmenting tables is to distribute data evenly across database nodes in the cluster, so that all nodes can participate in query execution
  • Since the Primary Key (or AK) uniquely identifies every row in a table, these columns typically make a good hash key. Until they don't ;-)

Rather than segment/hash on the Primary key of each table, segment/hash by the join key in both projections so rows joined by Primary/Foreign keys are on the same node (data locality).

For example, If you have a user table with a PK of user.user_Id and, a session table with a PK of session.session_id and an FK of session.user_id, hash on user_id on both table projections.

.. CREATE PROJECTION user_p (..) AS SELECT .. SEGMENTED BY HASH(user.userid) ALL NODES;

.. CREATE PROJECTION session_p (..) AS SELECT .. SEGMENTED BY HASH(session.userid) ALL NODES;

Don't worry about the session table data distribution on each node. The distribution will be evan accross nodes as long as user.user_id is an even distribution. That is, sessions will be evenly distributed by derivation of the users being evenly distributed.

  • Two considerations when choosing the hash columns for a) performance and b) when done right the first time - will save you the pain/time of moving your data to a new table/projection def.

  • PLEASE do NOT choose unnecesary columns in the hash() clause. I have seen people choosing a singular PK column AND another 30 unnecessary columns in the hash(). There is some processing overhead with the hash() function, so the fewer the columns the better. It also helps focus the mind on what is truly needed for data distribution and data locality among related tables. For example, the execution time of counting records by node to check for the even distribution using the hash() for the 30+ column hash() I mentioned on an 11TB table took 11.5 Minutes. Whereas, the same execution time using the single INT PK column took 5 minutes. Keep in mind that Vertica has to calc the hash to determine which node a record will be stored on (i.e., Write cost) and it uses the hash() for record retrieval (MPP). So reduce the cost of the hash UDF calc if possible, by reducing the columns in the hash list.

  • The most import thing I can say about the hash(), is to select the column(s) that are typically used for joins between tables. In almost every case I have seen their is a USER entity that many analytic queries are based upon (e.g., DAU, MAU KPI's). USER ID as a hash makes for an even distribution of users across nodes in the cluster. Let's face it, the user is the center of the solar system. So all tables with a User_ID should be hashed on this singular column, so that when a join occurs, you will have data locality (the same node) of the User and its related table records. This is a huge perf gain and further reduces the amount of data needed to move across the cluster (WINNING!). But wait a minute! Shouldn't the related tables be hashed on their PK instead, in order to get an even distribution of those records?!? As it turns out, the related records will be evenly distributed by derivation of the User table rows being evenly distributed. That is, some users may have more related records than others, however the overall distribution of users will ensure that related records will be in balance across the cluster nodes. I have proven this derived distribution many times with a method I have that shows the count of records by hash, by node. Remind me later to share that bit of code.

-- Note: If you have a lot of tables that are related to a given column and you are considering it for a hash(), make sure that you do not have to many nulls values for that column, else you will get an unbalanced distribution of data on the Vertica nodes.

PARTITION

"How" data is stored

  • Partitioning specifies how data is organized on individual nodes
  • Partitioning attempts to introduce hot spots within the node, providing a convenient way to quickly drop data (e.g., DROP PARTITION) and reclaim disk space
  • Partitioning also improves query performance by enabling predicate pruning and parallelism across the cluster
  • In our example, the date_id makes a good PARTITION column for a number of reasons:

** Dropping a partition is a very fast operation in Vertica (vs. DELETE). DROP PARTITION simply deletes a file on disk. Whereas, a DELETE marks individual rows as deleted and compaction happens when a PURGE operation occurs. If we need to rerun a job we can simply drop a day partition and then re-import those day records. This is a good model for idempotent jobs. Make ALL jobs idempotent NOW, or you will regret it later! A good rule of thumb is to partition on a column that matches the time frame of batch of records you import with your job (e.g., COPY/INSERT data by day)

Warning: Vertica has an upper bound of 1024 partitions (1,024 days in our example).

Primary Key (PK)

In order to support fast bulk loading into your tables, Vertica, unlike MySQL, does not enforce Unique key constraints on INSERT. SAY WHAT?!?

So why have a PK on every Vertica table? Without a PK you can't identify and remove duplicate rows in your table. The Primary Key definition also allows you to run ANALYZE_CONSTRAINTS() to find duplicate key violations.

The PK also gives the optimizer better information for producing query execution plans, and as mentioned, the PK is also likely the best Hash key to use for SEGMENTING your data

Note: You typically find duplicate rows in Vertica when a join fails. A fine time to find out - NOT!

Side note - Another reason why you should have aggregate tables for BI/BA that does count(distict column_name))

ORDER BY And Cardinality

"How" data is ordered

  • In Vertica, the columns in your ORDER BY clause are used to help optimize query performance. Reread that last sentence
  • The ORDER BY shares some behaviors similar to a MySQL Key/Index
  • The sort order optimizes for a specific query or commonalities in a class of queries based on the query predicate
  • The ORDER BY clause specifies a projection's sort order, which localizes logically grouped values so that a disk READ, can pick up many results at once
  • The best sort orders are determined by query WHERE clauses (predicates)

For example, if a projection's sort order is (x, y), and the query's WHERE clause specifies (x=1 AND y=2), all of the needed data is found together in the sort order, so the query runs almost instantaneously

  • In general, if you don't filter or group by a column, you don't need it in the ORDER BY clause. So in our example you'll note that we did not include a few columns. That said, we will discuss why you might include a column anyway.

  • If you do not specify a sort order, HP-Vertica uses the order in which columns are specified in the column definition as the projection's sort order. I always prefer to be explicit in definitions in the event the implicit behavior changes.

  • Low cardinality columns used in predicates, should be ordered first, in your ORDER BY column list This is a VERY IMPORTANT part of optimizing your table/queries

Here is an example of how you can determine the column cardinality of a Vertica table which is used to determine the ORDER BY column sequence/ordering. You can use this same method on a MySQL table, or other data source, for tables you plan to move to Vertica. We will use the sample tables we showed above for creating an optimized Vertica Table The 'agg_event_by_client_geo_day' table will represent the equivalent table you would optimize so let's insert some sample data to work with

`vsql> start transaction;
 insert into test.agg_event_by_client_geo_day values (20140601, 1, 'US', 1000, 2000, 30000, 60000, now() - RANDOM(), 1001);
 insert into test.agg_event_by_client_geo_day values (20140601, 1, 'US', 1000, 2000, 30000, 60000, now() - RANDOM(), 1001);
 insert into test.agg_event_by_client_geo_day values (20140601, 1, 'GB', 1001, 2000, 30000, 60000, now() - RANDOM(), 1001);
 insert into test.agg_event_by_client_geo_day values (20140601, 2, 'GB', 1002, 2000, 30000, 60000, now() - RANDOM(), 1002);
 insert into test.agg_event_by_client_geo_day values (20140601, 2, 'GB', 1002, 2000, 30000, 60000, now() - RANDOM(), 1002);
 insert into test.agg_event_by_client_geo_day values (20140602, 1, 'US', 1000, 2000, 30000, 60000, now() - RANDOM(), 1002);
 insert into test.agg_event_by_client_geo_day values (20140602, 1, 'US', 1000, 2000, 30000, 60000, now() - RANDOM(), 1002);
 insert into test.agg_event_by_client_geo_day values (20140602, 1, 'US', 1001, 2000, 30000, 60000, now() - RANDOM(), 1003);
 insert into test.agg_event_by_client_geo_day values (20140602, 2, 'GB', 1002, 2000, 30000, 60000, now() - RANDOM(), 1003);
 insert into test.agg_event_by_client_geo_day values (20140602, 2, 'GB', 1002, 2000, 30000, 60000, now() - RANDOM(), 1003);
 commit;`
 `-- review our sample data
 select * from test.agg_event_by_client_geo_day order by date_id;`

Now we want to use our 'tmp_column_cardinality' table we created earlier to hold the distinct counts of our table data for each column

 `-- Let's delete any data we may have inserted in previous samplings
 DELETE FROM test.tmp_column_cardinality where table_name = 'test.agg_event_by_client_geo_day';commit;
 select * from test.tmp_column_cardinality; -- should be an empty table`
 
 `-- We will _generate_ the SQL that we will then use to insert distinct row counts into our cardinality table. Using SQL to generate SQL #ProStyle ;-) If getting the cardinality count for a large table from somewhere like MySQL is a problem, then just add a predicate (where clause) to the SELECT. 
 select 'INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select ' || '''test.agg_event_by_client_geo_day''' || ' as table_name, ''' || column_name || ''' as column_name, count(distinct ' || column_name ||') as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit;' from v_catalog.columns where table_schema = 'test' and table_name = 'agg_event_by_client_geo_day' limit 1000;`

Note: For very large (TB+) , or wide tables (lots of columns) it may not be feasible to do a distance count. Fortunately Vertica has a performant solution where you can generate approximate distinct counts very quickly. Here is an example where we sample 25% of the table. It is OK to use this method since we will get the average repetition count later to ensure our accuracy of RLE and ORDER by columns is accurate

select 'INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select ' || '''schemaName.tableName''' || ' as table_name, ''' || column_name || ''' as column_name, APPROXIMATE_COUNT_DISTINCT( ' || column_name ||',25) as column_cardinality_count FROM schemaName.tableName;' from v_catalog.columns where table_schema ilike 'schemaName' and table_name ilike 'tableName';

Take the SQL INSERT statements that were just generated from the command above and run them in vsql _NOTE that only SELECT query predicate columns (the WHERE clause) that will be used to query your table _ need to be optimized and included in our ORDER BY clause. Therefor, we are going to ignore the "count" columns (e.g., uu_day, event_cnt_day) and only run the following generated SQL from the above command

 `vsql>
 INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select 'test.agg_event_by_client_geo_day' as table_name, 'date_id' as column_name, count(distinct date_id) as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit; 
 INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select 'test.agg_event_by_client_geo_day' as table_name, 'client_id' as column_name, count(distinct client_id) as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit; 
 INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select 'test.agg_event_by_client_geo_day' as table_name, 'country_id' as column_name, count(distinct country_id) as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit; 
 INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select 'test.agg_event_by_client_geo_day' as table_name, 'created_at' as column_name, count(distinct created_at) as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit; 
 INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select 'test.agg_event_by_client_geo_day' as table_name, 'batch_id' as column_name, count(distinct batch_id) as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit; 
 -- You can also run the following INSERT to get a comparison of the uniqueness (cardinality) of your columns

INSERT INTO test.tmp_column_cardinality (table_name, column_name, column_cardinality_count) select 'test.agg_event_by_client_geo_day' as table_name, '*' as column_name, count(1) as column_cardinality_count FROM test.agg_event_by_client_geo_day;commit;`

`-- Let's see the results of our column cardinality
 vsql> select table_name, column_name, column_cardinality_count from test.tmp_column_cardinality order by 1, 3 asc;`
`table_name column_name column_cardinality_count 
 -------------------------------- ----------- ------------------------ 
 test.agg_event_by_client_geo_day client_id 2 
 test.agg_event_by_client_geo_day country_id 2 
 test.agg_event_by_client_geo_day date_id 2 
 test.agg_event_by_client_geo_day batch_id 3 
 test.agg_event_by_client_geo_day * 10 
 test.agg_event_by_client_geo_day created_at 10`

From these results we can see that our recommended ORDER BY column sequence should be; client_id, country_id, date_id, batch_id, created_at

HOWEVER, we need to ensure that we don't create to many columns in the ORDER BY list. There is no maximum number of columns we can have, but there is an upper bound for performance reasons of the column list based on what is called "the AVERAGE repetition count". Good luck finding that with the awesome search capabilities of the Vertica docs ;-)

The average repetition count of all columns in the ORDER BY clause should be > than 10 (a magic number), but keep in mind that columns with a very low cardinality count like 1 (every row is the same) will throw off the average count. For very low cardinality columns just include them in the order by and ENCODING RLE. OK, that's just confusing! Let's see how we determine this AVG().

`SELECT AVG(rep_count) 
 FROM (
  SELECT client_id, country_id, date_id, batch_id      --, created_at
  , COUNT(*) AS rep_count 
  FROM test.agg_event_by_client_geo_day 
  GROUP BY 1 , 2, 3, 4 
 -- , 5
 ) q;`

Using the above query we can see that our AVG() repetition count is 1.6. Clearly less than 10. Doh! But our contrived table really has too few rows to get an accurate measure of the column cardinality :-( I recommend using a reasonable large sampling, or all the data from a pre-optimized table when doing your group by. Note

  • I have found that you should start with ~3 columns in your AVG/REP count query to get a decent sample set.
  • If you have columns that will always be null, do not include them in the RLE or ORDER BY (i.e., a count(distinct)) == 0).
  • If you have a very large table and a lot (e.g., over 25 columns) of columns that would likely be included as RLE/ORDER BY, I would check with the users or look through query requests to see if the columns are used. If they aren't currently used, or will never be used, do not include them in RLE/ORDER BY.
  • You can make a judgement call of how many columns to include in the RLE/ORDER BY -> If you find that you could RLE a very large number of columns (i.e., the AVG/REP count is > 10) you can decide if the slight overhead of COPY/INSERT costs from SORTing is acceptable vs. the perf gains that will be had for user queries that would benefit from the columns being included in the RLE/ORDER BY.
  • If you have a lot of columns that have low cardinality (typically found in a big & wide table with raw data in an ELT system), I would start the GROUP BY clause including the columns with < 100 cardinality count. Otherwise, you are going to be running a lot of queries to get the AVG/REP count ;-)

The average repetition count of all columns in the group by should be > 10. If you find that your AVG(rep_count) is > 10 using n-Columns, you can keep adding columns to your AVG rep count query until you drop below 10. Once you drop below the magic number of 10, you can not include the column in your ORDER BY clause.

Note: I have had very wide tables with up to 30 columns in my ORDER BY clause. Given the fact that these columns are also RLE encoded I have saved Terabytes of storage space and significantly improved write and read throughput!

In our sample it's safe to assume that with more data our ORDER BY should be; client_id, country_id, date_id, batch_id

NOTE: all of the low cardinality columns in your ORDER BY should be RLE Encoded. In fact, an ENCODING RLE column only has a benefit if it is included in the ORDER BY clause of your projection. That is, RLE columns MUST be in the ORDER BY clause since they are sorted.

HP-Vertica also recommends that you add a last column in the ORDER BY clause that is a very HIGH cardinality column. WAIT! WHAT? That runs counter to our analysis so far!?! There is always an exception to the rule. The reason for this recommendation, is that Vertica needs a highly selective column when doing DELETE operations. Even though we PARTITIONed our table for faster drop partition operations, there may be times you need to do a DELETE operation like cleaning up failed batches (batch_id).

Note: this last, high cardinality column, is not used in our AVG(Rep_count) checking, so our final ORDER BY clause should be:

ORDER BY client_id, country_id, date_id, batch_id, created_at

Note: Vertica will only compress OR encode a column (not both). Varchar and Char columns will benefit from compression.

While Vertica offers many encoding types, in all the testing I have done, only RLE is beneficial for PERF and storage.

Given all of this information you now know why the ORDER BY is critical to performance and storage.

Query Performance

When the distribution of data in your table changes significantly, such as, after you have INSERTED or DELETED a significant amount of data in your table, you will need to update the table statistics to improve query plans/performance. The benefit of updating statistics can not be underestimated. I have seen unfathomable improvements in query plans as a result.

Check out this late night tweet I sent Query Perf with updated statistics

Note: We have a nightly maintenance job that runs ANALYZE_STATISTICS on every schema/table. My philosophy is to script myself out of a job.

But if you need to run it manually

vsql> SELECT ANALYZE_STATISTICS('schema.TableName');

I hope people find this Wiki helpful. I wish every system I inherited was optimized/modeled correctly from the start. I'll never get back those months of my life I spent migrating databases. Please do yourself and your successor a favor and take to heart the guidance above, before you make those 3 envelopes.

Mahalo

JackG

⚠️ **GitHub.com Fallback** ⚠️