NiFi Demo

很多情况下,需要从不同的数据源中提取数据,并将数据格式进行转换,定义不同的流程,传输到对应的系统中

需求较为简单的情况下,可以通过自己写代码实现,但在数据流比较复杂后,整个流程难以维护,扩展性很差

这时候就需要引入像 NiFi、StreamSets 这类的 DFM(Data Flow Management) 数据流处理平台,这篇文章主要介绍在 NiFi 中定义一个简单的数据流

Demo

我们需要定义以下数据流,从数据的生产到消费,主要有以下步骤

生产

  1. 通过 randomuser api 获取用户数据
  2. 处理返回的数据
  3. 将处理过后的数据发送给 kafka

消费

  1. 从 kafka 获取用户数据
  2. 展开 json 数据 {"login": {"username": "anyisalin"}} -> {"login.username": "anyisalin"}
  3. 将展开后的 json 数据转化为 SQL 语句
  4. 执行 SQL

image-20190103170847022

NiFi

安装

NiFi 安装很简单,在 Linux 系统上安装好 JDK 环境,https://nifi.apache.org/download.html 下载并解压对应版本的二进制安装包,执行以下命令即可

$ cd nifi-<VERSION>

$ bin/nifi.sh start

等待启动完成后访问对应 IP 的 8080 端口即可

架构

Flowfile 是NiFi及其基于流的设计的核心,流文件是一个数据记录,它由指向其内容的指针和属性组成,并与相关事件进行关联

NiFi 架构如下,由几个组件组成

  • Web Server:NiFi 的 http 接口

  • Flow Controller:对 Processor 的流程控制、资源调度

  • FlowFile Repository:存储 Flowfile 元数据(attribute、指向内容的指针)

  • Content Repository:存储 Flowfile 内容

  • Provenance Repository:存储 Flowfile 运行时状态

NiFi Cluster Architecture Diagram

创建数据流

invoke http

首选我们要定义调用 randomuser api 获取用户数据,NiFi 中通过 Invoke HTTP 这个 Processor 发起 HTTP 请求

image-20190104101245046

这个 HTTP Endpoint 返回的资源格式如下 https://randomuser.me/api?results=50

{
"results": [
{
"gender": "male",
"name": {
"title": "mr",
"first": "آرسین",
"last": "رضایی"
},
"location": {
"street": "6802 فلسطین",
"city": "خمینیشهر",
"state": "هرمزگان",
"postcode": 87162,
"coordinates": {
"latitude": "31.5995",
"longitude": "-64.0100"
},
"timezone": {
"offset": "-12:00",
"description": "Eniwetok, Kwajalein"
}
},
"email": "آرسین.رضایی@example.com",
"login": {
"uuid": "120a95ad-7626-481e-a8d7-deeb86260ff3",
"username": "ticklishkoala843",
"password": "lakeside",
"salt": "EykxGh6P",
"md5": "ccd83cf95763b0f4a04e78b0aaf16f08",
"sha1": "3f78b70a952d75e4567d52c5b3967c5975c1323d",
"sha256": "e7f0bec78b81937aec798c4455ea0f91b7440192faa149ab031156cae46266ea"
},
"dob": {
"date": "1992-04-17T00:59:50Z",
"age": 26
},
"registered": {
"date": "2017-05-06T00:12:16Z",
"age": 1
},
"phone": "054-14099232",
"cell": "0937-683-3855",
"id": {
"name": "",
"value": null
},
"picture": {
"large": "https://randomuser.me/api/portraits/men/69.jpg",
"medium": "https://randomuser.me/api/portraits/med/men/69.jpg",
"thumbnail": "https://randomuser.me/api/portraits/thumb/men/69.jpg"
},
"nat": "IR"
}
],
"info": {
"seed": "4203aa4638365efa",
"results": 1,
"page": 1,
"version": "1.2"
}
}

SplitJson

我们只需要 results 里面的数据,所以需要进行切分

NiFi 中通过 SplitJson 来切分 JSON 数据

image-20190104101828670

定义完这两个 Processor ,可以先测试一下

将两个 Processer 连接起来

这里的 Relationships 是根据 Processor 中 Flowfile 的状态,需要定义对应状态下的 Flowfile 应该路由到哪一个 Processor

我们这里定义 Relationships,Response 数据流路由到 SplitJson

image-20190104102234538

但是在 NiFi 中,所有的 Relationships 都得定义,所以对应的 Failure、No Retry、Original、Retry 也得路由到对应的 Processor,我们这里使用 DebugFlow 来接收这些错误的数据

Automatically Terminate Relationships 指的是数据流路由到这个 Processor 后,特定状态下会被删除,一般在 Endpoint Processor 配置,因为数据流不需要再被继续路由了

image-20190104103115028

最后我们这个简单的数据流如下

image-20190104103643971

启动数据流之后,通过 NiFi Data Provenance 可以看到数据流的状态

image-20190104103836337

可以看到 JSON 数据已经被切分了

image-20190104103902591

kafka producer

完成 JSON 数据的切分过后就可以将数据存储到 Kafka 中了

我们需要创建 PublishKafka Processor,由于 Kafka 不同版本的 API 不兼容,所以 NiFi 提供了多个版本的 Processor

image-20190104104302166

我们这里使用的是 2.1.2 版本的 Kafka,所以用 PublishKafka_2_0 即可

配置好 Kafka 的连接参数

image-20190104104510785

由于 Kafka 在这个数据流中是 Endpoint Processor,数据流不需要再被路由了,所以 Automatically Terminate Relationships 都勾选上

image-20190104104600684

将 SplitJson 与 PublishKafka 连接,整个数据流就构建好了

image-20190104105243452

kafka consumer 测试

能够从 kafka 消费数据

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

{"gender":"female","name":{"title":"ms","first":"charlie","last":"smith"},"location":{"street":"2802 george st","city":"trout lake","state":"northwest territories","postcode":"M5K 7B1","coordinates":{"latitude":"-66.5974","longitude":"162.8858"},"timezone":{"offset":"-3:00","description":"Brazil, Buenos Aires, Georgetown"}},"email":"charlie.smith@example.com","login":{"uuid":"4de548e6-102f-4e57-bb03-8850da99ad55","username":"bluelion557","password":"555666","salt":"bi96vEBT","md5":"3526a37c16429aadd417953550b09a45","sha1":"acddea2bdea1eb17bd9c24bd74fff5de84a9af31","sha256":"9d2b53a41ea2cc6a235c540475b95d97ff9f875ab37fb71d627e86e98d137dfc"},"dob":{"date":"1961-11-18T05:30:31Z","age":57},"registered":{"date":"2003-01-20T03:57:43Z","age":15},"phone":"666-683-3176","cell":"297-613-3978","id":{"name":"","value":null},"picture":{"large":"https://randomuser.me/api/portraits/women/83.jpg","medium":"https://randomuser.me/api/portraits/med/women/83.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/women/83.jpg"},"nat":"CA"}

ConsumerKafka

创建 ConsumerKafka Processor

配置 Kafka 连接参数

image-20190104110620459

FlattenJson

创建 FlattenJson Processor

配置 Separator 为下划线 _

image-20190104123520294

展开的格式如下

{'gender': 'male',
'name_title': 'mr',
'name_first': 'lucas',
'name_last': 'olsen',
'location_street': '7882 mariagervej',
'location_city': 'saltum',
'location_state': 'midtjylland',
'location_postcode': 52010,
'location_coordinates_latitude': '66.7960',
'location_coordinates_longitude': '169.3742',
'location_timezone_offset': '+8:00',
'location_timezone_description': 'Beijing, Perth, Singapore, Hong Kong',
'email': 'lucas.olsen@example.com',
'login_uuid': '7e1f3118-a6aa-4ee9-a2a1-0a51ff05e210',
'login_username': 'smalllion212',
'login_password': 'oscar',
'login_salt': 'X06x1uIC',
'login_md5': 'cede0ee5c6da51ab45f30d443c0bce87',
'login_sha1': '65ae36af8670df9f5aaba7aa6ada24466f01c6a0',
'login_sha256': '14a363b54ae02f62e1e1a6a1f38e81f771ac57c0e10cbe931af332d52deea9ac',
'dob_date': '1990-12-30T04:11:02Z',
'dob_age': 28,
'registered_date': '2009-07-24T19:30:29Z',
'registered_age': 9,
'phone': '84828860',
'cell': '74092531',
'id_name': 'CPR',
'id_value': '819452-4601',
'picture_large': 'https://randomuser.me/api/portraits/men/84.jpg',
'picture_medium': 'https://randomuser.me/api/portraits/med/men/84.jpg',
'picture_thumbnail': 'https://randomuser.me/api/portraits/thumb/men/84.jpg',
'nat': 'DK'}

连接 FlattenJson 和 ConsumerKafka

ConvertJSONToSQL

首先在 MySQL 中创建库和表

表结构如下

create table users (
`gender` varchar(255),
`name_title` varchar(255),
`name_first` varchar(255),
`name_last` varchar(255),
`location_street` varchar(255),
`location_city` varchar(255),
`location_state` varchar(255),
`location_postcode` varchar(255),
`location_coordinates_latitude` varchar(255),
`location_coordinates_longitude` varchar(255),
`location_timezone_offset` varchar(255),
`location_timezone_description` varchar(255),
`email` varchar(255),
`login_uuid` varchar(255),
`login_username` varchar(255),
`login_password` varchar(255),
`login_salt` varchar(255),
`login_md5` varchar(255),
`login_sha1` varchar(255),
`login_sha256` varchar(255),
`dob_date` varchar(255),
`dob_age` int,
`registered_date` varchar(255),
`registered_age` int,
`phone` varchar(255),
`cell` varchar(255),
`id_name` varchar(255),
`id_value` varchar(255),
`picture_large` varchar(255),
`picture_medium` varchar(255),
`picture_thumbnail` varchar(255),
`nat` varchar(255)
);

创建 ConvertJSONToSQL Processor

在 NiFi 中创建并配置 MySQL service,需要指定 Connector Driver 的路径和类名(这里使用的是 MariaDB),并填写 用户名、连接地址等参数

image-20190104125536816

创建好 MySQL Service 后在 ConvertJSONToSQL 中配置对应的参数

image-20190104125940747

连接 ConvertJSONToSQL 和 FlattenJson

ExecuteSQL

创建 ExecuteSQL Processor

配置对应的数据库连接池,这里可以复用上一步创建的 MySQLConnection Service

image-20190104130509688

连接 ExecuteSQL 和 ConvertJSONToSQL

测试

最后我们的数据流如下

image-20190104130758360

查询数据库,不断的有数据插入

mysql> select count(login_username) from users;
+-----------------------+
| count(login_username) |
+-----------------------+
| 108154 |
+-----------------------+

限流

当 上游数据生产的过快,无法及时消费,就需要限流,限制上游的连接数,防止数据过载对整个数据流的影响

NiFi 可以对每一个连接的队列设置阈值

默认是 1000 个 flowfile 或者 1G 大小

image-20190104140848748

总结

了解了 Processor 和 Flowfile 的状态和流程,在 NiFi 中定义 数据流还是很容易的,NiFi 自带的 Processor 种类也非常丰富,也支持自己写对应的 Processor

在 NiFi 中定义了数据流之后,就可以很清晰的看到流式数据的路由,状态,也很容易地对现有的数据流进行扩展