共计 3846 个字符,预计需要花费 10 分钟才能阅读完成。
这期内容当中丸趣 TV 小编将会给大家带来有关 heka 从 kalka 中读取数据的示例分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
heka 从 kalka 中读取数据。
配置:
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092][RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder
上述配置只有从 kalfka 中读取数据并显示到 console,写到 kalfka 中数据,
结果
:Timestamp: 2016-07-21 09:39:46.342093657 +0000 UTC
:Type: heka.kafka
:Hostname: master
:Pid: 0
:Uuid: 501b0a0e-63a9-4eee-b9ca-ab572c17d273
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running1 , userid : 12 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T 20:40:05.509926234Z , severity :1}
:EnvVersion:
:Severity: 7
:Fields:
| name: Key type:bytes value:
| name: Topic type:string value: test
| name: Partition type:integer value:0
| name: Offset type:integer value:8
读取出来的数据放到了 payload 中,而 fileds 中存放了读取 kalkfa 中的一些信息。那么可以使用 jsondecoder 进行解析。
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua
[JsonDecoder.config]
type = artemis
payload_keep = true
map_fields = true
Severity = severity
[RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder
结果如下:
:Timestamp: 2016-07-21 09:42:34 +0000 UTC
:Type: artemis
:Hostname: master
:Pid: 0
:Uuid: 3965285c-70ac-4069-a1a3-a9bcf518d3e8
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running2 , userid : 11 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T 20:40:05.509926234Z , severity :1}
:EnvVersion:
:Severity: 1
:Fields:
| name: time type:string value: 2015-05-06T 20:40:05.509926234Z
| name: msg type:string value: Start Request
| name: userid type:string value: 11
| name: event type:string value: artemis.web.ensure-running2
| name: extra.workspace-id type:string value: cN907xLngi
经过 decoder 解析之后,fileds 发生了改变,但是我们可以看到 Logger 显示的还是 KafkaInputExample,说明数据不是 decoder 产生,而是 Input 产生,只不过使用了 decoder 进行了解析,重写改写了 fields 而已。
接下来,把数据录入都 es 中吧。
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua
[JsonDecoder.config]
type = artemis
payload_keep = true
map_fields = true
Severity = severity
[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
[ESJsonEncoder.field_mappings]
Timestamp = @timestamp
Severity = level
[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1
导入到 es 中,也需要 json,所以使用 ESJsonEncoder,同时指定索引名字和类型。执行结果如下,
可以看到,除了 heka 中元数据 field 之外,还有 JsonDecoder 生成 field 啊,其实是截取 JsonDecoder 中的 fields 属性中拿出。注意,Payload 不解析。
:Fields:
| name: time type:string value: 2015-05-06T 20:40:05.509926234Z
| name: msg type:string value: Start Request
| name: userid type:string value: 11
| name: event type:string value: artemis.web.ensure-running2
| name: extra.workspace-id type:string value: cN907xLngi
这些 field 当然随着数据不同而不同,那么称之为 dynamic fileds。
入 es 的时候,可以指定提取哪些 dynamic fields,
fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]
只要使用 dynamic_fileds,就必须要在 fields 中指定 DynamicFields。
如果没有 dynamic_fileds,那么 fields 只能列举几个固定的属性,参照官方文档即可。
完成的列子:
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
[hekad]
maxprocs = 2
[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder
[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua
[JsonDecoder.config]
type = artemis
payload_keep = true
map_fields = true
Severity = severity
[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]
raw_bytes_fields=[Payload]
[ESJsonEncoder.field_mappings]
Timestamp = @timestamp
Severity = level
[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1
结果如下,
上述就是丸趣 TV 小编为大家分享的 heka 从 kalka 中读取数据的示例分析了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注丸趣 TV 行业资讯频道。