Memory consumption for Spark SQL with aggregation function -
i have hive table few bigint or string columns, on 38 million rows, total size on 1gb, test environment small standalone cluster 4 worker nodes , each has 8gb memory, spark 1.4. spark-sql shell, tried execute sql
create table bla select user_id, brand_id, sum(cnt) foo group user_id, brand_id
several times , job got stuck in first stage few tasks hanging.
gc report seems indicate there not enough memory store transient objects , process waiting full gc finish.
gc output 1 hanging node:
4572.821: [gc [psyounggen: 1707488k->595392k(1963136k)] 5871286k->5321918k(7555584k), 1.2474880 secs] [times: user=16.32 sys=0.43, real=1.25 secs] 4575.891: [gc [psyounggen: 1739840k->559488k(1970304k)] 6466366k->5882382k(7562752k), 0.9950000 secs] [times: user=12.69 sys=0.72, real=1.00 secs] 4576.886: [full gc
my question is: sql need more 32gb memory run against 1gb data? or there quick optimization settings missed? sql executed less amount of records, or full data, if avoid sum/avg in query, queries like
select user_id, brand_id, count(cnt)
or
select user_id, brand_id, concat(xx)
all work , memory footprint on each node never exceed 1 or 2gb after gc.
second question: initial stage use 10 partitions, how change parallelism?
thanks wanchun
to specific questions:
how memory spark needs depends on data , operations perform on it. in particular case, larger unique
group by
aggregates, greater memory overhead. note jvm data structures are, in general, memory inefficient. see this presentation details. last not least have careful comparing sizes. table 1gb on disk? using optimized format automatic compressions such parquet?there 2 ways change parallelism. first, can change number of input files. second, can explicitly repartition data using
repartition(numpartitions)
. method in rdd doc.
you can repartition programmatically based on number of existing partitions achieve desired level of parallelism or data file granularity, e.g.,
val df = ctx.jsonfile("file://...") df.repartition(10 * df.rdd.partitions.size)
by using df.rdd.partitions.size
can adjust number of partitions dynamically based on cluster size, e.g., number of partitions be, say, 4x number of executor cpus, etc.
if experiencing memory problems, increasing number of partitions starting point finding processing pipeline works within constraints of cluster.
there other memory-related optimizations:
if run out of memory during reduce phase, may want increase
spark.shuffle.memoryfraction
if operations not benefit caching, reduce
spark.storage.memoryfraction
.
more configuring spark.
Comments
Post a Comment