apache spark - How to perform basic statistics on a Json file to explore my numeric and non-numeric variable? -
i've imported json file has schema :
sqlcontext.read.json("filename").printschema root |-- col: long (nullable = true) |-- data: array (nullable = true) | |-- element: struct (containsnull = true) | | |-- crate: string (nullable = true) | | |-- mlrate: string (nullable = true) | | |-- nrout: string (nullable = true) | | |-- up: string (nullable = true) |-- ifam: string (nullable = true) |-- ktm: long (nullable = true)
i'm new on spark , want perform basic statistics like
- getting min, max, mean, median , std of numeric variables
- getting values frequencies non-numeric variables.
my questions :
- how change type of variables in schema, 'string' 'numeric' ? (crate, mlrate , nrout should numeric variables) ?
- how basic statistics ?
how change type of variables in schema, 'string' 'numeric' ? (crate, mlrate , nrout should numeric variables) ?
you can create schema manually , apply existing rdd. assume data stored in df
variable , has same structure example previous question:
import org.apache.spark.sql.types._ import org.apache.spark.sql.row val schema = structtype( structfield("col", longtype, true) :: structfield("data", arraytype(structtype( structfield("crate", integertype, true) :: structfield("mlrate", integertype, true) :: structfield("nrout", integertype, true) :: structfield("up", integertype, true) :: nil ), true), true) :: structfield("ifam", stringtype, true) :: structfield("ktm", longtype, true) :: nil ) def convert(row: row) = { val col = row.get(0) val data: seq[row] = row.getseq(1) val rowdata = data.map(r => row.fromseq(r.toseq.map{ case v: string => v.toint case _ => null })).tolist val ifam = row.get(2) val ktm = row.get(3) row(col, rowdata, ifam, ktm) } val updateddf = sqlcontext.applyschema(df.rdd.map(convert), schema) updateddf.printschema
and expected output:
root |-- col: long (nullable = true) |-- data: array (nullable = true) | |-- element: struct (containsnull = true) | | |-- crate: integer (nullable = true) | | |-- mlrate: integer (nullable = true) | | |-- nrout: integer (nullable = true) | | |-- up: integer (nullable = true) |-- ifam: string (nullable = true) |-- ktm: long (nullable = true)
you can adjust numeric type (decimaltype
, doubletype
according requirements).
getting min, max, mean,
median, std of numeric variables how basic statistics ?
the simplest way obtain statistics numeric variables use describe
method:
updateddf.describe().show
and nicely formatted output:
+-------+----+-------------+ |summary| col| ktm| +-------+----+-------------+ | count| 2| 2| | mean|21.5| 1.4300064e12| | stddev| 0.5| null| | min| 21|1430006400000| | max| 22|1430006400000| +-------+----+-------------+
if need output can access programmatically can org.apache.spark.sql.functions
import org.apache.spark.sql.functions
import org.apache.spark.sql.functions._ df.agg( min("ktm").alias("ktm_min"), max("ktm").alias("ktm_max"), mean("ktm").alias("ktm_mean")).show
none of above work array field though. work these you'll need udf
or flatten structure first.
val flattenedschema = structtype( structfield("col", longtype, true) :: structfield("crate", integertype, true) :: structfield("mlrate", integertype, true) :: structfield("nrout", integertype, true) :: structfield("up", integertype, true) :: structfield("ifam", stringtype, true) :: structfield("ktm", longtype, true) :: nil ) def flatten(row: row) = { val col = row.get(0) val data: seq[row] = row.getseq(1) val ifam = row.get(2) val ktm = row.get(3) data.map(dat => { val crate = dat.get(0) val mlrate = dat.get(1) val nrout = dat.get(2) val = dat.get(3) row(col, crate, mlrate, nrout, up, ifam, ktm) }) } val updatedflatdf = sqlcontext. applyschema(updateddf.rdd.flatmap(flatten), flattenedschema) updatedflatdf.describe().show
now can stats each field:
+-------+----+------------------+------------------+------------------+----+-------------+ |summary| col| crate| mlrate| nrout| up| ktm| +-------+----+------------------+------------------+------------------+----+-------------+ | count| 12| 12| 12| 12| 0| 12| | mean|21.5|2.1666666666666665| 31.75|2.3333333333333335|null| 1.4300064e12| | stddev| 0.5|1.2133516482134201|2.5535922410074345| 3.223179934302286|null| null| | min| 21| 0| 30| 0|null|1430006400000| | max| 22| 5| 38| 8|null|1430006400000| +-------+----+------------------+------------------+------------------+----+-------------+
getting
min, max, mean, medianand stdof numeric variables
getting quantiles, including median, far expensive large datasets. if have compute median may my answer how find median using spark useful. written in python pretty easy implement in scala well. little bit less comprehensive answer in scala has been provided eugene zhulenev here.
edit:
if want convert nrout
date can replace rowdata
inside convert
this:
val rowdata = data.map(dat => { val crate = dat.get(0).toint val mlrate = dat.get(1).toint val nrout = java.sql.timestamp.valueof(dat.get(2)) val = dat.get(3).toint row(crate, mlrate, nrout, up) })
and adjust schema:
structfield("nrout", timestamptype, true)
Comments
Post a Comment