CarbonData 预聚合 DataMap

简单示例

下载并解压 spark-2.2.0-bin-hadoop2.7.tgz, 并且设置好 $SPARK_HOME 。

打包好 carbon jar, 并将 assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 文件复制到 $SPARK_HOME/jars 目录下。

mvn clean package -DskipTests -Pspark-2.2

在新的终端启动 spark-shell ,在里面输入 :paste, 将下面的代码复制到里面并运行:

 import java.io.File
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.carbondata.core.util.path.CarbonStorePath

 val warehouse = new File("./warehouse").getCanonicalPath
 val metastore = new File("./metastore").getCanonicalPath

 val spark = SparkSession
   .builder()
   .master("local")
   .appName("preAggregateExample")
   .config("spark.sql.warehouse.dir", warehouse)
   .getOrCreateCarbonSession(warehouse, metastore)

 spark.sparkContext.setLogLevel("ERROR")

 // drop table if exists previously
 spark.sql(s"DROP TABLE IF EXISTS sales")

 // Create main table
 spark.sql(
   s"""
      | CREATE TABLE sales (
      | user_id string,
      | country string,
      | quantity int,
      | price bigint)
      | STORED BY 'carbondata'
    """.stripMargin)

 // Create pre-aggregate table on the main table
 // If main table already have data, following command 
 // will trigger one immediate load to the pre-aggregate table
 spark.sql(
   s"""
      | CREATE DATAMAP agg_sales
      | ON TABLE sales
      | USING "preaggregate"
      | AS
      | SELECT country, sum(quantity), avg(price)
      | FROM sales
      | GROUP BY country
    """.stripMargin)

  import spark.implicits._
  import org.apache.spark.sql.SaveMode
  import scala.util.Random

  // Load data to the main table, it will also
  // trigger immediate load to pre-aggregate table.
  // These two loading operation is carried out in a
  // transactional manner, meaning that the whole 
  // operation will fail if one of the loading fails
  val r = new Random()
  spark.sparkContext.parallelize(1 to 10)
   .map(x => ("ID." + r.nextInt(100000), "country" + x % 8, x % 50, x % 60))
   .toDF("user_id", "country", "quantity", "price")
   .write
   .format("carbondata")
   .option("tableName", "sales")
   .option("compress", "true")
   .mode(SaveMode.Append)
   .save()

  spark.sql(
    s"""
       |SELECT country, sum(quantity), avg(price)
       | from sales GROUP BY country
     """.stripMargin).show

  spark.stop

DataMap 管理

可以通过下面 DDL 创建 DataMap

CREATE DATAMAP [IF NOT EXISTS] datamap_name
ON TABLE main_table
USING "datamap_provider"
DMPROPERTIES ('key'='value', ...)
AS
  SELECT statement

USING 之后的字符串称为 DataMap Provider,这个版本的 CarbonData 支持以下两种 DataMap:

  1. preaggregate, 针对预聚合表. 这个 DataMap 不需要 DMPROPERTY。
  2. timeseries, 针对时间序列表. 请访问 时间序列 DataMap

可以使用下面 DDL 删除 DataMap

DROP DATAMAP [IF EXISTS] datamap_name
ON TABLE main_table

使用下面命令显示所有已经创建的 DataMaps:

SHOW DATAMAP
ON TABLE main_table

上面命令将显示出 main_table 上创建的所有 DataMaps。

预聚合 DataMap 介绍

预聚合表以 DataMaps 形式创建,在内部由 CarbonData 作为表进行管理。 只要存储要求和加载速度可以接受,用户可以创建许多预聚合 datamaps 以提高查询性能。

预聚合 datamaps 一旦被创建,CarbonData 的 SparkSQL 优化器将选择最有效的预聚合 datamaps,并重写 SQL 直接从已选定的 datamap 查询而不查询主表。 因为预聚合 datamaps 的数据大小比较小,所有用户的查询非常快。根据我们之前的经验,生产线上的 SQL 速度提高了 5 倍到 100 倍

比如,主表称为 sales ,它的定义如下:

CREATE TABLE sales (
  order_time timestamp,
  user_id string,
  sex string,
  country string,
  quantity int,
  price bigint)
STORED BY 'carbondata'

用户可以使用 Create DataMap DDL 来创建预聚合表:

CREATE DATAMAP agg_sales
ON TABLE sales
USING "preaggregate"
AS
  SELECT country, sex, sum(quantity), avg(price)
  FROM sales
  GROUP BY country, sex

预聚合表支持的函数

Function 是否支持 Rollup
SUM Yes
AVG Yes
MAX Yes
MIN Yes
COUNT Yes

预聚合表是如何被选择的

当用户提交查询时,在查询计划阶段,CarbonData 将根据关系代数(Relational Algebra)转换规则收集所有匹配的预聚合表作为候选者。然后,根据成本计算从候选表中选择用于此查询的最佳预聚合表。 为了简单起见,当前成本估算是基于预聚合表数据大小(我们认为在小表上查询速度会更快)。

对于上面创建的主表 sales 以及预聚合表 agg_sales ,下面查询

SELECT country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex

SELECT sex, sum(quantity) from sales GROUP BY sex

SELECT avg(price), country from sales GROUP BY country

将会被 CarbonData 的查询计划转换成查询预聚合表 agg_sales 而不是主表 sales

然而下面的查询

SELECT user_id, country, sex, sum(quantity), avg(price) from sales GROUP BY user_id, country, sex

SELECT sex, avg(quantity) from sales GROUP BY sex

SELECT country, max(price) from sales GROUP BY country

将只能在主表 sales 上查询,因为它不满足预聚合表的查询逻辑。

数据加载

对已经加载数据的现有表,当用户创建预聚合表时,将数据加载到预聚合表由 CREATE DATAMAP 语句触发。 对于预聚合表创建之后增量数据的加载,在主表加载数据完成之后,主表会触发将数据加载到预聚合表中。

这些加载操作时事务性的,意味着只有所有表的数据加载成功,主表以及预聚合表的数据才对用户可见。如果其中一个加载出现失败,新的数据不会在所有的表中可见,就好像没有进行过加载操作一样。

数据查询

作为查询加速技术,我们不能直接查询预聚合表,查询将在主表上进行。 在执行查询计划时, CarbonData 内部将检查与主表关联的预聚合表,并相应地执行查询计划转换。

用户可以通过执行 EXPLAIN 命令来验证查询是否可以利用预聚合表,这将显示转换的逻辑计划,因此用户可以检查是否选择了预聚合表。

压缩预聚合表

在主表上运行压缩命令(ALTER TABLE COMPACT不会自动压缩在主表上创建的预聚合表。 用户需要在每个预聚合表上分别运行压缩命令来压缩它们。

预聚合表的压缩是可选的操作。如果在主表上执行压缩但未在预聚合表上执行压缩,所有的查询仍可受益于预聚合表。 为了进一步提高查询性能,对预聚合表的压缩会触发预聚合表中的段和文件的合并。

预聚合表的数据管理

在当前实现中,主表和预聚合表都需要维护数据的一致性。在主表创建预聚合表之后,不支持在主表上执行以下命令:

  1. 数据管理命令: UPDATE/DELETE/DELETE SEGMENT.
  2. 模式管理命令: ALTER TABLE DROP COLUMN, ALTER TABLE CHANGE DATATYPE, ALTER TABLE RENAME. 注意,添加新列是支持的;而删除列或者更新列的数据类型,CarbonData 将会检测其是否会影响预聚合表,如果不会,则操作允许进行;否则操作将会被拒绝,并且会抛出异常。
  3. 分区管理命令: ALTER TABLE ADD/DROP PARTITION

但是,仍然有办法在主表上执行这些操作,在当前 CarbonData 版本中,用户可以执行如下操作:

  1. 通过 DROP DATAMAP 命令删除预聚合表。
  2. 在主表上执行数据管理操作
  3. 通过 CREATE DATAMAP 命令创建预聚合表。基本上,用户可以手动触发来重新构建 datamap。