Hello Everyone,I hope everyone is doing great and read my last blog, If not then kindly have a look and do let me know your comments on the same.Today we are going to discuss few technique through which we can handle data skewness in Apache Spark.
Datasets are not always easy and straightforward to be get handled in Big Data environment,We have to handle them in multiple ways.Before proceeding further let’s try to understand few points regarding Data skewness ?
What do you mean by Data Skewness ?
Data is skewed when the mean,the median and the mode are not equal to each other,as in the case with normal distribution. In simple words when your datasets are not distributed properly.
Why do we care about Data Skewness ?
Data skew is a serious problem in a distributed processing environment, and occurs when the data is not evenly divided among the emitted key tuples from the map phase. This can lead to inconsistent processing times.In contrast to Apache spark ,partitions contains uneven distribution which will hamper the parallelism benefit and eventually leads to degradation in execution time of tasks.
Have you ever amazed while performing join, Majority of your tasks rapidly completed but spend most of the time (90% of total time) in finishing the last few,If you have experienced this,then it means you are dealing with badly skewed dataset.
I have listed few ways down below :
- Salting Technique
- Isolating Salting
- Isolating Map Join
- Iterative broadcast Join
A) Repartition :
As mentioned in adverse effect of data skewness above ,we might ended up having irregular distribution of data among the partitions ,in other words most of the rows are present on a small number of partitions,while majority of the partitions remain empty.If you didn’t solved this issue this might lead to adverse effect on Application performance.To deal with skewness you should try to repartition your data based on key(Rdd) and column(dataframe) ,which will evenly distribute the data.You can even use primary key of the dataframe.
How to increase partition value?
- In Spark SQL, increase the value by spark.sql.shuffle.partitions
- In regular Spark applications, use rdd.repartition() or rdd.coalesce()
from pyspark.sql import SQLContext from pyspark.sql Row sqlcontext = SQLContext(sc) rdd = sc.textFile('/user/cloudera/sampledata/test.txt') rdd2 = rdd.map(lambda x:x.split(',')).map(lambda x: Row(key=x[o],value=x)) dataframe = sqlcontext.createDataFrame(rdd2) dataframe.registerTempTable("df") sqlcontext.sql("SET spark.sql.shuffle.partitions = 5") sqlcontext.sql("SELECT * FROM df DISTRIBUTE BY key, value")
Note : “Distribute By” doesn’t guarantee that data will be distributed evenly between partitions. It all depends on the hash of the expression by which we distribute.In most cases,with bigger data samples,this trick can mitigate the skewness problem. Repartition helps us in fighting the skewness issue to some extent but not completely.
B) Salting Technique :
In this technique we salted the key to make it salted.On reading this line definitely most of you have questions in their mind “what the heck Rahul is talking about right ?”.Let me explain it with an example,consider you have datasets consist of 4 unique keys out of which 1st key is having 2000 records ,2nd key is having 500 records ,3rd and 4th key is having 50 and 30 records respectively.This indicates if partitions is based on original key then we can observe imbalanced distribution of data across the partitions.In order to curb this situation we should modified our original keys to some modified keys whose hash partitioning cause the proper distributions of records among the partitions.This technique is called Salting technique.
Normal Key: “Foo”
Salted Key: “Foo” + Random Integer
In Salting Technique there are two stages of aggregation :
Stage 1 : We have to convert the keys into salted keys and then prepare the salted keys and value tuple using map transformation ,afterwards we have to perform required operations on the salted keys.
Stage 2 : Once we have desired results using salted keys,we have to again perform operations to access unsalted Key results.
Through this technique we can able to reduced the processing time to a greater extend while performing join operation on skewed data. For code example kindly check it out Python Example and Scala Example.
C) Isolated Salting Technique :
It is similar to what we have discussed above in Salting technique ,the only difference is that ,We apply salt to only some subset of keys where as in case of Salted technique we apply salt to entire keys.If you are using the ‘Isolated Salting ‘ technique then you should further filter to isolate your subset of salted keys in map joins.
D) Isolated Map Join Technique :
In this technique we are trying to reduce the data shuffling by using the Map Join.I am assuming you guys are aware of Map Join ,if not then do let me know through comment,I will try to cover in my upcoming blog.Just a glimpse “In map join mapper will take care of joining rather than reducer,which eventually fasten up the process”.In Isolated Map Join technique we filter out the Isolated keys and use map join on those and perform normal reduce on the rest of the data.
E) Iterative broadcast Join :
In this method we divided the smaller table into “passes”.After that we broadcast a pass and do the left join to the larger dataset. Once joining is done broadcast partition got cleared from memory.This process will repeat until all the passes are processed.Iterative Broadcast Join can be used to process skewed data while maintaining parallelism because Default join types have issues with skewed data.For more information on this ,kindly have a look into this video.
This is it for today’s post.If you found this post useful ,then kindly do like and share it with your friends and do let me know your doubts in comments.Thank you !