博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink Table Schema的定义
阅读量:6381 次
发布时间:2019-06-23

本文共 10942 字,大约阅读时间需要 36 分钟。

本文主要研究一下flink Table Schema的定义

实例

定义字段及类型

.withSchema(  new Schema()    .field("MyField1", Types.INT)     // required: specify the fields of the table (in this order)    .field("MyField2", Types.STRING)    .field("MyField3", Types.BOOLEAN))
  • 通过field定义字段名及字段类型

定义字段属性

.withSchema(  new Schema()    .field("MyField1", Types.SQL_TIMESTAMP)      .proctime()      // optional: declares this field as a processing-time attribute    .field("MyField2", Types.SQL_TIMESTAMP)      .rowtime(...)    // optional: declares this field as a event-time attribute    .field("MyField3", Types.BOOLEAN)      .from("mf3")     // optional: original field in the input that is referenced/aliased by this field)
  • 通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名

定义Rowtime属性

// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute..rowtime(  new Rowtime()    .timestampsFromField("ts_field")    // required: original field name in the input)// Converts the assigned timestamps from a DataStream API record into the rowtime attribute// and thus preserves the assigned timestamps from the source.// This requires a source that assigns timestamps (e.g., Kafka 0.10+)..rowtime(  new Rowtime()    .timestampsFromSource())// Sets a custom timestamp extractor to be used for the rowtime attribute.// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`..rowtime(  new Rowtime()    .timestampsFromExtractor(...))
  • 通过timestampsFromField、timestampsFromSource、timestampsFromExtractor定义rowtime

定义watermark strategies

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp// are not late..rowtime(  new Rowtime()    .watermarksPeriodicAscending())// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.// Emits watermarks which are the maximum observed timestamp minus the specified delay..rowtime(  new Rowtime()    .watermarksPeriodicBounded(2000)    // delay in milliseconds)// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the// underlying DataStream API and thus preserves the assigned watermarks from the source..rowtime(  new Rowtime()    .watermarksFromSource())
  • 通过watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource定义watermark strategies

StreamTableEnvironment.connect

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scala

abstract class StreamTableEnvironment(    private[flink] val execEnv: StreamExecutionEnvironment,    config: TableConfig)  extends TableEnvironment(config) {  //......  def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = {    new StreamTableDescriptor(this, connectorDescriptor)  }  //......}
  • StreamTableEnvironment的connect方法创建StreamTableDescriptor

StreamTableDescriptor

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/StreamTableDescriptor.scala

class StreamTableDescriptor(    tableEnv: StreamTableEnvironment,    connectorDescriptor: ConnectorDescriptor)  extends ConnectTableDescriptor[StreamTableDescriptor](    tableEnv,    connectorDescriptor)  with StreamableDescriptor[StreamTableDescriptor] {  //......}
  • StreamTableDescriptor继承了ConnectTableDescriptor

ConnectTableDescriptor

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](    private val tableEnv: TableEnvironment,    private val connectorDescriptor: ConnectorDescriptor)  extends TableDescriptor  with SchematicDescriptor[D]  with RegistrableDescriptor { this: D =>  private var formatDescriptor: Option[FormatDescriptor] = None  private var schemaDescriptor: Option[Schema] = None  /**    * Searches for the specified table source, configures it accordingly, and registers it as    * a table under the given name.    *    * @param name table name to be registered in the table environment    */  override def registerTableSource(name: String): Unit = {    val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)    tableEnv.registerTableSource(name, tableSource)  }  /**    * Searches for the specified table sink, configures it accordingly, and registers it as    * a table under the given name.    *    * @param name table name to be registered in the table environment    */  override def registerTableSink(name: String): Unit = {    val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this)    tableEnv.registerTableSink(name, tableSink)  }  /**    * Searches for the specified table source and sink, configures them accordingly, and registers    * them as a table under the given name.    *    * @param name table name to be registered in the table environment    */  override def registerTableSourceAndSink(name: String): Unit = {    registerTableSource(name)    registerTableSink(name)  }  /**    * Specifies the format that defines how to read data from a connector.    */  override def withFormat(format: FormatDescriptor): D = {    formatDescriptor = Some(format)    this  }  /**    * Specifies the resulting table schema.    */  override def withSchema(schema: Schema): D = {    schemaDescriptor = Some(schema)    this  }  // ----------------------------------------------------------------------------------------------  /**    * Converts this descriptor into a set of properties.    */  override def toProperties: util.Map[String, String] = {    val properties = new DescriptorProperties()    // this performs only basic validation    // more validation can only happen within a factory    if (connectorDescriptor.isFormatNeeded && formatDescriptor.isEmpty) {      throw new ValidationException(        s"The connector '$connectorDescriptor' requires a format description.")    } else if (!connectorDescriptor.isFormatNeeded && formatDescriptor.isDefined) {      throw new ValidationException(        s"The connector '$connectorDescriptor' does not require a format description " +          s"but '${formatDescriptor.get}' found.")    }    properties.putProperties(connectorDescriptor.toProperties)    formatDescriptor.foreach(d => properties.putProperties(d.toProperties))    schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))    properties.asMap()  }}
  • ConnectTableDescriptor提供了withSchema方法,返回Schema

Schema

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Schema.scala

class Schema extends Descriptor {  // maps a field name to a list of properties that describe type, origin, and the time attribute  private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()  private var lastField: Option[String] = None  def schema(schema: TableSchema): Schema = {    tableSchema.clear()    lastField = None    schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>      field(n, t)    }    this  }  def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))    this  }  def field(fieldName: String, fieldType: String): Schema = {    if (tableSchema.contains(fieldName)) {      throw new ValidationException(s"Duplicate field name $fieldName.")    }    val fieldProperties = mutable.LinkedHashMap[String, String]()    fieldProperties += (SCHEMA_TYPE -> fieldType)    tableSchema += (fieldName -> fieldProperties)    lastField = Some(fieldName)    this  }  def from(originFieldName: String): Schema = {    lastField match {      case None => throw new ValidationException("No field previously defined. Use field() before.")      case Some(f) =>        tableSchema(f) += (SCHEMA_FROM -> originFieldName)        lastField = None    }    this  }  def proctime(): Schema = {    lastField match {      case None => throw new ValidationException("No field defined previously. Use field() before.")      case Some(f) =>        tableSchema(f) += (SCHEMA_PROCTIME -> "true")        lastField = None    }    this  }  def rowtime(rowtime: Rowtime): Schema = {    lastField match {      case None => throw new ValidationException("No field defined previously. Use field() before.")      case Some(f) =>        tableSchema(f) ++= rowtime.toProperties.asScala        lastField = None    }    this  }  final override def toProperties: util.Map[String, String] = {    val properties = new DescriptorProperties()    properties.putIndexedVariableProperties(      SCHEMA,      tableSchema.toSeq.map { case (name, props) =>        (Map(SCHEMA_NAME -> name) ++ props).asJava      }.asJava    )    properties.asMap()  }}
  • Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性

Rowtime

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Rowtime.scala

class Rowtime extends Descriptor {  private var timestampExtractor: Option[TimestampExtractor] = None  private var watermarkStrategy: Option[WatermarkStrategy] = None  def timestampsFromField(fieldName: String): Rowtime = {    timestampExtractor = Some(new ExistingField(fieldName))    this  }  def timestampsFromSource(): Rowtime = {    timestampExtractor = Some(new StreamRecordTimestamp)    this  }  def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {    timestampExtractor = Some(extractor)    this  }  def watermarksPeriodicAscending(): Rowtime = {    watermarkStrategy = Some(new AscendingTimestamps)    this  }  def watermarksPeriodicBounded(delay: Long): Rowtime = {    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))    this  }  def watermarksFromSource(): Rowtime = {    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)    this  }  def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {    watermarkStrategy = Some(strategy)    this  }  final override def toProperties: java.util.Map[String, String] = {    val properties = new DescriptorProperties()    timestampExtractor.foreach(normalizeTimestampExtractor(_)      .foreach(e => properties.putString(e._1, e._2)))    watermarkStrategy.foreach(normalizeWatermarkStrategy(_)      .foreach(e => properties.putString(e._1, e._2)))    properties.asMap()  }}
  • Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于定义timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于定义watermark strategies

小结

  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withSchema方法,返回Schema
  • Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性;通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名
  • Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于定义timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于定义watermark strategies

doc

转载地址:http://pgqqa.baihongyu.com/

你可能感兴趣的文章
NPOI Excel下拉项生成设置
查看>>
360该不该拍?
查看>>
用Xib创建控制器
查看>>
oracle的sqlplus和dos的中文乱码问题
查看>>
LVS+keepalived高可用负载均衡集群部署(二)---LAMP网站服务器与LVS服务器
查看>>
Struts2之简单数据类型转换
查看>>
python 打印数字
查看>>
iptables规则的查看、添加、删除和修改
查看>>
打开网站显示输入用户名和密码
查看>>
size_t的32位和64位兼容
查看>>
HBase全分布式模式的安装和配置
查看>>
Spring 框架的设计理念与设计模式分析
查看>>
十年web老兵整理的前端视频资料
查看>>
CentOS 6.3 上安装 Oracle 11g R2(转)
查看>>
高可用haproxy调度后端服务器实现动静分离集群架构
查看>>
Java 进行 RSA 加解密
查看>>
Hbase原理、基本概念、基本架构
查看>>
实战:RHEL6配置dhcp服务器并绑定主机IP
查看>>
百度不收录原因分析——Spider抓取篇
查看>>
Ubuntu Server 上安装 Jexus
查看>>