» Documentation

Distributed schema

The database horizontally distributes rows either by: partition field, relation (parent/child foreign key relation), round-robin, or, replicate rows to all partitions.

Partition by field

The partition field set during table creation. There can be only one partition field per table; it cannot be nullable. For example:

CREATE TABLE mytable
(
id bigint not null primary key PARTITION,
info char
) 

The rows will be partitioned by [id] hash value. It is possible to check/view the partition group id to that the row will be inserted by looking into insert SQL query DQL plans. For example, in managers' query window paste SQL insert:

insert into mytable values (0,'val1') 

Then, press "show execution plan" button, that reveals:

insert 'test.mytable' on '0' 
set @this.[id]=0, @this.[info]='val1' 

The output DQL's '''on '0' ''' identify that the command will be executed at partition group (0), i.e. all nodes of partition group will insert the row.

Similar SELECT/UPDATE/DELETE will perform an action on particular partition group, if the partition field used in the WHERE clause. For example:

delete from mytable
where id=0 

The DQL plan:

delete 'test.mytable(xlock_row)' on '0' 
index by '100'
with  @this.[id]=0 

When partition field cannot be used in search clause, the query will be sent to all partition groups. For instance SQL:

delete from mytable
where id>0 

In DQL, missing ON keyword, means delete will be executed on all partition groups.

delete 'test.mytable(xlock_row)'
index by '100'
with  @this.[id]>0 

Join tables by partition field

When tables join by partition fields either in INNER/OUTER JOIN on (...) or WHERE clauses, the join operation will be executed in parallel on each involved node. I.e. each node will perform the local join and then send the results back to the client. For example, using previously created table [mytable]:

select t1.* from mytable t1 INNER JOIN mytable t2 ON (t1.id = t2.id) 

Will produce DQL:

open_table 'test.mytable' to @TT1
index by '100'
project by @this.[id],@this.[info]

open_table 'test.mytable' to @TT2
index by '100'

join_inner 'TT1','TT2' to @_esults
with  @TT1.[id]=@TT2.[id]
project by @TT1.[id],@TT1.[info],@TT2.[id]

aggregate '_esults' to @Results 

The keyword split in DQL plan would mean that the output of one table will be aggregated from all nodes and then sent to each node to perform join. In this case split command is missing, therefore, the joins executed locally in parallel, and only successor rows are sent to the client. Hence, it still executes on all partitions.

Lest take another join:

select t1.* from mytable t1 INNER JOIN mytable t2 ON (t1.id > t2.id) 

The DQL plan:

open_table 'test.mytable' to @_T1
index by '100'
project by @this.[id],@this.[info]

aggregate '_T1' to @#T1

splitter '#T1' to @TT1

open_table 'test.mytable' to @TT2
index by '100'

join_inner 'TT1','TT2' to @_esults
with  @TT1.[id]>@TT2.[id]
project by @TT1.[id],@TT1.[info],@TT2.[id] 

aggregate '_esults' to @Results 

Even if both t1.id and t2.id are partition fields, since condition is not an equal, local join optimization cannot be applied. We can see here split. The join is much less efficient now.

Partition by parent/child relation

This is used when parent table is partitioned on one column and linked with child table(s) partitioned on parents' auto identity column.

Example, execute SQL:

create database test_hierarchy_dist; 
go; 
use test_hierarchy_dist; 
go; 
create table t3
(
name char not null primary key partition,
id autobigint not null unique 
); 
go; 
create table t3_1
(
id bigint not null partition,
info char,
uid autobigint not null primary key,
constraint sk foreign key (id) references t3(id)
); 
go; 
create table t3_1_1
(
id bigint not null partition,
info2 char,
uid autobigint not null primary key,
constraint sk foreign key (id) references t3_1(uid)) 

In the example above, T3 table partitioned by the column . The [T3] AutoBigInt column , generates auto unique values, the value is unique, and, additionally, it encodes columns' hash value. Therefore, child table T3_1 will be as well partitioned by T3(name), even if it is set to partition by ID field which is linked with T3(ID) ┬ŁAutoBigInt┬Ł field. Note: Foreign key contraint must be present, the optimizer must be told it is parent/child relation.

Example, execute script to populate rows:

use test_hierarchy_dist; 
go; 
declare @i int, @name char, @r1 bigint, @r2 bigint  
begin tran 
set @i = 0 
while @i < 10000 
begin 
  set @name = concat('m',@i) 
  insert into t3(name) values(@name) 

  set @r1 = scope_identity() 
  insert into t3_1(id,info) values(@r1,concat('v',@r1)) 

  set @r2 = scope_identity() 
  insert into t3_1_1(id,info2) values(@r2,concat('z',@r2)) 
  insert into t3_1_1(id,info2) values(@r2,concat('z',@r2)) 

  set @i = @i +1 
end 
commit 

Next, check ( show DQL plan ) if the following query will be executed only on a single node (the partition calculated by the WHERE clause name=value condition).

use test_hierarchy_dist; 
go; 
select t3.* from t3 join t3_1 on (t3.id = t3_1.id) join t3_1_1 on (t3_1.uid = t3_1_1.id)
where t3.name = 'm1' 

The DQL:

open_table 'test_hierarchy_dist.t3' on '0'  to @TT1
index by '100'
with  @this.[name]='m1'
project by @this.[name],@this.[id]

open_table 'test_hierarchy_dist.t3_1' to @TT2
index by '101'

join_inner 'TT1','TT2' on '0'  to @TT3
with  @TT1.[id]=@TT2.[id]
project by @TT1.[name],@TT1.[id],@TT2.[uid],@TT2.[id] as u1

open_table 'test_hierarchy_dist.t3_1_1' to @TT4
index by '101'

join_inner 'TT3','TT4' on '0'  to @Results
with  @TT3.[uid]=@TT4.[id]
project by @TT3.[name],@TT3.[id],@TT3.[uid] 

We can see '''ON '0' ''' on @TT1,@TT3,@Results, and, we don't see split, that means all commands will be executed on partition group (0). Note: @TT2 and @TT4 processes are the join inner tables (right parameters in join_inner process), therefore they don't play any role in distribution decision and missing ON.

Replicated tables

Replicating tables are quite useful for tables that are not often modified, and usually define details of some static entity that can be linked by its id field. For example, product details by id, OLAP dimension tables for fact table, etc... Since, they are replicated to all nodes, each field is considered as partition field. In order to define the table as replicated, do not set PARTITION field when creating the table. For instance, consider following 2 tables:

create table products -- replicated table
(
product_id autobigint not null primary key,
name char
)
go;
create table orders -- partitioned by [user_id]
(
order_id autobigint not null primary key,
user_id bigint not null PARTITION,
product_id bigint not null,
ammount int,
constraint [user] index(user_id),
constraint product foreign key (product_id) references products(product_id)
) 

Following SQL query will retrieve all orders for the user. Since [user_id] is partition field and [products] table is replicated, the execution will be done on a single node.

select orders.*,products.product_id from orders 
                                         JOIN products on 
                                             (orders.product_id = products.product_id)
                                    where user_id = 0 

DQL:

open_table 'test_hierarchy_dist.orders' on '0'  to @TT1
index by '101'
with  @this.[user_id]=0
project by @this.[order_id],@this.[user_id],@this.[product_id],@this.[ammount]

open_table 'test_hierarchy_dist.products' to @TT2
index by '100'

join_inner 'TT1','TT2' on '0'  to @Results
with  @TT1.[product_id]=@TT2.[product_id]
project by @TT1.[order_id] rid,@TT1.[user_id],@TT1.[product_id],@TT1.[ammount],@TT2.[product_id] rid 

Shard procedure execution

It is possible to control on what node to execute the stored procedure. Based on the input parameters' hash value, the procedure will be sent to the adequate node to execute. It comparable to simple sharding, where based on some parameter, the queries is executed on target shard. Note: if procedures' query(ies) distribution conflict with the executing node (computed by input parameter), the database error will be thrown.

For example, create a stored procedure, where parameters' [@val1] hash value define the target executing node/partition:

create procedure test (@val1 int PARTITION, @val2 int OUTPUT)
as
begin
set @val2 = @val1
select @val1
end 

So, @v1 value, defines the node to execute the procedure:

declare @v1 int = 10, @v2 int = 0
exec test @v1,@v2 output
select @v2 

Round robin partitioning

Round robin partitioning makes sense to apply when table has no logical partition attributes, e.g., tables for logging, facts, etc...
To enable round-robin partitioning, set GROUP PARTITION on AUTOBIGINT data type field.

For example, create round-robin table:

create table test_round_robin
(
id autobigint not null primary key partition,
myfield int,
info text
) 

Next, insert 1000 uniformly distributed rows:

declare @i int = 0
begin tran
while @i < 1000
begin
insert into test_round_robin(myfield,info) values(@i,concat('text-',@i))
set @i = @i+1
end
commit