Apache Avro数据的示例分析

71次阅读
没有评论

共计 5016 个字符,预计需要花费 13 分钟才能阅读完成。

这篇文章主要介绍 Apache Avro 数据的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

随着互联网高速的发展,云计算、大数据、人工智能 AI、物联网等前沿技术已然成为当今时代主流的高新技术,诸如电商网站、人脸识别、无人驾驶、智能家居、智慧城市等等,不仅方面方便了人们的衣食住行,背后更是时时刻刻有大量的数据在经过各种各样的系统平台的采集、清晰、分析,而保证数据的低时延、高吞吐、安全性就显得尤为重要,Apache Avro 本身通过 Schema 的方式序列化后进行二进制传输,一方面保证了数据的高速传输,另一方面保证了数据安全性,avro 当前在各个行业的应用越来越广泛,如何对 avro 数据进行处理解析应用就格外重要,本文将演示如果序列化生成 avro 数据,并使用 FlinkSQL 进行解析。

本文是 avro 解析的 demo,当前 FlinkSQL 仅适用于简单的 avro 数据解析, 复杂嵌套 avro 数据暂时不支持。

场景介绍

本文主要介绍以下三个重点内容:

如何序列化生成 Avro 数据

如何反序列化解析 Avro 数据

如何使用 FlinkSQL 解析 Avro 数据

前提条件

了解 avro 是什么,可参考 apache avro 官网快速入门指南

了解 avro 应用场景

操作步骤

1、新建 avro maven 工程项目,配置 pom 依赖

pom 文件内容如下:

?xml version= 1.0  encoding= UTF-8 ? 
 project xmlns= http://maven.apache.org/POM/4.0.0 
 xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance 
 xsi:schemaLocation= http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd 
  modelVersion 4.0.0 /modelVersion 
  groupId com.huawei.bigdata /groupId 
  artifactId avrodemo /artifactId 
  version 1.0-SNAPSHOT /version 
  dependencies 
  dependency 
  groupId org.apache.avro /groupId 
  artifactId avro /artifactId 
  version 1.8.1 /version 
  /dependency 
  dependency 
  groupId junit /groupId 
  artifactId junit /artifactId 
  version 4.12 /version 
  /dependency 
  /dependencies 
  build 
  plugins 
  plugin 
  groupId org.apache.avro /groupId 
  artifactId avro-maven-plugin /artifactId 
  version 1.8.1 /version 
  executions 
  execution 
  phase generate-sources /phase 
  goals 
  goal schema /goal 
  /goals 
  configuration 
  sourceDirectory ${project.basedir}/src/main/avro/ /sourceDirectory 
  outputDirectory ${project.basedir}/src/main/java/ /outputDirectory 
  /configuration 
  /execution 
  /executions 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-compiler-plugin /artifactId 
  configuration 
  source 1.6 /source 
  target 1.6 /target 
  /configuration 
  /plugin 
  /plugins 
  /build 
 /project

注意:以上 pom 文件配置了自动生成类的路径,即 project.basedir/src/main/avro/ 和 {project.basedir}/src/main/avro/ 和 project.basedir/src/main/avro/ 和 {project.basedir}/src/main/java/,这样配置之后,在执行 mvn 命令的时候,这个插件就会自动将此目录下的 avsc schema 生成类文件,并放到后者这个目录下。如果没有生成 avro 目录,手动创建一下即可。

2、定义 schema

使用 JSON 为 Avro 定义 schema。schema 由基本类型(null,boolean, int, long, float, double, bytes 和 string)和复杂类型(record, enum, array, map, union, 和 fixed)组成。例如,以下定义一个 user 的 schema,在 main 目录下创建一个 avro 目录,然后在 avro 目录下新建文件 user.avsc :

{ namespace :  lancoo.ecbdc.pre ,
  type :  record ,
  name :  User ,
  fields : [ { name :  name ,  type :  string},
 {name :  favorite_number ,  type : [ int ,  null]},
 {name :  favorite_color ,  type : [ string ,  null]}
 ]
}

3、编译 schema

点击 maven projects 项目的 compile 进行编译,会自动在创建 namespace 路径和 User 类代码

4、序列化

创建 TestUser 类,用于序列化生成数据

User user1 = new User();
user1.setName( Alyssa 
user1.setFavoriteNumber(256);
// Leave favorite col or null
// Alternate constructor
User user2 = new User( Ben , 7,  red 
// Construct via builder
User user3 = User.newBuilder()
 .setName(Charlie)
 .setFavoriteColor(blue)
 .setFavoriteNumber(null)
 .build();
// Serialize user1, user2 and user3 to disk
DatumWriter User  userDatumWriter = new SpecificDatumWriter User (User.class);
DataFileWriter User  dataFileWriter = new DataFileWriter User (userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File(user_generic.avro));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();

执行序列化程序后,会在项目的同级目录下生成 avro 数据

user_generic.avro 内容如下:

Objavro.schema�{type : record , name : User , namespace : lancoo.ecbdc.pre , fields :[{ name : name , type : string},{name : favorite_number , type :[ int , null]},{name : favorite_color , type :[ string , null]}]}

至此 avro 数据已经生成。

5、反序列化

通过反序列化代码解析 avro 数据

// Deserialize Users from disk
DatumReader User  userDatumReader = new SpecificDatumReader User (User.class);
DataFileReader User  dataFileReader = new DataFileReader User (new File( user_generic.avro), userDatumReader);
User user = null;
while (dataFileReader.hasNext()) { // Reuse user object by passing it to next(). This saves us from
 // allocating and garbage collecting many objects for files with
 // many items.
 user = dataFileReader.next(user);
 System.out.println(user);
}

执行反序列化代码解析 user_generic.avro

avro 数据解析成功。

6、将 user_generic.avro 上传至 hdfs 路径

hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/

7、配置 flinkserver

准备 avro jar 包

将 flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar 放入 flinkserver lib,将下面的命令在所有 flinkserver 节点执行

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar

同时重启 FlinkServer 实例,重启完成后查看 avro 包是否被上传

hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8、编写 FlinkSQL

CREATE TABLE testHdfs(
 name String,
 favorite_number int,
 favorite_color String
) WITH(
  connector  =  filesystem ,
  path  =  hdfs:///tmp/lztest/user_generic.avro ,
  format  =  avro 
);CREATE TABLE KafkaTable (
 name String,
 favorite_number int,
 favorite_color String
) WITH (
  connector  =  kafka ,
  topic  =  testavro ,
  properties.bootstrap.servers  =  96.10.2.1:21005 ,
  properties.group.id  =  testGroup ,
  scan.startup.mode  =  latest-offset ,
  format  =  avro 
insert into
 KafkaTable
select
 *
 testHdfs;

保存提交任务

9、查看对应 topic 中是否有数据

FlinkSQL 解析 avro 数据成功。

以上是“Apache Avro 数据的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注丸趣 TV 行业资讯频道!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-03发表,共计5016字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)