Apache Sparkのパーティショニングの
基礎知識
Apache SparkのResilient Distributed Dataset(RDD)は、サイズが大きすぎて1つのノードに収まらないため、複数のノードに分割する必要があるさまざまなデータの集合です。Apache Sparkは自動的にRDDをパーティションに分割し、複数のノードに分散します。この操作は遅延評価され(たとえば、アクションがトリガーされるまで実行を開始しないことで管理性が高まり、計算量が低減するため、結果的に最適化と速度が向上します)、変換はDirected Acyclic Graph(DAG)として格納されます。したがって、RDDに対して何らかのアクションが実行されると、Apache SparkがDAGを再計算します。
Apache Sparkのパーティションの特性を理解しておくことで、パフォーマンスの向上、デバッグ、およびエラー処理が容易になります。
以下に、パーティション分割の基本情報をいくつか紹介します。
- Sparkクラスター内の各ノードには、1つ以上のパーティションが含まれています。
- Sparkで使用されるパーティションの数は設定が可能で、少なすぎると同時実行性の低下、データの偏り(データスキュー)、不適切なリソース利用の原因となり、多すぎるとタスクスケジューリングの所要時間が実際の実行時間より長くなるなどの問題が発生します。デフォルトでは、すべてのexecutorノード上のコアの総数に設定されています。
- Sparkのパーティションが複数のマシンにまたがることはありません。
- 同じパーティション内のタプルは、同じマシン上にあることが保証されています。
- Sparkはパーティションごとに1つのタスクを割り当て、各Workerは一度に1つのタスクを処理できます。
Apache Sparkでのハッシュパーティショニングとレンジパーティショニング
Apache Sparkは「ハッシュパーティショニング」と「レンジパーティショニング」の2種類のパーティショニングをサポートしています。データ内のキーの分散方法または配列方法や、データに対して実行するアクションに応じて、適切な手法を選択します。以下のような多くの要因が、パーティショニングの選択に影響を与えます。
- 利用可能なリソース — タスクを実行できるコアの数。
- 外部のデータソース — ローカルコレクション、Cassandraテーブル、またはHDFSファイルのサイズによってパーティション数が決まります。
- RDDの派生に使用される変換 — RDDが別のRDDから派生する際にパーティションの数を決定するためのルールが多数存在します。
Apache Sparkの使用に際しては、留意すべき点がいくつかあります。このブログでは、ビジネスデータ、そのデータのキー、Spark処理における物理リソース、そして何よりもネットワーク、CPU、メモリーを完全に認識しておくことの重要性について説明します。
Apache Sparkのパーティショニングでよく見られる問題には次のようなものがあります。
データの偏り(データスキュー)とシャッフルブロック
Apache Sparkのデフォルトのパーティション分割ではデータの偏りが発生し、その結果、データ集約操作中のシャッフルや単一のエグゼキューターのメモリー不足に関連した問題が発生する可能性があります。
データスキューの例
この例では、「key-a」のパーティション内のデータ量が大きいため、Exec-5のタスクの完了には他の5つのタスクよりもはるかに長い時間を要します。もう1つの重要なのは、Sparkのシャッフルブロックは2GB以下でなければならないという点です(内部的にByteBuffer抽象化ではMAX_SIZEが2GBに設定されているため)。たとえば、集約、結合、キャッシュ操作などの操作を実行している場合、Sparkシャッフルが発生し、パーティションの数が少ないことやデータスキューが原因でシャッフルブロックの問題が発生する可能性があります。したがって、シャッフルによるMAX_SIZE制限の違反に関連するエラーが発生し始めた場合には、データの偏りが原因であることがわかります。
賢明なパーティショニング
では、どうすればデータの偏りとシャッフルブロックを回避できるのでしょうか。非常に重要なのは、メモリープレッシャーを管理し、エグゼキューターのノードでリソースをフル活用できるよう、賢明な方法でパーティション分割を行うことです。そのためには、データのサイズ、タイプ、分散方法を常に把握しておく必要があります。覚えておくべきベストプラクティスは次のとおりです。
- ドライバーに負荷をかけないように、またエグゼキューター上でタスクが適切に実行されるように、reduceByKeyやaggregateByKeyなどのアクションの正しい演算子を理解し選択します。
- いくつかの大規模で分割不可なファイルでデータを受け取った場合、InputFormatによるパーティション分割では各パーティションに大量のレコードを配置される可能性があります。しかし、利用可能なすべてのコアを活用するのに十分なパーティションは生成されません。この場合、データのロード後に多数のパーティションを使用する再パーティションを呼び出すことで、後続の操作でクラスターのCPUをより多く利用できるようになります。
- また、データが偏っている場合は、負荷を均等に分散できる適切なキーを使用して再パーティション化することも推奨されます。
Talendは、選択された適切なキーに基づいて、再パーティション化に必要なtPartitionコンポーネントを提供します。
最適なパーティション数を算出するには
Apache Sparkは、RDDの各パーティションに対して1つの同時タスクしか実行できず、その最大数はクラスター内のコア数(またはその2~3倍)になります。したがって、「最適な」数のパーティションを選択するためには、一般的に最低でも並列処理用のエクゼキューターの数と同数のパーティションが必要です。この数値を算出するには、sc.defaultParallelismを呼び出します。パーティションの最大サイズは、最終的にはエグゼキューターの利用可能なメモリーによって決まります。
適切な再パーティション化キーを使用してデータを均等に分散することが不可能な場合もあります。そこで、新しい「偽の」キーを追加し、現在のキーと一緒に使用することでデータを均等に分散させるソルトなどの方法を使用します。次に例を示します。
- 大きなRDDにランダムな要素を追加して、「ソルトキー=実際の結合キー+ 1からNの間の値を取るランダムな偽のキー(Nは分散レベル)」のように新しい結合キーを作成する
- 直積(1-N)を使用して小さいRDDにランダムな要素を追加して、エントリー数を増やし、新しい結合キーを作成する
- 新しい結合キーでRDDを結合する(これにより、ランダムなシード値の設定によって均等に分散されるようになります)
- 結合キーからランダムな偽のキーを削除して、結合の最終結果を取得する
上記の例では、ルックアップデータセットの偽のキーは直積(1-N)になり、メインデータセットの場合は各行のソースデータセットにランダムキー(1-N)になります。Nは分散レベルです。
TalendとApache Spark
Talend Studioは、ネイティブコードを生成するグラフィカルなツールとウィザードを提供しているので、Apache SparkやSpark Streamingの使用を開始し、さらにはデータを適切にパーティション化できます。上記の方法を実行するには、TalendのtMapコンポーネントを使用します。また、データを熟知している場合には、TalendのtPartitionコンポーネントを使用して、再パーティション化のニーズに対応できます。TalendのtMapコンポーネントを使用すれば、必要に応じてソルト処理や乱数処理を実行して、偏ったキーやNULL値を含んだデータに関する懸念を解消できます。
短いブログ記事でしたが、Apache Sparkのパーティション分割と処理について、何か新しい基礎知識を得られたでしょうか。TalendとApache Sparkの連携によってビッグデータ処理を高速化し拡張させる方法については、Talendのソリューションのページをご覧ください。
参考文献:
https://issues.apache.org/jira/browse/SPARK-6235
https://0x0fff.com/spark-architecture
https://www.youtube.com/watch?v=WyfHUNnMutg
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
https://stackoverflow.com/questions/40373577/skewed-dataset-join-in-spark