apache spark - How to perform basic statistics on a Json file to explore my numeric and non-numeric variable? -


i've imported json file has schema :

sqlcontext.read.json("filename").printschema     root   |-- col: long (nullable = true)   |-- data: array (nullable = true)   |    |-- element: struct (containsnull = true)   |    |    |-- crate: string (nullable = true)   |    |    |-- mlrate: string (nullable = true)   |    |    |-- nrout: string (nullable = true)   |    |    |-- up: string (nullable = true)   |-- ifam: string (nullable = true)   |-- ktm: long (nullable = true)  

i'm new on spark , want perform basic statistics like

  • getting min, max, mean, median , std of numeric variables
  • getting values frequencies non-numeric variables.

my questions :

  • how change type of variables in schema, 'string' 'numeric' ? (crate, mlrate , nrout should numeric variables) ?
  • how basic statistics ?

how change type of variables in schema, 'string' 'numeric' ? (crate, mlrate , nrout should numeric variables) ?

you can create schema manually , apply existing rdd. assume data stored in df variable , has same structure example previous question:

import org.apache.spark.sql.types._ import org.apache.spark.sql.row  val schema = structtype(     structfield("col", longtype, true) ::     structfield("data", arraytype(structtype(         structfield("crate", integertype, true) ::         structfield("mlrate", integertype, true) ::         structfield("nrout", integertype, true) ::         structfield("up", integertype, true) ::         nil     ), true), true) ::      structfield("ifam", stringtype, true) ::      structfield("ktm", longtype, true) ::      nil )  def convert(row: row) = {     val col = row.get(0)     val data: seq[row] = row.getseq(1)     val rowdata = data.map(r => row.fromseq(r.toseq.map{         case v: string => v.toint         case _ => null     })).tolist     val ifam = row.get(2)     val ktm = row.get(3)     row(col, rowdata, ifam, ktm) }  val updateddf = sqlcontext.applyschema(df.rdd.map(convert), schema) updateddf.printschema 

and expected output:

root  |-- col: long (nullable = true)  |-- data: array (nullable = true)  |    |-- element: struct (containsnull = true)  |    |    |-- crate: integer (nullable = true)  |    |    |-- mlrate: integer (nullable = true)  |    |    |-- nrout: integer (nullable = true)  |    |    |-- up: integer (nullable = true)  |-- ifam: string (nullable = true)  |-- ktm: long (nullable = true) 

you can adjust numeric type (decimaltype, doubletype according requirements).

getting min, max, mean, median , std of numeric variables how basic statistics ?

the simplest way obtain statistics numeric variables use describe method:

updateddf.describe().show 

and nicely formatted output:

+-------+----+-------------+ |summary| col|          ktm| +-------+----+-------------+ |  count|   2|            2| |   mean|21.5| 1.4300064e12| | stddev| 0.5|         null| |    min|  21|1430006400000| |    max|  22|1430006400000| +-------+----+-------------+ 

if need output can access programmatically can org.apache.spark.sql.functions import org.apache.spark.sql.functions

import org.apache.spark.sql.functions._  df.agg(     min("ktm").alias("ktm_min"),     max("ktm").alias("ktm_max"),     mean("ktm").alias("ktm_mean")).show 

none of above work array field though. work these you'll need udf or flatten structure first.

val flattenedschema = structtype(     structfield("col", longtype, true) ::     structfield("crate", integertype, true) ::     structfield("mlrate", integertype, true) ::     structfield("nrout", integertype, true) ::     structfield("up", integertype, true) ::     structfield("ifam", stringtype, true) ::      structfield("ktm", longtype, true) ::      nil )  def flatten(row: row) = {     val col = row.get(0)     val data: seq[row] = row.getseq(1)     val ifam = row.get(2)     val ktm = row.get(3)      data.map(dat => {         val crate = dat.get(0)         val mlrate = dat.get(1)         val nrout = dat.get(2)         val = dat.get(3)         row(col, crate, mlrate, nrout, up, ifam, ktm)     }) }  val updatedflatdf = sqlcontext.     applyschema(updateddf.rdd.flatmap(flatten), flattenedschema)  updatedflatdf.describe().show 

now can stats each field:

+-------+----+------------------+------------------+------------------+----+-------------+ |summary| col|             crate|            mlrate|             nrout|  up|          ktm| +-------+----+------------------+------------------+------------------+----+-------------+ |  count|  12|                12|                12|                12|   0|           12| |   mean|21.5|2.1666666666666665|             31.75|2.3333333333333335|null| 1.4300064e12| | stddev| 0.5|1.2133516482134201|2.5535922410074345| 3.223179934302286|null|         null| |    min|  21|                 0|                30|                 0|null|1430006400000| |    max|  22|                 5|                38|                 8|null|1430006400000| +-------+----+------------------+------------------+------------------+----+-------------+ 

getting min, max, mean, median and std of numeric variables

getting quantiles, including median, far expensive large datasets. if have compute median may my answer how find median using spark useful. written in python pretty easy implement in scala well. little bit less comprehensive answer in scala has been provided eugene zhulenev here.

edit:

if want convert nrout date can replace rowdata inside convert this:

val rowdata = data.map(dat => {     val crate = dat.get(0).toint     val mlrate = dat.get(1).toint     val nrout = java.sql.timestamp.valueof(dat.get(2))     val = dat.get(3).toint     row(crate, mlrate, nrout, up) }) 

and adjust schema:

structfield("nrout", timestamptype, true) 

Comments

Popular posts from this blog

qt - Using float or double for own QML classes -

Create Outlook appointment via C# .Net -

ios - Swift Array Resetting Itself -