下载并解压 spark-2.2.0-bin-hadoop2.7.tgz, 并且设置好 $SPARK_HOME 。
打包好 carbon jar, 并将 assembly/target/scala-2.11/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar 文件复制到 $SPARK_HOME/jars 目录下。
mvn clean package -DskipTests -Pspark-2.2
在终端启动套接字数据服务器
nc -lk 9099
在里面输入以下格式的 CSV 行数据
1,col1
2,col2
3,col3
4,col4
5,col5
在新的终端启动 spark-shell ,在里面输入 :paste
, 将下面的代码复制到里面并运行:
import java.io.File
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.util.path.CarbonTablePath
val warehouse = new File("./warehouse").getCanonicalPath
val metastore = new File("./metastore").getCanonicalPath
val spark = SparkSession
.builder()
.master("local")
.appName("StreamExample")
.config("spark.sql.warehouse.dir", warehouse)
.getOrCreateCarbonSession(warehouse, metastore)
spark.sparkContext.setLogLevel("ERROR")
// drop table if exists previously
spark.sql(s"DROP TABLE IF EXISTS carbon_table")
// Create target carbon table and populate with initial data
spark.sql(
s"""
| CREATE TABLE carbon_table (
| col1 INT,
| col2 STRING
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('streaming'='true')""".stripMargin)
val carbonTable = CarbonEnv.getCarbonTable(Some("default"), "carbon_table")(spark)
val tablePath = carbonTable.getTablePath
// batch load
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9099)
.load()
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "carbon_table")
.start()
// start new thread to show data
new Thread() {
override def run(): Unit = {
do {
spark.sql("select * from carbon_table").show(false)
Thread.sleep(10000)
} while (true)
}
}.start()
qry.awaitTermination()
继续在数据服务器中输入一些行数据,spark-shell 将会显示表的新数据。
流式表(Streaming table)只是一个普通的具有流式属性的 carbon 表,用户可以使用下面的 DDL 创建流式表。
CREATE TABLE streaming_table (
col1 INT,
col2 STRING
)
STORED BY 'carbondata'
TBLPROPERTIES('streaming'='true')
属性名称 | 默认值 | 描述 |
---|---|---|
streaming | false | 是否为此表启用流式摄取功能 值范围: true, false |
"DESC FORMATTED" 命令将会显示流式属性。
DESC FORMATTED streaming_table
对于旧表,使用 ALTER TABLE 命令来设置流式属性。
ALTER TABLE streaming_table SET TBLPROPERTIES('streaming'='true')
在流式数据摄取开始时,系统将尝试获取名为 streaming.lock 的表级锁文件。如果系统无法获取这张表的锁,将会抛出 InterruptedException。
流式输入的数据将会被摄入到 CarbonData 表的 segment 中,这个 segment 的状态是 streaming。在 CarbonData 中称其为流式段(streaming segment)。tablestatus 文件将会记录段的状态和数据大小。用户可以使用 SHOW SEGMENTS FOR TABLE tableName 来检查段状态。
当流式段达到了最大大小,CarbonData 将段的状态从 streaming 修改成 streaming finish,并且创建一个新的流式段以便继续摄取流式数据。
属性 | 默认值 | 描述 |
---|---|---|
carbon.streaming.segment.max.size | 1024000000 | 单位: byte 流式段的最大大小。 |
段状态 | 描述 |
---|---|
streaming | 当前段正在摄取流式数据。 |
streaming finish | 当前段已经完成流式数据的摄取, 它将以列式格式传递给另一个段(it will be handed off to a segment in the columnar format) |
使用下面命令将段的状态从 "streaming" 转变成 "streaming finish"。如果流式应用程序正在运行,则此命令将被阻塞。
ALTER TABLE streaming_table FINISH STREAMING
使用下面命令手动将 "streaming finish" 段传递给列式段
ALTER TABLE streaming_table COMPACT 'streaming'
配置 carbon.streaming.auto.handoff.enabled 属性来自动传递流式段。如果这个属性的值为 true,当流式段达到最大大小,CarbonData 会将这个段的状态置为 streaming finish,并且在一个新线程自动触发将此段递交到一个列式格式的段。
属性名称 | 默认值 | 描述 |
---|---|---|
carbon.streaming.auto.handoff.enabled | true | 是否自动触发递交操作 |
通过 carbon.stream.parser 参数配置流数据解析器,以便在写流数据是将 InternalRow 转换成 Object[]。
属性名称 | 默认值 | 描述 |
---|---|---|
carbon.stream.parser | org.apache.carbondata.streaming.parser.CSVStreamParserImp | 流数据解析器的类名 |
当前, CarbonData 支持以下两种解析器:
1. org.apache.carbondata.streaming.parser.CSVStreamParserImp: 这是默认的流数据解析器,它从 InternalRow 的第一个索引获取行数据(字符串类型)并将此字符串转换为 Object []。
2. org.apache.carbondata.streaming.parser.RowStreamParserImp: 这个流解析器自动根据 DataSet
的模式将 InternalRow 转换成 Object[], 比如:
case class FileElement(school: Array[String], age: Int)
case class StreamData(id: Int, name: String, city: String, salary: Float, file: FileElement)
...
var qry: StreamingQuery = null
val readSocketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9099)
.load()
.as[String]
.map(_.split(","))
.map { fields => {
val tmp = fields(4).split("\\$")
val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
StreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, file)
} }
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("dbName", "default")
.option("tableName", "carbon_table")
.option(CarbonStreamParser.CARBON_STREAM_PARSER,
CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
...
如果用户需要自定义流解析器来将特定的 InternalRow 转换成 Object[],他需要实现 CarbonStreamParser
接口的 initialize
和 parserRow
方法,比如:
package org.XXX.XXX.streaming.parser
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
class XXXStreamParserImp extends CarbonStreamParser {
override def initialize(configuration: Configuration, structType: StructType): Unit = {
// user can get the properties from "configuration"
}
override def parserRow(value: InternalRow): Array[Object] = {
// convert InternalRow to Object[](Array[Object] in Scala)
}
override def close(): Unit = {
}
}
然后将 carbon.stream.parser 属性设置成 org.XXX.XXX.streaming.parser.XXXStreamParserImp。
使用下面命令将所有流式段递交给列式格式的段,并且将流属性设置为 false,这张表将会变成一张普通的表。
ALTER TABLE streaming_table COMPACT 'close_streaming'