Mahoutで分散レコメンド(2)

んじゃ、早速Hadoopの疑似分散環境を作ってMahoutを回してみましょう。

HadoopのセットアップとMahoutの入手

まずは利用するHadoopのセットアップ。ここは本題じゃないので要点のみ。

入力データファイルの準備

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

まず、評価データのファイル(prefs.txt)。前使ったのと同じファイルですね。ユーザID,アイテムID,評点:(long,long,float) っていうレコードになってます。

前回「Hadoop税」の項で説明した通り、本来こんな小さなデータをMapReduceに掛けてもあんま意味ありません。ここで挙げるのはあくまでも例ってことでお願いしますね。大規模な奴は、検証してみて上手く行ったらあらためてご紹介したいと思います。

1

次に、Mahoutに対して「誰に対するレコメンドをするのか」を指示するファイル(users.txt)。改行区切りでユーザIDを列挙します。このファイル自体省略も可能(その場合全員に対するレコメンドを計算する)なんだけど、今回はとりあえず指定してみる。で、ここで注意点。このファイルの末尾に改行をいれないこと。上記の例だと、ただ単に1と書いてあるだけのファイルじゃないと落ちます。皆さんも、くれぐれも30時間近く待った挙げ句に落ちる、しかも原因はNumberFormatExceptionなんて悲劇に遭わぬよう気をつけてください。っていうかこれショックでさ、ショックすぎて俺原因究明してパッチ送ったさ。まぁ早速取り込んでもらえたみたいで、次期バージョンv0.6では直ると思います。

さて、こんな目に遭わないためにも、最初の試行時はHadoop税がどーのとかめんどくせーこと言わずにスモールデータで試しなさいってことですね。実用じゃなくて、あくまでも試行なんだから。

あい、気を取り直して。以上をHDFSに送り込みます。

$ hadoop fs -put /path/to/users.txt input/users.txt
$ hadoop fs -put /path/to/prefs.txt input/prefs.txt

送れたかどうか確認。

$ hadoop fs -ls input
Found 2 items
-rw-r--r--   1 daisuke supergroup        231 2011-06-03 14:17 /user/daisuke/input/prefs.txt
-rw-r--r--   1 daisuke supergroup          2 2011-06-04 23:10 /user/daisuke/input/users.txt

いよいよMahout起動

$ hadoop jar \
    /path/to/mahout-core-0.5-job.jar \
    org.apache.mahout.cf.taste.hadoop.item.RecommenderJob \
    -Dmapred.output.dir=output \
    -Dmapred.input.dir=input/prefs.txt \
    --usersFile input/users.txt \
    --similarityClassname SIMILARITY_PEARSON_CORRELATION
  • DLしたMahoutのjarファイルのパス(ローカルファイルシステム上)
  • レコメンドを行うDriverのFQCN
  • 結果の出力先パス(HDFS上)
  • 入力ファイル(上記)のパス(HDFS上)
  • ユーザファイル(上記)のパス(HDFS上)
  • 類似度行列を作る際の「類似度」に何を使うか指定(ここではピアソン相関係数*1

このまま5分ほど、正座して待つ。まぁ、待機時間はマシンスペック依存だと思いますが。エラーらしきものが表示されずにプロンプトに戻ってきたら完了。結果を見よう。

$ hadoop fs -ls output
Found 2 items
drwxr-xr-x   - daisuke supergroup          0 2011-06-04 23:19 /user/daisuke/output/_logs
-rw-r--r--   1 daisuke supergroup         18 2011-06-04 23:20 /user/daisuke/output/part-r-00000

この part-r-00000 ってファイルが結果。中身を見てみよう。

$ hadoop fs -cat output/part-r-00000
1       [104:3.9098573]

ユーザ1へのお勧めは、104番のアイテムで、3.9点くらいをつけるでしょう、というお告げでした。

前回見た通り、類似度行列にNaNが入りまくっていて、他のアイテムに対するレコメンドを計算できなかったみたいですね。similarityClassnameオプションを変更して類似度行列の計算方式を変える*2と、複数のアイテムについて評点のお告げをもらえたりもします。

1       [105:3.875,104:3.7222223,106:3.6]

*1:他にもSIMILARITY_COOCCURRENCE, SIMILARITY_EUCLIDEAN_DISTANCE, SIMILARITY_LOGLIKELIHOOD, SIMILARITY_TANIMOTO_COEFFICIENT, SIMILARITY_UNCENTERED_COSINE, SIMILARITY_UNCENTERED_ZERO_ASSUMING_COSINE, SIMILARITY_CITY_BLOCK が使えるようです。まだ全部は試してません。

*2:ここではSIMILARITY_COOCCURRENCEを使ってみた。しかし、prefs.txtの特性上Co-occurrenceは恐らく適さないんですけどね。

Mahoutで分散レコメンド(1)

さて、ちょっと間があきましたが。

前回まで、いったんレコメンドを抜けてクラスタリングの世界をご紹介してみた訳ですが。あまりウケがよさそうじゃないのでレコメンドに戻ってみます。

そんな中でMahoutが一押しであるのは、スケーラビリティの確保に重点が置かれていることです。

機械学習というのは、当然、計算に基づいて結果を出すわけですが、その基礎となるデータが多ければ多いほど、確からしい結果を出してくれます。が、しかし、データが多ければ多いほど、指数的に計算量が増加する傾向があります。

Apache Mahoutで機械学習してみるべ - 都元ダイスケ IT-PRESS

という導入から紹介に入ったレコメンドですが、実はあのアルゴリズムは分散処理できません。できませんったらできません。だってMapReduceパラダイムで書いてないんだもん。

ということで、先日紹介した処理をそのままMapReduce処理に変換できれば嬉しかったんですが。残念ながら、紹介したアルゴリズムをそのまま、分散処理の恩恵が受けられるようにMapReduceパラダイムに変換することは、できないようです*1。まぁ、出来ないのか異様に難しいのか、わからないけどとにかくMahoutには実装がないです。

つまり、「分散処理の恩恵が受けられるMapReduceパラダイムとして記述できるレコメンデーションアルゴリズム」ってのが必要なんですね。というわけで、先日紹介したアルゴリズムは、まずサッパリ忘れてください。新しいの行きます。

Hadoop

新しいアルゴリズムを紹介する前に。HadoopってのはMapReduceパラダイムを使って上手い事スケーラビリティを得られるプラットフォームな訳ですが、スケーラビリティは必要ではない場合、つまり扱うデータが小さく従来の非分散レコメンドアルゴリズムでも現実的な時間内で計算が終了する場合、逆に時間を食います。

非分散レコメンドは、データが小さい状態であれば「リアルタイム処理*2」ができます。しかし、これから紹介する分散レコメンドのアルゴリズムでは、どんなにデータが小さくても少なくとも5分程度の処理時間必要、つまり「バッチ処理*3」しかできません。

バッチ処理では、「最新の評価情報にリアルタイムで追従する」ことができなくなる、というトレードオフは認識しておきましょう。

類似度行列(similarity matrix)


数学の授業で習った「行列」って覚えてますか?*4

\large\left( \begin{array}1&2&3\\4&5&6\\7&8&9\end{array}\right)\cdot\large\left( \begin{array}2\\4\\6\end{array}\right)


こんなの*5。まぁ、ビビらないで。ここで使うのはベクトルのかけ算(内積)だけです。

で、ちょっと思い出してほしいんですが。非分散レコメンドで「Similarity(類似度)」っていう概念が出てきました。ユーザ同士がどれだけ似ているかを表す値。この類似度は、ユーザだけでなく、アイテムにも適用できます*6。で、以前挙げた例では「ピアソン相関係数」っていう-1〜1の範囲の値をSimilarityとして利用しました。

アイテム(101〜107)同士のピアソン相関係数を、全組み合わせで算出して、表にしてみました。データ数が少なくて計算できない(NaN)ところが多いですが。

101 102 103 104 105 106 107
101 1.00 0.94 -0.80 0.77 -1.00 NaN NaN
102 0.94 1.00 -0.98 0.99 NaN NaN NaN
103 -0.80 -0.98 1.00 -0.86 NaN NaN NaN
104 0.77 0.99 -0.86 1.00 NaN NaN NaN
105 -1.00 NaN NaN NaN 1.00 NaN NaN
106 NaN NaN NaN NaN NaN 1.00 NaN
107 NaN NaN NaN NaN NaN NaN 1.00

まず、101vs101や107vs107など、同じアイテム同士の類似性は、完全一致ってことで1.00になってます。当然ですが。しかしこんな値は当たり前すぎて計算に含めても全く意味がないので、全部NaNにしてしまいます*7

この表の中身を行列として考えるんです*8。こういうのを類似度行列(similarity matrix)と呼ぶっぽいです。

\large\left( \begin{array}NaN&0.94&-0.80&0.77&-1.00&NaN&NaN\\0.94&NaN&-0.98&0.99&NaN&NaN&NaN\\-0.80&-0.98&NaN&-0.86&NaN&NaN&NaN\\0.77&0.99&-0.86&NaN&NaN&NaN&NaN\\-1.00&NaN&NaN&NaN&NaN&NaN&NaN\\NaN&NaN&NaN&NaN&NaN&NaN&NaN\\NaN&NaN&NaN&NaN&NaN&NaN&NaN\end{array}\right)


ちなみに今回はピアソン相関係数を使った類似度行列を作りましたが、データの性質によって、共起(co-occurence)やユーグリッド距離(Euclidean distance)など、様々な「類似度」を使うことがあります。

でだ。次にユーザの評価表を考えます。ここではユーザ1の人に対してレコメンドをしてみようと思います。ユーザ1の評価データはこんなかんじ。

ユーザ1
101 5.0
102 3.0
103 2.5
104 -
105 -
106 -
107 -

未評価の部分は0として、これも行列にするとこんなかんじ。これがユーザベクトル。

\large\left( \begin{array}5.0\\3.0\\2.5\\0\\0\\0\\0\end{array}\right)

で、類似度行列とユーザベクトルの内積をもとめまーす。

\large\left( \begin{array}NaN&0.94&-0.80&0.77&-1.00&NaN&NaN\\0.94&NaN&-0.98&0.99&NaN&NaN&NaN\\-0.80&-0.98&NaN&-0.86&NaN&NaN&NaN\\0.77&0.99&-0.86&NaN&NaN&NaN&NaN\\-1.00&NaN&NaN&NaN&NaN&NaN&NaN\\NaN&NaN&NaN&NaN&NaN&NaN&NaN\\NaN&NaN&NaN&NaN&NaN&NaN&NaN\end{array}\right)\cdot\large\left( \begin{array}5.0\\3.0\\2.5\\0\\0\\0\\0\end{array}\right)=\large\left( \begin{array}a\\b\\c\\d\\e\\f\\g\end{array}\right)

はい、もう嫌になってきましたね。俺もです。

まぁ、こんな感じで求めた行列は、a〜fが101〜107の各アイテムに対する「オススメ度*9」になっています。この値が大きいアイテムがお勧めちゅーことですね。

今回のまとめ

レコメンドを分散アルゴリズムに対応させるためには、こんな感じで数学の世界にはまり込みます。俺も何故だかは分かりませんが、上記のアルゴリズムMapReduceパラダイムで表現できるみたいです。

そろそろ理屈はお腹いっぱいですね。さて次回は実際にMahoutを使って分散レコメンドしてみます。

そういえば

Apache Mahout v0.5 リリースされましたね。おめでとうございます。

*1:ただし、「分散処理の恩恵があまり受けられない形で、先日のアルゴリズムHadoop上に無理矢理乗せることは可能です。でも意味ないですね。

*2:つまり、Webアプリにおいて、ユーザからのリクエストをトリガとして計算を走らせ、その計算結果をレスポンスで返すような感じ。

*3:リクエストとは独立したバッチで事前に計算しておいて、リクエスト時に計算結果だけを参照する。

*4:まさか、はてなTeX記法のお世話になる日が来ようとは…。

*5:俺あんま好きじゃなかったなぁ。なんか縦横無尽に掛けたり足したりするだけなんだけど、数式作ってるうちにどこを見てるんだか分かんなくなって…。

*6:アイテム同士がどれだけ似ているかを計算できる。

*7:NaNにしないと有害なのか、1.00のままでもいいのか、なんてのはよくわかりません。とりあえずMahoutん中ではNaNにしてるのですw

*8:うわ、ほんとにNaNだらけだな。最後まで計算できるか不安になってきたw

*9:ただし、この段階では非分散の時のような「予想評点」ではない。

SerializableとserialVersionUID

以前、Javaシリアライズ仕様がよくわからなくてエントリを書いた。

難解なSerializableという仕様について俺が知っていること、というか俺の理解 - 都元ダイスケ IT-PRESS

まぁ、わからないまま書いたので論点もあっちゃこっちゃ飛びながらのエントリだったわけだけど、一年経ってコメントを頂いた。

serialVersionUIDに関しては、定義をしない場合にはクラスの構造を解釈して勝手にコンパイラが生成してくれます。
つまり、クラスのフィールドを変更したら勝手に変更してくれます。
問題点としては、コンパイラが勝手に計算してくれるので、複数のコンパイラをまたぐ場合に同じ定義のはずだけれども失敗することがある(らしい)ということです。
つまり、記事に書いてあることとは逆ですね。
JavaDocに記載されているのでご参照ください。
http://java.sun.com/j2se/1.5.0/ja/docs/ja/api/java/io/Serializable.html#そもそもSun実装以外のコンパイラを使うことがあるのかという疑問はありますが。
#そして、自分は複数のコンパイラ(+VM)で失敗することを見たことがないというか、Sun実装以外のコンパイラを使用したことがない。

難解なSerializableという仕様について俺が知っていること、というか俺の理解 - 都元ダイスケ IT-PRESS

なるほどっ。ありがとうございます。デフォルトコンストラクタを定義しないとコンパイラが勝手に作ってくれるアレだね! 非staticなメンバークラスを定義した時にコンパイラが勝手にフィールドを追加したりするアレだね! と思ったので実験してみた。

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;

public class Main implements Serializable {
 
  public static void main(String[] args) throws Exception {
    // (1)
    Constructor<Main> mainConst = Main.class.getDeclaredConstructor();
    System.out.println(mainConst);
   
    // (2-1)
    Main main = new Main();
    Child child = main.new Child(128);
    Constructor<Child> childConst = null;
    try {
      childConst = Child.class.getDeclaredConstructor(int.class);
      System.out.println("NG");
    } catch (NoSuchMethodException e) {
      System.out.println("OK");
    }
    childConst = Child.class.getDeclaredConstructor(Main.class, int.class);
    System.out.println(childConst);

    // (2-2)
    Field f = Child.class.getDeclaredField("this$0");
    Object object = f.get(child);
    System.out.println(object == main);
   
    // (3)
    Field field = Main.class.getDeclaredField("serialVersionUID");
    Object id = field.get(null);
    System.out.println(id);
  }
 

  class Child {
   
    private final int hoge;

    Child(int hoge) {
      this.hoge = hoge;
    }
  }
}

コンストラクタを1つも宣言しないと…

(1)は、Mainクラスには引数なしのコンストラクタを明示的に宣言していないけど、リフレクションでは取れる、という例。実行結果として「public Main()」みたいな文字列が表示されるので、コンストラクタは確かに存在することが分かる。もし指定した(引数なしの)コンストラクタが存在しないのならばgetDeclaredConstructorの時点でNoSuchMethodException飛ぶのがJavaの仕様である。

非staticの内部クラスを宣言すると…

(2)に先立って、まず用語整理。Mainを外包クラス(enclosing class)、Childを内部クラス(inner class)とします。それぞれのインスタンスを enclosing instance / inner instance と呼ぶ事にします。

で、(2-1)は、非static内部クラスのコンストラクタの第一引数に外包クラスが追加されている例だ。知らない人は、なぜコンパイラはこんな手を加えるのか、っての考えると面白いよ。本題からズレるのでその話は割愛。ポイントは、ChildのコンストラクタはChild(int hoge)と宣言したので Child.class.getDeclaredConstructor(int.class); でいいはずなのに、なぜか取れない。コンパイルしたバイトコードは実際は以下のような形をしているのだ。

public class Main implements Serializable {
  // 略
  class Child {
    final Main this$0;    
    Child(Main x, int hoge) {
      this$0 = x;
      this.hoge = hoge;
    }
  }
}

従って、Child.class.getDeclaredConstructor(Main.class, int.class); が正解。引き続き実行すると「Main$Child(Main,int)」と表示される。Childのコンストラクタ引数の数は、実は2個なんですね。

(2-2)は、上記コードの謎の this$0 フィールドの存在と同一性を確認しています。表示は「true」。

Serializableを実装して、serialVersionUIDを宣言しないと…!?

さて、本題は(3)です。1〜2でノリノリになってしまった。MainはSerializableを実装しているので、同じノリで自動生成してくれるんだろうなぁ、ということで Main.class.getDeclaredField("serialVersionUID"); してみた。が、

java.lang.NoSuchFieldException: serialVersionUID

くっ…。無いらしい。もっと単純にしてjadってみる。

import java.io.Serializable;
import java.lang.reflect.Field;

public class Main implements Serializable {
 
  public static void main(String[] args) throws Exception {
    Field field = Main.class.getDeclaredField("serialVersionUID");
    Object id = field.get(null);
    System.out.println(id);
  }
}
// Decompiled by Jad v1.5.8g. Copyright 2001 Pavel Kouznetsov.
// Jad home page: http://www.kpdus.com/jad.html
// Decompiler options: packimports(3)
// Source File Name:   Main.java

import java.io.PrintStream;
import java.io.Serializable;
import java.lang.reflect.Field;

public class Main
    implements Serializable
{

    public Main()
    {
    }

    public static void main(String args[])
        throws Exception
    {
        Field field = Main.getDeclaredField("serialVersionUID");
        Object obj = field.get(null);
        System.out.println(obj);
    }
}

やっぱりねぇよー。あらためて仕様を読み直す。

直列化可能クラスが明示的に serialVersionUID を宣言しない場合、直列化ランタイムは、『Java(TM) Object Serialization Specification』に記述されているように、さまざまな局面に基づいて、そのクラスのデフォルトの serialVersionUID 値を計算します。

直列化ランタイムは
直列化ランタイムは

直列化ランタイムは


うおぉ。読み漏らした。自動生成するのはコンパイラじゃなくてランタイムだっ。というわけで、コンパイラ(javacコマンド)がバイトコードに埋め込むんじゃなくて、ランタイム(javaコマンド)が頑張るんだね。

というわけで、コンパイラをSun-JDKに固定したとしても、ランタイムが異なるとアレな結果になるかもしれないっぽい。

最後に。ランタイムが未定義のserialVersionUIDにどんな値を割り当ててるのか、見る方法は分からなかった。誰か知ってますか?

僕と地豆とDDD

Domain-Driven Design: Tackling Complexity in the Heart of Software

Domain-Driven Design: Tackling Complexity in the Heart of Software

Domain-Driven Design: Tackling Complexity in the Heart of Software というソフトウェア設計に関する本がある。

この設計の考え方は、略してDDDと呼ばれたが、同時に「DDD難民」という言葉さえも生み出したらしい。ハードカバーによる560ページ、6000円を超える、洋書である。今思えば怖い物知らずだった。知らなかったとは言え、よくもまぁこの本を手にとったものだ…。

この本との出会いは、2009年9月に遡る*1。当時も相変わらず、Jiemamyの設計に頭を悩ませていた。この頃、自分は既にオブジェクト指向という考え方に傾倒していたのだが、正直なところ、現実に直面する "ソフトウェアの複雑さ" に対しては、オブジェクト指向のパワーだけでは太刀打ちできない、ということにも薄々感づき始めていた頃でもある。

そんな悩みを、コミッタ間で共有してはいたのだが、やはり思うように進まない。そんな時、「大ちゃんは、きっと(DDDの考え方が)好きだと思うよ」と、この本を薦めてくれたのが id:ashigeru である。

俺は当時、洋書なんて一冊も持っていなかった。英語と言えば、Web上で仕方なく調べるドキュメントの類がせいぜいだった。ただ、だんだん「自分の知りたい情報は、日本語で書かれていない」という事実に気づき*2始めていた自分は、やっぱり英語ももっと読めなきゃなぁ、とも考えていた。

そんな経緯で、初めて手に入れた洋書、それがDDD本であった。この本を読む目的は「DDDの考え方を身に付ける」ことと「英語の自信をつける」ことだったわけだ。

結果から言おう。DDD本は、初めて手にする洋書としてはお薦めできないw 内容が難しい上に、話の流れも難しい。なぜ突然裁判に訴えられた話が出てくるのか*3、理解するのに数時間かかった。普通だったら絶対に挫折する。自画自賛も甚だしいが、根性で読んだ俺エライわ…。かなり時間かかったけど。まぁ、その分自信も実力も付いたので、根性大好きな人には勧めてもいいかもしれないw この経験のお陰で、他の本が簡単に読める気がする。

しばらく読み進めて、id:ashigeru の思惑通りDDDが気に入った俺は、今度は上司 id:j5ik2o にこの本を買って読ませる、という暴挙に出る。この本の内容を理解して、俺と共感してくれる変態はこの人しかいないだろう、と。まぁ、思惑通り上司はこの本に魅了され、翻訳レビュー謝辞にまで名を連ねるに至った。

まさに

俺は俺で、JiemamyをDDDの考え方を参考にしながら設計し、v0.3を作ったりした。あまり知られてはいないが、Jiemamy Project では dddbase というライブラリがあったりする*4。また、著者のEric EvansがDDDのexampleとして作ったTime And Money Libraryをフォークし独自にメンテして、baseunits ライブラリとして整備したりもしてみた。このように、自分が手がけたプロダクトの設計と実装に計り知れない影響を与えたことは間違いない。

まぁ、多くの人にとって、DDD本を読む目的は英語慣れではないだろう。ただでさえ難しい設計実践の書籍である。慣れ親しんだ日本語という言語で的確に意味を掴みながら理解を進めると良いだろう。

間もなく、(色んな意味で)破壊力のあるこの書籍の邦訳版が世に出る。邦題は「エリック・エヴァンスのドメイン駆動設計」である。前述のように「オブジェクト指向が好きだが、それだけでは何か足りない」と感じている、悩める開発者にお薦めしたい。

エリック・エヴァンスのドメイン駆動設計 (IT Architects’Archive ソフトウェア開発の実践)

エリック・エヴァンスのドメイン駆動設計 (IT Architects’Archive ソフトウェア開発の実践)

一足お先に献本頂きました。どうもありがとうございます。

*1:今、Amazonの原著ページを参照したら、親切にも「お客様は、2009/9/30にこの商品を注文しました」と表示されていた。

*2:この事実を実感する第一段階は、ググったら自分が出てくる現象であるw

*3:出てくるんすよ、突然…w

*4:汎用ライブラリなので、どこで使ってみても良いかもね。

新連載、はじめました「Javaで始めるオブジェクト指向」

最近blogではApache Mahoutを中心にお送りしていますが、日経ソフトウエアでは新連載を担当させて頂くことになりました。先月までは「Javaで始めるプログラミング」でしたが、今月からは「Javaで始めるオブジェクト指向」です。前の連載の続きのようで続きじゃない、という微妙なポジションですがw

日経ソフトウエア 2011年 05月号 [雑誌]

日経ソフトウエア 2011年 05月号 [雑誌]

本文中にも書きましたが、オブジェクト指向というのは掴み所のない概念です。本来がぼんやりした存在なので、掴みづらくて当然、そして伝えづらいものです。そんなオブジェクト指向の私なりの解釈をなるべく分かりやすく皆さんにお届けできればと思っています。

どうぞよろしくお願いします。

今度はMahoutでクラスタリング(ソース編)

Mahoutシリーズを最初から読む場合はこちらApache Mahoutで機械学習してみるべ - 都元ダイスケ IT-PRESS。前回はこちら今度はMahoutでクラスタリング - 都元ダイスケ IT-PRESS

準備

まずmvnの依存設定を。以前と同じようにmahout-coreは要ります。それに加えて*1slf4jとlogback*2、そしてcommons-io*3を入れておきます。

pom.xml
    <dependency>
      <groupId>org.apache.mahout</groupId>
      <artifactId>mahout-core</artifactId>
      <version>0.4</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${lib.slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>jcl-over-slf4j</artifactId>
      <version>${lib.slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>${lib.logback.version}</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>${lib.logback.version}</version>
    </dependency>
  
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.0</version>
    </dependency>
  
...

  <properties>
    <lib.slf4j.version>1.6.0</lib.slf4j.version>
    <lib.logback.version>0.9.21</lib.logback.version>
  </properties>
logback.xml

で、ログ設定ファイルこんなんをsrc/main/resouces直下に置いておきましょう。

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <Target>System.out</Target>
    <layout class="ch.qos.logback.classic.PatternLayout">
      <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
    </layout>
  </appender>

  <root>
    <level value="INFO" />
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

Javaソース

やっと本質的なトコはいりますよー。とりあえず、サンプルコードでは、最後の3次元ベクトルをクラスタリングしてみましょう。

まずはクラスタリング対象のベクトル群を用意します。ここでは前回の3Dベクトル9つを使います。

static final double[][] points = {
    {8, 8, 8}, {8, 7.5, 9}, {7.7, 7.5, 9.8},
    {0, 7.5, 9}, {0.1, 8, 8}, {-1, 9, 7.5},
    {9, -1, -0.8}, {7.7, -1.2, -0.1}, {8.2, 0.2, 0.2},
};


で、今回のクラスタリングには k-means clastering という手法を使います。この手法では、あらかじめ「最終的にいくつのクラスタを作るのか」、という k の値を決めなければなりません。ここでは k = 3 として、3つのクラスタを作る前提でいきます。

Mahoutのクラスタリングでは、いきなりHadoopが出て来ます。とは言え、Hadoopクラスタを組む必要はなく、standaloneで走らせることはできます。その際「クラスタリングの対象となる9つのベクトル」と「3つのクラスタ」をあらかじめHDFS上にファイルとして配置する必要があります。これを writePointsToFile と writeClustersToFile メソッドで行っています。

そしてクラスタリングの処理を実行。クラスタリングの計算は、HDFSからデータを読み込み、そして結果もHDFSに書き込みます。従って、計算後にはHDFSを読み出す処理として readClusteredPointsFromFile を実行しています。

public static void main(String args[]) throws Exception {
    int k = 3;
    List<Vector> vectors = getPoints(points);
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
   
   // HDFSにベクトルとクラスタを書き込む
    writePointsToFile(vectors, "target/input/points/file1", fs, conf);
    writeClustersToFile(k, vectors, "target/input/clusters/part-00000", fs, conf);
   
   // クラスタリングを実行
    Path pointsPath = new Path("target/input/points");
    Path clustersPath = new Path("target/input/clusters");
    Path outputPath = new Path("target/output");
    KMeansDriver.run(conf, pointsPath, clustersPath, outputPath,
            new EuclideanDistanceMeasure(), 0.001, 10, true, false);
    
    // クラスタリングの結果をHDFSから読み出し、コンソールに表示する
    readClusteredPointsFromFile(fs, conf);
}

static List<Vector> getPoints(double[][] raw) {
    List<Vector> points = new ArrayList<Vector>();
    for (double[] fr : raw) {
        Vector vec = new RandomAccessSparseVector(fr.length);
        vec.assign(fr);
        points.add(vec);
    }
    return points;
}

static void writePointsToFile(List<Vector> points, String fileName, FileSystem fs, Configuration conf)
        throws IOException {
    Path path = new Path(fileName);
    SequenceFile.Writer writer = null;
    try {
        writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, VectorWritable.class);
        long recNum = 0;
        VectorWritable vec = new VectorWritable();
        for (Vector point : points) {
            vec.set(point);
            writer.append(new LongWritable(recNum++), vec);
        }
    } finally {
        IOUtils.closeQuietly(writer);
    }
}

static void writeClustersToFile(int k, List<Vector> vectors, String fileName, FileSystem fs, Configuration conf)
        throws IOException {
    Path path = new Path(fileName);
    SequenceFile.Writer writer = null;
    try {
        writer = new SequenceFile.Writer(fs, conf, path, Text.class, Cluster.class);
        for (int i = 0; i < k; i++) {
            Vector vec = vectors.get(i);
            Cluster cluster = new Cluster(vec, i, new EuclideanDistanceMeasure());
            writer.append(new Text(cluster.getIdentifier()), cluster);
        }
    } finally {
        IOUtils.closeQuietly(writer);
    }
}

static void readClusteredPointsFromFile(FileSystem fs, Configuration conf) throws IOException {
    Path path = new Path("target/output/" + Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000");
    SequenceFile.Reader reader = null;
    try {
        reader = new SequenceFile.Reader(fs, path, conf);
        IntWritable key = new IntWritable();
        WeightedVectorWritable value = new WeightedVectorWritable();
        while (reader.next(key, value)) {
            System.out.println(value.toString() + " belongs to cluster " + key.toString());
        }
    } finally {
        IOUtils.closeQuietly(reader);
    }
}


参考までに、importはこちら。同じ単純名のクラスが意外とある。

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.WeightedVectorWritable;
import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

結果

クラスタリングの結果は以下の通り。それぞれのベクトルが cluster 0 〜 cluster 2 に分類されていることが分かると思います。

1.0: [8.000, 8.000, 8.000] belongs to cluster 1
1.0: [8.000, 7.500, 9.000] belongs to cluster 1
1.0: [7.700, 7.500, 9.800] belongs to cluster 1
1.0: [1:7.500, 2:9.000] belongs to cluster 2
1.0: [0.100, 8.000, 8.000] belongs to cluster 2
1.0: [-1.000, 9.000, 7.500] belongs to cluster 2
1.0: [9.000, -1.000, -0.800] belongs to cluster 0
1.0: [7.700, -1.200, -0.100] belongs to cluster 0
1.0: [8.200, 0.200, 0.200] belongs to cluster 0


参考までに、結果を出す前にだーーっと流れるログはこんな感じ。Hadoopのジョブとして動いているのが分かると思います。

22:16:11.733 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - Input: target/input/points Clusters In: target/input/clusters Out: target/output Distance: org.apache.mahout.common.distance.EuclideanDistanceMeasure

22:16:11.738 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - convergence: 0.0010 max Iterations: 10 num Reduce Tasks: org.apache.mahout.math.VectorWritable Input Vectors: {}
22:16:11.739 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - K-Means Iteration 1
22:16:11.768 [main] INFO  o.a.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
22:16:11.878 [main] INFO  org.apache.mahout.common.HadoopUtil - Deleting target/output/clusters-1
22:16:11.885 [main] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
22:16:12.497 [main] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:12.781 [main] INFO  org.apache.hadoop.mapred.JobClient - Running job: job_local_0001
22:16:12.787 [Thread-14] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:12.880 [Thread-14] INFO  org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
22:16:17.532 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 0% reduce 0%
22:16:17.534 [Thread-14] INFO  org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
22:16:17.535 [Thread-14] INFO  org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
22:16:17.646 [Thread-14] INFO  org.apache.hadoop.mapred.MapTask - Starting flush of map output
22:16:18.047 [Thread-14] INFO  org.apache.hadoop.mapred.MapTask - Finished spill 0
22:16:18.051 [Thread-14] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
22:16:18.055 [Thread-14] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:18.055 [Thread-14] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0001_m_000000_0' done.
22:16:18.064 [Thread-14] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:18.072 [Thread-14] INFO  org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
22:16:18.087 [Thread-14] INFO  org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 239 bytes
22:16:18.087 [Thread-14] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:18.184 [Thread-14] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
22:16:18.185 [Thread-14] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:18.186 [Thread-14] INFO  org.apache.hadoop.mapred.TaskRunner - Task attempt_local_0001_r_000000_0 is allowed to commit now
22:16:18.190 [Thread-14] INFO  o.a.h.m.l.output.FileOutputCommitter - Saved output of task 'attempt_local_0001_r_000000_0' to target/output/clusters-1
22:16:18.191 [Thread-14] INFO  o.a.hadoop.mapred.LocalJobRunner - reduce > reduce
22:16:18.192 [Thread-14] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0001_r_000000_0' done.
22:16:18.535 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 100% reduce 100%
22:16:18.535 [main] INFO  org.apache.hadoop.mapred.JobClient - Job complete: job_local_0001
22:16:18.537 [main] INFO  org.apache.hadoop.mapred.JobClient - Counters: 13
22:16:18.537 [main] INFO  org.apache.hadoop.mapred.JobClient -   Clustering
22:16:18.538 [main] INFO  org.apache.hadoop.mapred.JobClient -     Converged Clusters=1
22:16:18.538 [main] INFO  org.apache.hadoop.mapred.JobClient -   FileSystemCounters
22:16:18.538 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_READ=2741232
22:16:18.539 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_WRITTEN=2792502
22:16:18.539 [main] INFO  org.apache.hadoop.mapred.JobClient -   Map-Reduce Framework
22:16:18.539 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input groups=3
22:16:18.540 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine output records=3
22:16:18.540 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map input records=9
22:16:18.541 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce shuffle bytes=0
22:16:18.541 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce output records=3
22:16:18.541 [main] INFO  org.apache.hadoop.mapred.JobClient -     Spilled Records=6
22:16:18.542 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output bytes=675
22:16:18.542 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine input records=9
22:16:18.543 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output records=9
22:16:18.543 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input records=3
22:16:18.547 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - K-Means Iteration 2
22:16:18.548 [main] INFO  o.a.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
22:16:18.576 [main] INFO  org.apache.mahout.common.HadoopUtil - Deleting target/output/clusters-2
22:16:18.578 [main] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
22:16:19.072 [main] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:19.630 [main] INFO  org.apache.hadoop.mapred.JobClient - Running job: job_local_0002
22:16:19.632 [Thread-28] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:20.622 [Thread-28] INFO  org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
22:16:20.719 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 0% reduce 0%
22:16:22.272 [Thread-28] INFO  org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
22:16:22.273 [Thread-28] INFO  org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
22:16:22.321 [Thread-28] INFO  org.apache.hadoop.mapred.MapTask - Starting flush of map output
22:16:22.323 [Thread-28] INFO  org.apache.hadoop.mapred.MapTask - Finished spill 0
22:16:22.326 [Thread-28] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
22:16:22.327 [Thread-28] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:22.327 [Thread-28] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0002_m_000000_0' done.
22:16:22.358 [Thread-28] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:22.360 [Thread-28] INFO  org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
22:16:22.360 [Thread-28] INFO  org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 239 bytes
22:16:22.361 [Thread-28] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:22.428 [Thread-28] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
22:16:22.429 [Thread-28] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:22.430 [Thread-28] INFO  org.apache.hadoop.mapred.TaskRunner - Task attempt_local_0002_r_000000_0 is allowed to commit now
22:16:22.434 [Thread-28] INFO  o.a.h.m.l.output.FileOutputCommitter - Saved output of task 'attempt_local_0002_r_000000_0' to target/output/clusters-2
22:16:22.435 [Thread-28] INFO  o.a.hadoop.mapred.LocalJobRunner - reduce > reduce
22:16:22.436 [Thread-28] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0002_r_000000_0' done.
22:16:23.265 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 100% reduce 100%
22:16:23.266 [main] INFO  org.apache.hadoop.mapred.JobClient - Job complete: job_local_0002
22:16:23.266 [main] INFO  org.apache.hadoop.mapred.JobClient - Counters: 12
22:16:23.267 [main] INFO  org.apache.hadoop.mapred.JobClient -   FileSystemCounters
22:16:23.267 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_READ=5484503
22:16:23.267 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_WRITTEN=5583630
22:16:23.267 [main] INFO  org.apache.hadoop.mapred.JobClient -   Map-Reduce Framework
22:16:23.268 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input groups=3
22:16:23.268 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine output records=3
22:16:23.268 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map input records=9
22:16:23.269 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce shuffle bytes=0
22:16:23.269 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce output records=3
22:16:23.269 [main] INFO  org.apache.hadoop.mapred.JobClient -     Spilled Records=6
22:16:23.269 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output bytes=675
22:16:23.269 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine input records=9
22:16:23.269 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output records=9
22:16:23.270 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input records=3
22:16:23.273 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - K-Means Iteration 3
22:16:23.274 [main] INFO  o.a.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
22:16:23.289 [main] INFO  org.apache.mahout.common.HadoopUtil - Deleting target/output/clusters-3
22:16:23.291 [main] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
22:16:23.496 [main] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:24.679 [Thread-41] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:24.690 [main] INFO  org.apache.hadoop.mapred.JobClient - Running job: job_local_0003
22:16:24.729 [Thread-41] INFO  org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
22:16:25.043 [Thread-41] INFO  org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
22:16:25.044 [Thread-41] INFO  org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
22:16:25.101 [Thread-41] INFO  org.apache.hadoop.mapred.MapTask - Starting flush of map output
22:16:25.103 [Thread-41] INFO  org.apache.hadoop.mapred.MapTask - Finished spill 0
22:16:25.106 [Thread-41] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
22:16:25.107 [Thread-41] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:25.107 [Thread-41] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0003_m_000000_0' done.
22:16:25.113 [Thread-41] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:25.114 [Thread-41] INFO  org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
22:16:25.115 [Thread-41] INFO  org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 239 bytes
22:16:25.115 [Thread-41] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:25.190 [Thread-41] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
22:16:25.191 [Thread-41] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:25.191 [Thread-41] INFO  org.apache.hadoop.mapred.TaskRunner - Task attempt_local_0003_r_000000_0 is allowed to commit now
22:16:25.195 [Thread-41] INFO  o.a.h.m.l.output.FileOutputCommitter - Saved output of task 'attempt_local_0003_r_000000_0' to target/output/clusters-3
22:16:25.196 [Thread-41] INFO  o.a.hadoop.mapred.LocalJobRunner - reduce > reduce
22:16:25.196 [Thread-41] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0003_r_000000_0' done.
22:16:25.702 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 100% reduce 100%
22:16:25.703 [main] INFO  org.apache.hadoop.mapred.JobClient - Job complete: job_local_0003
22:16:25.704 [main] INFO  org.apache.hadoop.mapred.JobClient - Counters: 13
22:16:25.704 [main] INFO  org.apache.hadoop.mapred.JobClient -   Clustering
22:16:25.705 [main] INFO  org.apache.hadoop.mapred.JobClient -     Converged Clusters=3
22:16:25.705 [main] INFO  org.apache.hadoop.mapred.JobClient -   FileSystemCounters
22:16:25.705 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_READ=8227859
22:16:25.706 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_WRITTEN=8374758
22:16:25.706 [main] INFO  org.apache.hadoop.mapred.JobClient -   Map-Reduce Framework
22:16:25.706 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input groups=3
22:16:25.706 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine output records=3
22:16:25.707 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map input records=9
22:16:25.707 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce shuffle bytes=0
22:16:25.707 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce output records=3
22:16:25.708 [main] INFO  org.apache.hadoop.mapred.JobClient -     Spilled Records=6
22:16:25.709 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output bytes=675
22:16:25.709 [main] INFO  org.apache.hadoop.mapred.JobClient -     Combine input records=9
22:16:25.710 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output records=9
22:16:25.710 [main] INFO  org.apache.hadoop.mapred.JobClient -     Reduce input records=3
22:16:25.713 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - Clustering data
22:16:25.714 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - Running Clustering
22:16:25.714 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - Input: target/input/points Clusters In: target/output/clusters-3 Out: target/output/clusteredPoints Distance: org.apache.mahout.common.distance.EuclideanDistanceMeasure@343a9d95
22:16:25.714 [main] INFO  o.a.m.clustering.kmeans.KMeansDriver - convergence: 0.0010 Input Vectors: org.apache.mahout.math.VectorWritable
22:16:25.714 [main] INFO  o.a.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
22:16:25.730 [main] INFO  org.apache.mahout.common.HadoopUtil - Deleting target/output/clusteredPoints
22:16:25.732 [main] WARN  org.apache.hadoop.mapred.JobClient - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
22:16:25.932 [main] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:26.259 [main] INFO  org.apache.hadoop.mapred.JobClient - Running job: job_local_0004
22:16:26.270 [Thread-54] INFO  o.a.h.m.lib.input.FileInputFormat - Total input paths to process : 1
22:16:26.404 [Thread-54] INFO  org.apache.hadoop.mapred.TaskRunner - Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
22:16:26.405 [Thread-54] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:26.405 [Thread-54] INFO  org.apache.hadoop.mapred.TaskRunner - Task attempt_local_0004_m_000000_0 is allowed to commit now
22:16:26.410 [Thread-54] INFO  o.a.h.m.l.output.FileOutputCommitter - Saved output of task 'attempt_local_0004_m_000000_0' to target/output/clusteredPoints
22:16:26.411 [Thread-54] INFO  o.a.hadoop.mapred.LocalJobRunner - 
22:16:26.411 [Thread-54] INFO  org.apache.hadoop.mapred.TaskRunner - Task 'attempt_local_0004_m_000000_0' done.
22:16:27.261 [main] INFO  org.apache.hadoop.mapred.JobClient -  map 100% reduce 0%
22:16:27.261 [main] INFO  org.apache.hadoop.mapred.JobClient - Job complete: job_local_0004
22:16:27.262 [main] INFO  org.apache.hadoop.mapred.JobClient - Counters: 5
22:16:27.262 [main] INFO  org.apache.hadoop.mapred.JobClient -   FileSystemCounters
22:16:27.262 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_READ=5484682
22:16:27.262 [main] INFO  org.apache.hadoop.mapred.JobClient -     FILE_BYTES_WRITTEN=5581897
22:16:27.262 [main] INFO  org.apache.hadoop.mapred.JobClient -   Map-Reduce Framework
22:16:27.263 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map input records=9
22:16:27.263 [main] INFO  org.apache.hadoop.mapred.JobClient -     Spilled Records=0
22:16:27.263 [main] INFO  org.apache.hadoop.mapred.JobClient -     Map output records=9

*1:以下は俺の趣味なので、必須のライブラリではありませんが。

*2:ログ出力の俺好み設定ファイルをこっちで作っているからです。無くてもよいです。その場合、以下のlogback.xmlは不要です。ただし、さらにその下に示すログ出力は別の表記に変わります。

*3:IOUtil.closeQuietlyのためだけに入ってます。

今度はMahoutでクラスタリング

Mahoutシリーズを最初から読む場合はこちらApache Mahoutで機械学習してみるべ - 都元ダイスケ IT-PRESS

さて、前回まではMahoutで「協調フィルタリングによるレコメンデーション」を解説してきました。まだレコメンドの処理をHadoopで分散させたりしていませんし、そのほかにもレコメンドにはまだまだ奥が深い世界があるのですが、ひとまずはここまでにしておきましょう。(分散レコメンドの話は、またしばらくしてから書きます。)

で、Mahoutは「協調フィルタリングライブラリ」じゃありません。機械学習ライブラリです。機械学習にも色々あるのですが、Mahoutでは大きく分けて以下の3つをサポートしています。


そして今日からはクラスタリングの話。

クラスタリング

クラスタリングとは何でしょうか。複数の要素を「クラスタ」と呼ばれるいくつかの集まりに分類することです。同じクラスタに属する要素同士は似ていて、異なるクラスタに属する要素は似ていない状態です。

例えば 1, 2, 30, 5, 35, 32, 4, 31 という数字の要素があったとして、それを「1, 2, 5, 4」と「30, 35, 32, 31」に分けるのが狭義のクラスタリングです。値が近い同士を同じクラスタに属させたわけですね。これを下のように数直線で表してみました。

上の例では、「要素」というのは「ただの数字」でした。次に、2次元平面(XYグラフ)上の点を要素として分類してみましょう。座標とそのグラフは以下の通り。下の表では既に「人間の感覚による答え」を出してしまいましたが、これはA〜C群にクラスタリングできます。グラフ上で距離が近い同士を同じクラスタに属させたわけです。(さらにここでは、整数じゃなくて小数でも大丈夫、ということがわかります。)

A群 (1, 1) (2, 1) (1, 1.8) (1.3, 1.2)
B群 (8.2, 6.6) (9.1, 7) (9, 7.4) (7.9, 7.7)
C群 (2, 9) (2, 7.7) (2.5, 7.8) (2, 7)

では、要素を3次元空間上の点としたらどうでしょうか? 座標とグラフは以下の通り。3次元グラフは視点が動いていないと見づらいので動画です。これも、座標を見ているだけでは関連性はなかなか見えて来ませんが、グラフにすると一目瞭然ですね。これもa〜c群にクラスタリングできるのです。(さらにここでは、要素の値は負数でも大丈夫ということがわかります。)

a群 (8, 8, 8) (8, 7.5, 9) (7.7, 7.5, 9.8)
b群 (0, 7.5, 9) (0.1, 8, 8) (-1, 9, 7.5)
c群 (9, -1, -0.8) (7.7, -1.2, -0.1) (8.2, 0.2, 0.2)

ここまで考えると、最初の数直線の世界というのは実は「1次元」だったのです。4次元以上は図示するのが難しいので説明は省きますが、この勢いで、いくらでも多次元の要素をクラスタリングできるのが感覚的に分かると思います。多次元の要素であっても、その2つの要素の距離を数値として出すことができるのです。これを「ユークリッド距離」と呼びます。

さらに、要素として今まで扱ってきた「括弧に囲まれた固定数個の数値の順序付きリスト」を「ベクトル」と呼びます。おっと、数学の時間に昔習いましたかね。あの矢印の奴。あれです。ただし、今はあんまり矢印として考えずに、やはりX次元座標上の一点とイメージしておいた方が良いと思います。

さてさて、狭義のクラスタリングはこんな感じ。で、広義には、「ある要素をベクトルに変換する」こともクラスタリングの分野に入ります。これを「ベクトル化」とか「vectorize」と呼んだりします。

例えば、大量の本をクラスタリングしたい場合、本自体は固定数個の数値の順序付きリストではありません。この本の分類の根拠になるような情報をベクトルの形で表現しないとクラスタリングはできないのです。

まぁ、ベクトル化の話はまた今度ということで。今日はベクトル化が終わった後の、狭義のクラスタリングです。ソースいきたいところですが、前置きが長くなっちまいましたので、続きは明日。