Memory consumption for Spark SQL with aggregation function -


i have hive table few bigint or string columns, on 38 million rows, total size on 1gb, test environment small standalone cluster 4 worker nodes , each has 8gb memory, spark 1.4. spark-sql shell, tried execute sql

create table bla select user_id, brand_id, sum(cnt) foo group user_id, brand_id 

several times , job got stuck in first stage few tasks hanging.

gc report seems indicate there not enough memory store transient objects , process waiting full gc finish.

gc output 1 hanging node:

4572.821: [gc [psyounggen: 1707488k->595392k(1963136k)] 5871286k->5321918k(7555584k), 1.2474880 secs] [times: user=16.32 sys=0.43, real=1.25 secs] 4575.891: [gc [psyounggen: 1739840k->559488k(1970304k)] 6466366k->5882382k(7562752k), 0.9950000 secs] [times: user=12.69 sys=0.72, real=1.00 secs] 4576.886: [full gc

my question is: sql need more 32gb memory run against 1gb data? or there quick optimization settings missed? sql executed less amount of records, or full data, if avoid sum/avg in query, queries like

select user_id, brand_id, count(cnt) 

or

select user_id, brand_id, concat(xx) 

all work , memory footprint on each node never exceed 1 or 2gb after gc.

second question: initial stage use 10 partitions, how change parallelism?

thanks wanchun

to specific questions:

  • how memory spark needs depends on data , operations perform on it. in particular case, larger unique group by aggregates, greater memory overhead. note jvm data structures are, in general, memory inefficient. see this presentation details. last not least have careful comparing sizes. table 1gb on disk? using optimized format automatic compressions such parquet?

  • there 2 ways change parallelism. first, can change number of input files. second, can explicitly repartition data using repartition(numpartitions). method in rdd doc.

you can repartition programmatically based on number of existing partitions achieve desired level of parallelism or data file granularity, e.g.,

val df = ctx.jsonfile("file://...") df.repartition(10 * df.rdd.partitions.size) 

by using df.rdd.partitions.size can adjust number of partitions dynamically based on cluster size, e.g., number of partitions be, say, 4x number of executor cpus, etc.

if experiencing memory problems, increasing number of partitions starting point finding processing pipeline works within constraints of cluster.

there other memory-related optimizations:

  • if run out of memory during reduce phase, may want increase spark.shuffle.memoryfraction

  • if operations not benefit caching, reduce spark.storage.memoryfraction.

more configuring spark.


Comments

Popular posts from this blog

qt - Using float or double for own QML classes -

Create Outlook appointment via C# .Net -

ios - Swift Array Resetting Itself -