LOFFunction - acromusashi/acromusashi-stream-example GitHub Wiki

外れ値検出(LOF:Local Outlier Factor)機能

本ページでは外れ値検出(LOF:Local Outlier Factor)機能の利用方法について説明します。

処理内容

本機能は学習ストリーム、評価ストリームと2つのフローがあり、各々は以下のフローで動作します。

学習ストリーム

  1. TextReadBatchSpoutが学習に使用するデータを1行1データとして読み込みます。
  2. 読み込んだデータをLofPointCreatorを用いてエンティティに変換します。
  3. LofUpdaterが読み込んだデータを用いて学習を行い、InfinispanLofStateを使用して学習データをIn-Memory DBに保存します。

評価ストリーム

  1. LofDrpcClientがDRPC Serverに対して学習データを用いた外れ値スコア算出リクエストを送信します。
  2. DRPCSpoutがDRPC Serverからリクエストを受信します。
  3. リクエストをLofPointCreatorを用いてエンティティに変換します。
  4. LofQueryが学習データを用いてクラスタリングを行い、クラスタリング結果をLofDrpcClientに返却します。

準備

外れ値検出(LOF:Local Outlier Factor)機能を動作させるためには以下の準備が必要です。

  • Step1: 必要となるミドルウェアのインストール
  • Step2: LOFTopologyのデプロイ
  • Step3: LOFTopologyの設定
  • Step4: DRPC Serverプロセスの起動

Step1: 必要となるミドルウェアのインストール

下記の手順を確認し、必要ミドルウェアのインストールを行います。

Step2: LOFTopologyのデプロイ

Step2: acromusashi-stream-exampleのデプロイ を参照し、Topologyのデプロイを行います。

Step3: LOFTopologyの設定

Nimbusをインストールしたサーバの/opt/storm/conf 配下にLofTopology.yamlを配置し、下記(★項目)の設定を行います。

## KmeansTopology Config
## ★NimbusHost  
## Set StormCluster's Nimbus Host
nimbus.host        : "192.168.0.1"
## NimbusPort
nimbus.thrift.port : 6627
## WorkerProcess Number
topology.workers   : 3
## parallelismHint ThreadNum
topology.parallelismHint   :   3
## Storm Debug Flag
topology.debug     : false

## TopologyDefine
## ★Tridentの管理情報を保存するZooKeeperホスト
transactional.zookeeper.servers :
  - "192.168.0.1"
  - "192.168.0.2"
  - "192.168.0.3"

## ★Tridentの管理情報を保存するZooKeeperポート
transactional.zookeeper.port : 2181

## Spout
## ★LOF用のテストデータの配置パス&ベースファイル名
## parallelismHint設定に応じて「ベースファイル名_【TaskIndex】」のファイルが読みこまれる。
## 例)「Lof.txt_0」「Lof.txt_1」
lof.datafilepath     : "/opt/acromusashi-stream-ml/"
lof.datafilebasename : "Lof.txt"
## LOF用のテストデータを最後まで読んだ場合に最初から再読み込みを行うか?
lof.datafilereload   : true
## 1Transactionでファイルから読み込むデータ数
lof.maxbatchsize     : 50

## StateFactory
## ★学習モデルを保存するサーバアドレス。「host1:port1;host2:port2;host3:port3...」という形式で定義
lof.stateservers        : "192.168.0.1:11222;192.168.0.2:11222;192.168.0.3:11222"
## 学習モデルキャッシュ名称
lof.cachename           : "default"
## 学習モデルのマージを行う間隔
lof.merge.interval.secs : 300
## キャッシュ上に学習モデルを保存する期間
lof.lifespan.secs       : 300

## Creator
## ★投入データを分割する文字列
lof.delimeter        : " "

## Updater
## キャッシュに保存する際のベース名称
lof.state.base.name     : "Lof"
## 中間データを保持して計算を高速化するか?
## (キャッシュへの保持データ量は増大するため注意すること)
lof.has.intermediate    : true
## 学習データを受信時、常に学習モデルを更新するか?
lof.always.update.model : false
## 学習データ受信時の学習モデル更新間隔
## 有効条件:「lof.always.update.model」=false
lof.update.interval     : 100
## K値
lof.kn                  : 10
## LOF値の算出を行う最小データ数
lof.min.data.count      : 10
## 学習モデルが保持する最大データ数
lof.max.data.count      : 100
## 通知ログを出力するLOF閾値
lof.notify.threshold    : 5.0

## Query
## ★DRPCサーバアドレス
drpc.servers            :
  - "192.168.0.1"
## DRPC機能名称
lof.drpc.function : "lof"

Step4: DRPC Serverプロセスの起動

Nimbusインストールサーバにログインし、下記のコマンドを実行してDRPC Serverプロセスを起動します。

service storm-drpc start

実行

教師なし学習(KMeans++クラスタリング)機能は下記の手順で起動します。

  • Step1: LOFTopologyの起動
  • Step2: 外れ値スコア算出リクエストの送信
  • Step3: LOFTopologyの終了

Step1: LOFTopologyの起動

Nimbusインストールサーバにログインし、下記のコマンドを実行してKMeans++クラスタリングTopologyを起動します。

cd /opt/storm  
bin/storm jar acromusashi-stream-example-x.x.x.jar acromusashi.stream.example.ml.topology.LofTopology conf/LofTopology.yaml false  

Step2: 外れ値スコア算出リクエストの送信

Nimbusインストールサーバにログインし、下記のコマンドを実行してLofDrpcClientによる外れ値スコア算出リクエストを送信します。-dオプションに続けてクラスタリングを行いたいデータを入力してください。

java -classpath conf:acromusashi-stream-example-x.x.x.jar:lib/* acromusashi.stream.example.ml.client.LofDrpcClient -c conf/LofTopology.yaml -d "10,10,10,10"

コンソールに外れ値スコアが出力されることを確認します。

Step3: LOFTopologyの終了

Nimbusインストールサーバにログインし、下記のコマンドを実行してSNMPTrap受信Topologyを終了します。

cd /opt/storm  
bin/storm kill LofTopology

解説

特になし。