CarbonData 流式数据摄取

简单示例

下载并解压 spark-2.2.0-bin-hadoop2.7.tgz, 并且设置好 $SPARK_HOME 。

打包好 carbon jar, 并将 assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 文件复制到 $SPARK_HOME/jars 目录下。

mvn clean package -DskipTests -Pspark-2.2

在终端启动套接字数据服务器

 nc -lk 9099

在里面输入以下格式的 CSV 行数据

1,col1
2,col2
3,col3
4,col4
5,col5

在新的终端启动 spark-shell ,在里面输入 :paste, 将下面的代码复制到里面并运行:

 import java.io.File
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.carbondata.core.util.path.CarbonTablePath

 val warehouse = new File("./warehouse").getCanonicalPath
 val metastore = new File("./metastore").getCanonicalPath

 val spark = SparkSession
   .builder()
   .master("local")
   .appName("StreamExample")
   .config("spark.sql.warehouse.dir", warehouse)
   .getOrCreateCarbonSession(warehouse, metastore)

 spark.sparkContext.setLogLevel("ERROR")

 // drop table if exists previously
 spark.sql(s"DROP TABLE IF EXISTS carbon_table")
 // Create target carbon table and populate with initial data
 spark.sql(
   s"""
      | CREATE TABLE carbon_table (
      | col1 INT,
      | col2 STRING
      | )
      | STORED BY 'carbondata'
      | TBLPROPERTIES('streaming'='true')""".stripMargin)

 val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
 val tablePath = carbonTable.getTablePath

 // batch load
 var qry: StreamingQuery = null
 val readSocketDF = spark.readStream
   .format("socket")
   .option("host", "localhost")
   .option("port", 9099)
   .load()

 // Write data from socket stream to carbondata file
 qry = readSocketDF.writeStream
   .format("carbondata")
   .trigger(ProcessingTime("5 seconds"))
   .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
   .option("dbName", "default")
   .option("tableName", "carbon_table")
   .start()

 // start new thread to show data
 new Thread() {
   override def run(): Unit = {
     do {
       spark.sql("select * from carbon_table").show(false)
       Thread.sleep(10000)
     } while (true)
   }
 }.start()

 qry.awaitTermination()

继续在数据服务器中输入一些行数据,spark-shell 将会显示表的新数据。

创建具有 streaming 属性的表

流式表(Streaming table)只是一个普通的具有流式属性的 carbon 表,用户可以使用下面的 DDL 创建流式表。

 CREATE TABLE streaming_table (
  col1 INT,
  col2 STRING
 )
 STORED BY 'carbondata'
 TBLPROPERTIES('streaming'='true')
属性名称 默认值 描述
streaming false 是否为此表启用流式摄取功能
值范围: true, false

"DESC FORMATTED" 命令将会显示流式属性。

DESC FORMATTED streaming_table

修改流式属性

对于旧表,使用 ALTER TABLE 命令来设置流式属性。

ALTER TABLE streaming_table SET TBLPROPERTIES('streaming'='true')

获取流式锁

在流式数据摄取开始时,系统将尝试获取名为 streaming.lock 的表级锁文件。如果系统无法获取这张表的锁,将会抛出 InterruptedException。

创建流式段(streaming segment)

流式输入的数据将会被摄入到 CarbonData 表的 segment 中,这个 segment 的状态是 streaming。在 CarbonData 中称其为流式段(streaming segment)。tablestatus 文件将会记录段的状态和数据大小。用户可以使用 SHOW SEGMENTS FOR TABLE tableName 来检查段状态。

当流式段达到了最大大小,CarbonData 将段的状态从 streaming 修改成 streaming finish,并且创建一个新的流式段以便继续摄取流式数据。

属性 默认值 描述
carbon.streaming.segment.max.size 1024000000 单位: byte
流式段的最大大小。
段状态 描述
streaming 当前段正在摄取流式数据。
streaming finish 当前段已经完成流式数据的摄取,
它将以列式格式传递给另一个段(it will be handed off to a segment in the columnar format)

修改段状态

使用下面命令将段的状态从 "streaming" 转变成 "streaming finish"。如果流式应用程序正在运行,则此命令将被阻塞。

ALTER TABLE streaming_table FINISH STREAMING

将 "streaming finish" 段传递给列式段

使用下面命令手动将 "streaming finish" 段传递给列式段

ALTER TABLE streaming_table COMPACT 'streaming'

自动传递流式段

配置 carbon.streaming.auto.handoff.enabled 属性来自动传递流式段。如果这个属性的值为 true,当流式段达到最大大小,CarbonData 会将这个段的状态置为 streaming finish,并且在一个新线程自动触发将此段递交到一个列式格式的段。

属性名称 默认值 描述
carbon.streaming.auto.handoff.enabled true 是否自动触发递交操作

流数据解析器

通过 carbon.stream.parser 参数配置流数据解析器,以便在写流数据是将 InternalRow 转换成 Object[]。

属性名称 默认值 描述
carbon.stream.parser org.apache.carbondata.streaming.parser.CSVStreamParserImp 流数据解析器的类名

当前, CarbonData 支持以下两种解析器:

1. org.apache.carbondata.streaming.parser.CSVStreamParserImp: 这是默认的流数据解析器,它从 InternalRow 的第一个索引获取行数据(字符串类型)并将此字符串转换为 Object []。

2. org.apache.carbondata.streaming.parser.RowStreamParserImp: 这个流解析器自动根据 DataSet 的模式将 InternalRow 转换成 Object[], 比如:

 case class FileElement(school: Array[String], age: Int)
 case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
 ...

 var qry: StreamingQuery = null
 val readSocketDF = spark.readStream
   .format("socket")
   .option("host", "localhost")
   .option("port", 9099)
   .load()
   .as[String]
   .map(_.split(","))
   .map { fields => {
     val tmp = fields(4).split("\\$")
     val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
     StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
   } }

 // Write data from socket stream to carbondata file
 qry = readSocketDF.writeStream
   .format("carbondata")
   .trigger(ProcessingTime("5 seconds"))
   .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
   .option("dbName", "default")
   .option("tableName", "carbon_table")
   .option(CarbonStreamParser.CARBON_STREAM_PARSER,
     CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
   .start()

 ...

如何实现自定义流解析器

如果用户需要自定义流解析器来将特定的 InternalRow 转换成 Object[],他需要实现 CarbonStreamParser 接口的 initializeparserRow 方法,比如:

 package org.XXX.XXX.streaming.parser

 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType

 class XXXStreamParserImp extends CarbonStreamParser {

   override def initialize(configuration: Configuration, structType: StructType): Unit = {
     // user can get the properties from "configuration"
   }

   override def parserRow(value: InternalRow): Array[Object] = {
     // convert InternalRow to Object[](Array[Object] in Scala) 
   }

   override def close(): Unit = {
   }
 }
   

然后将 carbon.stream.parser 属性设置成 org.XXX.XXX.streaming.parser.XXXStreamParserImp。

关闭流式表

使用下面命令将所有流式段递交给列式格式的段,并且将流属性设置为 false,这张表将会变成一张普通的表。

ALTER TABLE streaming_table COMPACT 'close_streaming'

限制

  1. 请不要将流属性从 true 设置成 false。
  2. 请不要在流式表上执行 UPDATE/DELETE 命令。
  3. 请不要在流式表上建立预聚合的 DataMap 。
  4. 请不要在有预聚合 DataMap 的表上添加流属性。
  5. 如果表包含字典列,则不支持并发数据加载。
  6. 在流数据摄取过程中,删除流式段会被阻止。
  7. 在流数据摄取过程中,删除流式表会被阻止。