怎么使用enrich processor

60次阅读
没有评论

共计 6183 个字符,预计需要花费 16 分钟才能阅读完成。

这篇文章主要介绍“怎么使用 enrich processor”,在日常操作中,相信很多人在怎么使用 enrich processor 问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么使用 enrich processor”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

enrich processor 简介

ingest pipeline 可以在传入的文档被索引之前,对文档进行预处理,通过 processor 中定义的一系列规则来修改文档的内容(例如大小写转换等)。

在 Elasticsearch 7.5 版本引入了 enrich processor,可以将现有索引(source index)中的数据添加到传入的文档(incoming document)中。

比如,你可以在如下的场景中用到:

根据已知的 IP 地址识别 Web 服务或供应商。

根据产品 ID 将产品信息添加到零售订单中。

根据电子邮件地址补充联系信息。

根据用户坐标添加邮政编码。

使用 enrich processor

使用 enrich processor 有如下几个步骤:

1. 添加 enrich data:添加 document(enrich data)到一个或者多个的 source index 中,这些 document 中应包含之后要添加到 incoming documents 中的数据。

2. 创建 enrich policy:enrich policy 中应至少包含如下参数:

指定 source index 的。

指定 incoming documents 和 source index 用于匹配的属性。

指定要添加到 incoming documents 中的属性。

3. 执行 enrich policy:执行完后会自动创建相应的 enrich index,enrich index 和普通索引不同,进行了优化。

4. 在 ingest pipeline 使用 enrich processor:enrich processor 使用 enrich index 来查询。

背景说明

source index 的内容如下:

locnumcompany 广东省 A1001 腾讯上海市 B1001Bilibili 浙江省 C1001 阿里巴巴

incoming document 传入的文档如下,通过 num 字段查到对应 source index 中的 loc 的值,添加到 incoming document 中新增 enrich_loc 属性中。

numcompanyA1001 腾讯 B1001BilibiliC1001 阿里巴巴第一步:添加 enrich data

通过 _bulk API 批量添加文档到 location 索引,这些文档和普通的文档一样。

POST _bulk
{index : { _index : location}}
{loc : 广东省 , company : 腾讯 , num : A1001}
{index : { _index : location}}
{loc : 上海市 , company : Bilibili , num : B1001}
{index : { _index : location}}
{loc : 浙江省 , company : 阿里巴巴 , num : C1001}

第二步:创建 enrich policy

enrich policy 一旦创建,就不能更新或者修改。

PUT /_enrich/policy/my-policy
  match : {
  indices :  location , #source index  索引名,就是前面创建的  enrich data  对应的索引
  match_field :  num , #source index  中的属性名,用于 incoming documents  和  source index  匹配的属性,属性名一样都是  num
  enrich_fields : [loc], # 添加到  incoming documents  中的属性
 #  可选,过滤  source index  的文档,只有  loc.keyword  是上海市的  enrich data  才能将属性添加到  incoming documents  中
  query : {
  match : {
  loc.keyword :  上海市 
 }
 }
 }
}

第三步:执行 enrich policy

当创建了 enrich policy 后,你可以通过 execute enrich policy API 去执行 enrich policy。当执行 enrich policy 后,会自动创建 enrich index。

直接将 incoming document 与 source index 中的文档匹配可能会很慢且占用大量资源。为了加快处理速度,enrich processor 使用了 enrich index。enrich index 包含来自 source index 的 enrich data,enrich index 具有一些特殊属性可帮助简化它们:

它们是系统索引,这意味着它们由 Elasticsearch 在内部进行管理,仅适用于 enrich processor。

它们始终以 .enrich- * 开头。

它们是只读的,这意味着你不能直接更改它们。

它们被强制合并以便快速检索。

当 source index 中新增或者修改了数据,只需要重新执行 enrich policy 就可以更改 enrich index,从而更新 enrich processor。

通过以下命令执行 enrich policy:

PUT /_enrich/policy/my-policy/_execute

查看自动创建的 enrich index:

GET _cat/indices/.enrich*
#  返回结果
green open .enrich-my-policy-1616136526661 Vxal9lLBSlKS5lmzMpFfwQ 1 3 1 0 13.4kb 3.3kb

我感觉 enrich policy 这里有个小 bug,当删除 enrich policy 时,例如删除的 enrich policy 为 my-policy-1,会同时删除 my-policy-1 的 enrich index 和 enrich policy,但是如果原先还有个 my-policy-2(两个 enrich policy 在 - 之前是一样的),会把 my-policy-2 的 enrich index 也误删了(enrich policy 不删)。

第四步:在 ingest pipeline 使用 enrich processor

PUT _ingest/pipeline/loc-pipeline
  processors : [
 {
  enrich : {
  policy_name :  my-policy , # 引用前面创建的  enrich policy
  field :  num , # incoming document  中的属性名,用于和  source index  中的属性匹配值
 # 在 incoming document  中新增的属性, # 包含在  enrich policy  中定义的  match_field  和  enrich_fields  的值
  target_field :  enrich_loc  
 }
 }
 ]
}

验证

使用 simulate 用来调试 ingest pipeline 的效果,由于 source index 中匹配到的 loc.keyword 不是上海市,不会对这个文档进行处理:

POST _ingest/pipeline/loc-pipeline/_simulate
  docs : [
 {
  _source : {
  num :  A1001 ,
  company :  腾讯 
 }
 }
 ]
#  返回结果
  docs  : [
 {
  doc  : {
  _index  :  _index ,
  _type  :  _doc ,
  _id  :  _id ,
  _source  : {
  company  :  腾讯 ,
  num  :  A1001 
 },
  _ingest  : {
  timestamp  :  2021-03-19T06:56:45.754486259Z 
 }
 }
 }
 ]
}

这个文档的 loc.keyword 是上海市,因此会添加上 enrich data 中指定的属性:

POST _ingest/pipeline/loc-pipeline/_simulate
  docs : [
 {
  _source : {
  num :  B1001 ,
  company :  Bilibili 
 }
 }
 ]
#  返回结果
  docs  : [
 {
  doc  : {
  _index  :  _index ,
  _type  :  _doc ,
  _id  :  _id ,
  _source  : {
  company  :  Bilibili ,
  enrich_loc  : {
  loc  :  上海市 ,
  num  :  B1001 
 },
  num  :  B1001 
 },
  _ingest  : {
  timestamp  :  2021-03-19T06:56:29.393585306Z 
 }
 }
 }
 ]
}

在 simulate 调试成功之后,我们在插入文档的时候指定 ingest pipeline:

#  方式一:单条插入
POST origin-location/_doc?pipeline=loc-pipeline
  num :  A1001 ,
  company :  腾讯 
POST origin-location/_doc?pipeline=loc-pipeline
  num :  B1001 ,
  company :  Bilibili 
#  方式二:批量插入
POST _bulk?pipeline=loc-pipeline
{index :{ _index : origin-location}}
{num : A1001 , company : 腾讯}
{index :{ _index : origin-location}}
{num : B1001 , company : Bilibili}

查看插入的结果:

GET origin-location/_search
#返回结果
  took  : 12,
  timed_out  : false,
  _shards  : {
  total  : 1,
  successful  : 1,
  skipped  : 0,
  failed  : 0
 },
  hits  : {
  total  : {
  value  : 2,
  relation  :  eq 
 },
  max_score  : 1.0,
  hits  : [
 {
  _index  :  origin-location ,
  _type  :  _doc ,
  _id  :  zXxLSXgBUc4opBV-QiOv ,
  _score  : 1.0,
  _source  : {
  num  :  A1001 ,
  company  :  腾讯 
 }
 },
 {
  _index  :  origin-location ,
  _type  :  _doc ,
  _id  :  znxLSXgBUc4opBV-SCPk ,
  _score  : 1.0,
  _source  : {
  num  :  B1001 ,
  company  :  Bilibili ,
  enrich_loc  : {
  loc  :  上海市 ,
  num  :  B1001 
 }
 }
 }
 ]
 }
}

也可以指定索引默认使用的 ingest pipeline,这样就不用每次在插入文档的时候指定 ingest pipeline 了:

#  指定索引默认使用的  ingest pipeline
PUT origin-location2
  settings : {
  default_pipeline :  loc-pipeline  
 }
#  插入数据
POST _bulk
{index :{ _index : origin-location2}}
{num : A1001 , company : 腾讯}
{index :{ _index : origin-location2}}
{num : B1001 , company : Bilibili}
#  查看结果
GET origin-location2/_search
#  输出结果
  took  : 8,
  timed_out  : false,
  _shards  : {
  total  : 1,
  successful  : 1,
  skipped  : 0,
  failed  : 0
 },
  hits  : {
  total  : {
  value  : 2,
  relation  :  eq 
 },
  max_score  : 1.0,
  hits  : [
 {
  _index  :  origin-location2 ,
  _type  :  _doc ,
  _id  :  CXxPSXgBUc4opBV-oyTJ ,
  _score  : 1.0,
  _source  : {
  num  :  A1001 ,
  company  :  腾讯 
 }
 },
 {
  _index  :  origin-location2 ,
  _type  :  _doc ,
  _id  :  CnxPSXgBUc4opBV-oyTJ ,
  _score  : 1.0,
  _source  : {
  num  :  B1001 ,
  company  :  Bilibili ,
  enrich_loc  : {
  loc  :  上海市 ,
  num  :  B1001 
 }
 }
 }
 ]
 }
}

另外还可以使用 index template,通过正则表达式的方式匹配多个索引,来指定索引使用的 ingest pipeline:

#  使用  index template
PUT _template/my-template
  index_patterns : [origin-*],
  settings : {
  default_pipeline :  loc-pipeline 
 }
#  插入数据
POST _bulk
{index :{ _index : origin-location3}}
{num : A1001 , company : 腾讯}
{index :{ _index : origin-location3}}
{num : B1001 , company : Bilibili}
#  查看结果
GET origin-location3/_search
#  输出结果
  took  : 2,
  timed_out  : false,
  _shards  : {
  total  : 1,
  successful  : 1,
  skipped  : 0,
  failed  : 0
 },
  hits  : {
  total  : {
  value  : 2,
  relation  :  eq 
 },
  max_score  : 1.0,
  hits  : [
 {
  _index  :  origin-location3 ,
  _type  :  _doc ,
  _id  :  XnxVSXgBUc4opBV-1yRp ,
  _score  : 1.0,
  _source  : {
  num  :  A1001 ,
  company  :  腾讯 
 }
 },
 {
  _index  :  origin-location3 ,
  _type  :  _doc ,
  _id  :  X3xVSXgBUc4opBV-1yRp ,
  _score  : 1.0,
  _source  : {
  num  :  B1001 ,
  company  :  Bilibili ,
  enrich_loc  : {
  loc  :  上海市 ,
  num  :  B1001 
 }
 }
 }
 ]
 }
}

到此,关于“怎么使用 enrich processor”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6183字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)