共计 3157 个字符,预计需要花费 8 分钟才能阅读完成。
行业资讯
服务器
云计算
Databricks 如何使用 Spark Streaming 和 Delta Lake 对流式数据进行数据质量监控
本篇文章给大家分享的是有关 Databricks 如何使用 Spark Streaming 和 Delta Lake 对流式数据进行数据质量监控,丸趣 TV 小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着丸趣 TV 小编一起来看看吧。
丸趣 TV 小编主要对 Databricks 如何使用 Spark Streaming 和 Delta Lake 对流式数据进行数据质量监控的方法和架构进行了介绍,下面探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
构建流式数据分析和监控流程
在 Databricks,我们看到客户中不断涌现出许多数据处理模式,这些新模式的产生推动了可能的极限,在速度和质量问题上也不例外。为了帮助解决这一矛盾,我们开始考虑使用正确的工具,不仅可以支持所需的数据速度,还可以提供可接受的数据质量水平。Structured Streaming 和 Delta Lake 非常适合用于数据获取和存储层,因为他们能够配合创造一个具有扩展性、容错性和类实时的系统,并且具有 exactly-once 处理保证。
为企业数据质量分析找到可接受的工具要困难一些,特别是这个工具需要具有对数据质量指标的状态汇总的能力。另外,还需要能够对整个数据集进行检查(例如检测出多少比例的记录为空值),这些都会随着所提取的数据量的增加而增加计算成本。这对所有流式系统而言都是需要的,这一要求就排除了很多可用的工具。
在我们最初的解决方案中,我们选择了 Amazon 的数据质量检测工具 Deequ,因为它能提供简单而强大的 API,有对数据质量指标进行状态聚合的能力,以及对 Scala 的支持。将来,其他 Spark 原生的工具将提供额外的选择。
流式数据质量监控的实现
我们通过在 EC2 实例上运行一个小型的 Kafka producer 来模拟数据流,该实例将模拟的股票交易信息写入 Kafka topic,并使用原生的 Databricks 连接器将这些数据导入到 Delta Lake 表当中。为了展示 Spark Streaming 中数据质量检查的功能,我们选择在整个流程中实现 Deequ 的不同功能:
根据历史数据生成约束条件;
使用 foreachBatch 算子对到达的数据进行增量质量分析;
使用 foreachBatch 算子对到达的数据执行(较小的)单元测试,并将质量不佳的 batch 隔离到质量不佳记录表中;
对于每个到达的 batch,将最新的状态指标写入到 Delta 表当中;
对整个数据集定期执行(较大的)单元测试,并在 MLFlow 中跟踪结果;
根据验证结果发送通知(如通过电子邮件或 Slack);
捕获 MLFlow 中的指标以进行可视化和记录。
我们结合了 MLFlow 来跟踪一段时间内数据性能指标的质量、Delta 表的版本迭代以及结合了一个用于通知和告警的 Slack 连接器。整个流程可以用如下的图片进行表示:
由于 Spark 中具有统一的批处理 / 流式处理接口,因此我们能够在这个流程的任何位置提取报告、告警和指标,作为实时更新或批处理快照。这对于设置触发器或限制特别有用,因此,如果某个指标超过了阈值,则可以执行数据质量改善措施。还要注意的是,我们并没有对初始到达的原始数据造成影响,这些数据将立即提交到我们的 Delta 表,这意味着我们不会限制数据输入的速率。下游系统可以直接从该表中读取数据,如果超过了上述任何触发条件或质量阈值,则可能会中断。此外,我们可以轻松地创建一个排除质量不佳记录的 view 以提供一个干净的表。
在一个较高的层次,执行我们的数据质量跟踪和验证的代码如下所示:
spark.readStream.table(trades_delta).writeStream.foreachBatch {(batchDF: DataFrame, batchId: Long) =
// reassign our current state to the previous next state val stateStoreCurr = stateStoreNext
// run analysis on the current batch, aggregate with saved state val metricsResult = AnalysisRunner.run(data=batchDF, ...) // verify the validity of our current microbatch val verificationResult = VerificationSuite() .onData(batchDF) .addCheck(...).run()
// if verification fails, write batch to bad records table if (verificationResult.status != CheckStatus.Success) {...}
// write the current results into the metrics table Metric_results.write .format(delta) .mode(overwrite) .saveAsTable(deequ_metrics)}.start()
使用数据质量工具 Deequ
在 Databricks 中使用 Deequ 是相对比较容易的事情,你需要首先定义一个 analyzer,然后在 dataframe 上运行该 analyzer。例如,我们可以跟踪 Deequ 本地提供的几个相关指标检查,包括检查数量和价格是否为非负数、原始 IP 地址是否不为空以及符号字段在所有事务中的唯一性。Deequ 的 StateProvider 对象在流式数据配置中特别有用,它能允许用户将我们指标的状态保存在内存或磁盘中,并在以后汇总这些指标。这意味着每个处理的批次仅分析该批次中的数据记录,而不会分析整个表。即使随着数据大小的增长,这也可以使性能保持相对稳定,这在长时间运行的生产环境中很重要,因为生产环境需要在任意数量的数据上保持一致。
MLFlow 还可以很好地跟踪指标随时间的演变,在我们的 notebook 中,我们跟踪在 foreachBatch 代码中分析的所有 Deequ 约束作为指标,并使用 Delta 的 versionID 和时间戳作为参数。在 Databricks 的 notebook 中,集成的 MLFlow 服务对于指标跟踪特别方便。
通过使用 Structured Streaming、Delta Lake 和 Deequ,我们能够消除传统情况下数据质量和速度之间的权衡,而专注于实现两者的可接受水平。这里特别重要的是灵活性——不仅在如何处理不良记录(隔离、报错、告警等),而且在体系结构上(例如何时以及在何处执行检查?)和生态上(如何使用我们的数据?)。开源技术(如 Delta Lake、Structured Streaming 和 Deequ)是这种灵活性的关键。随着技术的发展,能够使用最新最、最强大的解决方案是提升其竞争优势的驱动力。最重要的是,你的数据的速度和质量一定不能对立,而要保持一致,尤其是在流式数据处理越来越靠近核心业务运营时。很快,这将不会是一种选择,而是一种期望和要求,我们正朝着这个未来方向一次一小步地不断前进。
以上就是 Databricks 如何使用 Spark Streaming 和 Delta Lake 对流式数据进行数据质量监控,丸趣 TV 小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注丸趣 TV 行业资讯频道。