Feat: Add APPROX_MOST_FREQUENT Aggregation Function

This commit is contained in:
FearfulTomcat27 2025-05-28 10:01:56 +08:00 committed by GitHub
parent a70575f6fd
commit 4342166741
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 2869 additions and 10 deletions

15
LICENSE
View File

@ -270,3 +270,18 @@ The following files include code modified from Apache Kafka project.
Project page: https://github.com/apache/kafka
License: https://github.com/apache/kafka/blob/trunk/LICENSE
--------------------------------------------------------------------------------
The following files include code modified from Stream-Lib project.
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/Counter.java
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/DoublyLinkedList.java
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/ExternalizableUtil.java
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/ITopK.java
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/ListNode2.java
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/Pair.java
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/StreamSummary.java
Project page: https://github.com/addthis/stream-lib
License: https://github.com/addthis/stream-lib/blob/master/LICENSE.txt

View File

@ -4165,6 +4165,51 @@ public class IoTDBTableAggregationIT {
DATABASE_NAME);
}
@Test
public void approxMostFrequentTest() {
String[] expectedHeader = buildHeaders(7);
String[] retArray =
new String[] {
"{\"50000\":8},{\"30.0\":8},{\"55.0\":8},{\"true\":12},{\"0xcafebabe55\":8},{\"1727158540000\":12},{\"20240924\":20},"
};
tableResultSetEqualTest(
"select approx_most_frequent(s2, 1, 10), approx_most_frequent(s3, 1, 10), approx_most_frequent(s4, 1, 10), approx_most_frequent(s5, 1, 10), approx_most_frequent(s8, 1, 10), approx_most_frequent(s9, 1, 10), approx_most_frequent(s10, 1, 10) from table1",
expectedHeader,
retArray,
DATABASE_NAME);
expectedHeader = new String[] {"time", "province", "_col2"};
retArray =
new String[] {
"2024-09-24T06:15:30.000Z,beijing,{},",
"2024-09-24T06:15:31.000Z,beijing,{\"31000\":2},",
"2024-09-24T06:15:35.000Z,beijing,{\"35000\":2},",
"2024-09-24T06:15:36.000Z,beijing,{},",
"2024-09-24T06:15:40.000Z,beijing,{\"40000\":2},",
"2024-09-24T06:15:41.000Z,beijing,{},",
"2024-09-24T06:15:46.000Z,beijing,{\"46000\":2},",
"2024-09-24T06:15:50.000Z,beijing,{\"50000\":4},",
"2024-09-24T06:15:51.000Z,beijing,{},",
"2024-09-24T06:15:55.000Z,beijing,{},",
"2024-09-24T06:15:30.000Z,shanghai,{},",
"2024-09-24T06:15:31.000Z,shanghai,{\"31000\":2},",
"2024-09-24T06:15:35.000Z,shanghai,{\"35000\":2},",
"2024-09-24T06:15:36.000Z,shanghai,{},",
"2024-09-24T06:15:40.000Z,shanghai,{\"40000\":2},",
"2024-09-24T06:15:41.000Z,shanghai,{},",
"2024-09-24T06:15:46.000Z,shanghai,{\"46000\":2},",
"2024-09-24T06:15:50.000Z,shanghai,{\"50000\":4},",
"2024-09-24T06:15:51.000Z,shanghai,{},",
"2024-09-24T06:15:55.000Z,shanghai,{},",
};
tableResultSetEqualTest(
"SELECT time,province,approx_most_frequent(s2, 1, 10) from table1 group by 1,2 order by 2,1",
expectedHeader,
retArray,
DATABASE_NAME);
}
@Test
public void exceptionTest() {
tableAssertTestFail(
@ -4211,6 +4256,14 @@ public class IoTDBTableAggregationIT {
"select approx_count_distinct(province, 'test') from table1",
"701: Second argument of Aggregate functions [approx_count_distinct] should be numberic type and do not use expression",
DATABASE_NAME);
tableAssertTestFail(
"select approx_most_frequent(province, -10, 100) from table1",
"701: The second and third argument must be greater than 0, but got k=-10, capacity=100",
DATABASE_NAME);
tableAssertTestFail(
"select approx_most_frequent(province, 'test', 100) from table1",
"701: The second and third argument of 'approx_most_frequent' function must be numeric literal",
DATABASE_NAME);
}
// ==================================================================

View File

@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import com.google.gson.Gson;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.utils.Binary;
import java.nio.charset.StandardCharsets;
public abstract class AbstractApproxMostFrequentAccumulator<T> implements TableAccumulator {
protected SpaceSavingStateFactory.SingleSpaceSavingState<T> state =
SpaceSavingStateFactory.createSingleState();
@Override
public void evaluateIntermediate(ColumnBuilder columnBuilder) {
columnBuilder.writeBinary(new Binary(state.getSpaceSaving().serialize()));
}
@Override
public void evaluateFinal(ColumnBuilder columnBuilder) {
columnBuilder.writeBinary(
new Binary(new Gson().toJson(state.getSpaceSaving().getBuckets()), StandardCharsets.UTF_8));
}
@Override
public boolean hasFinalResult() {
return false;
}
@Override
public void addStatistics(Statistics[] statistics) {
throw new UnsupportedOperationException(
"ApproxMostFrequentAccumulator does not support statistics");
}
@Override
public void reset() {
state.getSpaceSaving().reset();
}
}

View File

@ -22,6 +22,11 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.agg
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
import org.apache.iotdb.db.queryengine.execution.aggregation.VarianceAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.BinaryGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.BlobGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.BooleanGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.DoubleGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.FloatGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxCountDistinctAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
@ -41,6 +46,8 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggr
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedUserDefinedAggregateAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedVarianceAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.IntGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.LongGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.MarkDistinctHash;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@ -57,6 +64,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.block.column.IntColumn;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import java.util.Arrays;
import java.util.List;
@ -246,6 +254,8 @@ public class AccumulatorFactory {
inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
case APPROX_COUNT_DISTINCT:
return new GroupedApproxCountDistinctAccumulator(inputDataTypes.get(0));
case APPROX_MOST_FREQUENT:
return getGroupedApproxMostFrequentAccumulator(inputDataTypes.get(0));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
}
@ -313,11 +323,63 @@ public class AccumulatorFactory {
inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
case APPROX_COUNT_DISTINCT:
return new ApproxCountDistinctAccumulator(inputDataTypes.get(0));
case APPROX_MOST_FREQUENT:
return getApproxMostFrequentAccumulator(inputDataTypes.get(0));
default:
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
}
}
public static GroupedAccumulator getGroupedApproxMostFrequentAccumulator(TSDataType type) {
switch (type) {
case BOOLEAN:
return new BooleanGroupedApproxMostFrequentAccumulator();
case INT32:
case DATE:
return new IntGroupedApproxMostFrequentAccumulator();
case INT64:
case TIMESTAMP:
return new LongGroupedApproxMostFrequentAccumulator();
case FLOAT:
return new FloatGroupedApproxMostFrequentAccumulator();
case DOUBLE:
return new DoubleGroupedApproxMostFrequentAccumulator();
case TEXT:
case STRING:
return new BinaryGroupedApproxMostFrequentAccumulator();
case BLOB:
return new BlobGroupedApproxMostFrequentAccumulator();
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: %s", type));
}
}
public static TableAccumulator getApproxMostFrequentAccumulator(TSDataType type) {
switch (type) {
case BOOLEAN:
return new BooleanApproxMostFrequentAccumulator();
case INT32:
case DATE:
return new IntApproxMostFrequentAccumulator();
case INT64:
case TIMESTAMP:
return new LongApproxMostFrequentAccumulator();
case FLOAT:
return new FloatApproxMostFrequentAccumulator();
case DOUBLE:
return new DoubleApproxMostFrequentAccumulator();
case TEXT:
case STRING:
return new BinaryApproxMostFrequentAccumulator();
case BLOB:
return new BlobApproxMostFrequentAccumulator();
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: %s", type));
}
}
public static boolean isMultiInputAggregation(TAggregationType aggregationType) {
switch (aggregationType) {
case MAX_BY:

View File

@ -14,6 +14,9 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLogStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
@ -24,7 +27,7 @@ import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog.DEFAULT_STANDARD_ERROR;
public class ApproxCountDistinctAccumulator implements TableAccumulator {
private static final long INSTANCE_SIZE =

View File

@ -0,0 +1,130 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
public class BinaryApproxMostFrequentAccumulator
extends AbstractApproxMostFrequentAccumulator<Binary> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(BinaryApproxMostFrequentAccumulator.class);
public BinaryApproxMostFrequentAccumulator() {}
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE
+ RamUsageEstimator.shallowSizeOfInstance(BinaryApproxMostFrequentAccumulator.class)
+ state.getEstimatedSize();
}
@Override
public TableAccumulator copy() {
return new BinaryApproxMostFrequentAccumulator();
}
@Override
public void addInput(Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSaving<Binary> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
int positionCount = mask.getPositionCount();
Column valueColumn = arguments[0];
if (mask.isSelectAll()) {
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
spaceSaving.add(valueColumn.getBinary(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
spaceSaving.add(valueColumn.getBinary(position));
}
}
}
}
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (!argument.isNull(i)) {
SpaceSaving<Binary> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
BinaryApproxMostFrequentAccumulator::serializeBucket,
BinaryApproxMostFrequentAccumulator::deserializeBucket,
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(current);
}
}
}
public static SpaceSaving<Binary> getOrCreateSpaceSaving(
SpaceSavingStateFactory.SingleSpaceSavingState<Binary> state, int maxBuckets, int capacity) {
SpaceSaving<Binary> spaceSaving = state.getSpaceSaving();
if (spaceSaving == null) {
spaceSaving =
new SpaceSaving<>(
maxBuckets,
capacity,
BinaryApproxMostFrequentAccumulator::serializeBucket,
BinaryApproxMostFrequentAccumulator::deserializeBucket,
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
state.setSpaceSaving(spaceSaving);
}
return spaceSaving;
}
public static void serializeBucket(Binary key, long count, ByteBuffer output) {
ReadWriteIOUtils.write(key, output);
ReadWriteIOUtils.write(count, output);
}
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Binary> spaceSaving) {
Binary key = ReadWriteIOUtils.readBinary(input);
long count = ReadWriteIOUtils.readLong(input);
spaceSaving.add(key, count);
}
public static int calculateKeyByte(List<Counter<Binary>> counters) {
return counters.stream().mapToInt(counter -> counter.getItem().getLength()).sum();
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import com.google.gson.Gson;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BytesUtils;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
public class BlobApproxMostFrequentAccumulator extends BinaryApproxMostFrequentAccumulator {
@Override
public void evaluateFinal(ColumnBuilder columnBuilder) {
Map<Binary, Long> buckets = state.getSpaceSaving().getBuckets();
Map<String, Long> formatedBuckets =
buckets.entrySet().stream()
.collect(
Collectors.toMap(
entry -> BytesUtils.parseBlobByteArrayToString(entry.getKey().getValues()),
Map.Entry::getValue));
columnBuilder.writeBinary(
new Binary(new Gson().toJson(formatedBuckets), StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
public class BooleanApproxMostFrequentAccumulator
extends AbstractApproxMostFrequentAccumulator<Boolean> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(BooleanApproxMostFrequentAccumulator.class);
public BooleanApproxMostFrequentAccumulator() {}
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE
+ RamUsageEstimator.shallowSizeOfInstance(BooleanApproxMostFrequentAccumulator.class)
+ state.getEstimatedSize();
}
@Override
public TableAccumulator copy() {
return new BooleanApproxMostFrequentAccumulator();
}
@Override
public void addInput(Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSaving<Boolean> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
int positionCount = mask.getPositionCount();
Column valueColumn = arguments[0];
if (mask.isSelectAll()) {
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
spaceSaving.add(valueColumn.getBoolean(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
spaceSaving.add(valueColumn.getBoolean(position));
}
}
}
}
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (!argument.isNull(i)) {
SpaceSaving<Boolean> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
BooleanApproxMostFrequentAccumulator::serializeBucket,
BooleanApproxMostFrequentAccumulator::deserializeBucket,
BooleanApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(current);
}
}
}
public static SpaceSaving<Boolean> getOrCreateSpaceSaving(
SpaceSavingStateFactory.SingleSpaceSavingState<Boolean> state, int maxBuckets, int capacity) {
SpaceSaving<Boolean> spaceSaving = state.getSpaceSaving();
if (spaceSaving == null) {
spaceSaving =
new SpaceSaving<>(
maxBuckets,
capacity,
BooleanApproxMostFrequentAccumulator::serializeBucket,
BooleanApproxMostFrequentAccumulator::deserializeBucket,
BooleanApproxMostFrequentAccumulator::calculateKeyByte);
state.setSpaceSaving(spaceSaving);
}
return spaceSaving;
}
public static void serializeBucket(Boolean key, long count, ByteBuffer output) {
ReadWriteIOUtils.write(key, output);
ReadWriteIOUtils.write(count, output);
}
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Boolean> spaceSaving) {
boolean key = ReadWriteIOUtils.readBoolean(input);
long count = ReadWriteIOUtils.readLong(input);
spaceSaving.add(key, count);
}
public static int calculateKeyByte(List<Counter<Boolean>> counters) {
return counters.stream().mapToInt(counter -> 1).sum();
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
public class DoubleApproxMostFrequentAccumulator
extends AbstractApproxMostFrequentAccumulator<Double> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(DoubleApproxMostFrequentAccumulator.class);
public DoubleApproxMostFrequentAccumulator() {}
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE
+ RamUsageEstimator.shallowSizeOfInstance(DoubleApproxMostFrequentAccumulator.class)
+ state.getEstimatedSize();
}
@Override
public TableAccumulator copy() {
return new DoubleApproxMostFrequentAccumulator();
}
@Override
public void addInput(Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSaving<Double> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
int positionCount = mask.getPositionCount();
Column valueColumn = arguments[0];
if (mask.isSelectAll()) {
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
spaceSaving.add(valueColumn.getDouble(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
spaceSaving.add(valueColumn.getDouble(position));
}
}
}
}
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (!argument.isNull(i)) {
SpaceSaving<Double> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
DoubleApproxMostFrequentAccumulator::serializeBucket,
DoubleApproxMostFrequentAccumulator::deserializeBucket,
DoubleApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(current);
}
}
}
public static SpaceSaving<Double> getOrCreateSpaceSaving(
SpaceSavingStateFactory.SingleSpaceSavingState<Double> state, int maxBuckets, int capacity) {
SpaceSaving<Double> spaceSaving = state.getSpaceSaving();
if (spaceSaving == null) {
spaceSaving =
new SpaceSaving<>(
maxBuckets,
capacity,
DoubleApproxMostFrequentAccumulator::serializeBucket,
DoubleApproxMostFrequentAccumulator::deserializeBucket,
DoubleApproxMostFrequentAccumulator::calculateKeyByte);
state.setSpaceSaving(spaceSaving);
}
return spaceSaving;
}
public static void serializeBucket(Double key, long count, ByteBuffer output) {
ReadWriteIOUtils.write(key, output);
ReadWriteIOUtils.write(count, output);
}
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Double> spaceSaving) {
double key = ReadWriteIOUtils.readDouble(input);
long count = ReadWriteIOUtils.readLong(input);
spaceSaving.add(key, count);
}
public static int calculateKeyByte(List<Counter<Double>> counters) {
return counters.stream().mapToInt(counter -> Double.BYTES).sum();
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
public class FloatApproxMostFrequentAccumulator
extends AbstractApproxMostFrequentAccumulator<Float> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(FloatApproxMostFrequentAccumulator.class);
public FloatApproxMostFrequentAccumulator() {}
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE
+ RamUsageEstimator.shallowSizeOfInstance(FloatApproxMostFrequentAccumulator.class)
+ state.getEstimatedSize();
}
@Override
public TableAccumulator copy() {
return new FloatApproxMostFrequentAccumulator();
}
@Override
public void addInput(Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSaving<Float> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
int positionCount = mask.getPositionCount();
Column valueColumn = arguments[0];
if (mask.isSelectAll()) {
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
spaceSaving.add(valueColumn.getFloat(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
spaceSaving.add(valueColumn.getFloat(position));
}
}
}
}
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (!argument.isNull(i)) {
SpaceSaving<Float> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
FloatApproxMostFrequentAccumulator::serializeBucket,
FloatApproxMostFrequentAccumulator::deserializeBucket,
FloatApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(current);
}
}
}
public static SpaceSaving<Float> getOrCreateSpaceSaving(
SpaceSavingStateFactory.SingleSpaceSavingState<Float> state, int maxBuckets, int capacity) {
SpaceSaving<Float> spaceSaving = state.getSpaceSaving();
if (spaceSaving == null) {
spaceSaving =
new SpaceSaving<>(
maxBuckets,
capacity,
FloatApproxMostFrequentAccumulator::serializeBucket,
FloatApproxMostFrequentAccumulator::deserializeBucket,
FloatApproxMostFrequentAccumulator::calculateKeyByte);
state.setSpaceSaving(spaceSaving);
}
return spaceSaving;
}
public static void serializeBucket(Float key, long count, ByteBuffer output) {
ReadWriteIOUtils.write(key, output);
ReadWriteIOUtils.write(count, output);
}
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Float> spaceSaving) {
float key = ReadWriteIOUtils.readFloat(input);
long count = ReadWriteIOUtils.readLong(input);
spaceSaving.add(key, count);
}
public static int calculateKeyByte(List<Counter<Float>> counters) {
return counters.stream().mapToInt(counter -> Float.BYTES).sum();
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
public class IntApproxMostFrequentAccumulator
extends AbstractApproxMostFrequentAccumulator<Integer> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(IntApproxMostFrequentAccumulator.class);
public IntApproxMostFrequentAccumulator() {}
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE
+ RamUsageEstimator.shallowSizeOfInstance(IntApproxMostFrequentAccumulator.class)
+ state.getEstimatedSize();
}
@Override
public TableAccumulator copy() {
return new IntApproxMostFrequentAccumulator();
}
@Override
public void addInput(Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSaving<Integer> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
int positionCount = mask.getPositionCount();
Column valueColumn = arguments[0];
if (mask.isSelectAll()) {
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
spaceSaving.add(valueColumn.getInt(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
spaceSaving.add(valueColumn.getInt(position));
}
}
}
}
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (!argument.isNull(i)) {
SpaceSaving<Integer> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
IntApproxMostFrequentAccumulator::serializeBucket,
IntApproxMostFrequentAccumulator::deserializeBucket,
IntApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(current);
}
}
}
public static SpaceSaving<Integer> getOrCreateSpaceSaving(
SpaceSavingStateFactory.SingleSpaceSavingState<Integer> state, int maxBuckets, int capacity) {
SpaceSaving<Integer> spaceSaving = state.getSpaceSaving();
if (spaceSaving == null) {
spaceSaving =
new SpaceSaving<>(
maxBuckets,
capacity,
IntApproxMostFrequentAccumulator::serializeBucket,
IntApproxMostFrequentAccumulator::deserializeBucket,
IntApproxMostFrequentAccumulator::calculateKeyByte);
state.setSpaceSaving(spaceSaving);
}
return spaceSaving;
}
public static void serializeBucket(Integer key, long count, ByteBuffer output) {
ReadWriteIOUtils.write(key, output);
ReadWriteIOUtils.write(count, output);
}
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Integer> spaceSaving) {
int key = ReadWriteIOUtils.readInt(input);
long count = ReadWriteIOUtils.readLong(input);
spaceSaving.add(key, count);
}
public static int calculateKeyByte(List<Counter<Integer>> counters) {
return counters.stream().mapToInt(counter -> Integer.BYTES).sum();
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
public class LongApproxMostFrequentAccumulator extends AbstractApproxMostFrequentAccumulator<Long> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(LongApproxMostFrequentAccumulator.class);
public LongApproxMostFrequentAccumulator() {}
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE
+ RamUsageEstimator.shallowSizeOfInstance(LongApproxMostFrequentAccumulator.class)
+ state.getEstimatedSize();
}
@Override
public TableAccumulator copy() {
return new LongApproxMostFrequentAccumulator();
}
@Override
public void addInput(Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSaving<Long> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
int positionCount = mask.getPositionCount();
Column valueColumn = arguments[0];
if (mask.isSelectAll()) {
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
spaceSaving.add(valueColumn.getLong(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
spaceSaving.add(valueColumn.getLong(position));
}
}
}
}
@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
if (!argument.isNull(i)) {
SpaceSaving<Long> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
LongApproxMostFrequentAccumulator::serializeBucket,
LongApproxMostFrequentAccumulator::deserializeBucket,
LongApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(current);
}
}
}
public static SpaceSaving<Long> getOrCreateSpaceSaving(
SpaceSavingStateFactory.SingleSpaceSavingState<Long> state, int maxBuckets, int capacity) {
SpaceSaving<Long> spaceSaving = state.getSpaceSaving();
if (spaceSaving == null) {
spaceSaving =
new SpaceSaving<>(
maxBuckets,
capacity,
LongApproxMostFrequentAccumulator::serializeBucket,
LongApproxMostFrequentAccumulator::deserializeBucket,
LongApproxMostFrequentAccumulator::calculateKeyByte);
state.setSpaceSaving(spaceSaving);
}
return spaceSaving;
}
public static void serializeBucket(Long key, long count, ByteBuffer output) {
ReadWriteIOUtils.write(key, output);
ReadWriteIOUtils.write(count, output);
}
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Long> spaceSaving) {
long key = ReadWriteIOUtils.readLong(input);
long count = ReadWriteIOUtils.readLong(input);
spaceSaving.add(key, count);
}
public static int calculateKeyByte(List<Counter<Long>> counters) {
return counters.stream().mapToInt(counter -> Long.BYTES).sum();
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.nio.ByteBuffer;
public interface ApproxMostFrequentBucketDeserializer<K> {
void deserialize(ByteBuffer input, SpaceSaving<K> spaceSaving);
}

View File

@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.nio.ByteBuffer;
public interface ApproxMostFrequentBucketSerializer<K> {
void serialize(K key, long count, ByteBuffer output);
}

View File

@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
public class Counter<T> implements Externalizable {
protected ListNode2<StreamSummary<T>.Bucket> bucketNode;
protected T item;
protected long count;
protected long error;
/** For de-serialization */
public Counter() {}
public Counter(ListNode2<StreamSummary<T>.Bucket> bucket, T item) {
this.bucketNode = bucket;
this.count = 0;
this.error = 0;
this.item = item;
}
public T getItem() {
return item;
}
public long getCount() {
return count;
}
public long getError() {
return error;
}
@Override
public String toString() {
return item + ":" + count + ':' + error;
}
@SuppressWarnings("unchecked")
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
item = (T) in.readObject();
count = in.readLong();
error = in.readLong();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(item);
out.writeLong(count);
out.writeLong(error);
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
public class DoublyLinkedList<T> implements Iterable<T> {
protected int size;
protected ListNode2<T> tail;
protected ListNode2<T> head;
/** Append to head of list */
public ListNode2<T> add(T value) {
ListNode2<T> node = new ListNode2<T>(value);
if (size++ == 0) {
tail = node;
} else {
node.prev = head;
head.next = node;
}
head = node;
return node;
}
/** Prepend to tail of list */
public ListNode2<T> enqueue(T value) {
ListNode2<T> node = new ListNode2<T>(value);
if (size++ == 0) {
head = node;
} else {
node.next = tail;
tail.prev = node;
}
tail = node;
return node;
}
public void add(ListNode2<T> node) {
node.prev = head;
node.next = null;
if (size++ == 0) {
tail = node;
} else {
head.next = node;
}
head = node;
}
public ListNode2<T> addAfter(ListNode2<T> node, T value) {
ListNode2<T> newNode = new ListNode2<T>(value);
addAfter(node, newNode);
return newNode;
}
public void addAfter(ListNode2<T> node, ListNode2<T> newNode) {
newNode.next = node.next;
newNode.prev = node;
node.next = newNode;
if (newNode.next == null) {
head = newNode;
} else {
newNode.next.prev = newNode;
}
size++;
}
public void remove(ListNode2<T> node) {
if (node == tail) {
tail = node.next;
} else {
node.prev.next = node.next;
}
if (node == head) {
head = node.prev;
} else {
node.next.prev = node.prev;
}
size--;
}
public int size() {
return size;
}
@Override
public Iterator<T> iterator() {
return new DoublyLinkedListIterator(this);
}
protected class DoublyLinkedListIterator implements Iterator<T> {
protected DoublyLinkedList<T> list;
protected ListNode2<T> itr;
protected int length;
public DoublyLinkedListIterator(DoublyLinkedList<T> list) {
this.length = list.size;
this.list = list;
this.itr = list.tail;
}
@Override
public boolean hasNext() {
return itr != null;
}
@Override
public T next() {
if (length != list.size) {
throw new ConcurrentModificationException();
}
T next = itr.value;
itr = itr.next;
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public T first() {
return tail == null ? null : tail.getValue();
}
public T last() {
return head == null ? null : head.getValue();
}
public ListNode2<T> head() {
return head;
}
public ListNode2<T> tail() {
return tail;
}
public boolean isEmpty() {
return size == 0;
}
@SuppressWarnings("unchecked")
public T[] toArray() {
T[] a = (T[]) new Object[size];
int i = 0;
for (T v : this) {
a[i++] = v;
}
return a;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectOutputStream;
public class ExternalizableUtil {
public static byte[] toBytes(Externalizable o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
o.writeExternal(out);
out.flush();
return baos.toByteArray();
}
}

View File

@ -12,7 +12,7 @@
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;

View File

@ -12,7 +12,7 @@
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.HyperLogLogBigArray;

View File

@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.util.List;
public interface ITopK<T> {
/**
* offer a single element to the top.
*
* @param element - the element to add to the top
* @return false if the element was already in the top
*/
boolean offer(T element);
/**
* offer a single element to the top and increment the count for that element by incrementCount.
*
* @param element - the element to add to the top
* @param incrementCount - the increment count for the given count
* @return false if the element was already in the top
*/
boolean offer(T element, int incrementCount);
/**
* @param k
* @return top k elements offered (may be an approximation)
*/
List<T> peek(int k);
}

View File

@ -0,0 +1,42 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
public class ListNode2<T> {
protected T value;
protected ListNode2<T> prev;
protected ListNode2<T> next;
public ListNode2(T value) {
this.value = value;
}
public ListNode2<T> getPrev() {
return prev;
}
public ListNode2<T> getNext() {
return next;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
public class Pair<T1, T2> {
public final T1 left;
public final T2 right;
public Pair(T1 left, T2 right) {
this.left = left;
this.right = right;
}
@Override
public final int hashCode() {
int hashCode = 31 + (left == null ? 0 : left.hashCode());
return 31 * hashCode + (right == null ? 0 : right.hashCode());
}
@Override
public final boolean equals(Object o) {
if (!(o instanceof Pair)) {
return false;
}
Pair that = (Pair) o;
// handles nulls properly
return equal(left, that.left) && equal(right, that.right);
}
// From Apache Licensed guava:
private boolean equal(Object a, Object b) {
return a == b || (a != null && a.equals(b));
}
@Override
public String toString() {
return "(" + left + "," + right + ")";
}
public static <X, Y> Pair<X, Y> create(X x, Y y) {
return new Pair<X, Y>(x, y);
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import static java.lang.Math.toIntExact;
import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
public class SpaceSaving<K> {
private static final long INSTANCE_SIZE = shallowSizeOfInstance(SpaceSaving.class);
private static final long STREAM_SUMMARY_SIZE = shallowSizeOfInstance(StreamSummary.class);
private static final long LIST_NODE2_SIZE = shallowSizeOfInstance(ListNode2.class);
private static final long COUNTER_SIZE = shallowSizeOfInstance(Counter.class);
private StreamSummary<K> streamSummary;
private final int maxBuckets;
private final int capacity;
// Used to serialize and deserialize buckets for different types of keys
private final ApproxMostFrequentBucketSerializer<K> serializer;
private final ApproxMostFrequentBucketDeserializer<K> deserializer;
// Used to calculate the size of keys in bytes
private final SpaceSavingByteCalculator<K> calculator;
public SpaceSaving(
int maxBuckets,
int capacity,
ApproxMostFrequentBucketSerializer<K> serializer,
ApproxMostFrequentBucketDeserializer<K> deserializer,
SpaceSavingByteCalculator<K> calculator) {
this.streamSummary = new StreamSummary<>(capacity);
this.maxBuckets = maxBuckets;
this.capacity = capacity;
this.serializer = serializer;
this.deserializer = deserializer;
this.calculator = calculator;
}
public SpaceSaving(
byte[] bytes,
ApproxMostFrequentBucketSerializer<K> serializer,
ApproxMostFrequentBucketDeserializer<K> deserializer,
SpaceSavingByteCalculator<K> calculator) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
this.maxBuckets = ReadWriteIOUtils.readInt(byteBuffer);
this.capacity = ReadWriteIOUtils.readInt(byteBuffer);
int counterSize = ReadWriteIOUtils.readInt(byteBuffer);
this.streamSummary = new StreamSummary<>(capacity);
this.serializer = serializer;
this.deserializer = deserializer;
this.calculator = calculator;
for (int i = 0; i < counterSize; i++) {
this.deserializer.deserialize(byteBuffer, this);
}
}
public static long getDefaultEstimatedSize() {
return INSTANCE_SIZE
+ STREAM_SUMMARY_SIZE
// Long.BYTES as a proxy for the size of K
+ 50 * (LIST_NODE2_SIZE + COUNTER_SIZE + Long.BYTES);
}
public long getEstimatedSize() {
return INSTANCE_SIZE
+ STREAM_SUMMARY_SIZE
// Long.BYTES as a proxy for the size of K
+ streamSummary.size() * (LIST_NODE2_SIZE + +COUNTER_SIZE + Long.BYTES);
}
public interface BucketConsumer<K> {
void process(K key, long value);
}
public void add(K key) {
streamSummary.offer(key);
}
public void add(K key, long incrementCount) {
streamSummary.offer(key, toIntExact(incrementCount));
}
public void merge(SpaceSaving<K> other) {
List<Counter<K>> counters = other.streamSummary.topK(capacity);
for (Counter<K> counter : counters) {
streamSummary.offer(counter.getItem(), toIntExact(counter.getCount()));
}
}
public void forEachBucket(BucketConsumer<K> consumer) {
List<Counter<K>> counters = streamSummary.topK(maxBuckets);
for (Counter<K> counter : counters) {
consumer.process(counter.getItem(), counter.getCount());
}
}
public Map<K, Long> getBuckets() {
ImmutableMap.Builder<K, Long> buckets = ImmutableMap.builder();
forEachBucket(buckets::put);
return buckets.buildOrThrow();
}
public byte[] serialize() {
List<Counter<K>> counters = streamSummary.topK(capacity);
// Calculate the size of the keys
int keyBytesSize = calculator.calculateBytes(counters);
// maxBucket + capacity + counterSize + keySize + countSize
int estimatedTotalBytes =
Integer.BYTES
+ Integer.BYTES
+ Integer.BYTES
+ keyBytesSize
+ counters.size() * (Long.BYTES + Long.BYTES);
ByteBuffer byteBuffer = ByteBuffer.allocate(estimatedTotalBytes);
ReadWriteIOUtils.write(maxBuckets, byteBuffer);
ReadWriteIOUtils.write(capacity, byteBuffer);
ReadWriteIOUtils.write(counters.size(), byteBuffer);
// Serialize key and counts.
for (Counter<K> counter : counters) {
this.serializer.serialize(counter.getItem(), counter.getCount(), byteBuffer);
}
return byteBuffer.array();
}
public void reset() {
this.streamSummary = new StreamSummary<>(capacity);
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.util.List;
public interface SpaceSavingByteCalculator<K> {
int calculateBytes(List<Counter<K>> counters);
}

View File

@ -0,0 +1,90 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.utils.RamUsageEstimator;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving.getDefaultEstimatedSize;
public class SpaceSavingStateFactory {
public static <T> SingleSpaceSavingState<T> createSingleState() {
return new SingleSpaceSavingState<T>();
}
public static <T> GroupedSpaceSavingState<T> createGroupedState() {
return new GroupedSpaceSavingState<T>();
}
public static class SingleSpaceSavingState<T> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(SingleSpaceSavingState.class);
private SpaceSaving<T> spaceSaving;
public SpaceSaving<T> getSpaceSaving() {
return spaceSaving;
}
public void setSpaceSaving(SpaceSaving<T> value) {
this.spaceSaving = value;
}
public long getEstimatedSize() {
return spaceSaving == null
? INSTANCE_SIZE + getDefaultEstimatedSize()
: INSTANCE_SIZE + spaceSaving.getEstimatedSize();
}
public void merge(SpaceSaving<T> other) {
if (this.spaceSaving == null) {
setSpaceSaving(other);
} else {
spaceSaving.merge(other);
}
}
}
public static class GroupedSpaceSavingState<T> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(GroupedSpaceSavingState.class);
private SpaceSavingBigArray<T> spaceSavings = new SpaceSavingBigArray<>();
public SpaceSavingBigArray<T> getSpaceSavings() {
return spaceSavings;
}
public void setSpaceSavings(SpaceSavingBigArray<T> value) {
requireNonNull(value, "value is null");
this.spaceSavings = value;
}
public long getEstimatedSize() {
return INSTANCE_SIZE + spaceSavings.sizeOf();
}
public void merge(int groupId, SpaceSaving<T> spaceSaving) {
SpaceSaving<T> existingSpaceSaving = spaceSavings.get(groupId, spaceSaving);
if (!existingSpaceSaving.equals(spaceSaving)) {
existingSpaceSaving.merge(spaceSaving);
}
}
public boolean isEmpty() {
return spaceSavings.isEmpty();
}
}
}

View File

@ -0,0 +1,294 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate;
import java.io.ByteArrayInputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i> data structure as
* described in: <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i> by
* Metwally, Agrawal, and Abbadi
*
* @param <T> type of data in the stream to be summarized
*/
public class StreamSummary<T> implements ITopK<T>, Externalizable {
protected class Bucket {
protected DoublyLinkedList<Counter<T>> counterList;
private long count;
public Bucket(long count) {
this.count = count;
this.counterList = new DoublyLinkedList<Counter<T>>();
}
}
protected int capacity;
private HashMap<T, ListNode2<Counter<T>>> counterMap;
protected DoublyLinkedList<Bucket> bucketList;
/**
* @param capacity maximum size (larger capacities improve accuracy)
*/
public StreamSummary(int capacity) {
this.capacity = capacity;
counterMap = new HashMap<T, ListNode2<Counter<T>>>();
bucketList = new DoublyLinkedList<Bucket>();
}
public int getCapacity() {
return capacity;
}
/**
* Algorithm: <i>Space-Saving</i>
*
* @param item stream element (<i>e</i>)
* @return false if item was already in the stream summary, true otherwise
*/
@Override
public boolean offer(T item) {
return offer(item, 1);
}
/**
* Algorithm: <i>Space-Saving</i>
*
* @param item stream element (<i>e</i>)
* @return false if item was already in the stream summary, true otherwise
*/
@Override
public boolean offer(T item, int incrementCount) {
return offerReturnAll(item, incrementCount).left;
}
/**
* @param item stream element (<i>e</i>)
* @return item dropped from summary if an item was dropped, null otherwise
*/
public T offerReturnDropped(T item, int incrementCount) {
return offerReturnAll(item, incrementCount).right;
}
/**
* @param item stream element (<i>e</i>)
* @return Pair<isNewItem, itemDropped> where isNewItem is the return value of offer() and
* itemDropped is null if no item was dropped
*/
public Pair<Boolean, T> offerReturnAll(T item, int incrementCount) {
ListNode2<Counter<T>> counterNode = counterMap.get(item);
boolean isNewItem = (counterNode == null);
T droppedItem = null;
if (isNewItem) {
if (size() < capacity) {
counterNode =
bucketList
.enqueue(new Bucket(0))
.getValue()
.counterList
.add(new Counter<T>(bucketList.tail(), item));
} else {
Bucket min = bucketList.first();
counterNode = min.counterList.tail();
Counter<T> counter = counterNode.getValue();
droppedItem = counter.item;
counterMap.remove(droppedItem);
counter.item = item;
counter.error = min.count;
}
counterMap.put(item, counterNode);
}
incrementCounter(counterNode, incrementCount);
return new Pair<Boolean, T>(isNewItem, droppedItem);
}
protected void incrementCounter(ListNode2<Counter<T>> counterNode, int incrementCount) {
Counter<T> counter = counterNode.getValue(); // count_i
ListNode2<Bucket> oldNode = counter.bucketNode;
Bucket bucket = oldNode.getValue(); // Let Bucket_i be the bucket of count_i
bucket.counterList.remove(counterNode); // Detach count_i from Bucket_i's child-list
counter.count = counter.count + incrementCount;
// Finding the right bucket for count_i
// Because we allow a single call to increment count more than once, this may not be the
// adjacent bucket.
ListNode2<Bucket> bucketNodePrev = oldNode;
ListNode2<Bucket> bucketNodeNext = bucketNodePrev.getNext();
while (bucketNodeNext != null) {
Bucket bucketNext =
bucketNodeNext.getValue(); // Let Bucket_i^+ be Bucket_i's neighbor of larger value
if (counter.count == bucketNext.count) {
bucketNext.counterList.add(counterNode); // Attach count_i to Bucket_i^+'s child-list
break;
} else if (counter.count > bucketNext.count) {
bucketNodePrev = bucketNodeNext;
bucketNodeNext = bucketNodePrev.getNext(); // Continue hunting for an appropriate bucket
} else {
// A new bucket has to be created
bucketNodeNext = null;
}
}
if (bucketNodeNext == null) {
Bucket bucketNext = new Bucket(counter.count);
bucketNext.counterList.add(counterNode);
bucketNodeNext = bucketList.addAfter(bucketNodePrev, bucketNext);
}
counter.bucketNode = bucketNodeNext;
// Cleaning up
if (bucket.counterList.isEmpty()) // If Bucket_i's child-list is empty
{
bucketList.remove(oldNode); // Detach Bucket_i from the Stream-Summary
}
}
@Override
public List<T> peek(int k) {
List<T> topK = new ArrayList<T>(k);
for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
if (topK.size() == k) {
return topK;
}
topK.add(c.item);
}
}
return topK;
}
public List<Counter<T>> topK(int k) {
List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
if (topK.size() == k) {
return topK;
}
topK.add(c);
}
}
return topK;
}
/**
* @return number of items stored
*/
public int size() {
return counterMap.size();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('[');
for (ListNode2<Bucket> bNode = bucketList.head(); bNode != null; bNode = bNode.getPrev()) {
Bucket b = bNode.getValue();
sb.append('{');
sb.append(b.count);
sb.append(":[");
for (Counter<T> c : b.counterList) {
sb.append('{');
sb.append(c.item);
sb.append(':');
sb.append(c.error);
sb.append("},");
}
if (b.counterList.size() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
sb.append("]},");
}
if (bucketList.size() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
sb.append(']');
return sb.toString();
}
@SuppressWarnings("unchecked")
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.bucketList = new DoublyLinkedList<Bucket>();
this.capacity = in.readInt();
int size = in.readInt();
this.counterMap = new HashMap<T, ListNode2<Counter<T>>>(size);
Bucket currentBucket = null;
ListNode2<Bucket> currentBucketNode = null;
for (int i = 0; i < size; i++) {
Counter<T> c = (Counter<T>) in.readObject();
if (currentBucket == null || c.count != currentBucket.count) {
currentBucket = new Bucket(c.count);
currentBucketNode = bucketList.add(currentBucket);
}
c.bucketNode = currentBucketNode;
counterMap.put(c.item, currentBucket.counterList.add(c));
}
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(this.capacity);
out.writeInt(this.size());
for (ListNode2<Bucket> bNode = bucketList.tail(); bNode != null; bNode = bNode.getNext()) {
Bucket b = bNode.getValue();
for (Counter<T> c : b.counterList) {
out.writeObject(c);
}
}
}
/** For de-serialization */
public StreamSummary() {}
/**
* For de-serialization
*
* @param bytes
* @throws IOException
* @throws ClassNotFoundException
*/
public StreamSummary(byte[] bytes) throws IOException, ClassNotFoundException {
fromBytes(bytes);
}
public void fromBytes(byte[] bytes) throws IOException, ClassNotFoundException {
readExternal(new ObjectInputStream(new ByteArrayInputStream(bytes)));
}
public byte[] toBytes() throws IOException {
return ExternalizableUtil.toBytes(this);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import com.google.gson.Gson;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.utils.Binary;
import java.nio.charset.StandardCharsets;
public abstract class AbstractGroupedApproxMostFrequentAccumulator<T>
implements GroupedAccumulator {
protected final SpaceSavingStateFactory.GroupedSpaceSavingState<T> state =
SpaceSavingStateFactory.createGroupedState();
public AbstractGroupedApproxMostFrequentAccumulator() {}
@Override
public void setGroupCount(long groupCount) {
state.getSpaceSavings().ensureCapacity(groupCount);
}
@Override
public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
columnBuilder.writeBinary(new Binary(state.getSpaceSavings().get(groupId).serialize()));
}
@Override
public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
Binary result =
new Binary(
new Gson().toJson(state.getSpaceSavings().get(groupId).getBuckets()),
StandardCharsets.UTF_8);
columnBuilder.writeBinary(result);
}
@Override
public void prepareFinal() {}
@Override
public void reset() {
state.getSpaceSavings().reset();
}
public SpaceSavingBigArray<T> getOrCreateSpaceSaving(
SpaceSavingStateFactory.GroupedSpaceSavingState<T> state) {
if (state.isEmpty()) {
state.setSpaceSavings(new SpaceSavingBigArray<>());
}
return state.getSpaceSavings();
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.BinaryApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.RamUsageEstimator;
public class BinaryGroupedApproxMostFrequentAccumulator
extends AbstractGroupedApproxMostFrequentAccumulator<Binary> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(BinaryGroupedApproxMostFrequentAccumulator.class);
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE + state.getEstimatedSize();
}
@Override
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSavingBigArray<Binary> spaceSavingBigArray = getOrCreateSpaceSaving(state);
Column column = arguments[0];
int positionCount = mask.getPositionCount();
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
int groupId = groupIds[i];
SpaceSaving<Binary> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
BinaryApproxMostFrequentAccumulator::serializeBucket,
BinaryApproxMostFrequentAccumulator::deserializeBucket,
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(i)) {
spaceSaving.add(column.getBinary(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
int groupId;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
groupId = groupIds[position];
SpaceSaving<Binary> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
BinaryApproxMostFrequentAccumulator::serializeBucket,
BinaryApproxMostFrequentAccumulator::deserializeBucket,
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(position)) {
spaceSaving.add(column.getBinary(position));
}
}
}
}
@Override
public void addIntermediate(int[] groupIds, Column argument) {
for (int i = 0; i < groupIds.length; i++) {
if (!argument.isNull(i)) {
SpaceSaving<Binary> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
BinaryApproxMostFrequentAccumulator::serializeBucket,
BinaryApproxMostFrequentAccumulator::deserializeBucket,
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(groupIds[i], current);
}
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import com.google.gson.Gson;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BytesUtils;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
public class BlobGroupedApproxMostFrequentAccumulator
extends BinaryGroupedApproxMostFrequentAccumulator {
@Override
public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
Map<Binary, Long> buckets = state.getSpaceSavings().get(groupId).getBuckets();
Map<String, Long> formatedBuckets =
buckets.entrySet().stream()
.collect(
Collectors.toMap(
entry -> BytesUtils.parseBlobByteArrayToString(entry.getKey().getValues()),
Map.Entry::getValue));
columnBuilder.writeBinary(
new Binary(new Gson().toJson(formatedBuckets), StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.BooleanApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
public class BooleanGroupedApproxMostFrequentAccumulator
extends AbstractGroupedApproxMostFrequentAccumulator<Boolean> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(BooleanGroupedApproxMostFrequentAccumulator.class);
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE + state.getEstimatedSize();
}
@Override
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSavingBigArray<Boolean> spaceSavingBigArray = getOrCreateSpaceSaving(state);
Column column = arguments[0];
int positionCount = mask.getPositionCount();
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
int groupId = groupIds[i];
SpaceSaving<Boolean> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
BooleanApproxMostFrequentAccumulator::serializeBucket,
BooleanApproxMostFrequentAccumulator::deserializeBucket,
BooleanApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(i)) {
spaceSaving.add(column.getBoolean(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
int groupId;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
groupId = groupIds[position];
SpaceSaving<Boolean> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
BooleanApproxMostFrequentAccumulator::serializeBucket,
BooleanApproxMostFrequentAccumulator::deserializeBucket,
BooleanApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(position)) {
spaceSaving.add(column.getBoolean(position));
}
}
}
}
@Override
public void addIntermediate(int[] groupIds, Column argument) {
for (int i = 0; i < groupIds.length; i++) {
if (!argument.isNull(i)) {
SpaceSaving<Boolean> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
BooleanApproxMostFrequentAccumulator::serializeBucket,
BooleanApproxMostFrequentAccumulator::deserializeBucket,
BooleanApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(groupIds[i], current);
}
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.DoubleApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
public class DoubleGroupedApproxMostFrequentAccumulator
extends AbstractGroupedApproxMostFrequentAccumulator<Double> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(DoubleGroupedApproxMostFrequentAccumulator.class);
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE + state.getEstimatedSize();
}
@Override
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSavingBigArray<Double> spaceSavingBigArray = getOrCreateSpaceSaving(state);
Column column = arguments[0];
int positionCount = mask.getPositionCount();
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
int groupId = groupIds[i];
SpaceSaving<Double> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
DoubleApproxMostFrequentAccumulator::serializeBucket,
DoubleApproxMostFrequentAccumulator::deserializeBucket,
DoubleApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(i)) {
spaceSaving.add(column.getDouble(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
int groupId;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
groupId = groupIds[position];
SpaceSaving<Double> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
DoubleApproxMostFrequentAccumulator::serializeBucket,
DoubleApproxMostFrequentAccumulator::deserializeBucket,
DoubleApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(position)) {
spaceSaving.add(column.getDouble(position));
}
}
}
}
@Override
public void addIntermediate(int[] groupIds, Column argument) {
for (int i = 0; i < groupIds.length; i++) {
if (!argument.isNull(i)) {
SpaceSaving<Double> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
DoubleApproxMostFrequentAccumulator::serializeBucket,
DoubleApproxMostFrequentAccumulator::deserializeBucket,
DoubleApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(groupIds[i], current);
}
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.FloatApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
public class FloatGroupedApproxMostFrequentAccumulator
extends AbstractGroupedApproxMostFrequentAccumulator<Float> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(FloatGroupedApproxMostFrequentAccumulator.class);
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE + state.getEstimatedSize();
}
@Override
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSavingBigArray<Float> spaceSavingBigArray = getOrCreateSpaceSaving(state);
Column column = arguments[0];
int positionCount = mask.getPositionCount();
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
int groupId = groupIds[i];
SpaceSaving<Float> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
FloatApproxMostFrequentAccumulator::serializeBucket,
FloatApproxMostFrequentAccumulator::deserializeBucket,
FloatApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(i)) {
spaceSaving.add(column.getFloat(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
int groupId;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
groupId = groupIds[position];
SpaceSaving<Float> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
FloatApproxMostFrequentAccumulator::serializeBucket,
FloatApproxMostFrequentAccumulator::deserializeBucket,
FloatApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(position)) {
spaceSaving.add(column.getFloat(position));
}
}
}
}
@Override
public void addIntermediate(int[] groupIds, Column argument) {
for (int i = 0; i < groupIds.length; i++) {
if (!argument.isNull(i)) {
SpaceSaving<Float> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
FloatApproxMostFrequentAccumulator::serializeBucket,
FloatApproxMostFrequentAccumulator::deserializeBucket,
FloatApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(groupIds[i], current);
}
}
}
}

View File

@ -15,8 +15,8 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLogStateFactory;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLogStateFactory;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.HyperLogLogBigArray;
import org.apache.tsfile.block.column.Column;
@ -26,7 +26,7 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog.DEFAULT_STANDARD_ERROR;
public class GroupedApproxCountDistinctAccumulator implements GroupedAccumulator {
private static final long INSTANCE_SIZE =

View File

@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.IntApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
public class IntGroupedApproxMostFrequentAccumulator
extends AbstractGroupedApproxMostFrequentAccumulator<Integer> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(IntGroupedApproxMostFrequentAccumulator.class);
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE + state.getEstimatedSize();
}
@Override
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSavingBigArray<Integer> spaceSavingBigArray = getOrCreateSpaceSaving(state);
Column column = arguments[0];
int positionCount = mask.getPositionCount();
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
int groupId = groupIds[i];
SpaceSaving<Integer> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
IntApproxMostFrequentAccumulator::serializeBucket,
IntApproxMostFrequentAccumulator::deserializeBucket,
IntApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(i)) {
spaceSaving.add(column.getInt(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
int groupId;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
groupId = groupIds[position];
SpaceSaving<Integer> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
IntApproxMostFrequentAccumulator::serializeBucket,
IntApproxMostFrequentAccumulator::deserializeBucket,
IntApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(position)) {
spaceSaving.add(column.getInt(position));
}
}
}
}
@Override
public void addIntermediate(int[] groupIds, Column argument) {
for (int i = 0; i < groupIds.length; i++) {
if (!argument.isNull(i)) {
SpaceSaving<Integer> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
IntApproxMostFrequentAccumulator::serializeBucket,
IntApproxMostFrequentAccumulator::deserializeBucket,
IntApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(groupIds[i], current);
}
}
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LongApproxMostFrequentAccumulator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.SpaceSavingBigArray;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.utils.RamUsageEstimator;
public class LongGroupedApproxMostFrequentAccumulator
extends AbstractGroupedApproxMostFrequentAccumulator<Long> {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(LongGroupedApproxMostFrequentAccumulator.class);
@Override
public long getEstimatedSize() {
return INSTANCE_SIZE + state.getEstimatedSize();
}
@Override
public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) {
int maxBuckets = arguments[1].getInt(0);
int capacity = arguments[2].getInt(0);
if (maxBuckets <= 0 || capacity <= 0) {
throw new SemanticException(
"The second and third argument must be greater than 0, but got k="
+ maxBuckets
+ ", capacity="
+ capacity);
}
SpaceSavingBigArray<Long> spaceSavingBigArray = getOrCreateSpaceSaving(state);
Column column = arguments[0];
int positionCount = mask.getPositionCount();
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
int groupId = groupIds[i];
SpaceSaving<Long> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
LongApproxMostFrequentAccumulator::serializeBucket,
LongApproxMostFrequentAccumulator::deserializeBucket,
LongApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(i)) {
spaceSaving.add(column.getLong(i));
}
}
} else {
int[] selectedPositions = mask.getSelectedPositions();
int position;
int groupId;
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
groupId = groupIds[position];
SpaceSaving<Long> spaceSaving =
spaceSavingBigArray.get(
groupId,
maxBuckets,
capacity,
LongApproxMostFrequentAccumulator::serializeBucket,
LongApproxMostFrequentAccumulator::deserializeBucket,
LongApproxMostFrequentAccumulator::calculateKeyByte);
if (!column.isNull(position)) {
spaceSaving.add(column.getLong(position));
}
}
}
}
@Override
public void addIntermediate(int[] groupIds, Column argument) {
for (int i = 0; i < groupIds.length; i++) {
if (!argument.isNull(i)) {
SpaceSaving<Long> current =
new SpaceSaving<>(
argument.getBinary(i).getValues(),
LongApproxMostFrequentAccumulator::serializeBucket,
LongApproxMostFrequentAccumulator::deserializeBucket,
LongApproxMostFrequentAccumulator::calculateKeyByte);
state.merge(groupIds[i], current);
}
}
}
}

View File

@ -14,7 +14,7 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog;
import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOf;
import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;

View File

@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.ApproxMostFrequentBucketDeserializer;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.ApproxMostFrequentBucketSerializer;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingByteCalculator;
import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOf;
import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
public class SpaceSavingBigArray<T> {
private static final long INSTANCE_SIZE = shallowSizeOfInstance(SpaceSavingBigArray.class);
private final ObjectBigArray<SpaceSaving<T>> array;
private long sizeOfSpaceSaving;
public SpaceSavingBigArray() {
array = new ObjectBigArray<>();
}
public long sizeOf() {
return INSTANCE_SIZE + shallowSizeOf(array) + sizeOfSpaceSaving;
}
public SpaceSaving<T> get(long index) {
return array.get(index);
}
public SpaceSaving<T> get(
long index,
int maxBuckets,
int capacity,
ApproxMostFrequentBucketSerializer<T> serializer,
ApproxMostFrequentBucketDeserializer<T> deserializer,
SpaceSavingByteCalculator<T> calculator) {
return get(
index, new SpaceSaving<T>(maxBuckets, capacity, serializer, deserializer, calculator));
}
public SpaceSaving<T> get(long index, SpaceSaving<T> spaceSaving) {
SpaceSaving<T> result = array.get(index);
if (result == null) {
set(index, spaceSaving);
return spaceSaving;
}
return result;
}
public void set(long index, SpaceSaving<T> spaceSaving) {
updateRetainedSize(index, spaceSaving);
array.set(index, spaceSaving);
}
public boolean isEmpty() {
return sizeOfSpaceSaving == 0;
}
public void ensureCapacity(long length) {
array.ensureCapacity(length);
}
public void updateRetainedSize(long index, SpaceSaving<T> value) {
SpaceSaving<T> spaceSaving = array.get(index);
if (spaceSaving != null) {
sizeOfSpaceSaving -= spaceSaving.getEstimatedSize();
}
if (value != null) {
sizeOfSpaceSaving += value.getEstimatedSize();
}
}
public void reset() {
array.forEach(
item -> {
if (item != null) {
item.reset();
}
});
}
}

View File

@ -661,7 +661,14 @@ public class TableMetadataImpl implements Metadata {
"Second argument of Aggregate functions [%s] should be numberic type and do not use expression",
functionName));
}
break;
case SqlConstant.APPROX_MOST_FREQUENT:
if (argumentTypes.size() != 3) {
throw new SemanticException(
String.format(
"Aggregation functions [%s] should only have three arguments", functionName));
}
break;
case SqlConstant.COUNT:
break;
default:
@ -695,6 +702,8 @@ public class TableMetadataImpl implements Metadata {
case SqlConstant.VAR_POP:
case SqlConstant.VAR_SAMP:
return DOUBLE;
case SqlConstant.APPROX_MOST_FREQUENT:
return STRING;
default:
// ignore
}

View File

@ -273,6 +273,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSe
import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName.mapIdentifier;
import static org.apache.iotdb.db.utils.TimestampPrecisionUtils.currPrecision;
import static org.apache.iotdb.db.utils.constant.SqlConstant.APPROX_COUNT_DISTINCT;
import static org.apache.iotdb.db.utils.constant.SqlConstant.APPROX_MOST_FREQUENT;
import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION;
import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION;
import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_AGGREGATION;
@ -2986,11 +2987,20 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> {
throw new SemanticException(
"The second argument of 'approx_count_distinct' function must be a literal");
}
} else if (name.toString().equalsIgnoreCase(APPROX_MOST_FREQUENT)) {
if (!isNumericLiteral(arguments.get(1)) || !isNumericLiteral(arguments.get(2))) {
throw new SemanticException(
"The second and third argument of 'approx_most_frequent' function must be numeric literal");
}
}
return new FunctionCall(getLocation(ctx), name, distinct, arguments);
}
public boolean isNumericLiteral(Expression expression) {
return expression instanceof LongLiteral || expression instanceof DoubleLiteral;
}
@Override
public Node visitDateBinGapFill(RelationalSqlParser.DateBinGapFillContext ctx) {
TimeDuration timeDuration = DateTimeUtils.constructTimeDuration(ctx.timeDuration().getText());

View File

@ -79,6 +79,7 @@ public class SqlConstant {
public static final String COUNT_TIME_HEADER = "count_time(*)";
public static final String APPROX_COUNT_DISTINCT = "approx_count_distinct";
public static final String APPROX_MOST_FREQUENT = "approx_most_frequent";
// names of scalar functions
public static final String DIFF = "diff";

View File

@ -58,7 +58,7 @@ public enum TableBuiltinAggregationFunction {
VAR_POP("var_pop"),
VAR_SAMP("var_samp"),
APPROX_COUNT_DISTINCT("approx_count_distinct"),
;
APPROX_MOST_FREQUENT("approx_most_frequent");
private final String functionName;

View File

@ -285,7 +285,8 @@ enum TAggregationType {
MIN,
MAX,
COUNT_ALL,
APPROX_COUNT_DISTINCT
APPROX_COUNT_DISTINCT,
APPROX_MOST_FREQUENT
}
struct TShowConfigurationTemplateResp {