Convert clojure vector to flambo sql row -
i'm working on developing function convert vector sql row further convert data-frame , save table using sqlcontext in apache spark. i'm developing in clojure , got lost along way. thought of implementing solution thus:
- for each rdd (vector) convert rows
- convert rows data frame
- save data frame table
- use sqlcontext query particular information in table
and how convert result query into rdd again further analysis.
(defn assign-ecom [] (let [rdd-fields (-> (:rdd @transformed-rdd) (f/map #(sql/row->vec %)) f/collect)] (clojure.pprint/pprint rdd-fields)))
i'm using flambo v0.60 api function abstracting apache-spark functions, welcome suggestion how go solving problem.
here's link flambo row -> vec docs:
i assume have spark-context
(sc
) , sql-context
(sql-ctx
). first lets import stuff we'll need:
(import org.apache.spark.sql.rowfactory) (import org.apache.spark.sql.types.structtype) (import org.apache.spark.sql.types.structfield) (import org.apache.spark.sql.types.metadata) (import org.apache.spark.sql.types.datatypes)
for each rdd (vector) convert rows
;; vector row conversion (defn vec->row [v] (rowfactory/create (into-array object v))) ;; example data (def rows (-> (f/parallelize sc [["foo" 1] ["bar" 2]]) (f/map vec->row)))
convert rows data frame
;; define schema (def schema (structtype. (into-array structfield [(structfield. "k" (datatypes/stringtype) false (metadata/empty)) (structfield. "v" (datatypes/integertype) false (metadata/empty))]))) ;; create data frame (def df (.createdataframe sql-ctx rows schema)) ;; see if works (.show df)
save data frame table
(.registertemptable df "df")
use sqlcontext query particular information in table
(def df-keys (.sql sql-ctx "select upper(k) k df")) ;; check results (.show df-keys)
and how convert result query into rdd again further analysis.
(.tojavardd df-keys)
or if want vectors:
(f/map (.tojavardd df-keys) sql/row->vec)
Comments
Post a Comment