MapReduceとは?
今日のデータ駆動の市場では、アルゴリズムやアプリケーションが人、プロセス、システム、組織に関するデータを24時間365日収集し、膨大な量のデータを生成しています。しかし、問題となるのは、この大量のデータを有意義な知見を犠牲にすることなく高速かつ効率的に処理する方法です。
そこで役立つのが、MapReduceプログラミングモデルです。MapReduceは、Googleが検索結果の分析のために最初に使用したものであり、テラバイト規模のデータを分割して並列処理し、より迅速に結果を得ることができます。
MapReduceとは?
MapReduceは、Hadoopフレームワーク内のプログラミングモデル(パターン)であり、Hadoopファイルシステム(HDFS)に格納されたビッグデータにアクセスするために使用されます。これは、Hadoopフレームワークの機能に不可欠な主要コンポーネントです。
MapReduceは、ペタバイト規模のデータを小さなチャンクに分割し、Hadoopの汎用サーバーで同時に処理することで、並行処理を容易にします。最後に、複数のサーバーからのすべてのデータを集約して、統合された出力をアプリケーションに返します。
たとえば、安価な汎用サーバーを2万台使用し、各サーバーが256MBのデータブロックを持つHadoopクラスターでは、同時に約5TBのデータを処理できます。これにより、同じ大規模データセットの逐次処理と比較して、処理時間が短縮されます。
MapReduceを使用すると、アプリケーションまたはロジックのある場所にデータを送信するのではなく、データがすでに存在するサーバー上でロジックが実行され、処理が迅速化されます。データアクセスとストレージはディスクベースです。入力は通常、構造化データ、半構造化データ、または非構造化データを含むファイルとして格納され、出力もファイルに格納されます。
MapReduceは、HDFSに格納されたデータを取得するために使用できる唯一の方法でしたが、現在は状況が変わりました。今日では、HiveやPigなどのクエリベースのシステムがあり、SQLに似たステートメントを使用してHDFSからデータを取得するために使用されています。ただし、これらは通常、MapReduceモデルを使用して記述されたジョブとともに実行されます。これは、MapReduceに独自のメリットがあるためです。
MapReduceの仕組み
MapReduceの要となるのが、MapとReduceという関数です。2つは順番に並べられています。
- Map関数は、ディスクからの入力を<キー, 値>ペアとして処理し、中間セットとなる<キー, 値>ペアを出力として生成します。
- また、Reduce関数は<キー, 値>ペアを入力とし、<キー, 値>ペアを出力として生成します。
キーと値のタイプは、ユースケースによって異なります。すべての入力と出力はHDFSに保存されます。Map関数は初期データをフィルタリングしてソートするための必須ステップですが、Reduce関数はオプションです。
<k1, v1> -> Map() -> list(<k2, v2>)
<k2, list(v2)> -> Reduce() -> list(<k3, v3>)
MapperとReducerは、Map関数とReduce関数をそれぞれ実行するHadoopサーバーです。これらは、同一サーバーでも異なるサーバーでもかまいません。
Map
入力データは、まず小さなブロックに分割されます。続いて、各ブロックが処理のためにMapperに割り当てられます。
たとえば、ファイルに100個の処理対象レコードがある場合、100のMapperがそれぞれ1つのレコードを処理するように同時に実行することも、50のMapperがそれぞれ2つのレコードを処理するように同時に実行することも可能です。Hadoopフレームワークは、処理されるデータのサイズと各Mapperサーバーで使用可能なメモリブロックに基づいて、使用するMapperの数を決定します。
Reduce
すべてのMapperが処理を完了した後、フレームワークは結果をシャッフルしてソートしてから、Reducerに渡します。Reducerは、Mapperが実行している間は開始できません。同じキーを持つすべてのmap出力値は、単一のReducerに割り当てられ、Reducerはそのキーの値を集計します。
CombineとPartition
MapとReduceの間には、2つの中間ステップがあります。
Combineはオプションのプロセスです。Combinerは、各Mapperサーバーで個別に実行されるReducerです。各Mapperのデータを単純な形式にさらに縮小してから下流に渡します。
これにより、作業データが少なくなるので、シャッフルとソートが容易になります。多くの場合、reduce関数の累積関数と連想関数のため、CombinerクラスはReducerクラス自体に設定されますが、必要に応じてCombinerを別のクラスにすることも可能です。
Partitionは、Mapperから得られた<キー, 値>ペアを別の<キー, 値>ペアに変換し、Reducerに供給するプロセスです。データをReducerに提示する方法を決定し、特定のReducerに割り当てます。
デフォルトのPartitionerは、Mapperから出力されるキーのハッシュ値を決定し、このハッシュ値に基づいてパーティションを割り当てます。Reducerと同じ数のパーティションがあります。したがって、パーティショニングが完了すると、各パーティションからのデータは特定のReducerに送られます。
MapReduceの例
支払いを処理するために毎日100万件の要求を受け取る電子商取引システムを例にとって考えてみましょう。これらの要求の最中に、「支払いゲートウェイによって支払いが拒否されました」、「在庫切れ」、「無効な住所」など、いくつかの例外が投げられることがあります。開発者は、過去4日間のログを分析して、どの例外が何回投げられたかを把握したいと考えています。
ユースケースの例
目的は、エラーが発生しやすいユースケースを分離し、適切な処置をとることです。たとえば、同じ支払いゲートウェイが頻繁に例外を投げる場合、サービスの信頼性が低いのでしょうか、それともインターフェイスの記述に問題があるのでしょうか? 「在庫切れ」の例外が頻繁に投げられる場合、在庫計算サービスを改善する必要があるのでしょうか、それとも特定の製品の在庫を増やす必要があるのでしょうか?
開発者は、適切な質問をして、適切な行動方針を決定できます。MapReduceは、数百万のレコードを持つ大規模なログで、このような分析を実行するのに適したプログラミングモデルです。複数のMapperが、これらのログを同時に処理できます。ログサイズとMapperサーバーで処理に使用可能なメモリブロックに基づいて、1つのMapperが1日分のログまたはそのサブセットを処理できます。
Map
単純化するために、Hadoopフレームワークが4つのMapper(Mapper 1、Mapper 2、Mapper 3、Mapper 4)だけを実行していると仮定します。
Mapperに入力される値は、ログファイルの1レコードです。キーは、「ファイル名+行番号」のようなテキスト文字列にすることができます。Mapperは、ログファイルの各レコードを処理して、キーと値のペアを生成します。ここでは値を「1」とします。Mapperの出力は次のようになります。
Mapper 1 -> , , , ,
Mapper 2 -> , , ,
Mapper 3 -> , , , ,
Mapper 4 -> , , ,
各MapperでCombiner(Combiner 1...Combiner 4)が1つ実行され、各Combinerがそれぞれの例外(Reducerと同じ関数)のカウントを計算すると仮定すると、Combiner1への入力は次のようになります。
, , , ,
Combine
Combiner 1の出力は次のようになります。
, ,
ほかのCombinerからの出力は次のようになります。
Combiner 2:
Combiner 3:
Combiner 4:
Partition
この後、Partitionerは、CombinerからのデータをReducerに割り当てます。また、データはReducer向けにソートされます。
Reducerへの入力は次のようになります。
Reducer 1: {3,2,3,1}
Reducer 2: {1,2,1,1}
Reducer 3: {1,1,2}
関与するCombinerがない場合、Reducerへの入力は以下のようになります。
Reducer 1: {1,1,1,1,1,1,1,1,1}
Reducer 2: {1,1,1,1,1}
Reducer 3: {1,1,1,1}
これは単純な例ですが、テラバイト規模のデータが関与する場合、 Combinerプロセスによる帯域幅の改善は非常に大きなものになります。
Reduce
次に、各Reducerは例外の総数を次のように計算します。
Reducer 1:
Reducer 2:
Reducer 3:
データは、例外Aがほかに比べて頻繁に投げられ、一層の注意が必要であることを示しています。数週間分または数か月分のデータを一緒に処理する必要がある場合、MapReduceプログラムの潜在性が真に発揮される可能性があります。
MapReduceの実装方法
MapReduceプログラムは、Javaに限定されるものではありません。C、C++、Python、Ruby、Perlなどでも記述できます。典型的なMapReduceジョブのmain機能は次のようになります。
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(ExceptionCount.class);
conf.setJobName("exceptioncount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setCombinerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
MapReduceクラス名、Map、ReduceおよびCombinerクラス、入力および出力タイプ、入力および出力ファイルのパスなど、すべてのパラメーターはmain関数で定義されています。MapperクラスはMapReduceBaseを拡張し、Mapperインターフェイスを実装します。ReducerクラスはMapReduceBaseを拡張し、Reducerインターフェイスを実装します。
詳細なコード例については、Hadoopチュートリアルを参照してください。
TalendのMapReduceチュートリアル
MapReduceはビッグデータの問題を解決するための俊敏で弾力のあるアプローチですが、その固有の複雑さのために、開発者が専門知識を得るのに時間がかかります。MapReduceを使用してビッグデータセットを処理するには、高いスキルを持つ人材と堅牢なインフラストラクチャが必要です。
ここで役立つのがTalendのデータ統合ソリューションです。Hive、Pig、Flume、Kafka、HBaseなど、Hadoopエコシステムで使用されるさまざまなツールを統合するためのフレームワークを提供します。Talend Studioが提供するUIベースの環境では、HDFSを使用してデータのロードや抽出を実行できます。
Talend Studioの紹介ビデオはこちらで視聴できます。
MapReduceの場合、具体的には、TalendStudioを使用してHadoopクラスターで実行可能なジョブを簡単に作成し、MapperやReducerクラスなどのパラメーター、入力と出力の形式などを設定できます。
Talend MapReduceジョブを作成すると(Apache Hadoopジョブの定義とは異なります)、ビッグデータクラスターでネイティブに実行されるサービス、実行可能ファイル、またはスタンドアロンのジョブとして展開できます。1つ以上のHadoopMapReduceジョブが生成され、次にジョブがMapReduceアルゴリズムを実行します。
MapReduceジョブを実行する前に、Hadoop接続を構成する必要があります。Talendを使用してMapReduceジョブをセットアップする方法の詳細については、チュートリアルを参照してください。
MapReduceを活用してビッグデータの問題を解決
MapReduceプログラミングパラダイムは、並列化によって解決可能な複雑な問題で使用できます。
MapReduceを使用することで、ソーシャルメディアサイトは、過去1か月間の新規登録者数を国別に把握して、地域ごとの人気拡大を測定できます。貿易会社は、バッチの突合せ処理を高速化でき、取引中断を引き起こすシナリオも特定できます。さらに、検索エンジンがページビューを把握したり、マーケティング担当者がセンチメント分析を実行する場合にも、MapReduceを使用できます。
Talend Studioの試用版を今すぐダウンロードして、MapReduceの詳細をご確認ください。また、上記のようなユースケースを試すこともできます。