StreamSets DataCollector Demo

StreamSets 有三个组件

  • StreamSets Edge:主要安装在物联网设备上,采集数据
  • StreamSets Data Collector:ETL、dataflow 工具
  • StreamSets Control Hub:管理 Data Collector 定义的 pipeline

本文主要介绍通过 StreamSets Data Collector(以下简称 SDC) 定义数据流

Demo

我们需要定义以下数据流,从数据的生产到消费,主要有以下步骤

生产

  1. 通过 randomuser api 获取用户数据
  2. 处理返回的数据
  3. 将处理过后的数据发送给 kafka

消费

  1. 从 kafka 获取用户数据
  2. 展开 json 数据 {"login": {"username": "anyisalin"}} -> {"login.username": "anyisalin"}
  3. 将展开后的 json 数据转化为 SQL 语句
  4. 执行 SQL

image-20190103170847022

StreamSets DataCollector

安装

SDC 的安装包很大,全组件的二进制 6G 左右,安装好 JDK 之后,直接启动即可

$ streamsets-datacollector-3.6.1/bin/streamsets dc

默认端口是 18630,用户名密码为 admin/admin

创建 Producer Pipeline

我们创建一个名为 user data producer 的数据流,用来生产用户数据

image-20190107104133176

HTTP Client

进入 Pipeline 后我们选择一个 Origin (一个 Pipeline 中只能有一个 Origin)

我们需要发起 HTTP 请求来获取数据,所以这里选择 HTTP Client 即可

image-20190107104314832

然后配置 HTTP 请求的地址和参数即可

image-20190107104528494

Field Pivoter

由于获取的数据格式如下

{
'results': [
{
'username': 'haha',
'field1': 'value1'
},
{
'username': 'haha',
'field1': 'value1'
},
]
}

我们只需要里面的 results,所以需要对 JSON 数据进行切分

StreamSets Data Collector 中可以通过 Field Pivoter 这个 Processor 对字段进行切分

所以我们创建 Field Pivoter 并配置即可

image-20190107104945268

Kafka Producer

最后我们需要将切分后的数据 push 到 kafka 里面

选择 Destination 为 Kafka Producer

选择 Kafka 客户端对应的版本,我们这里使用的是 2.1.1 的 kafka,所以选择 Apache Kafka 2.0.0 这个库

image-20190107105518017

并配置 Kafka Broker 参数

image-20190107105706891

Producer 测试

最后我们的 Pipeline 如下

image-20190107105826413

启动并验证

运行 Pipeline 之后,可以看到自带的监控,对记录的统计和汇总

image-20190107105908433

创建 Consumer Pipeline

我们创建一个名为 user data consumer 的数据流,用来消费刚刚生产的数据

image-20190107110650845

Kafka Consumer

选择 Kafka Consumer 作为 Origin

配置 Kafka Broker 参数

image-20190107111836335

Filed Flattener

我们需要展开 Kafka 中获取到的用户数据

展开的格式如下

{'gender': 'male',
'name_title': 'mr',
'name_first': 'lucas',
'name_last': 'olsen',
'location_street': '7882 mariagervej',
'location_city': 'saltum',
'location_state': 'midtjylland',
'location_postcode': 52010,
'location_coordinates_latitude': '66.7960',
'location_coordinates_longitude': '169.3742',
'location_timezone_offset': '+8:00',
'location_timezone_description': 'Beijing, Perth, Singapore, Hong Kong',
'email': 'lucas.olsen@example.com',
'login_uuid': '7e1f3118-a6aa-4ee9-a2a1-0a51ff05e210',
'login_username': 'smalllion212',
'login_password': 'oscar',
'login_salt': 'X06x1uIC',
'login_md5': 'cede0ee5c6da51ab45f30d443c0bce87',
'login_sha1': '65ae36af8670df9f5aaba7aa6ada24466f01c6a0',
'login_sha256': '14a363b54ae02f62e1e1a6a1f38e81f771ac57c0e10cbe931af332d52deea9ac',
'dob_date': '1990-12-30T04:11:02Z',
'dob_age': 28,
'registered_date': '2009-07-24T19:30:29Z',
'registered_age': 9,
'phone': '84828860',
'cell': '74092531',
'id_name': 'CPR',
'id_value': '819452-4601',
'picture_large': 'https://randomuser.me/api/portraits/men/84.jpg',
'picture_medium': 'https://randomuser.me/api/portraits/med/men/84.jpg',
'picture_thumbnail': 'https://randomuser.me/api/portraits/thumb/men/84.jpg',
'nat': 'DK'}

配置 Field Flattener,分隔符为 _

image-20190107112156873

JDBC Producer

最后我们要将数据插入到 MySQL 里,SDC 通过类型为 JDBC Producer 的 Destination 来处理 SQL 的转换和执行

需要上传对应的库到 SDC 中

image-20190107112539452

在 Legacy Drivers 里面配置 class

image-20190107112632307

在 Credentials 中配置用户名密码,最后在 JDBC 选项栏中配置连接参数和操作选项

image-20190107112739224

Producer 测试

最后我们启动这个 Pipeline 进行测试

登录 mysql 服务器,查看是否有数据插入

mysql> select count(*) from users;
+----------+
| count(*) |
+----------+
| 25255950 |
+----------+
1 row in set (1 min 22.44 sec)

总结

SDC 定义数据流没有 NiFi 那么灵活,缺少限流,但是界面风格相对 NiFi 好一些,监控图表也很直观