systemml-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [systemml] mboehm7 commented on a change in pull request #909: [SYSTEMDS-???] [WIP] Distinct values estimation function
Date Sun, 31 May 2020 17:17:52 GMT

mboehm7 commented on a change in pull request #909:
URL: https://github.com/apache/systemml/pull/909#discussion_r432964409



##########
File path: src/main/java/org/apache/sysds/common/Builtins.java
##########
@@ -174,6 +174,8 @@
 	TANH("tanh", false),
 	TRACE("trace", false),
 	TYPEOF("typeOf", false),
+	COUNT_DISTINCT("countDistinct",false),
+	COUNT_DISTINCT_ESTIMATE_KMV("countDistinctEstimateKMV",false),

Review comment:
       I like the `countDistinct`, but maybe we should use `countDistinctApprox` in order to stick with Spark's existing naming (well they use `approxCountDistinct` but count should go first).  

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;
+
+	private LibMatrixEstimator() {
+		// Prevent instantiation via private constructor.
+	}
+
+	/**
+	 * Public method to count the number of distinct values inside a matrix. Depending on which EstimatorOperator
+	 * selected it either gets the absolute number or a estimated value.
+	 * 
+	 * TODO: support counting num distinct in rows, or columns axis.

Review comment:
       Yes besides that, also add a TODO for distributed spark operations.

##########
File path: src/main/java/org/apache/sysds/runtime/util/DataConverter.java
##########
@@ -456,6 +456,23 @@ public static MatrixBlock convertToMatrixBlock( double[][] data ) {
 		return mb;
 	}
 
+
+	/**
+	 * Converts an Integer matrix to an MatrixBlock (FP32)
+	 * 
+	 * @param data Int matrix input that is converted to double MatrixBlock
+	 * @return The matrixBlock constructed.
+	 */
+	public static MatrixBlock convertToMatrixBlock(int[][] data){
+		double[][] input = new double[data.length][];
+		int index = 0;
+		for(int[] x: data){
+			input[index] = Arrays.stream(x).asDoubleStream().toArray();

Review comment:
       If you want to use streams here, use `Arrays.setAll(input[index], i -> data[index][i])`

##########
File path: src/test/java/org/apache/sysds/test/component/matrix/MatrixEstimatorTest.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.matrix;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.data.LibMatrixEstimator;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Hash.HashType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class MatrixEstimatorTest {
+
+	private static AUType[] esT = new AUType[] {
+		// The different types of Estimators
+		AUType.COUNT_DISTINCT,
+		// EstimatorType.NUM_DISTINCT_KMV,
+		// EstimatorType.NUM_DISTINCT_HYPER_LOG_LOG
+	};
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+		ArrayList<MatrixBlock> inputs = new ArrayList<>();
+		ArrayList<Long> actualUnique = new ArrayList<>();
+
+		// single value matrix.
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1, 1, 0.0, 100.0, 1, 7)));
+		actualUnique.add(1L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1, 100, 0.0, 100.0, 1, 7)));
+		actualUnique.add(100L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 1, 0.0, 100.0, 1, 7)));
+		actualUnique.add(100L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 100, 0.0, 100.0, 1, 7)));
+		// actualUnique.add(10000L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1024, 1024, 0.0, 100.0, 1, 7)));
+		// actualUnique.add(1024L * 1024L);
+
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(5000, 5000, 1, 100, 1, 8)));
+		actualUnique.add(99L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10240, 1, 100, 1, 7)));
+		actualUnique.add(99L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240, 1024, 1, 100, 1, 7)));
+		actualUnique.add(99L);
+
+		// inputs.add(
+		// DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 1024, 1000001, 1000100, 1, 8)));
+		// actualUnique.add(99L);
+		// inputs.add(
+		// DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10240, 1000001, 1000100, 1, 7)));
+		// actualUnique.add(99L);
+
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10241, 1, 1500, 1, 7)));
+		actualUnique.add(1499L);
+
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10241, 0, 3000, 1, 7)));
+		actualUnique.add(3000L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10241, 0, 6000, 1, 7)));
+		actualUnique.add(6000L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240, 10241, 0, 10000, 1,
+		// 7)));
+		// actualUnique.add(10000L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240, 10241, 0, 100000, 1,
+		// 7)));
+		// actualUnique.add(100000L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240, 10241, 0, 1000000, 1,
+		// 7)));
+		// actualUnique.add(1000000L);
+
+		for(AUType et : esT) {
+			for(HashType ht : HashType.values()) {
+				if(ht == HashType.ExpHash && et == AUType.COUNT_DISTINCT_ESTIMATE_KMV) {
+
+					String errorMessage = "Invalid hashing configuration using " + HashType.ExpHash + " and "
+						+ AUType.COUNT_DISTINCT_ESTIMATE_KMV;
+					tests.add(new Object[] {et, inputs.get(0), actualUnique.get(0), ht, DMLException.class,
+						errorMessage, 0.0});
+				}
+				else if(et == AUType.COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG) {
+					tests.add(new Object[] {et, inputs.get(0), actualUnique.get(0), ht, NotImplementedException.class,
+						"HyperLogLog not implemented", 0.0});
+				}
+				else {
+					if(et == AUType.COUNT_DISTINCT) {
+
+						for(int i = 0; i < inputs.size(); i++) {
+							tests.add(new Object[] {et, inputs.get(i), actualUnique.get(i), ht, null, null, 0.0001});
+						}
+					}
+					else {
+						for(int i = 0; i < inputs.size(); i++) {
+							// allowing the estimate to be 30% off
+							tests.add(new Object[] {et, inputs.get(i), actualUnique.get(i), ht, null, null, 0.3});
+						}
+					}
+				}
+			}
+
+		}
+
+		return tests;
+	}
+
+	@Parameterized.Parameter
+	public AUType et;
+	@Parameterized.Parameter(1)
+	public MatrixBlock in;
+	@Parameterized.Parameter(2)
+	public long nrUnique;
+	@Parameterized.Parameter(3)
+	public HashType ht;
+
+	// Exception handling
+	@Parameterized.Parameter(4)
+	public Class<? extends Exception> expectedException;
+	@Parameterized.Parameter(5)
+	public String expectedExceptionMsg;
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	// allowing the estimate to be within 20% of target.
+	@Parameterized.Parameter(6)
+	public double epsilon;
+
+	@Test
+	public void testEstimation() {
+
+		// setup expected exception
+		if(expectedException != null) {
+			thrown.expect(expectedException);
+			thrown.expectMessage(expectedExceptionMsg);
+		}
+
+		Integer out = 0;
+		EstimatorOperator op = new EstimatorOperator(et, ht);
+		try {
+			out = LibMatrixEstimator.estimateDistinctValues(in, op);
+		}
+		catch(DMLException e) {
+			System.out.println(e.getMessage());
+			throw e;
+		}
+		catch(NotImplementedException e) {
+			throw e;
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			Assert.assertTrue(
+				"EXCEPTION: " + e.getMessage() + " PARAMETERS: " + et + " , hashing: " + ht + " & input size:"
+					+ in.getNumRows() + "," + in.getNumColumns(),
+				false);
+		}
+
+		int count = out;
+		boolean success = Math.abs(nrUnique - count) <= nrUnique * epsilon;
+		Assert.assertTrue(et + " estimated " + count + " unique values, actual:" + nrUnique + " with eps of " + epsilon
+			+ " , hashing: " + ht + " & input size:" + in.getNumRows() + "," + in.getNumColumns(), success);
+

Review comment:
       drop the free lines before closing `}`

##########
File path: src/main/java/org/apache/sysds/hops/AggUnaryOp.java
##########
@@ -102,7 +102,10 @@ else if ((_op == AggOp.SUM    && (_direction == Direction.RowCol || _direction =
 					 || (_op == AggOp.MIN    && (_direction == Direction.RowCol || _direction == Direction.Row || _direction == Direction.Col))
 					 || (_op == AggOp.MEAN   && (_direction == Direction.RowCol || _direction == Direction.Row || _direction == Direction.Col))
 					 || (_op == AggOp.VAR    && (_direction == Direction.RowCol || _direction == Direction.Row || _direction == Direction.Col))
-					 || (_op == AggOp.PROD   && (_direction == Direction.RowCol))){
+					 || (_op == AggOp.PROD   && (_direction == Direction.RowCol))
+					 || (_op == AggOp.COUNT_DISTINCT && (_direction == Direction.RowCol))
+					 || (_op == AggOp.COUNT_DISTINCT_ESTIMATE_KMV && (_direction == Direction.RowCol))

Review comment:
       Really you support these on the GPU too?

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;
+
+	private LibMatrixEstimator() {
+		// Prevent instantiation via private constructor.
+	}
+
+	/**
+	 * Public method to count the number of distinct values inside a matrix. Depending on which EstimatorOperator
+	 * selected it either gets the absolute number or a estimated value.
+	 * 
+	 * TODO: support counting num distinct in rows, or columns axis.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @param op  the selected operator to use
+	 * @return the distinct count
+	 */
+	public static int estimateDistinctValues(MatrixBlock in, EstimatorOperator op) {
+		// set output to correct size.
+		
+		// TODO: If the MatrixBlock type is CompressedMatrix, simply read the vaules from the ColGroups.
+
+		if(op.hashType == HashType.ExpHash && op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_KMV) {
+			throw new DMLException(
+				"Invalid hashing configuration using " + HashType.ExpHash + " and " + AUType.COUNT_DISTINCT_ESTIMATE_KMV);
+		}
+
+		if(op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG)
+			throw new NotImplementedException("HyperLogLog not implemented");
+
+		// Just use naive implementation if the size is small.
+		if(in.getNumRows() * in.getNumColumns() < minimumSize) {
+			return CountDistinctValuesNaive(in);

Review comment:
       Why? 64000 values is a lot, and in distributed operations we only see blocks of 1k rows.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
##########
@@ -152,6 +169,23 @@ else if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {
 				ec.setScalarOutput(output_name, new StringObject(Explain.explain(li)));
 				break;
 			}
+			case COUNT_DISTINCT:
+			case COUNT_DISTINCT_ESTIMATE_KMV:
+			case COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG: {
+				System.out.print(ec.getVariables().keySet());
+				if( !ec.getVariables().keySet().contains(input1.getName()) )
+					throw new DMLRuntimeException("Variable '" + input1.getName() + "' does not exist.");
+
+				MatrixBlock input = ec.getMatrixInput(input1.getName());
+				
+				EstimatorOperator op = new EstimatorOperator(_type);
+				int res = LibMatrixEstimator.estimateDistinctValues(input, op);
+				if (res == 0)
+					throw new DMLRuntimeException("Imposible estimate of distinct values");

Review comment:
       Maybe rather check the validity inside the `LibMatrixEstimator` so we can use it safely in other places of the system too.

##########
File path: src/main/java/org/apache/sysds/lops/PartialAggregate.java
##########
@@ -340,6 +338,17 @@ else if( dir == Direction.Col )
 					return "uaktrace";
 				break;
 			}
+
+			case COUNT_DISTINCT: {
+				if(dir == Direction.RowCol )
+					return "countDistinct";
+				break;
+			}
+			case COUNT_DISTINCT_ESTIMATE_KMV: {
+				if(dir == Direction.RowCol )
+					return "countDistinctEstimateKMV";

Review comment:
       could we find some shorter opcodes and start them with ua (for unary aggregate)?

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
##########
@@ -152,6 +169,23 @@ else if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {
 				ec.setScalarOutput(output_name, new StringObject(Explain.explain(li)));
 				break;
 			}
+			case COUNT_DISTINCT:
+			case COUNT_DISTINCT_ESTIMATE_KMV:
+			case COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG: {
+				System.out.print(ec.getVariables().keySet());

Review comment:
       Remove these debugging outputs.

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);

Review comment:
       Maybe better just set this single class to the debug level.

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;

Review comment:
       `public static int`

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;
+
+	private LibMatrixEstimator() {
+		// Prevent instantiation via private constructor.
+	}
+
+	/**
+	 * Public method to count the number of distinct values inside a matrix. Depending on which EstimatorOperator
+	 * selected it either gets the absolute number or a estimated value.
+	 * 
+	 * TODO: support counting num distinct in rows, or columns axis.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @param op  the selected operator to use
+	 * @return the distinct count
+	 */
+	public static int estimateDistinctValues(MatrixBlock in, EstimatorOperator op) {
+		// set output to correct size.
+		
+		// TODO: If the MatrixBlock type is CompressedMatrix, simply read the vaules from the ColGroups.
+
+		if(op.hashType == HashType.ExpHash && op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_KMV) {
+			throw new DMLException(
+				"Invalid hashing configuration using " + HashType.ExpHash + " and " + AUType.COUNT_DISTINCT_ESTIMATE_KMV);
+		}
+
+		if(op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG)
+			throw new NotImplementedException("HyperLogLog not implemented");
+
+		// Just use naive implementation if the size is small.
+		if(in.getNumRows() * in.getNumColumns() < minimumSize) {
+			return CountDistinctValuesNaive(in);
+		}
+
+		switch(op.operatorType) {
+			case COUNT_DISTINCT:
+				return CountDistinctValuesNaive(in);
+			case COUNT_DISTINCT_ESTIMATE_KMV:
+				return CountDistinctValuesKVM(in, op);
+			case COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG:
+				return CountDistinctHyperLogLog(in);
+			default:
+				throw new DMLException("Invalid or not implemented Estimator Type");
+		}
+	}
+
+	/**
+	 * Naive implementation of counting Distinct values. Benefit Precise, but Uses memory, on the scale of input.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesNaive(MatrixBlock in) {
+		// Make a hash set to contain all the distinct.
+		// Memory usage scale linear with number of distinct values.
+
+		Set<Double> distinct = new HashSet<Double>();
+
+		for(int c = 0; c < in.getNumColumns(); c++) {
+			for(int r = 0; r < in.getNumRows(); r++) {
+				distinct.add(in.getValue(r, c));

Review comment:
       Avoid `getValue` and add branches for dense and sparse data access.

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;
+
+	private LibMatrixEstimator() {
+		// Prevent instantiation via private constructor.
+	}
+
+	/**
+	 * Public method to count the number of distinct values inside a matrix. Depending on which EstimatorOperator
+	 * selected it either gets the absolute number or a estimated value.
+	 * 
+	 * TODO: support counting num distinct in rows, or columns axis.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @param op  the selected operator to use
+	 * @return the distinct count
+	 */
+	public static int estimateDistinctValues(MatrixBlock in, EstimatorOperator op) {
+		// set output to correct size.
+		
+		// TODO: If the MatrixBlock type is CompressedMatrix, simply read the vaules from the ColGroups.
+
+		if(op.hashType == HashType.ExpHash && op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_KMV) {
+			throw new DMLException(
+				"Invalid hashing configuration using " + HashType.ExpHash + " and " + AUType.COUNT_DISTINCT_ESTIMATE_KMV);
+		}
+
+		if(op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG)
+			throw new NotImplementedException("HyperLogLog not implemented");
+
+		// Just use naive implementation if the size is small.
+		if(in.getNumRows() * in.getNumColumns() < minimumSize) {
+			return CountDistinctValuesNaive(in);
+		}
+
+		switch(op.operatorType) {
+			case COUNT_DISTINCT:
+				return CountDistinctValuesNaive(in);
+			case COUNT_DISTINCT_ESTIMATE_KMV:
+				return CountDistinctValuesKVM(in, op);
+			case COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG:
+				return CountDistinctHyperLogLog(in);
+			default:
+				throw new DMLException("Invalid or not implemented Estimator Type");
+		}
+	}
+
+	/**
+	 * Naive implementation of counting Distinct values. Benefit Precise, but Uses memory, on the scale of input.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesNaive(MatrixBlock in) {
+		// Make a hash set to contain all the distinct.
+		// Memory usage scale linear with number of distinct values.
+
+		Set<Double> distinct = new HashSet<Double>();
+
+		for(int c = 0; c < in.getNumColumns(); c++) {
+			for(int r = 0; r < in.getNumRows(); r++) {
+				distinct.add(in.getValue(r, c));
+			}
+		}
+
+		return distinct.size();
+
+		// debugging
+		// LOG.debug("Distinct actual array: " + Arrays.toString(distinct.toArray()));
+		// LOG.debug("Number destinct: " + out);
+
+	}
+
+	/**
+	 * KMV synopsis(for k minimum values) Distinct-Value Estimation
+	 * 
+	 * Kevin S. Beyer, Peter J. Haas, Berthold Reinwald, Yannis Sismanis, Rainer Gemulla: On synopses for distinct‐value
+	 * estimation under multiset operations. SIGMOD 2007
+	 * 
+	 * @param in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesKVM(MatrixBlock in, EstimatorOperator op) {
+
+		// D is the number of possible distinct values in the MatrixBlock.
+		// As a default we set this number to numRows * numCols
+		int D = in.getNumRows() * in.getNumColumns();
+
+		// To ensure that the likelihood to hash to the same value we need O(D^2) bits
+		// to hash to assign.
+		// This is handled by our custom bit array class.
+		long tmp = (long) D * (long) D;
+		LOG.debug("M not forced to int size: " + tmp);
+		int M = (tmp > (long) Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) tmp;
+
+		// the estimator is asymptotically unbiased as k becomes large
+		// memory scales with k.
+		// D >> k >> 0
+		int k = 1024;
+
+		PriorityQueue<Integer> smallestHashes = new PriorityQueue<>(k, Collections.reverseOrder());
+
+		// cache for common elements.
+		Set<Integer> cache = new HashSet<>();
+		// int maxCache = 512;
+
+		// int gccCount = 0;
+
+		for(int c = 0; c < in.getNumColumns(); c++) {
+			for(int r = 0; r < in.getNumRows(); r++) {
+				double input = in.getValue(r, c);

Review comment:
       Again, use separate dense or sparse branches.

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;
+
+	private LibMatrixEstimator() {
+		// Prevent instantiation via private constructor.
+	}
+
+	/**
+	 * Public method to count the number of distinct values inside a matrix. Depending on which EstimatorOperator
+	 * selected it either gets the absolute number or a estimated value.
+	 * 
+	 * TODO: support counting num distinct in rows, or columns axis.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @param op  the selected operator to use
+	 * @return the distinct count
+	 */
+	public static int estimateDistinctValues(MatrixBlock in, EstimatorOperator op) {
+		// set output to correct size.
+		
+		// TODO: If the MatrixBlock type is CompressedMatrix, simply read the vaules from the ColGroups.
+
+		if(op.hashType == HashType.ExpHash && op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_KMV) {
+			throw new DMLException(
+				"Invalid hashing configuration using " + HashType.ExpHash + " and " + AUType.COUNT_DISTINCT_ESTIMATE_KMV);
+		}
+
+		if(op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG)
+			throw new NotImplementedException("HyperLogLog not implemented");
+
+		// Just use naive implementation if the size is small.
+		if(in.getNumRows() * in.getNumColumns() < minimumSize) {
+			return CountDistinctValuesNaive(in);
+		}
+
+		switch(op.operatorType) {
+			case COUNT_DISTINCT:
+				return CountDistinctValuesNaive(in);
+			case COUNT_DISTINCT_ESTIMATE_KMV:
+				return CountDistinctValuesKVM(in, op);
+			case COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG:
+				return CountDistinctHyperLogLog(in);
+			default:
+				throw new DMLException("Invalid or not implemented Estimator Type");
+		}
+	}
+
+	/**
+	 * Naive implementation of counting Distinct values. Benefit Precise, but Uses memory, on the scale of input.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesNaive(MatrixBlock in) {
+		// Make a hash set to contain all the distinct.
+		// Memory usage scale linear with number of distinct values.
+
+		Set<Double> distinct = new HashSet<Double>();
+
+		for(int c = 0; c < in.getNumColumns(); c++) {
+			for(int r = 0; r < in.getNumRows(); r++) {
+				distinct.add(in.getValue(r, c));
+			}
+		}
+
+		return distinct.size();
+
+		// debugging
+		// LOG.debug("Distinct actual array: " + Arrays.toString(distinct.toArray()));
+		// LOG.debug("Number destinct: " + out);
+
+	}
+
+	/**
+	 * KMV synopsis(for k minimum values) Distinct-Value Estimation
+	 * 
+	 * Kevin S. Beyer, Peter J. Haas, Berthold Reinwald, Yannis Sismanis, Rainer Gemulla: On synopses for distinct‐value
+	 * estimation under multiset operations. SIGMOD 2007
+	 * 
+	 * @param in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesKVM(MatrixBlock in, EstimatorOperator op) {
+
+		// D is the number of possible distinct values in the MatrixBlock.
+		// As a default we set this number to numRows * numCols
+		int D = in.getNumRows() * in.getNumColumns();
+
+		// To ensure that the likelihood to hash to the same value we need O(D^2) bits
+		// to hash to assign.
+		// This is handled by our custom bit array class.
+		long tmp = (long) D * (long) D;
+		LOG.debug("M not forced to int size: " + tmp);
+		int M = (tmp > (long) Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) tmp;
+
+		// the estimator is asymptotically unbiased as k becomes large
+		// memory scales with k.
+		// D >> k >> 0
+		int k = 1024;
+
+		PriorityQueue<Integer> smallestHashes = new PriorityQueue<>(k, Collections.reverseOrder());
+
+		// cache for common elements.
+		Set<Integer> cache = new HashSet<>();

Review comment:
       Positioning this structure as a cache is somewhat misleading, as you seem to use it for containment checks to avoid uniqueness in the priority queue. Maybe create a private class that covers both under a simple PriorityQueue that also ensures uniqueness.

##########
File path: src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixEstimator.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.matrix.data;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.utils.Hash;
+import org.apache.sysds.utils.Hash.HashType;
+
+/**
+ * This class contains estimation operations for matrix block.
+ */
+public class LibMatrixEstimator {
+
+	// ------------------------------
+	// Logging parameters:
+	// local debug flag
+	private static final boolean LOCAL_DEBUG = true;
+	// DEBUG/TRACE for details
+	private static final Level LOCAL_DEBUG_LEVEL = Level.DEBUG;
+
+	private static final Log LOG = LogFactory.getLog(LibMatrixEstimator.class.getName());
+
+	static {
+		// for internal debugging only
+		if(LOCAL_DEBUG) {
+			Logger.getLogger("org.apache.sysds.runtime.matrix.data").setLevel(LOCAL_DEBUG_LEVEL);
+		}
+	}
+	// ------------------------------
+
+	// public enum EstimatorType {
+	// 	NUM_DISTINCT_COUNT, // Baseline naive implementation, iterate though, add to hashMap.
+	// 	NUM_DISTINCT_KMV, // K-Minimum Values algorithm.
+	// 	NUM_DISTINCT_HYPER_LOG_LOG // HyperLogLog algorithm.
+	// }
+
+	static public int minimumSize = 64000;
+
+	private LibMatrixEstimator() {
+		// Prevent instantiation via private constructor.
+	}
+
+	/**
+	 * Public method to count the number of distinct values inside a matrix. Depending on which EstimatorOperator
+	 * selected it either gets the absolute number or a estimated value.
+	 * 
+	 * TODO: support counting num distinct in rows, or columns axis.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @param op  the selected operator to use
+	 * @return the distinct count
+	 */
+	public static int estimateDistinctValues(MatrixBlock in, EstimatorOperator op) {
+		// set output to correct size.
+		
+		// TODO: If the MatrixBlock type is CompressedMatrix, simply read the vaules from the ColGroups.
+
+		if(op.hashType == HashType.ExpHash && op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_KMV) {
+			throw new DMLException(
+				"Invalid hashing configuration using " + HashType.ExpHash + " and " + AUType.COUNT_DISTINCT_ESTIMATE_KMV);
+		}
+
+		if(op.operatorType == AUType.COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG)
+			throw new NotImplementedException("HyperLogLog not implemented");
+
+		// Just use naive implementation if the size is small.
+		if(in.getNumRows() * in.getNumColumns() < minimumSize) {
+			return CountDistinctValuesNaive(in);
+		}
+
+		switch(op.operatorType) {
+			case COUNT_DISTINCT:
+				return CountDistinctValuesNaive(in);
+			case COUNT_DISTINCT_ESTIMATE_KMV:
+				return CountDistinctValuesKVM(in, op);
+			case COUNT_DISTINCT_ESTIMATE_HYPER_LOG_LOG:
+				return CountDistinctHyperLogLog(in);
+			default:
+				throw new DMLException("Invalid or not implemented Estimator Type");
+		}
+	}
+
+	/**
+	 * Naive implementation of counting Distinct values. Benefit Precise, but Uses memory, on the scale of input.
+	 * 
+	 * @param in  the input matrix to count number distinct values in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesNaive(MatrixBlock in) {
+		// Make a hash set to contain all the distinct.
+		// Memory usage scale linear with number of distinct values.
+
+		Set<Double> distinct = new HashSet<Double>();
+
+		for(int c = 0; c < in.getNumColumns(); c++) {
+			for(int r = 0; r < in.getNumRows(); r++) {
+				distinct.add(in.getValue(r, c));
+			}
+		}
+
+		return distinct.size();
+
+		// debugging
+		// LOG.debug("Distinct actual array: " + Arrays.toString(distinct.toArray()));
+		// LOG.debug("Number destinct: " + out);
+
+	}
+
+	/**
+	 * KMV synopsis(for k minimum values) Distinct-Value Estimation
+	 * 
+	 * Kevin S. Beyer, Peter J. Haas, Berthold Reinwald, Yannis Sismanis, Rainer Gemulla: On synopses for distinct‐value
+	 * estimation under multiset operations. SIGMOD 2007
+	 * 
+	 * @param in
+	 * @return the distinct count
+	 */
+	private static int CountDistinctValuesKVM(MatrixBlock in, EstimatorOperator op) {

Review comment:
       Besides the extensions for row/col aggregates, we should also make a TODO for multi-threaded operations, which would require explicit partitioning or a thread-safe priority queue.

##########
File path: src/main/java/org/apache/sysds/runtime/util/DataConverter.java
##########
@@ -456,6 +456,23 @@ public static MatrixBlock convertToMatrixBlock( double[][] data ) {
 		return mb;
 	}
 
+
+	/**
+	 * Converts an Integer matrix to an MatrixBlock (FP32)
+	 * 
+	 * @param data Int matrix input that is converted to double MatrixBlock
+	 * @return The matrixBlock constructed.
+	 */
+	public static MatrixBlock convertToMatrixBlock(int[][] data){
+		double[][] input = new double[data.length][];
+		int index = 0;
+		for(int[] x: data){
+			input[index] = Arrays.stream(x).asDoubleStream().toArray();
+			index++;
+		}
+		return convertToMatrixBlock(input);

Review comment:
       Hm, maybe create the matrix block first and use append, to avoid the unnecessary second allocation and copy.

##########
File path: src/test/java/org/apache/sysds/test/component/matrix/MatrixEstimatorTest.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.matrix;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLException;
+import org.apache.sysds.runtime.instructions.cp.AggregateUnaryCPInstruction.AUType;
+import org.apache.sysds.runtime.matrix.data.LibMatrixEstimator;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.EstimatorOperator;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Hash.HashType;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class MatrixEstimatorTest {
+
+	private static AUType[] esT = new AUType[] {
+		// The different types of Estimators
+		AUType.COUNT_DISTINCT,
+		// EstimatorType.NUM_DISTINCT_KMV,
+		// EstimatorType.NUM_DISTINCT_HYPER_LOG_LOG
+	};
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+		ArrayList<MatrixBlock> inputs = new ArrayList<>();
+		ArrayList<Long> actualUnique = new ArrayList<>();
+
+		// single value matrix.
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1, 1, 0.0, 100.0, 1, 7)));
+		actualUnique.add(1L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1, 100, 0.0, 100.0, 1, 7)));
+		actualUnique.add(100L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 1, 0.0, 100.0, 1, 7)));
+		actualUnique.add(100L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(100, 100, 0.0, 100.0, 1, 7)));
+		// actualUnique.add(10000L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrix(1024, 1024, 0.0, 100.0, 1, 7)));
+		// actualUnique.add(1024L * 1024L);
+
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(5000, 5000, 1, 100, 1, 8)));
+		actualUnique.add(99L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10240, 1, 100, 1, 7)));
+		actualUnique.add(99L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240, 1024, 1, 100, 1, 7)));
+		actualUnique.add(99L);
+
+		// inputs.add(
+		// DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 1024, 1000001, 1000100, 1, 8)));
+		// actualUnique.add(99L);
+		// inputs.add(
+		// DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10240, 1000001, 1000100, 1, 7)));
+		// actualUnique.add(99L);
+
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10241, 1, 1500, 1, 7)));
+		actualUnique.add(1499L);
+
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10241, 0, 3000, 1, 7)));
+		actualUnique.add(3000L);
+		inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(1024, 10241, 0, 6000, 1, 7)));
+		actualUnique.add(6000L);
+		// inputs.add(DataConverter.convertToMatrixBlock(TestUtils.generateTestMatrixIntV(10240, 10241, 0, 10000, 1,

Review comment:
       Drop all commented code.

##########
File path: src/main/java/org/apache/sysds/utils/Hash.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.utils;
+
+import java.util.Random;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * A class containing different hashing functions.
+ */
+public class Hash {
+
+	public enum HashType {
+		StandardJava, LinearHash, ExpHash
+	}
+
+	static public int hash(Object o, HashType ht) {
+		int hashcode = o.hashCode();
+		if(hashcode == 0) {
+			// Special case handling if 0 input.
+			hashcode = Integer.hashCode(13241);
+		}
+		switch(ht) {
+			case StandardJava:
+				return hashcode;
+			case LinearHash:
+				return linearHash(hashcode);
+			case ExpHash:
+				return expHash(hashcode);
+			default:
+				throw new NotImplementedException("Not Implemented hashing combination");
+		}
+	}
+
+	// 32 random int values.
+	// generated values:
+	public static void main(String[] args) {
+		Random r = new Random(1324121);
+		for(int x = 0; x < 32; x++) {
+			System.out.println(String.format("0x%08X,", r.nextInt()));
+		}
+	}
+
+	private static int[] a = {0x21ae4036, 0x32435171, 0xac3338cf, 0xea97b40c, 0x0e504b22, 0x9ff9a4ef, 0x111d014d,

Review comment:
       please place members at the top of the class.

##########
File path: src/test/java/org/apache/sysds/test/component/misc/UtilHash.java
##########
@@ -0,0 +1,87 @@
+package org.apache.sysds.test.component.misc;

Review comment:
       License missing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message