Wissenswertes über die Partitionierung mit Apache Spark
eine Einführung
Die Resilient Distributed Datasets (RDD) von Apache Spark sind Sammlungen verschiedenartiger Daten, die so groß sind, dass sie nicht auf einen einzelnen Node passen und deshalb am besten durch Partitionierung auf mehrere Nodes verteilt werden. Apache Spark nimmt die Partitionierung von RDDs und ihre Verteilung auf Nodes automatisch vor. Die RDDs werden “faul“ ausgewertet, d. h., ihre Ausführung beginnt erst dann, wenn mindestens eine Aktion angefordert wird. Dieses Prinzip sorgt für bessere Verwaltbarkeit, schont Rechenressourcen und erhöht so die Optimierung und Geschwindigkeit der Verarbeitung. Die Transformationen werden als DAGs (Directed acyclic graphs, gerichtete azyklische Graphen) gespeichert. Jede Aktion mit einem RDD führt somit dazu, dass Apache Spark den DAG neu berechnet.
Partitionen in Apache Spark weisen bestimmte Eigenschaften auf. Nur wenn Sie diese verstehen, können Sie die Leistung, die Fehlerbehebung und Fehlerbehandlung in diesem Zusammenhang optimieren.
Das Wichtigste, was über Partitionen zu sagen ist, lässt sich wie folgt zusammenfassen:
- Jeder Node in einem Spark-Cluster enthält eine oder mehrere Partitionen.
- Die Anzahl der Partitionen in Spark ist konfigurierbar. Jedoch sind sowohl zu wenige als auch zu viele Partitionen von Nachteil: Zu wenige sind ungünstig, weil dadurch die Gleichzeitigkeit (Parallelität) abnimmt, die Daten in den Partitionen ungleich verteilt werden (Data Skewing) und Ressourcen falsch ausgelastet werden. Zu viele Partitionen sind nachteilig, weil die Planung von Aufgaben dann länger dauert als ihre Ausführung. Standardmäßig wird die Anzahl der Partitionen entsprechend der Anzahl an Kernen gewählt, die in allen Nodes zusammengenommen vorhanden sind.
- Partitionen in Spark erstrecken sich nicht über mehrere Maschinen.
- Tupel, die zu derselben Partition gehören, befinden sich deshalb in jedem Fall auf derselben Maschine.
- Spark weist eine Aufgabe pro Partition zu und jeder Worker kann gleichzeitig nur eine Aufgabe verarbeiten.
Hash-Partitionierung vs. Bereichspartitionierung in Apache Spark
Apache Spark unterstützt zwei Arten der Partionierung: “Hash-Partitionierung” und “Bereichspartitionierung“. Welche Methode für Sie geeignet ist, hängt davon ab, wie Ihre Daten verteilt oder sequenziert sind und welche Aktion Sie mit den Daten ausführen möchten. In diese Entscheidung fließen zahlreiche Faktoren ein, darunter z. B.:
- Verfügbare Ressourcen — Anzahl der verfügbaren Kerne für die Ausführung der Aufgabe.
- Externe Datenquellen — Die Anzahl der Partitionen hängt von der Größe der lokalen Sammlungen, der Cassandra-Tabelle oder der HDFS-Datei ab.
- Zur Ableitung eines RDD verwendete Transformationen — Um bei der Ableitung eines RDD aus einem anderen RDD die Anzahl der Partitionen festzulegen, gibt es eine Reihe von Regeln.
Wie man sieht, sind beim Arbeiten mit Apache Spark zahlreiche Aspekte zu beachten. In diesem Blog möchte ich erläutern, warum es so wichtig ist, sich die Zusammenhänge zwischen den Geschäftsdaten, den Schlüsseln in diesen Daten und den vorhandenen physischen Ressourcen einerseits und der Verarbeitung in Spark andererseits klarzumachen, wobei unter den Ressourcen das Netzwerk, die CPU und der Speicher herausstechen.
Betrachten wir zunächst einige typische Schwierigkeiten, die bei der Partitionierung mit Apache Spark auftreten können:
Ungleiche Datenverteilung (Datenschiefe) und Shuffle-Blöcke
Bei der Verarbeitung mit der Standardpartitionierung von Apache Spark kann es zur ungleichen Datenverteilung (Datenschiefe) kommen, die wiederum zu Problemen mit Shuffles im Verlauf von Aggregationsvorgängen oder mit einzelnen Executors führen, denen nicht genügend Speicher zur Verfügung steht.
Beispiel für Datenschiefe
Wie wir hier sehen, befinden sich bei „Key A“ deutlich mehr Daten in der Partition. Deshalb nimmt die Ausführung der Aufgaben auf Exec-5 mehr Zeit in Anspruch als die der anderen fünf Aufgaben. Außerdem sollten Sie nicht vergessen, dass Spark Shuffle Blocks nicht größer als 2 GB sein können (intern, weil die MAX_SIZE der ByteBuffer Abstraktion auf 2 GB eingestellt ist). Wenn Sie beispielsweise eine Aggregation, einen Join oder Cache-Aufgaben ausführen, kommt es zu einem Spark Shuffle. Selbst eine geringe Anzahl von Partitionierungen oder ein geringfügiger Datenversatz kann hier massive Shuffle Block-Probleme nach sich ziehen. Sollten Sie also aufgrund eines Shuffle eine Fehlermeldung mit Verweis auf einen Verstoß gegen MAX_SIZE Limits erhalten, wissen Sie, dass das an Datenversatz liegen kann.
Mit Umsicht partitionieren
Wie also lassen sich Datenschiefe und übergroße Shuffle-Blöcke vermeiden? Durch umsichtiges Partitionieren. Damit reduzieren Sie zum einen die Speicherbeanspruchung, zum anderen sorgen Sie für eine vollständige Ressourcenauslastung auf den Executor-Nodes. Machen Sie sich also zuerst ein Bild von Ihren Daten – dem Umfang, den Datentypen und der Verteilung. Folgende Best Practices sind dabei hilfreich:
- Wählen Sie für Aktionen wie reduceByKey oder aggregateByKey die richtigen Operatoren je nach Wirkungsweise aus, damit der Treiber nicht belastet wird und die Aufgaben auf den Executors ordnungsgemäß ausgeführt werden.
- Wenn als Ergebnis wenige große, nicht aufteilbare Dateien zurückgegeben werden, liegt es wahrscheinlich daran, dass InputFormat zwar zahlreiche Datensätze pro Partition erzeugt hat, jedoch nicht genügend Partitionen, um die verfügbaren Kerne auszunutzen. In diesem Fall erreichen Sie durch einen erneuten Aufruf der Partitionierung mit InputFormat und eine großen Anzahl Partitionen nach dem Laden der Daten, dass bei den anschließenden Operationen eine größere Anzahl von CPUs des Clusters genutzt wird.
- Wenn die Daten außerdem ungleich verteilt sind (Datenschiefe), wird empfohlen, bei der Neupartitionierung einen entsprechenden Schlüssel für eine gleichmäßigere Lastverteilung anzugeben.
Talend bietet eine Komponente tPartition an, mit der Sie eine Neupartitionierung anhand von entsprechenden Schlüsseln vornehmen können, die Sie selbst auswählen.
Wie lässt sich die richtige Anzahl von Partitionen ermitteln?
Apache Spark kann zu einem Zeitpunkt immer nur eine Aufgabe pro RDD-Partition ausführen. Insgesamt sind so viele Aufgaben ausführbar, wie Kerne im Cluster vorhanden sind (und wahrscheinlich auch das 2-3-Fache dessen). Um Gleichzeitigkeit/Parallelität zu erreichen, wäre die “richtige“ Anzahl von Partitionen damit mindestens so groß wie die Anzahl der Executors. Diesen Wert können Sie durch Aufruf von sc.defaultParallelism berechnen. Letztlich hängt die maximale Größe einer Partition davon ab, wie groß der für den Executor verfügbare Speicherbereich ist.
In manchen Fällen ist nicht feststellbar, welcher Neupartitionierungs-Schlüssel “richtig“ im Sinne einer gleichmäßigen Datenverteilung ist. Hierfür stehen dann Verfahren wie das Salting zur Verfügung, bei dem für eine gleichmäßigere Verteilung zusätzlich zum aktuellen Schlüssel und gleichzeitig mit diesem ein neuer “Pseudo“-Schlüssel verwendet wird. Hier ein Beispiel:
- Sie fügen einem großen RDD ein Zufallselement hinzu und erstellen damit nach folgender Formel einen neuen Join-Schlüssel: “Salting-Schlüssel = tatsächlicher Join-Schlüssel und Pseudo-Zufallsschlüssel, wobei der Pseudo-Schlüssel einen Wert zwischen 1 und N annehmen kann und N der Grad der Verteilung ist“.
- Sie fügen mithilfe eines kartesischen Produkts (1-N) einem kleinen RDD ein Zufallselement hinzu, mit dem Sie die Anzahl der Einträge erhöhen, und erstellen einen neuen Join-Schlüssel.
- Mit einem neuen Join-Schlüssel verbinden Sie die RDDs, die nun aufgrund des zufälligen Startwerts besser verteilt sind.
- Sie entfernen den Pseudo-Zufallsschlüssel wieder aus dem Join-Schlüssel und erhalten so das Endergebnis der Join-Operation.
Für das obige Beispiel gilt: Der Pseudo-Schlüssel ist im Lookup-Datensatz ein kartesisches Produkt (1-N); im Haupt-Datensatz ist er ein Zufallsschlüssel (1-N), jeweils bezogen auf den Quell-Datensatz in den einzelnen Zeilen, wobei N den Grad der Verteilung angibt.
Talend und Apache Spark
Talend Studio umfasst außerdem grafische Tools und Assistenten zum Generieren von nativem Code. Dies ist zugleich Ihr richtiger Einstieg in die Arbeit mit Apache Spark, Spark Streaming und sogar in die richtige Partitionierung Ihrer Daten. Die oben dargestellten Verfahren lassen sich mit der Komponente tMap von Talend realisieren. Eine eventuell notwendige Neupartitionierung führen Sie mit der Komponente tPartition in Talend durch, wenn Sie mit Ihren Daten gut vertraut sind, und ein eventuelles Salting oder Zufallswert-Verfahren führen Sie je nachdem entweder bei Verdacht auf ungleich verteilte Schlüssel oder auf Nullwerte durch.
Ich hoffe, dass ich Ihnen mit diesem kurzen Blogbeitrag etwas Neues über die Grundlagen der Partitionierung und Verarbeitung mit Apache Spark vermitteln konnte. Weitere Informationen dazu, wie Sie mit Talend und Apache Spark die Geschwindigkeit und den Umfang Ihrer Big-Data-Verarbeitung steigern, finden Sie auf unserer Seite mit Lösungen.
Quellenangaben:
https://issues.apache.org/jira/browse/SPARK-6235
https://0x0fff.com/spark-architecture
https://www.youtube.com/watch?v=WyfHUNnMutg
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
https://stackoverflow.com/questions/40373577/skewed-dataset-join-in-spark
Sind Sie bereit, mit Talend durchzustarten?
Weitere Artikel zu diesem Thema
- 5 Möglichkeiten, um Ihre Big Data zu optimieren
- Apache Hadoop: Definition, Aufbau und Anwendungsbeispiele
- Was ist MapReduce?
- Die Zukunft von Big Data – Definition und Anwendung
- Einsteigerleitfaden Batchverarbeitung
- Big Data im Finanzwesen: Ihr Guide für die Finanzdaten-Analyse
- Big Data im Marketing
- Smart Retailing: Big Data in E-Commerce und Einzelhandel