java - Creating an RDD after retrieving data from cassandra DB -
i'm using cassandra , spark project, wrote retrieve data db:
results = session.execute("select * foo.test"); arraylist<string> supportlist = new arraylist<string>(); (row row : results) { supportlist.add(row.getstring("firstcolumn") + "," + row.getstring("secondcolumn"))); } javardd<string> input = sparkcontext.parallelize(supportlist); javapairrdd<string, double> tuple = input.maptopair(new pairfunction<string, string, double>() { public tuple2<string, double> call(string x) { string[] parts = x.split(","); return new tuple2(parts[0],string.valueof(new random().nextint(30) + 1)); }
it works, want know if there pretty way write above code, want achieve is:
in scala can retrieve , fill rdd in way :
val datardd = sc.cassandratable[tablecolumnnames]("keyspace", "table")
how can write same thing in java, without using support list or other "nasty" things.
update
javardd<string> cassandrarowsrdd = javafunctions(javasparkcontext).cassandratable("keyspace", "table") .map(new function<cassandrarow, string>() { @override public string call(cassandrarow cassandrarow) throws exception { return cassandrarow.tostring(); } });
i'm getting on row -> public string call(cassandrarow cassandrarow)
exception:
exception in thread "main" org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:166) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:158) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:1623) @ org.apache.spark.rdd.rdd.map(rdd.scala:286) @ org.apache.spark.api.java.javarddlike$class.map(javarddlike.scala:89) @ org.apache.spark.api.java.abstractjavarddlike.map(javarddlike.scala:46) @ org.sparkexamples.cassandraexample.main.kmeans.executequery(kmeans.java:271) @ org.sparkexamples.cassandraexample.main.kmeans.main(kmeans.java:67) caused by: java.io.notserializableexception: org.sparkexamples.cassandraexample.main.kmeans serialization stack: - object not serializable (class: org.sparkexamples.cassandraexample.main.kmeans, value: org.sparkexamples.cassandraexample.main.kmeans@3015db78) - field (class: org.sparkexamples.cassandraexample.main.kmeans$2, name: this$0, type: class org.sparkexamples.cassandraexample.main.kmeans) - object (class org.sparkexamples.cassandraexample.main.kmeans$2, org.sparkexamples.cassandraexample.main.kmeans$2@5dbf5634) - field (class: org.apache.spark.api.java.javapairrdd$$anonfun$toscalafunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.function) - object (class org.apache.spark.api.java.javapairrdd$$anonfun$toscalafunction$1, <function1>) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:38) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:164) ... 7 more
thanks in advance.
have @ answer: rdd not serializable cassandra/spark connector java api
the problem may class surrounding code block you've shown not serializable.
Comments
Post a Comment