java - Creating an RDD after retrieving data from cassandra DB -


i'm using cassandra , spark project, wrote retrieve data db:

 results = session.execute("select * foo.test");   arraylist<string> supportlist = new arraylist<string>();  (row row : results) {             supportlist.add(row.getstring("firstcolumn") + "," + row.getstring("secondcolumn")));         }         javardd<string> input = sparkcontext.parallelize(supportlist);         javapairrdd<string, double> tuple = input.maptopair(new pairfunction<string, string, double>() {             public tuple2<string, double> call(string x) {                 string[] parts = x.split(",");                 return new tuple2(parts[0],string.valueof(new random().nextint(30) + 1));             } 

it works, want know if there pretty way write above code, want achieve is:

  • in scala can retrieve , fill rdd in way :

    val datardd = sc.cassandratable[tablecolumnnames]("keyspace", "table")

  • how can write same thing in java, without using support list or other "nasty" things.

update

javardd<string> cassandrarowsrdd = javafunctions(javasparkcontext).cassandratable("keyspace", "table")                 .map(new function<cassandrarow, string>() {                     @override                     public string call(cassandrarow cassandrarow) throws exception {                         return cassandrarow.tostring();                     }                 }); 

i'm getting on row -> public string call(cassandrarow cassandrarow) exception:

exception in thread "main" org.apache.spark.sparkexception: task not serializable     @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:166)     @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:158)     @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:1623)     @ org.apache.spark.rdd.rdd.map(rdd.scala:286)     @ org.apache.spark.api.java.javarddlike$class.map(javarddlike.scala:89)     @ org.apache.spark.api.java.abstractjavarddlike.map(javarddlike.scala:46)     @ org.sparkexamples.cassandraexample.main.kmeans.executequery(kmeans.java:271)     @ org.sparkexamples.cassandraexample.main.kmeans.main(kmeans.java:67) caused by: java.io.notserializableexception: org.sparkexamples.cassandraexample.main.kmeans serialization stack:     - object not serializable (class: org.sparkexamples.cassandraexample.main.kmeans, value: org.sparkexamples.cassandraexample.main.kmeans@3015db78)     - field (class: org.sparkexamples.cassandraexample.main.kmeans$2, name: this$0, type: class org.sparkexamples.cassandraexample.main.kmeans)     - object (class org.sparkexamples.cassandraexample.main.kmeans$2, org.sparkexamples.cassandraexample.main.kmeans$2@5dbf5634)     - field (class: org.apache.spark.api.java.javapairrdd$$anonfun$toscalafunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.function)     - object (class org.apache.spark.api.java.javapairrdd$$anonfun$toscalafunction$1, <function1>)     @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:38)     @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47)     @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80)     @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:164)     ... 7 more 

thanks in advance.

have @ answer: rdd not serializable cassandra/spark connector java api

the problem may class surrounding code block you've shown not serializable.


Comments

Popular posts from this blog

qt - Using float or double for own QML classes -

Create Outlook appointment via C# .Net -

ios - Swift Array Resetting Itself -