realtime event processing.0 - ijokarumawak/hdf-tutorials-ja GitHub Wiki
NiFi, SAM, Schema Registry, Supersetでリアルタイムプロセッシング
原文: REAL-TIME EVENT PROCESSING IN NIFI, SAM, SCHEMA REGISTRY AND SUPERSET
このチュートリアルは、MacおよびLinux OSユーザー向けです。
はじめに
このチュートリアルでは、視覚的なキャンバスでStream Analytics Manager(SAM)のトポロジを構築する方法を学びます。Schema Registryにスキーマを作成し、SAMとNiFiがフローにデータを引き込むために使用します。SAMトポロジがデプロイされたら、Druid上で実行されるSuperSetを使用していろいろな可視化スライスを作成する方法を学びます。
事前準備
- ローカルマシンで
/private/etc/hosts
にsandbox-hdf.hortonworks.com
を追加します。
127.0.0.1 localhost sandbox-hdf.hortonworks.com
- SAMのデモの依存関係をローカルマシンにダウンロードします。
cd ~/Downloads
wget https://github.com/hortonworks/data-tutorials/raw/master/tutorials/hdf/realtime-event-processing-in-nifi-sam-sr-superset/assets/templates.zip
unzip templates.zip
このテンプレートフォルダには、NiFiフロー、SAMトポロジ、SAM custom UDF、Schema Registryのスキーマが含まれています。
SAMのデモを実行するために、HDFのセットアップをする
- Amberiに
http://sandbox-hdf.hortonworks.com:9080
でアクセスし、admin / admin
でログインします。 - 左にあるサイドバーから“Streaming Analytics Manager (SAM)”を選択し、Summaryタブが表示されたら“Service Actions”をクリックします。“Start”ボタンをクリックして“maintenance mode”をオフにします。
- ホストマシンでターミナルを開き、SSHでHDF sandboxに入ります。“bootstrap-storage.sh drop-create”のコマンドは、SAMのメタデータを格納するためにMySQLのテーブルをリセットします。“bootstrap.sh pulls”コマンドは、SAM(Streamline)のデフォルトコンポーネント、Notifier、UDF、roleを作成します。以下のコマンドを実行します。
ssh root@localhost -p 12222
cd /usr/hdf/current/streamline
./bootstrap/bootstrap-storage.sh drop-create
./bootstrap/bootstrap.sh
- Ambariダッシュボードの左サイドバーにあるSAM Serviceに移動します。その後、“Configs” -> “Streamline Config”をクリックします。フィルタボックスで“registry.url”を検索し、“registry.url”フィールドに
http://sandbox-hdf.hortonworks.com:17788/api/v1
を入力します。
設定を保存し、“updated registry.url”と入力します。SAM Serviceを再起動します。
- Druid Serviceに移動し、“Config”、“Advanced”をクリックしてディレクトリを更新し、検索ボックスにdruid.indexer.logs.directoryと入力します。
/home/druid/logs
で設定を更新します。
- druid.storage.storageDirectoryを検索し、
/home/druid/data
で設定を更新します。
両方の設定が更新されたら、設定を保存し、“updated directories, so druid can write to them”を呼びます。
- 左サイドバーからHDFSを選択し、先ほどのSAMと同じようにサービスを起動します。
- HDFSを起動したのと同じ方法で、Storm、Ambari Metrics、Kafka、Druid、Registry、Streaming Analytics Manager(SAM)を起動します。
概要
- 概念
- Step 1: Druid ServiceにMAPBox APIキーを追加する
- Step 2: Kafkaトラックトピックが作成されていることを確認する
- Step 3: Schema Registryにトラックスキーマを作成する
- Step 4: GeoEnrich Kafka DataにNiFiフローを導入する
- Step 5: Druid SuperSetのためのデータを前処理するためにSAMトポロジをデプロイする
- Step 6: アプリケーションでData-Loaderを実行する
- Step 7: スライスを持つSuperSetダッシュボードを作成する
- まとめ
- 参考文献
- 付録: リアルタイムイベント処理のデモについてのトラブルシューティング
概念
SuperSet
SuperSetは、視覚的、直観的、対話的なデータ探索プラットフォームです。このプラットフォームは、可視化されたデータセットにて、友人やビジネスクライアントとダッシュボードを迅速に作成して共有する方法を提供します。様々な可視化オプションはデータを分析するために使用し、それについて説明します。Semantic Layerは、データストアをUIに表示する方法を制御することをユーザに許可します。このモデルは安全であり、特定の機能だけが特定のユーザによってアクセスできる複雑なルールをユーザに与えます。SuperSetは、Druidやマルチシステムと互換がある柔軟性を提供している他のデータストア(SQLAlchemy、Python ORMなど)と結合できます。
Druid
Druidは、データのビジネスインテリジェンスクエリのために開発されたオープンソースの分析データベースです。Druidは、リアルタイムなデータ取り込みを低レイテンシで行い、柔軟なデータ探索と迅速な集約を提供します。デプロイメントは、多数のペタバイトのデータに関連して、何兆ものイベントに達することがよくあります。
Stream Analytics Manager (SAM)
Stream Analytics Managerは、ストリーム処理開発者が従来の数行のコードを記述するのに比べてわずか数分でデータトポロジを構築できるドラッグ&ロップのプログラムです。今すぐに、各コンポーネントまたはプロセッサがデータの計算を実行する方法をユーザは設定および最適化できます。ウィンドウ処理機能、複数のストリーム結合、その他のデータ操作を実行できます。SAMは現在、Apache Stormと呼ばれるストリーム処理エンジンをサポートしていますが、後にSparkやFlinkなどの他のエンジンもサポートします。その際には、ユーザーが選択したいストリーム処理エンジンを選択できるようになるでしょう。
Schema Registry
Schema Registry(SR)は、RESTfulインターフェイス経由でAvroスキーマを格納および取得します。SRには、すべてのスキーマを含むバージョン履歴が保存されます。シリアライザは、スキーマを格納するKafkaクライアントに接続するために提供され、Avro形式で送信されたKafkaメッセージを取得します。
Step 1: Druid ServiceにMAPBox APIキーを追加する
Mapboxは地図でデータの可視化を作成できるサービスで、SuperSetで使用します。SuperSetでMapboxの地図の可視化機能を使用するためには、MapBox APIキーをDruidの設定として追加する必要があります。
- APIキーを取得するためには、
mapbox.com
に移動し、アカウントを作成します。その後、Mapbox Studio -> “My Access Tokens” -> “Create a new token” ->DruidSuperSetToken
という名前を付け、初期設定を保持しておきます。
Mapbox Studio
My access tokens
Create a new token: DruidSuperSetToken
- 初期設定のパラメータのままで、トークンの名前を
DruidSuperSetToken
にします。
- Ambari Druid Serviceから、Configs -> Advanced -> フィルタフィールドにて
MAPBOX_API_KEY
を検索すると、MAPBOX_API_KEYの属性が表示されます。 mapbox.comから取得したMAPBOX_API_KEYを記入してください。
- Saveをクリックし、Save Configurationに
MAPBOX_API_KEY added
を入力し、もう一度Saveを押下します。
- Druid SuperSetコンポーネントを再起動します。
Step 2: Kafkaトラックトピックが作成されていることを確認する
- hdfsユーザに切り替え、ディレクトリを作成し、すべてのユーザに権限を与えます。これにより、SAMはすべてのディレクトリにデータを書き込むことができます。
su hdfs
hdfs dfs -mkdir /apps/trucking-app
hdfs dfs -chmod 777 /apps/trucking-app
cd /usr/hdp/current/kafka-broker/bin/
./kafka-topics.sh --list --zookeeper sandbox-hdf.hortonworks.com:2181
Step 3: Schema Registryにトラックスキーマを作成する
Schema Registryにsandbox-hdf.hortonworks.com:17788
、またはAmbariのQuick Linksにある“Registry UI”からアクセスします。4つのトラックスキーマを作成していきます。
- 新しいスキーマを追加するためには“+”ボタンをクリックします。“Add New Schema”というウィンドウが表示されます。
- 表1の情報をもとに、新規のスキーマ(最初のスキーマ)に次の特性を追加します。
表1: raw-truck_events_avroスキーマ
PROPERTY | VALUE |
---|---|
Name | raw-truck_events_avro |
Desc | Raw Geo events from trucks in Kafka Topic |
Group | truck-sensors-kafka |
Browse File | raw-truck_events_avro.avsc |
ファイル参照: 先ほどダウンロードした“templates”フォルダに移動すると、"Schema"フォルダに全てのスキーマテンプレートがあります。
スキーマ情報のフィールドを入力してテンプレートをアップロードしたら、Saveをクリックします。
注釈: Groupは論理グループのようなものです。アプリケーション開発者が同じ全てのSchema Registryからスキーマを取得し、それらのプロジェクトに関連した名前でグループ化する1つの方法です。
- 表2の情報をもとに、2番目の新しいスキーマを追加します。
表2: raw-truck_speed_events_avroスキーマ
PROPERTY | VALUE |
---|---|
Name | raw-truck_speed_events_avro |
Desc | Raw Speed Events from trucks in Kafka Topic |
Group | truck-sensors-kafka |
Browse File | raw-truck_speed_events_avro.avsc |
スキーマ情報のフィールドを入力してテンプレートをアップロードしたら、Saveをクリックします。
- 表3の情報をもとに、3番目の新しいスキーマを追加します。
Table 3: truck_events_avroスキーマ
PROPERTY | VALUE |
---|---|
Name | truck_events_avro |
Desc | Schema for the kafka topic named ‘truck_events_avro’ |
Group | truck-sensors-kafka |
Browse File | truck_events_avro.avsc |
スキーマ情報のフィールドを入力してテンプレートをアップロードしたら、Saveをクリックします。
- 表4の情報をもとに、4番目の新しいスキーマを追加します。
Table 4: truck_speed_events_avroスキーマ
PROPERTY | VALUE |
---|---|
Name | truck_speed_events_avro |
Desc | Schema for the kafka topic named ‘truck_speed_events_avro’ |
Group | truck-sensors-kafka |
Browse File | truck_speed_events_avro.avsc |
Saveをクリックします。
Step 4: GeoEnrich Kafka DataにNiFiフローを導入する
- Ambari NiFi Service Summaryウィンドウの“Quick Links”からNiFiを起動するか、
http://sandbox-hdf.hortonworks.com:19090/nifi
でNiFi UIを開きます。 - “Operate panel”のNiFi upload templateボタンを使用して、先ほどダウンロードした“templates” -> “nifi”フォルダにある
Nifi_and_Schema_Registry_Integration.xml
をアップロードします。
- Templateアイコンを“components toolbar”からキャンバスにドラッグし、アップロードしたばかりのテンプレートを追加します。
- “Operate panel”の左隅にある歯車をクリックし、“Controller Services”タブを開きます。
- “HWX Schema Registry”サービスを確認します。Schema RegistryのREST API URLがHDF 3.0 Sandboxで実行されている適切なSchema Registryを指していることを確認します。
http://sandbox-hdf.hortonworks.com:17788/api/v1
であるはずです。 - “HWX Schema Registry”サービスが有効であることを確認します。“HWX Schema Registry”に依存する他のすべての関連するサービスが有効であることを確認します。下図のように有効になっていない場合は、稲妻の記号をクリックして有効にします。
- NiFiフローのルートレベルから、左下の“NiFi Flow”に示すように、NiFiフローを開始します。“Use Case 1”のプロセスグループを右クリックし、“Start”を選択します。“Data-Loader”が実行されるまで、データの登録は開始されません。
Step 5: Druid SuperSetのためのデータを前処理するためにSAMトポロジをデプロイする
- Ambari SAM Service Summaryウィンドウの“Quick Links”からStreaming Analytics Manager(SAM)を起動するか、
http://sandbox-hdf.hortonworks.com:17777/
でSAM UIを開きます。 - 左隅にあるツールの“Configuration”をクリック -> “Service Pool”リンクを選択します。
- Ambari API URLに http://sandbox-hdf.hortonworks.com:8080/api/v1/clusters/Sandbox を入力します。
- ユーザ名
admin
、パスワードadmin
でログインします。
- SAM Service PoolをAmbari API URLに指し示します。
- SAMクラスタが作成されました。
- ホーム画面に戻るために、左上にあるSAMのロゴをクリックします。
- ツールの“Configuration”をクリックし、“Environments”をクリックします。以下の表について、以下のプロパティ値を持つ新しい環境を追加するために、“+”ボタンをクリックします。
表5: Environment metadata
PROPERTY | VALUE |
---|---|
Name | HDF3_Docker_Sandbox |
Description | SAM Environment Config |
Service | Include all services |
OKをクリックして環境を作成します。
- 左隅にある“Configuration”ボタンをクリックし、次に“Application Resources”をクリックして、UDFタブを押下します。新しいUDFを追加するためには‘+’をクリックします。以下の表について、“Add UDF”ウィンドウのフィールドに下記のプロパティ値を入力し、OKをクリックします。
PROPERTY | VALUE |
---|---|
Name | ROUND |
Display Name | ROUND |
Description | Rounds a double to integer |
Type | FUNCTION |
Classname | hortonworks.hdf.sam.custom.udf.math.Round |
UDF JAR | sam-custom-udf-0.0.5.jar |
注釈: UDF JARは、以前ダウンロードした“sam”フォルダの“templates”フォルダにあります。
- 新しいUDFを追加
- ROUND UDFが追加されました
SAMのロゴを押下し、“My Applications”ページに戻ります。
- “+”ボタンをクリックして新しいアプリケーションを追加します。“Import Application”を選択し、“sam”フォルダから“IOT-Trucking-Ref-App.json”のテンプレートを選択します。
- Application Name:
IOT-Trucking-Demo
- Environment: HDF3_Docker_Sandbox
OKをクリックします。
- キャンバスに表示されるSAMトポロジから、2つのKafka Sinkを確認します。それぞれをダブルクリックし、“Security Protocol”で“PLAINTEXT”が選択されていることを確認します。Kafka BrokerのURLは、
sandbox-hdf.hortonworks.com:6667
を指しているはずです。
- Druid Sinkを確認し、ZOOKEEPER CONNECT STRING URLが
sandbox-hdf.hortonworks.com:2181
に設定されていることを確認します。それ以外であったらその値を変更します。
- SAMトポロジをデプロイするためには、右下のRunアイコンをクリックします。
- 注釈: “Are you sure want to continue with this configuration?”のウィンドウが表示され、デフォルトの設定を維持してOKをクリックします。
デモが正常にデプロイされたことを示すはずです。
Step 6: アプリケーションでData-Loaderを実行する
- hdfsユーザーを終了し、Data-Loaderを実行してデータを生成し、Kafka Topicに転送します。
exit
cd /root/Data-Loader
tar -zxvf routes.tar.gz
nohup java -cp /root/Data-Loader/stream-simulator-jar-with-dependencies.jar hortonworks.hdp.refapp.trucking.simulator.SimulationRegistrySerializerRunnerApp 20000 hortonworks.hdp.refapp.trucking.simulator.impl.domain.transport.Truck hortonworks.hdp.refapp.trucking.simulator.impl.collectors.KafkaEventSerializedWithRegistryCollector 1 /root/Data-Loader/routes/midwest/ 10000 sandbox-hdf.hortonworks.com:6667 http://sandbox-hdf.hortonworks.com:17788/api/v1 ALL_STREAMS NONSECURE &
Data-Loaderを実行させることによって、Kafkaトピックにデータが保存され、NiFiとSAMがKafkaトピックからデータを取り出します。
- nohup.outファイルにデータが取り込まれていることを確認します。
6.1 データフローとストリーム処理データを確認する
- NiFi UIに戻ると、Kafka Consumerからトラックイベントに引っ張ってきているはずです。そうでなければ、それ以前に問題があります。
- SAM UIに戻り、左上にあるSAMのロゴをクリックしてSAMのルートレベルに移動します。現在のページから移動するかどうかを尋ねるメッセージが表示されたら、OKをクリックします。次に、アプリケーションの概要を提供する必要があります。アプリケーションの詳細を表示するには、appをクリックします。
- Storm Monitorをクリックし、Stormがタプルを処理していることを確認します。
Step 7: スライスを持つSuperSetダッシュボードを作成する
- Ambariダッシュボードから、Druid Serviceをクリックします。その後、“Quick Links”ドロップダウンを押下し、SuperSetを選択します。
- 初期設定でログインしていない場合、ログインの認証情報は
admin / hadoophadoop
です。 - “Sources”、“Refresh Druid Metadata”の順にクリックします。SAMトポロジによって作成された2つのデータソースは、25分以内に表示されます。
- “Data Source”の
violation-events-cube-2
を選択すると、データの可視化を作成できる“Datasource & Chart Type”ページが表示されます。
“サンバースト図”でDriverViolationsSunburstの可視化を作成する
- Data Source & Chart Typeから、“Table View”をクリックし、Chart typeを
Sunburst
に設定します。
-
“Time”から、“Time Granularity”を
one day
に設定します。“Since”を7 days ago
に、“Until”をnow
に設定します。 -
Hierarchyを
driverName, eventType
に設定します。 -
左上にある“緑のQueryボタン”を押下してクエリを実行し、出力を確認します。Sunburstの可視化には18.49秒かかります。
-
Save asボタンをクリックして、Save asには
DriverViolationsSunburst
と入力し、add to new DashboardにはTruckDriverMonitoring
を入力して、保存をします。
“Mapbox”でDriverViolationMapの可視化を作成する
- “Table View”で
Mapbox
を選択します。 - Timeには、先ほどの設定を維持します。
- “Longitude”と“Latitude”を名前付き変数(longitudeとlatitude)に変更します。
- “Clustering Radius”を
20
に設定します。 - “GroupBy”を
latitude,longitude,route
に設定します。 - “Label”を
route
に設定します。 - “Map Style”を
Outdoors
に設定します。 - Viewportから、“Default Long field”を
-90.1
に、Latを38.7
に、Zoomを5.5
に設定します。 - クエリを実行します。
- “Save As”ボタンをクリックして、“Save as”を選択し
DriverViolationMap
と名付けます。その後、“Add slice to existing dashboard”を選択し、ドロップダウンからTruckDriverMonitoring
を選びます。“Save”を押下します。
SuperSetスライスの可視化ダッシュボードにアクセスする
- Dashboardsタブをクリックすると、ダッシュボードのリストが表示されます。
- TruckDriverMonitoringのダッシュボードを選択します。
他のSuperSetの可視化を試す(任意)
- “TruckDriverMonitoring”のダッシュボードにて、“+”ボタンで新しいスライスを作成し、他の可視化を探索します。
- 新しい可視化スライスを作成します。
まとめ
おめでとうございます!堅牢なキュー(Kafka)、データフローマネジメントエンジン(NiFi)、ストリーム処理エンジン(Storm)を使用してトラックのイベントデータを処理するSAMのデモをデプロイしました。Stream Analytics Managerを使用して複雑なデータを計算するために視覚的なデータフローを作成し、Druid SuperSetでデータを可視化する方法について学びました。
参考文献
付録: リアルタイムイベント処理のデモについてのトラブルシューティング
付録A: SAMアプリケーションのシャットダウン
- data-loaderを強制終了する。
(ps -ef | grep data-loader)
- NiFiがkafkaのキューを吐かせ、NiFiフローのデータが0になります。
- Ambariからすべてのサービスを停止します。
- HDF 3.0 Sandboxのインスタンスをシャットダウンします。
付録B: Kafkaトピックが存在しなかった場合、Kafkaトピックを作成する
- SSHでHDS 3.0 Sandboxに入ります。
ssh root@localhost -p 12222
sudo su -
- 新しいアプリケーションをデプロイするために、既にアプリケーションが存在する場合に備えて、Kafka Topicを削除してください。
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --if-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --topic raw-truck_events_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --if-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --topic raw-truck_speed_events_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --if-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --topic truck_events_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --if-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --topic truck_speed_events_avro
- Kafka Topicを再作成する。
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --if-not-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partition 1 --topic raw-truck_events_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --if-not-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partition 1 --topic raw-truck_speed_events_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --if-not-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partition 1 --topic truck_events_avro
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --if-not-exists --zookeeper sandbox-hdf.hortonworks.com:2181 --replication-factor 1 --partition 1 --topic truck_speed_events_avro
付録C: SAMトポロジにNotification Sinkを追加する
- 左にあるprocessorバーをNotification Sinkまでスクロールします。Notification Sinkをドラッグしてトポロジに接続します。デフォルトコネクションを受け入れます。
- Notification Sinkまでスクロールします。
- キャンバスにNotification Sinkをドラッグします。
- Notification sinkを編集し、下記のプロパティ値を追加します。
PROPERTY | VALUE |
---|---|
Username | [email protected] |
Password | StrongPassword |
Host | smtp.gmail.com |
Port | 587 |
From/To Email | [email protected] |
Subject | Driver Violation |
Message | Driver ${driverName} is speeding at ${speed_AVG} mph over the last 3 minutes |
付録D: SAMのデモの拡張機能を追加する
SAMのデモの拡張機能には、カスタムプロセッサとUDFが付属しています。これらのコンポーネントは、“Application Resources” -> “Custom Processor”または“UDF”に追加することによって、トポロジに組み込むことができます。
- SAMのデモの拡張機能をダウンロードします。
git clone https://github.com/georgevetticaden/sam-custom-extensions.git
- HDF 3.0 Sandboxにmavenをダウンロードします。
sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
sudo sed -i s/$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
sudo yum install -y apache-maven
mvn --version
- maven clean packageを実行して、カスタムプロセッサとUDFのためのプロジェクトコードを独自のjarファイルにパッケージ化します。
mvn clean package -DskipTests
OKを押下します。