heka从kalka中读取数据的示例分析

61次阅读
没有评论

共计 3846 个字符,预计需要花费 10 分钟才能阅读完成。

这期内容当中丸趣 TV 小编将会给大家带来有关 heka 从 kalka 中读取数据的示例分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

heka 从 kalka 中读取数据。

配置:

[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092][RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder

上述配置只有从 kalfka 中读取数据并显示到 console,写到 kalfka 中数据,

结果

:Timestamp: 2016-07-21 09:39:46.342093657 +0000 UTC
:Type: heka.kafka
:Hostname: master
:Pid: 0
:Uuid: 501b0a0e-63a9-4eee-b9ca-ab572c17d273
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running1 , userid : 12 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T    20:40:05.509926234Z , severity :1}
:EnvVersion: 
:Severity: 7
:Fields:
    | name: Key type:bytes value:
    | name: Topic type:string value: test
    | name: Partition type:integer value:0
    | name: Offset type:integer value:8

读取出来的数据放到了 payload 中,而 fileds 中存放了读取 kalkfa 中的一些信息。那么可以使用 jsondecoder 进行解析。

[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua

        [JsonDecoder.config]
        type = artemis
        payload_keep = true
        map_fields = true
        Severity = severity

[RstEncoder][LogOutput]
message_matcher = TRUE
encoder = RstEncoder

结果如下:

:Timestamp: 2016-07-21 09:42:34 +0000 UTC
:Type: artemis
:Hostname: master
:Pid: 0
:Uuid: 3965285c-70ac-4069-a1a3-a9bcf518d3e8
:Logger: KafkaInputExample
:Payload: {msg : Start Request , event : artemis.web.ensure-running2 , userid : 11 , extra :{ workspace-id : cN907xLngi}, time : 2015-05-06T    20:40:05.509926234Z , severity :1}
:EnvVersion: 
:Severity: 1
:Fields:
    | name: time type:string value: 2015-05-06T    20:40:05.509926234Z
    | name: msg type:string value: Start Request
    | name: userid type:string value: 11
    | name: event type:string value: artemis.web.ensure-running2
    | name: extra.workspace-id type:string value: cN907xLngi

经过 decoder 解析之后,fileds 发生了改变,但是我们可以看到 Logger 显示的还是 KafkaInputExample,说明数据不是 decoder 产生,而是 Input 产生,只不过使用了 decoder 进行了解析,重写改写了 fields 而已。

接下来,把数据录入都 es 中吧。
[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua

        [JsonDecoder.config]
        type = artemis
        payload_keep = true
        map_fields = true
        Severity = severity

[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
    [ESJsonEncoder.field_mappings]
    Timestamp = @timestamp
    Severity = level

[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1

导入到 es 中,也需要 json,所以使用 ESJsonEncoder,同时指定索引名字和类型。执行结果如下,

可以看到,除了 heka 中元数据 field 之外,还有 JsonDecoder 生成 field 啊,其实是截取 JsonDecoder 中的 fields 属性中拿出。注意,Payload 不解析。

:Fields:
    | name: time type:string value: 2015-05-06T    20:40:05.509926234Z
    | name: msg type:string value: Start Request
    | name: userid type:string value: 11
    | name: event type:string value: artemis.web.ensure-running2
    | name: extra.workspace-id type:string value: cN907xLngi

这些 field 当然随着数据不同而不同,那么称之为 dynamic fileds。

入 es 的时候,可以指定提取哪些 dynamic fields,

fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]

只要使用 dynamic_fileds,就必须要在 fields 中指定 DynamicFields。

如果没有 dynamic_fileds,那么 fields 只能列举几个固定的属性,参照官方文档即可。

完成的列子:

[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
[hekad]
maxprocs = 2

[KafkaInputExample]
type = KafkaInput
topic = test
addrs = [localhost:9092]
decoder= JsonDecoder

[JsonDecoder]
type = SandboxDecoder
filename = lua_decoders/json.lua

        [JsonDecoder.config]
        type = artemis
        payload_keep = true
        map_fields = true
        Severity = severity

[ESJsonEncoder]
index = %{Type}-%{%Y.%m.%d}
es_index_from_timestamp = true
type_name = %{Type}
fields=[Timestamp , Uuid , Type , Logger , Pid , Hostname , DynamicFields]
dynamic_fields=[msg , userid]

raw_bytes_fields=[Payload]
    [ESJsonEncoder.field_mappings]
    Timestamp = @timestamp
    Severity = level

[ElasticSearchOutput]
message_matcher = TRUE
encoder = ESJsonEncoder
flush_interval = 1

结果如下,

上述就是丸趣 TV 小编为大家分享的 heka 从 kalka 中读取数据的示例分析了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3846字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)