Introduzione al partizionamento di Apache Spark

Cosa è necessario sapere

I set di dati RDD (Resilient Distributed Datasets) di Apache Spark sono raccolte di dati di varia natura, dalle dimensioni talmente grandi da non poter rientrare in un unico nodo e che devono pertanto essere ripartiti tra vari nodi. Apache Spark partiziona automaticamente i dati RDD e distribuisce le partizioni tra diversi nodi. La valutazione di questi dati è lenta (l'esecuzione non viene avviata fino a quando non viene attivata un'azione che ne incrementa la gestibilità e ne riduce l'elaborazione, migliorando, di conseguenza, ottimizzazione e velocità) e le trasformazioni vengono memorizzate sotto forma di grafi aciclici diretti (DAG). Ne consegue che ogni azione eseguita sui dati RDD comporta una rielaborazione dei DAG da parte di Apache Spark.

È importante comprendere le caratteristiche delle partizioni di Apache Spark, in modo da poter migliorare prestazioni, debugging e gestione degli errori.

Ecco alcuni degli elementi fondamentali della procedura di partizionamento:

  • Ogni nodo in un cluster Spark contiene una o più partizioni.
  • Il numero di partizioni utilizzate in Spark è configurabile; è sconsigliabile disporre di un numero troppo basso di partizioni (in quanto limita la concorrenza ed è causa di asimmetrie nei dati e di un uso improprio delle risorse), così come di un numero troppo elevato (in quanto la pianificazione delle attività richiederebbe più tempo della loro effettiva esecuzione). Per impostazione predefinita, tale numero è definito dal numero totale di core di tutti i nodi esecutori.
  • In Spark, le partizioni non si estendono su più macchine.
  • Le tuple di una stessa partizione sono garantite sulla medesima macchina.
  • Spark assegna un'attività per partizione e ogni operatore può elaborare una sola attività per volta.

Partizionamento hash e partizionamento range in Apache Spark

Apache Spark supporta due tipologie di partizionamento, il "partizionamento hash" e il "partizionamento range". La tecnica di partizionamento verrà scelta sulla base della distribuzione delle chiavi nei dati o della loro sequenza, così come del tipo di azione che si desidera eseguire sui dati. Sono diversi i fattori che determinano la scelta del metodo di partizionamento, come:

  • Risorse disponibili — Numero di core su cui l'attività può essere eseguita.
  • Sorgenti dati esterne — Le dimensioni delle raccolte locali, la tabella Cassandra o il file HDFS determinano il numero di partizioni.
  • Trasformazioni utilizzate per ricavare i dati RDD — Una serie di regole consente di determinare il numero di partizioni quando un set di dati RDD viene ricavato da un altro set di dati RDD.

Come si può notare, sono molteplici gli aspetti da tenere presente quando si lavora con Apache Spark. In questo blog, vorrei sottolineare l'importanza di essere pienamente consapevoli dei propri dati di business, delle relative chiavi e delle risorse fisiche coinvolte nell'elaborazione Spark, in modo particolare di rete, CPU e memoria.

Analizziamo ora alcuni comuni insidie che possono presentarsi quando si lavora con il partizionamento di Apache Spark:

Dati asimmetrici e blocchi shuffle

L'elaborazione dei dati con il partizionamento predefinito di Apache Spark potrebbe portare a un'asimmetria dei dati che, a sua volta, può causare problemi di shuffle durante le operazioni di aggregazione o di memoria insufficiente da parte di un singolo esecutore.


Esempio di asimmetria dei dati

Come vediamo qui, "key-a" presenta una quantità maggiore di dati nella partizione, di conseguenza le attività in Exec-5 richiederanno più tempo per essere completate rispetto alle altre cinque attività. Un'altra cosa importante da ricordare è che i blocchi shuffle di Spark non possono superare i 2 GB (internamente, in quanto l'astrazione ByteBuffer ha il valore MAX_SIZE impostato su 2 GB). Ad esempio, durante l'esecuzione di operazioni di aggregazione, unione o cache, viene implementato lo shuffle di Spark; inoltre, la disponibilità di un numero limitato di partizioni o la presenza di asimmetrie nei dati può causare un problema di blocchi shuffle elevati. Di conseguenza, se iniziate a riscontrare un errore di superamento dei limiti MAX_SIZE dovuto a shuffle, sappiate che ciò potrebbe essere collegato alla presenza di asimmetrie nei dati.

Partizionamento intelligente

Cosa fare per evitare asimmetrie nei dati e blocchi shuffle? Semplice: ripartire i dati in modo intelligente. È fondamentale effettuare partizionamenti intelligenti per poter ottimizzare l'uso della memoria così come assicurare un impiego completo delle risorse sui nodi esecutori. Bisogna sempre conoscere i dati — dimensione, tipo e distribuzione. Ecco qualche best practice da tenere a mente:

  • Conoscere e saper scegliere i giusti operatori per azioni quali reduceByKey o aggregateByKey, in modo che il driver non venga messo sotto pressione e le attività vengano eseguite correttamente sugli esecutori.
  • Se i dati arrivano suddivisi in pochi file di grandi dimensioni non separabili, il partizionamento dettato da InputFormat potrebbe inserire un numero elevato di record in ciascuna partizione e non generare un numero di partizioni sufficiente per sfruttare al meglio tutti i core disponibili. In tal caso, l'esecuzione di un partizionamento con un elevato numero di partizioni dopo il caricamento dei dati consentirà alle operazioni che vengono eseguite successivamente di sfruttare maggiormente la CPU del cluster.
  • Inoltre, in caso di dati asimmetrici, è consigliabile eseguire il partizionamento con una chiave appropriata, in grado di suddividere il carico in modo omogeneo.

Talend offre un componente tPartition per esigenze di partizionamento basate su chiavi appropriate scelte dall'utente.

Come si ottiene il numero giusto di partizioni?

Apache Spark è in grado di eseguire una sola attività per ogni singola partizione di un set di dati RDD, fino al numero massimo di core presenti nel cluster (e forse fino a 2-3 volte di più). Di conseguenza, per quanto concerne la scelta della "giusta" quantità di partizioni, in genere, si punta a un numero almeno equivalente al numero degli esecutori, per garantire il parallelismo. È possibile calcolare questo valore mediante l'operazione sc.defaultParallelism. Le dimensioni massime di una partizione sono in ultima analisi dettate dalla disponibilità di memoria di ogni singolo esecutore.

Vi sono inoltre casi in cui non è possibile determinare quale sia la chiave di partizionamento più appropriata da utilizzare per garantire una distribuzione uniforme dei dati. In tali situazioni, è possibile adottare un metodo detto "salting", che comporta l'aggiunga di una chiave "falsa" e l'uso simultaneo della chiave effettiva per ottenere una migliore distribuzione dei dati. Ecco un esempio:

  • Aggiungere un elemento casuale a un set di dati RDD di grandi dimensioni e creare una nuova chiave join insieme a esso, come "chiave salting = chiave join effettiva + chiave falsa casuale, dove la chiave falsa ha un valore compreso tra 1 e N, con N corrispondente al livello di distribuzione"
  • Aggiungere un elemento casuale a un set di dati RDD di piccole dimensioni utilizzando un prodotto Cartesiano (1-N), per incrementare il numero di voci e creare una nuova chiave join
  • Unire i set di dati RDD su una nuova chiave join che verrà ora distribuita in modo più uniforme grazie al processo di seeding casuale
  • Rimuovere la chiave falsa casuale dalla chiave join per ottenere il risultato finale dell'unione

Nell'esempio di cui sopra, la chiave falsa nel set di dati di riferimento sarà un prodotto cartesiano (1-N), mentre per il set di dati principale sarà una chiave casuale (1-N) per il set di dati sorgente di ciascuna riga, con N corrispondente al livello di distribuzione.

Talend e Apache Spark

Talend Studio offre strumenti grafici e wizard per generare codice nativo, permettendo di iniziare subito a lavorare con database Apache Spark, Spark Streaming e di ripartire i dati in modo corretto. Le tecniche illustrate sopra possono essere implementate utilizzando il componente tMap di Talend. È inoltre possibile soddisfare esigenze di partizionamento con il componente tPartition di Talend, se si conoscono bene i dati, e utilizzare il componente tMap per eseguire procedure di salting e utilizzare tecniche di numerazione casuale in caso di problemi di dati con chiavi asimmetriche o di valore null.

Mi auguro che questo breve post di blog vi abbia fornito qualche utile informazione di base sul partizionamento di Apache Spark. Per approfondire le vostre conoscenze e scoprire come Talend e Apache Spark possono essere utilizzati insieme per velocizzare e far crescere le vostre attività di elaborazione dei big data, visitate la nostra pagina delle soluzioni.

Riferimenti:

https://issues.apache.org/jira/browse/SPARK-6235

https://0x0fff.com/spark-architecture

https://www.youtube.com/watch?v=WyfHUNnMutg

http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

https://stackoverflow.com/questions/40373577/skewed-dataset-join-in-spark

Sei pronto a iniziare con Talend?