共计 4956 个字符,预计需要花费 13 分钟才能阅读完成。
今天就跟大家聊聊有关如何进行 flink 中的 kafka 源码分析,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
最近一直在弄 flink sql 相关的东西,第一阶段的目标是从解决 kafka 的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。
flink 源码如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory Row { private ConcurrentHashMap String, KafkaTableSource kafkaTableSources = new ConcurrentHashMap ();
@Override
public Map String, String requiredContext() { Map String, String context = new HashMap ();
context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE);
context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION));
return context;
}
@Override
public List String supportedProperties() { List String properties = new ArrayList ();
properties.add(KafkaConnectorDescriptor.DATABASE_KEY);
properties.add(KafkaConnectorDescriptor.TABLE_KEY);
return properties;
}
@Override
public StreamTableSource Row createStreamTableSource(Map String, String properties) {
// 避免频繁的触发 是否需要加缓存
KafkaTableSource kafkaTableSource;
String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY);
String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY);
if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder();
kafkaTableSource = builder
.cluster(dataBase)
.subject(table)
.build();
kafkaTableSources.put(dataBase + table,kafkaTableSource);
} else { kafkaTableSource = kafkaTableSources.get(dataBase + table);
}
return kafkaTableSource;
}
}
class Kafka08PBTableSource protected(topic: String,
properties: Properties,
schema: TableSchema,
typeInformation: TypeInformation[Row],
paramMap: util.LinkedHashMap[String, AnyRef],
entryClass: String)
extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST)
new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()
}
}
下面用户自定义的 kafka 的 sink 类:
class Kafka08UDMPBTableSink (topic: String,
properties: Properties,
partitioner: Optional[FlinkKafkaPartitioner[Row]],
paramMap: util.LinkedHashMap[String, AnyRef],
serializationSchema: SerializationSchema[Row],
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]]
) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))
}
override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema
override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes)
}
override def getFieldNames: Array[String]=this.fieldNames
/** Returns the types of the table fields. */
override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes
override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))
}
}
public class TrackRowDeserializationSchema implements SerializationSchema Row , DeserializationSchema Row {
private static final long serialVersionUID = -2885556750743978636L;
/** Type information describing the input type. */
private TypeInformation Row typeInfo = null;
private LinkedHashMap paraMap;
private String inSchema;
private String outSchema;
private String inClass;
private String outClass;
}
public class TrackRowFormatFactory extends TableFormatFactoryBase Row
implements SerializationSchemaFactory Row , DeserializationSchemaFactory Row { public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false);
}
public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation);
}
@Override
protected List String supportedFormatProperties() { final List String properties = new ArrayList ();
properties.add(TrackValidator.FORMAT_IN_SCHEMA);
properties.add(TrackValidator.FORMAT_IN_CLASS);
properties.add(TrackValidator.FORMAT_OUT_CLASS);
properties.add(TrackValidator.FORMAT_OUT_SCHEMA);
properties.add(TrackValidator.FORMAT_TYPE_INFORMATION);
properties.add(TrackValidator.FORMAT_TYPE_VALUE);
return properties;
}
}
看完上述内容,你们对如何进行 flink 中的 kafka 源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。
正文完