Vertica Node Data Distribution - jackghm/Vertica GitHub Wiki

Disclaimer: The views and opinions expressed in this article are those of the author and do not necessarily reflect the official policy or position of my current or past employers.

Understanding Vertica Node Data Distribution

I recently taught "HPE Vertica Resource Management" and "HPE Vertica Managing a Cluster" classes at the HPE Big Data Conference (BDC) 2016 and I promised to share some code that I demoed, so here you go :-)

-- create a schema for our tests and a place to write our UDFs

create schema jackg;

select node_name, node_address, node_state from nodes;

node_name node_address node_state


v_pstl_node0001 192.168.220.130 UP
v_pstl_node0002 192.168.220.131 UP
v_pstl_node0003 192.168.220.132 UP

-- Let's create a table so we can insert some rows to determine which nodes the hash() affinitizes too

-- drop table jackg.testHashNodes cascade;

CREATE TABLE jackg.testHashNodes ( userId IDENTITY(1, 1) , userName varchar(128) not null default '' );

CREATE PROJECTION jackg.testHashNodes_p1 ( userId, userName) as select userId, userName from jackg.testHashNodes segmented by hash(userId) ALL NODES ksafe 1;

select export_objects('','jackg.testHashNodes');

/* THERE IS SO MUCH information in the table def
README! https://github.com/jackghm/Vertica/wiki/Optimize-Tables-Not-Queries */

insert into jackg.testHashNodes (userName) values ('');

select node_name, projection_name, row_count from projection_storage where projection_name ilike 'testHashNodes%' and anchor_table_schema ilike 'jackg' and row_count > 0 order by 1;

node_name projection_name row_count


v_pstl_node0002 testHashNodes_p1_b0 1
v_pstl_node0003 testHashNodes_p1_b1 1

-- After each insert check the buddy projection (ksafe 1 - offset by one if no fault groups are defined)

-- Insert another record

insert into jackg.testHashNodes (userName) values ('');

-- Now check projection_storage again

select node_name, projection_name, row_count from projection_storage where projection_name ilike 'testHashNodes%' and anchor_table_schema ilike 'jackg' and row_count > 0 order by 1;

-- results show we still hash to v_pstl_node0002 projection testHashNodes_p1_b0

node_name projection_name row_count


v_pstl_node0002 testHashNodes_p1_b0 2
v_pstl_node0003 testHashNodes_p1_b1 2

insert into jackg.testHashNodes (userName) values ('');

-- Now check projection_storage again

select node_name, projection_name, row_count from projection_storage where projection_name ilike 'testHashNodes%' and anchor_table_schema ilike 'jackg' and row_count > 0 order by 1;

-- results show we still hash to v_pstl_node0002 projection testHashNodes_p1_b0

node_name projection_name row_count


v_pstl_node0002 testHashNodes_p1_b0 3
v_pstl_node0003 testHashNodes_p1_b1 3

insert into jackg.testHashNodes (userName) values ('');

-- Now check projection_storage again

select node_name, projection_name, row_count from projection_storage where projection_name ilike 'testHashNodes%' and anchor_table_schema ilike 'jackg' and row_count > 0 order by 1;

-- results show the 4th record inserted now hashes to v_pstl_node0003 testHashNodes_p1_b0

node_name projection_name row_count


v_pstl_node0001 testHashNodes_p1_b1 1
v_pstl_node0002 testHashNodes_p1_b0 3
v_pstl_node0003 testHashNodes_p1_b0 1
v_pstl_node0003 testHashNodes_p1_b1 3

Also note that we are discovering who our buddy projection nodes are (p1_b1).

Pretty cool!

I usually use a little bash script that does all this repetitive processing to find out who my buddy node projections map too. Sometimes I want to know the buddy nodes when I need to do parallel export/imports when migrating very large table projections when a projection refresh in not a viable solution.

Stated differently, we can now see that

v_pstl_node0002 projection (testHashNodes_p1_b0) has its buddy projection (testHashNodes_p1_b1) on v_pstl_node0003

and now we can see from the row counts that

v_pstl_node0003 projection (testHashNodes_p1_b0) has its buddy projection (testHashNodes_p1_b1) on v_pstl_node0001 which makes sense because the hashing is done in a node ring manner (more on that later)

insert into jackg.testHashNodes (userName) values ('Jane Doe');

-- Now check projection_storage again

select node_name, projection_name, row_count from projection_storage where projection_name ilike 'testHashNodes%' and anchor_table_schema ilike 'jackg' and row_count > 0 order by 1;

node_name projection_name row_count


v_pstl_node0001 testHashNodes_p1_b0 1
v_pstl_node0001 testHashNodes_p1_b1 1
v_pstl_node0002 testHashNodes_p1_b0 3
v_pstl_node0002 testHashNodes_p1_b1 1
v_pstl_node0003 testHashNodes_p1_b0 1
v_pstl_node0003 testHashNodes_p1_b1 3

-- Now we have written enough rows to see that we are distributing our records accross the entire 3 node cluster

The 5th record we inserted shows

v_pstl_node0001 projection (testHashNodes_p1_b0) has its buddy projection (testHashNodes_p1_b1) on v_pstl_node0002

_note: If you want to easily see Who's Your Buddy? You can truncate the jackg.testHashNodes table in between each check _ of the projection_storage.

For our discussion today I wanted to show you how you can determine what records live on each node (ignoring the buddy projection/node) in order to see if you have a good distribution of data accross all nodes in your cluster.

See the "Why we care" section of this article for more on that.

btw, here are the results from the node dependency table (aka Who's your buddy)

I personally don't find this very usefull (or even accurate only small or large clusters)

SELECT GET_NODE_DEPENDENCIES('jackg.testHashNodes');

GET_NODE_DEPENDENCIES


Deps for table testHashNodes:

011 - cnt: 1

101 - cnt: 1

110 - cnt: 1

` Read the bitmask from right to left

011 are nodes 1 and 2

101 are nodes 1 and 3 -- hard to tell who is the buddy unless you assume the last node (3) buddy is the first (1)

110 are nodes 2 and 3 `

So do we have data Skew? (Am I unbalanced? - Um, I mean data per Node, Not me personally)

Clearly I'm unbalanced since I spent countless hours over years figuring all this out (lacking documentation).

To answer the question we need to understand how vertica writes records to each node.

We know the hash() function is clearly used since it is defined on our projection.

And we know a projection is the physical storage of our rows.

We also know that we should use something like a Unique Key (PK, AK, UK) for our hash(column[s]) in order to get an even distribution of data (i.e., avoiding to many, or to few rows, on any given node).

But as I mentioned in my Optimize-Tables-Not-Queries wiki using a PK on all your tables for a hash is not alsways a good idea when you are joining between tables since data locality is very important.

So if you have a lot of tables related by USER_ID and you are using surrogate keys for your hash on those tables your doing a lot of Shuffling.

And if you don't believe me that it makes sense to hash all tables on User_ID you can check the hash data distribution on all your tables with the following methods.

"Why we care" section

Like life, we want our cluster to be in balance.

For both READS and WRITES!

If we had 300 Billion rows in our table, on our 3 node cluster, we would like to see ~100 Billion rows on each node to have a balanced projection on our cluster.

Since Vertica uses Massive Parallel Processing (MPP) to execute queries on each node, having an even distribution is important for READ performance.

Not to mention that if we were writting all 300 Billion rows to a single node we would be chewing up a lot of disk space on a single node and our overal cluster capacity for CPU, Memory, Disk IOPS, etc... will be skewed to a single node. That's No Bueno.

So HOW does Vertica distribute data?!?

  1. Vertica gets a 64bit signed INT hash value from your column[s].

It just uses https://en.wikipedia.org/wiki/MurmurHash

select userId, hash(userId) as userIdHashValue from jackg.testHashNodes order by 1 limit 2;

userId userIdHashValue


1 5783548743464686114
2 1618211815126016456

  1. Vertica then turns that hash INT value into what is called a SEGMENT.

By masking off the signed bits and getting a 4byte INT

see https://www.google.com/?ion=1&espv=2#q=what+is+the+range+of+a+4+byte+unsigned+int

So we can create a function that does what Vertica is doing under the covers using the & binary operator

https://my.vertica.com/docs/7.1.x/HTML/Content/Authoring/SQLReferenceManual/LanguageElements/Operators/BinaryOperators.htm

-- Let's make all this simple with a UDF

dbadmin=> CREATE OR REPLACE FUNCTION jackg.segmentation(hashint Integer) RETURN INT AS BEGIN RETURN (4294967295 & hashINT); END;

Now we can return a Segment INT from a hash INT value.

select userId , hash(userId) as userIdHashValue , jackg.segmentation(hash(userId)) as SegmentIntValFromHashIntVal from jackg.testHashNodes order by 1 limit 2;

userId userIdHashValue SegmentIntValFromHashIntVal


1 5783548743464686114 2338230818
2 1618211815126016456 2079138248

Are we there yet? How does that tell me what node userId value of 1 is written to?

  1. A range of segment integer values is assigned to each node in a vertica cluster.

And if your segment value for a hash falls within the range on a given node then that's were the record is written and read from.

So let's see what segment ranges we have on our 3 node cluster

-- any projection will work to get the range

select get_projection_segments('jackg.testHashNodes_p1_b0');

get_projection_segments

v_pstl_node0001|v_pstl_node0002|v_pstl_node0003

1431655764|2863311529|4294967294

4294967295|1431655765|2863311530

So let's start with what makes visual sense from that result.

Any Segment Integer Number

between 1431655765 to 2863311529 means the record will be written/read to/from v_pstl_node0002

between 2863311530 to 4294967294 means the record will be written/read to/from v_pstl_node0003

Since the Integers are a range, node0001 in this case, has a some special logic. if

between 0 and 1431655764 or >= 4294967295 the record will be written/read to/from v_pstl_node0001

So given this information we can write a simple SQL case statement UDF to map a Segment INT to a NodeName which will tell us for any given hash value, the node the record reside on.

Seriously we can do this.

-- Using the Segment Value ranges to return a Vertica NodeName

CREATE OR REPLACE FUNCTION jackg.segmentationNode(seg INT) RETURN varchar(128) AS BEGIN RETURN ( CASE WHEN (seg between 1431655765 and 2863311529) then 'v_pstl_node0002' WHEN (seg between 2863311530 and 4294967294) then 'v_pstl_node0003' WHEN ( (seg between 0 and 1431655764) or (seg >= 4294967295)) then 'v_pstl_node0001' ELSE 'unk' END); END;

So let's add to our query to get the node the records reside on

select userId , hash(userId) as userIdHashValue , jackg.segmentation(hash(userId)) as SegmentIntValFromHashIntVal , jackg.segmentationNode(jackg.segmentation(hash(userId))) as NodeNameFromSegmentIntValFromHashIntVal from jackg.testHashNodes order by 1;

userId userIdHashValue SegmentIntValFromHashIntVal NodeNameFromSegmentIntValFromHashIntVal


1 5783548743464686114 2338230818 v_vmart_node0002
2 1618211815126016456 2079138248 v_vmart_node0002
3 3883506187811235133 1468343613 v_vmart_node0002
4 5130139942120141781 3256693717 v_vmart_node0003
5 8299427032879314813 101431165 v_vmart_node0001

We can also count by node to check our distribution of hashed userId data

select jackg.segmentationNode(jackg.segmentation(hash(userId))) as NodeNameFromSegmentIntValFromHashIntVal , count(1) as rowCountByNode from jackg.testHashNodes group by 1 order by 1;

NodeNameFromSegmentIntValFromHashIntVal rowCountByNode


v_vmart_node0001 1
v_vmart_node0002 3
v_vmart_node0003 1

And we can prove this out with the query we used to determine the physical storage of records on each node

select node_name, projection_name, row_count from projection_storage where projection_name ilike 'testHashNodes_p1_b0' -- ignore the buddy projection and anchor_table_schema ilike 'jackg' and row_count > 0 order by 1;

node_name projection_name row_count


v_pstl_node0001 testHashNodes_p1_b0 1
v_pstl_node0002 testHashNodes_p1_b0 3
v_pstl_node0003 testHashNodes_p1_b0 1

Is that cool or what?!?

side note: Manualy creating the case statement function that gets the node names is a bit of a pain on large clusters so I wrote a simple a scala app to do this (see https://github.com/jackghm/Vertica/wiki/nodeSegUDFgen-ReadMe )

Everything I just showed you is also available in the Vertica SDK (see the VHash class)

but having SQL functions is handy for native vSQL queries.

https://my.vertica.com/docs/7.2.x/HTML/index.htm#Authoring/ConnectingToHPVertica/ClientJDBC/KVApi/VHash.htm?Highlight=vhash

Aside from checking data skew of existing projection hash columns,

you can use this method when planning for new projection hashing using an existing table.

Let's say our Customer service team looks up users by their name and you want to determine if a projection with a hash() on User Name would be a good distribution accross the nodes.

select jackg.segmentationNode(jackg.segmentation(hash(userName))) as NodeNameFromSegmentIntValFromHashIntVal , count(1) as userNameRowCountByNode from jackg.testHashNodes group by 1 order by 1;

NodeNameFromSegmentIntValFromHashIntVal userNameRowCountByNode


v_vmart_node0002 1
v_vmart_node0003 4

The hash(userName) was a real world scenario I ran into.

It turns out one of the regional applications was creating empty strings for user names, so most of the user records were only on one node :-(

Eventualy this one node would run out of disk space.

Another use case for knowing apriori the hash to node affinity for records was when we used this method to prehash/segment data for parallel loading
to achieve 36TB+/hour of data ingestion to Vertica.

See the video/slideshare from my Big Data Conference 2015 session

http://www.slideshare.net/jackghm/hpbigdata2015-pstl-kafka-spark-vertica

Stated differently, MPP for reads is awesome AND MPP for Loading Data also rocks!