org.apache.sysml.runtime.instructions.spark.utils

Class RDDConverterUtilsExt

  • java.lang.Object
    • org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt


  • public class RDDConverterUtilsExt
    extends Object
    NOTE: These are experimental converter utils. Once thoroughly tested, they can be moved to RDDConverterUtils.
    • Constructor Detail

      • RDDConverterUtilsExt

        public RDDConverterUtilsExt()
    • Method Detail

      • coordinateMatrixToBinaryBlock

        public static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> coordinateMatrixToBinaryBlock(org.apache.spark.api.java.JavaSparkContext sc,
                                                                                                                     org.apache.spark.mllib.linalg.distributed.CoordinateMatrix input,
                                                                                                                     MatrixCharacteristics mcIn,
                                                                                                                     boolean outputEmptyBlocks)
                                                                                                              throws DMLRuntimeException
        Example usage:
        
         import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt
         import org.apache.sysml.runtime.matrix.MatrixCharacteristics
         import org.apache.spark.api.java.JavaSparkContext
         import org.apache.spark.mllib.linalg.distributed.MatrixEntry
         import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
         val matRDD = sc.textFile("ratings.text").map(_.split(" ")).map(x => new MatrixEntry(x(0).toLong, x(1).toLong, x(2).toDouble)).filter(_.value != 0).cache
         require(matRDD.filter(x => x.i == 0 || x.j == 0).count == 0, "Expected 1-based ratings file")
         val nnz = matRDD.count
         val numRows = matRDD.map(_.i).max
         val numCols = matRDD.map(_.j).max
         val coordinateMatrix = new CoordinateMatrix(matRDD, numRows, numCols)
         val mc = new MatrixCharacteristics(numRows, numCols, 1000, 1000, nnz)
         val binBlocks = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), coordinateMatrix, mc, true)
         
        Parameters:
        sc - java spark context
        input - coordinate matrix
        mcIn - matrix characteristics
        outputEmptyBlocks - if true, inject empty blocks if necessary
        Returns:
        matrix as JavaPairRDD<MatrixIndexes, MatrixBlock>
        Throws:
        DMLRuntimeException - if DMLRuntimeException occurs
      • allocateDenseOrSparse

        public static MatrixBlock allocateDenseOrSparse(int rlen,
                                                        int clen,
                                                        boolean isSparse)
      • addIDToDataFrame

        public static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> addIDToDataFrame(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
                                                                                              org.apache.spark.sql.SparkSession sparkSession,
                                                                                              String nameOfCol)
        Add element indices as new column to DataFrame
        Parameters:
        df - input data frame
        sparkSession - the Spark Session
        nameOfCol - name of index column
        Returns:
        new data frame
      • stringDataFrameToVectorDataFrame

        public static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> stringDataFrameToVectorDataFrame(org.apache.spark.sql.SparkSession sparkSession,
                                                                                                              org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> inputDF)
                                                                                                       throws DMLRuntimeException
        Convert a dataframe of comma-separated string rows to a dataframe of ml.linalg.Vector rows.

        Example input rows:
        ((1.2, 4.3, 3.4))
        (1.2, 3.4, 2.2)
        [[1.2, 34.3, 1.2, 1.25]]
        [1.2, 3.4]

        Parameters:
        sparkSession - Spark Session
        inputDF - dataframe of comma-separated row strings to convert to dataframe of ml.linalg.Vector rows
        Returns:
        dataframe of ml.linalg.Vector rows
        Throws:
        DMLRuntimeException - if DMLRuntimeException occurs

Copyright © 2017 The Apache Software Foundation. All rights reserved.