查询指南-Stream

大数据 2023-09-13 11:10:00
117阅读

事件流

事件流定义由流名称和一组属性组成,这些属性的名称在特定类型和流范围内是唯一可识别的。

目的

接收事件输入并接收查询处理结果输出。

语法

定义流流

属性名称属性类型,);

以下参数用于配置流定义

范围

描述

流名称

流名称

属性名称

参数名称

属性类型

参数类型(STRING、INT、LONG、DOUBLE、FLOAT、BOOL 或OBJECT)

例子

定义流TempStream

(deviceID long, roomNo int, temp double);

来源来源

源通过多种传输和各种数据格式接收事件,并将它们引导到流中进行处理。

源配置允许定义映射,将每个传入事件从其本机数据格式转换为Siddhi 事件。当未提供此类映射的自定义时,Siddhi 假定到达事件遵循基于流定义和配置的消息映射类型的预定义格式。

目的

提供一种消费外部系统事件并将其转换为流处理的方法。

语法

Query  Guide-Stream_HTTP

@source 注解的type 参数定义接收事件的源类型。

@source 注释的其他参数取决于所选的源类型,其中一些参数可以是可选的。

以下是Siddhi 支持的源类型列表:

来源类型

描述

在记忆中

允许SiddhiApp 使用来自同一JVM 上运行的其他SiddhiApp 的事件。

HTTP协议

HTTP服务

卡夫卡

订阅Kafka 主题来消费事件

传输控制协议

TCP服务

电子邮件

通过POP3 和IMAP 协议使用电子邮件

联合管理系统

订阅JMS 主题或队列以消费事件

文件

从文件中读取事件

CDC

数据库CDC日志数据

普罗米修斯

监控系统数据

内存中是Siddhi 中唯一的内置源,所有其他源类型均作为扩展实现

源图

每个@source配置可以有一个由@map注释表示的映射,它定义了传入事件格式如何转换为Siddhi事件。

@map 的type 参数定义转换传入事件时要使用的映射类型。 @map 注释的其他参数取决于所选的映射器,其中一些参数是可选的。

测绘Attributes

@attributes 是一个可选注释,与@map 一起使用来定义自定义映射

支持的源映射类型

源映射类型

描述

直通

省略Siddhi 事件的数据转换

JSON

将JSON 消息转换为Siddhi 事件

XML

将XML 消息转换为Siddhi 事件

文本

将TEXT 中的消息转换为Siddhi 事件

阿夫罗

将Avro 事件转换为Siddhi 事件

二进制

将Siddhi 特定二进制事件转换为Siddhi 事件

核心价值

将键值HashMap 转换为Siddhi 事件

CSV

将CSV 分隔符之类的事件转换为Siddhi 事件

暗示:

当没有提供@map 注解时,默认使用@map(type="passThrough"),它将消费的Siddhi 事件直接传递到流,而不进行任何数据转换。

PassThrough 是Siddhi 中唯一内置的源映射器,所有其他源映射器都作为扩展实现。

实施例1

通过公开的HTTP 服务接收JSON 消息并将它们引导到InputStream 中进行处理。在这里,HTTP 服务将通过基本身份验证进行保护,在端口8080 和context/foo 上的所有网络接口上接收事件。该服务要求JSON 消息采用JSON 映射器支持的默认数据格式,如下所示。

Query  Guide-Stream_JSON_02

HTTP 源和JSON 源映射器的配置实现了上述功能,如下所示。

Query  Guide-Stream_自定义_03

实施例2

通过公开的HTTP 服务接收JSON 消息,并将它们定向到StockStream 流中进行处理。这里,传入的JSON(如下所示)不遵循JSON 映射器支持的默认数据格式。

Query  Guide-Stream_自定义_04

配置HTTP源和自定义JSON源映射以实现上述功能如下。

Query  Guide-Stream_HTTP_05

接收器

目的

Sink 提供了一种通过将事件转换为支持的格式来将流式Siddhi 事件发布到外部系统的方法。

语法

Query  Guide-Stream_JSON_06

以下是Siddhi 支持的接收器类型列表

接收器类型

描述

在记忆中

允许SiddhiApp 使用来自同一JVM 上运行的其他SiddhiApp 的事件。

HTTP协议

将事件发布到HTTP 服务。

卡夫卡

将事件发送到Kafka 主题

传输控制协议

T 将事件发布到TCP 服务

电子邮件

通过SMTP 协议发送电子邮件

联合管理系统

将事件发布到JMS 主题或队列

文件

将事件写入文件

日志

记录流中发生的事件

普罗米修斯

发布数据到监控系统

分布式接收器

分布式接收器使用负载平衡或分区策略将事件从定义的流发布到多个端点。

任何接收器都可以用作分布式接收器。分布式接收器配置允许用户定义通用映射来转换Siddhi 事件并将其发送到所有目标端点。

目的

分布式接收器提供了一种以配置的事件格式将Siddhi 事件发布到多个端点的方法。

语法

轮询分布式接收器

以循环方式将事件发布到定义的目的地

Query  Guide-Stream_HTTP_07

分区分布式接收器

通过根据分区键对事件进行分区,将事件发布到定义的目标。

Query  Guide-Stream_HTTP_08

接收器映射器

使用@map进行映射

@payload 是一个可选注释,与@map 一起使用来定义自定义映射

配置@payload注解有两种方式

1 某些映射器(例如XML、JSON 和Test)仅接受一种输出负载

@有效负载(

“这是来自{{user}} 的测试消息。”)

2 一些映射器(例如键值)接受一系列映射值

@有效负载(

key1='mapping_1', 'key2'='用户: {{user}}')

以下是Siddhi 支持的接收器映射类型列表

类型

描述

直通

省略传出Siddhi 事件的数据转换

JSON

将Siddhi 事件转换为JSON 消息

XML

将Siddhi 事件转换为XML 消息

文本

将Siddhi 事件转换为纯文本消息

阿夫罗

将Siddhi 事件转换为Avro 事件

二进制

将Siddhi 事件转换为Siddhi 特定的二进制事件

核心价值

将Siddhi 事件转换为键值HashMap

CSV

将Siddhi 事件转换为类似CSV 分隔符分隔的事件

实施例1

Sink 通过将OutputStream 事件转换为默认格式的JSON 消息并使用POST 方法、Accept 标头以及管理员(用户名和密码)的基本身份验证发送到HTTP 端点http://localhost:8005/endpoint1 来发布这些事件。

HTTP 接收器和JSON 接收器映射器的配置实现了上述功能,如下所示。

Query  Guide-Stream_自定义_09

这将以以下格式发布JSON 消息

Query  Guide-Stream_JSON_10

实施例2

Sink 通过将StockStream 事件转换为用户定义的JSON 消息并将其发送到HTTP 端点http://localhost:8005/stocks 来发布StockStream 事件。

配置HTTP接收器和自定义JSON接收器映射以实现上述功能如下。

Query  Guide-Stream_JSON_11

这会将单个事件发布为以下格式的JSON 消息

Query  Guide-Stream_自定义_12

这也可以将多个事件一起发布为以下格式的JSON 消息

Query  Guide-Stream_JSON_13

实施例3

Sink 使用分区策略将事件从OutputStream 流发布到多个HTTP 端点。此处,事件根据关键国家/地区发送到http://localhost:8005/endpoint1 或http://localhost:8006/endpoint2。发布到两个端点时,它使用默认的JSON 映射、POST 方法以及admin 作为用户名和密码。

Query  Guide-Stream_自定义_14

这将对传出事件进行分区,并将具有相同国家/地区属性值的所有事件发布到同一端点。发布的JSON 消息将采用以下格式:

Query  Guide-Stream_HTTP_15

错误处理

Siddhi 中的错误可以在Streams 和Sink 中处理

流中的错误处理

当订阅流的Siddhi 元素抛出错误时,该错误将传播到向这些Siddhi 元素传递事件的流。默认情况下,错误会在流中记录和删除,但可以通过在相应的流定义中添加@OnError 注释来更改此行为。 @OnError 注解可以帮助用户捕获错误和相关事件,并通过发送优雅地处理它们来将它们发送到故障流。

@OnError注解以及需要指定的操作如下。

Query  Guide-Stream_自定义_16

@OnError 注释的操作参数定义了在失败场景中要执行的操作。可以为@OnError注解指定以下操作来处理错误场景。

1 日志:记录有错误的事件并删除事件。即使未定义@OnError 注释,这也是执行的默认操作。

2 STREAM:创建故障流并将事件和错误重定向到其中。创建的故障流将具有基本流中定义的所有属性,以捕获导致错误的事件,此外,它将包含包含错误信息的对象类型的_error 属性。故障流程可以参考添加!基本流前面的名称是流名称。

例子

通过将TempStream 中的错误重定向到故障流来处理错误

TempStream流和@OnError注解的配置如下

Query  Guide-Stream_JSON_17

Siddhi 将推断并自动定义TempStream 的故障流,如下所示。

Query  Guide-Stream_JSON_18

SiddhiApp 通过使用查询添加故障生成和错误处理来扩展上述用例,如下所示。

注:通过查询编写处理逻辑的细节将在后面的章节中解释

Query  Guide-Stream_HTTP_19

接收器错误处理

在某些情况下,当事件发布到外部系统时,外部系统可能会变得不可用或遇到错误。默认情况下,sink会记录并删除导致事件丢失的事件,可以通过配置@sink注解的.error参数来正常处理。

@sink注解的on.error参数可以指定如下。

y Guide-Stream_JSON_20" /> 可以为@sink注释的on.error参数指定以下操作以处理错误场景。 1日志:记录带有错误的事件,并删除该事件。这是即使在@sink注释上未定义on.error参数时执行的默认操作 2 WAIT:发布线程在后退和重试模式下等待,并且仅在重新建立连接时发送事件。在此期间,线程将不会消耗任何新消息,从而导致系统在发布到它的系统上引入背压。 3 STREAM:将具有相应错误的失败事件推送到接收器所属的相关故障流。 例子1 当系统无法连接到Kafka时,等待重试 TempStream流和带有on.error属性的@ssink Kafka注释的配置如下。 Query Guide-Stream_JSON_21 例子2 当系统无法连接到Kafka时,将事件发送到TempStream的故障流 Query Guide-Stream_自定义_22
the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。