influxdb - taka512/memo GitHub Wiki

influxdb

1. 概要

以下の特徴があるDB

  • メトリクスのような時系列のデータが得意
  • SQLライクなクエリ言語
  • HTTP APIでのクエリの発行が可能
  • データ数は数十億レベルまで格納可能(らしい)
  • テーブル数は数百万レベルまで可能(らしい)
  • tagでの効率的なアクセス
  • データの保持ポリシーが設定可能
  • 組込のweb管理画面
  • 即時集計

2. 基礎知識

インストール

$ wget http://s3.amazonaws.com/influxdb/influxdb-latest-1.x86_64.rpm
$ sudo rpm -ivh influxdb-latest-1.x86_64.rpm

設定ファイル

/etc/opt/influxdb/influxdb.conf

ログファイル

/var/log/influxdb/influxd.log

データディレクトリ

/var/opt/influxdb/db

使用ポート

ポート 用途
8083 Web UI
8086 API
8090 Cluster
8099 Replication

用語

「DATABASE」と「MEASUREMENT」(テーブル)「SERIES」があり、「MEASUREMENT」は時系列のポイント(データ)が存在しており、ポイントは「timestamp」「fields」「tags」が含まれる。 「SERIES」は「MEASUREMENT」とtagの組み合わせっぽい

1シリーズ1バリューがお勧めらしい

MEASUREMENT

timestamp

  1. timestampには必ずプライマリーインデックスが設定される
  2. 書式はRFC3339またはエポック時間

RFC3339

"time": "2015-01-29T21:50:44Z"
"time": "2015-01-29T14:49:23-07:00"
"time": "2015-01-29T21:51:28.968422294Z"
"time": "2015-01-29T14:48:36.127798015-07:00"

エポック時間

"time": 1422568543702900257,
"precision": "n"

precisionは精度

表記 単位
n microseconds
u microseconds
s seconds
m minutes
h hours

tag

  1. tagもindexが設定される
  2. tagは常に文字列として解釈される
  3. オプショナルの指定値である
  4. tagはカーディナリティが低い方が良い(高いとデータがたくさんできる?)
  5. 高いカーディナリティのtagはfieldがふさわしいかもしれない

3. Web UIからの操作

Web UIにアクセスするとログイン画面が表示されるので「root/root」でログインする。 データベースは適当に作ると以下のように表示されるので「Explore Data」をクリックすると選択したデータベースのデータ操作画面に遷移する

url

http://localhost:8083/

管理画面

データの操作

データ操作画面は参照系のクエリは「Read Points」に入力し、更新系は「Write Data」にjsonを入力して更新

管理画面

4. CUI clientからの参照

cuiのクライアントが「/opt/influxdb/influx」に存在してCUIからの対話シェルによる操作も可能である

$ /opt/influxdb/influx
Connected to http://localhost:8086 version 0.9.0-rc24
InfluxDB shell 0.9.0-rc30
> use test
Using database test
> SELECT * FROM cpu_load_short
name: cpu_load_short
--------------------
time                    value
2009-11-10T23:00:00Z    0.64

5. クエリ

定義を見る系のクエリ

# データベース一覧
SHOW DATABASES

# series一覧を表示
SHOW MEASUREMENTS
# whereで絞込ができる
SHOW MEASUREMENTS WHERE region = 'us-west'

# seriesとtagとtag値の一覧を表示
SHOW SERIES
# seriesを指定できる
SHOW SERIES FROM cpu_load_short
#  whereで絞込ができる
SHOW SERIES FROM cpu_load_short WHERE region = 'us-west'

# seriesとtag一覧を表示
SHOW TAG KEYS

参照系クエリ

SQLに似た言語で参照できる詳しくは下記を参照

http://influxdb.com/docs/v0.9/query_language/querying_data.html

# 「cpu_idle」のデータをselect
select * from cpu_idle

# 複数series検索(結果は別々)
select * from test , test1;

# seriesの前方一致
select * from /^stats.*/i limit 10

# 条件指定
select * from response_times where value > 200

# 条件指定(不一致)
select * from nagios_checks where status != 0;

# 条件指定(and)
select * from events where customer_id = 23 and type = 'click';

# 条件指定(or)
select * from events where customer_id = 23 or type = 'click';

# 条件指定(括弧)
select * from events
where (email =~ /gmail/ or email =~ /yahoo/) and state = 'ny';

# 正規表現部分一致
select line from log_lines where line =~ /gmail/i

# 正規表現一致しない
select * from test where col !~ /gmail/i

from句の完全修飾

from句はデータベース名と保存期間を指定する事もできる

"<database>"."<retention period>".<measurement>

SELECT value FROM "test"."mypolicy".cpu_load

削除

# 1時間以上前のデータを削除
delete from response_times where time < now() - 1h

# response_timesを削除
drop series response_times

6. HTTP APIので操作

apiにリクエストを送信する事でデータの参照や更新を行う事ができる

データ操作APIのエントリポイント

# クエリを投げる
http://localhost:8086/query 

# データを書込
http://localhost:8086/write

以下のようにクエリをパラメータに記述してリクエストを送信

curl -G http://localhost:8086/query --data-urlencode "q=CREATE DATABASE test"

参照

curl -G 'http://localhost:8086/query' --data-urlencode "db=test" --data-urlencode "q=SELECT value FROM cpu_load_short"

# pretty=trueで出力が整形
curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=test" --data-urlencode "q=SELECT value FROM cpu_load_short"


# 「;」で区切ると複数クエリも可能
curl -G 'http://localhost:8086/query' --data-urlencode "db=test" --data-urlencode "q=SELECT value FROM cpu_load_short;SELECT * FROM cpu_load_long"

データ追加

データの追加はjsonをPOSTして行う

curl -XPOST 'http://localhost:8086/write' -d '
{
    "database": "test",
    "retentionPolicy": "default",
    "points": [
        {
            "name": "cpu_load_short",
            "tags": {
                "host": "server01",
                "region": "us-west"
            },
            "time": "2009-11-10T23:00:00Z",
            "fields": {
                "value": 0.64
            }
        }
    ]
}
'

# 複数行を同時に更新する場合
$ curl -XPOST 'http://localhost:8086/write' -d '
{
    "database": "test",
    "retentionPolicy": "default",
    "tags": {
        "host": "server01",
        "region": "us-west"
    },
    "time": "2009-11-10T23:00:00Z",
    "points": [
        {
            "name": "cpu_load_short",
            "fields": {
                "value": 0.64
            }
        },
        {
            "name": "cpu_load_short",
            "fields": {
                "value": 0.55
            },
            "time": 1422568543702900257,
            "precision": "n"
        },
        {
            "name": "network",
            "tags": {
                "direction": "in"
            },
            "fields": {
                "value": 23422
            }
        }
    ]
}
'

7. 参照処理の深淵

時刻

もっとも細かい値でミリ秒まで指定可能で形式は「YYYY-MM-DD HH:MM:SS.mmm」 もちろん「YYYY-MM-DD」指定も可能

select value from response_times where time > '2013-08-12 23:32:01.232';

相対時刻

以下のように相対時間を指定できるが「1h」とかの表記は表の通り

# 1時間前
select value from response_times where time > now() - 1h limit 1000;
表記 単位
省略 microseconds
u microseconds
s seconds
m minutes
h hours
d days
w weeks

絶対時間

# 「2014-01-01 00:00:00」と同じ
select value from response_times where time > 1388534400s

集計(GROUPBY)

# 1日前からの10分毎の数をカウント(COUNT)
select count(customerId) from customer_events 
where time > now() - 1d group by time(10m)

# ユーザ毎の数をカウント
select count(customerId), customerId from customer_events
group by time(10m), customerId
where time > now() - 1h

# fill(0)だとデータの無い時間帯はデフォルト0となる
select count(type) from events
group by time(1h) fill(0) where time > now() - 3h

# 1日前から30分毎の平均を取得(MEAN)
select mean(value) from cpu_idle 
where time > now() - 1d group by time(30m) 

# 1時間前からのユニークをidを出力
select distinct(customerId) as customerId from customer_events 
where time > now() - 1h

集計関数

http://influxdb.com/docs/v0.8/api/aggregate_functions.html

join

時刻なるべく近いものをinfluxが合算する

select hosta.value + hostb.value
from cpu_load as hosta
inner join cpu_load as hostb
where hosta.host = 'hosta.influxdb.orb' and hostb.host = 'hostb.influxdb.org';

8. Retention Policy

登録したデータを何日で削除するとかの設定を「Retention Policy」という

# ポリシー一覧表示
> SHOW RETENTION POLICIES test
name    duration        replicaN        default
default 0               1               true

# ポリシー作成
CREATE RETENTION POLICY testpolicy ON test DURATION 1h REPLICATION 1 DEFAULT

# ポリシー変更
ALTER RETENTION POLICY testpolicy ON test DURATION 2h

9 認証

認証はデフォルトでオフ

ユーザ関係のコマンド

# ユーザ一覧
SHOW USERS

# ユーザ追加
CREATE USER test WITH PASSWORD 'mypassword'

# ユーザ削除
DROP USER test

# パスワード変更
SET PASSWORD FOR test = 'mynewpassword'

権限系のコマンド

# READ権限追加
GRANT READ ON test TO test

# 権限はく奪
REVOKE ALL ON test TO test

READ|WRITE|ALLがある

10. client接続

goの場合

ソース

https://github.com/influxdb/influxdb/tree/master/client

書込例1

package main

import (
    "fmt"
    "github.com/influxdb/influxdb/client"
)

func main() {
    c, err := client.NewClient(&client.ClientConfig{
        Database: "project",
        Username: "root",
        Password: "root",
    })
    if err != nil {
        panic(err)
    }

    w := &client.Series{
        Name:    "metrics",
        Columns: []string{"value"},
        Points: [][]interface{}{
            {2.0}, {3.0},
        },
    }

    e := c.WriteSeries([]*client.Series{w})
    if e != nil {
        fmt.Printf("error: %s", e)
    }
}

書き込み例2

こいう書き方もできる

package main

import (
    "fmt"
    "github.com/influxdb/influxdb/client"
)

func main() {
    c, err := client.NewClient(&client.ClientConfig{
        Database: "project",
        Username: "root",
        Password: "root",
    })
    if err != nil {
        panic(err)
    }

    w := &client.Series{
        Name: "metrics",
    }

    p := []interface{}{}
    w.Columns = append(w.Columns, "value")
    p = append(p, 10)
    w.Points = [][]interface{}{p}
    e := c.WriteSeries([]*client.Series{w})
    if e != nil {
        fmt.Printf("error: %s", e)
}

読み込み例

package main

import (
    "fmt"
    "github.com/influxdb/influxdb/client"
)

func main() {
    c, err := client.NewClient(&client.ClientConfig{
        Database: "project",
        Username: "root",
        Password: "root",
    })
    if err != nil {
        panic(err)
    }

    results, err := c.Query("select * from metrics")
    if err != nil {
        panic(err)
    }
    for _, result := range results {
        fmt.Printf("%#v\n", result)
    }
}
$ go run main.go
&client.Series{Name:"metrics", Columns:[]string{"time", "sequence_number", "value"}, Points:[][]interface {}{[]interface {}{1.421073500695e+12, 670001, 10}, []interface {}{1.42107279922e+12, 660001, 3}, []interface {}{1.42107279922e+12, 650001, 2}, []interface {}{1.421072609614e+12, 640001, 1}}}

11. バックアップと復元

以下のコマンドで「」バックアップを取ることができる。1回目で「/tmp/snapshot」にフルバックアップができ、2回叩くと「/tmp/snapshot.0」に差分バックアップが生成される

/opt/influxdb/influxd backup /tmp/snapshot
$ ls /tmp/snapshot*
/tmp/snapshot

/opt/influxdb/influxd backup /tmp/snapshot
$ ls /tmp/snapshot*
/tmp/snapshot  /tmp/snapshot.0

復元は以下

/opt/influxdb/influxd restore -config influxdb.conf /tmp/mysnapshot

9. 参考資料

influxdbの概要

http://www.slideshare.net/sonots/influxdb-study-20140627

InfluxDB をちょっとさわってみた

http://d.hatena.ne.jp/hirose31/20140404/1396596668

influxdb-internals

https://speakerdeck.com/chobie/influxdb-internals-1

⚠️ **GitHub.com Fallback** ⚠️