From aa55f22b102d99eeffe7dab747447eb5251db693 Mon Sep 17 00:00:00 2001 From: apete Date: Sun, 8 Dec 2024 07:01:34 +0100 Subject: [PATCH] clustering and r/w --- CHANGELOG.md | 37 ++++ src/main/java/org/ojalgo/array/Array1D.java | 145 +------------ src/main/java/org/ojalgo/array/ArrayR032.java | 4 +- src/main/java/org/ojalgo/array/ArrayR064.java | 4 +- src/main/java/org/ojalgo/array/ArrayR256.java | 4 +- src/main/java/org/ojalgo/array/ArrayZ008.java | 4 +- src/main/java/org/ojalgo/array/ArrayZ016.java | 4 +- src/main/java/org/ojalgo/array/ArrayZ032.java | 4 +- src/main/java/org/ojalgo/array/ArrayZ064.java | 4 +- .../java/org/ojalgo/array/NumberList.java | 9 + .../java/org/ojalgo/array/ScalarArray.java | 4 +- .../org/ojalgo/concurrent/MultiviewSet.java | 27 +-- .../ojalgo/concurrent/ProcessingService.java | 8 +- .../java/org/ojalgo/data/DataProcessors.java | 33 ++- .../java/org/ojalgo/data/batch/BatchNode.java | 20 +- .../data/cluster/ClusteringAlgorithm.java | 11 + .../data/cluster/GeneralisedKMeans.java | 111 ++++++++++ .../ojalgo/data/cluster/GreedyClustering.java | 82 +++++++ .../java/org/ojalgo/data/cluster/Point.java | 192 +++++++++++++++++ .../data/cluster/PointDistanceCache.java | 104 +++++++++ .../ojalgo/data/cluster/RandomClustering.java | 60 ++++++ .../domain/finance/series/DataFetcher.java | 4 +- .../domain/finance/series/DataSource.java | 12 +- .../data/domain/finance/series/DatePrice.java | 10 +- .../finance/series/FinanceDataReader.java | 16 +- .../java/org/ojalgo/netio/BasicParser.java | 4 +- .../org/ojalgo/netio/DataInterpreter.java | 120 ++++++++++- .../java/org/ojalgo/netio/DataReader.java | 8 +- .../java/org/ojalgo/netio/FromFileReader.java | 131 ++++++++---- .../ManagedReader.java} | 33 ++- .../ManagedWriter.java} | 22 +- .../MappedReader.java} | 26 +-- .../MappedWriter.java} | 22 +- .../QueuedReader.java} | 58 +++-- .../QueuedWriter.java} | 42 ++-- .../org/ojalgo/netio/ReaderWriterBuilder.java | 29 ++- .../ScoredDualWriter.java} | 5 +- .../java/org/ojalgo/netio/SegmentedFile.java | 43 ++++ .../SequencedReader.java} | 44 ++-- .../java/org/ojalgo/netio/ShardedFile.java | 20 +- .../ShardedWriter.java} | 45 ++-- .../function => netio}/SupplierIterator.java | 12 +- .../org/ojalgo/netio/TextLineInterpreter.java | 10 +- .../java/org/ojalgo/netio/TextLineReader.java | 13 +- .../java/org/ojalgo/netio/TextLineWriter.java | 5 +- .../java/org/ojalgo/netio/ToFileWriter.java | 142 ++++++++----- .../java/org/ojalgo/random/SampleSet.java | 18 +- src/main/java/org/ojalgo/type/ObjectPool.java | 3 +- .../ojalgo/type/function/AutoConsumer.java | 116 ---------- .../ojalgo/type/function/AutoSupplier.java | 143 ------------- src/test/java/org/ojalgo/TestUtils.java | 4 + .../org/ojalgo/data/cluster/BasicTest.java | 72 +++++++ .../ojalgo/data/cluster/ClusterTests.java} | 18 +- .../org/ojalgo/data/cluster/KaggleTest.java | 127 +++++++++++ .../domain/finance/series/DataParserTest.java | 5 +- .../org/ojalgo/netio/DataInterpreterTest.java | 87 ++++++++ .../org/ojalgo/netio/SegmentedFileTest.java | 2 +- .../service/ServiceIntegrationTest.java | 2 +- src/test/resources/kaggle/Mall_Customers.csv | 201 ++++++++++++++++++ 59 files changed, 1805 insertions(+), 740 deletions(-) create mode 100644 src/main/java/org/ojalgo/data/cluster/ClusteringAlgorithm.java create mode 100644 src/main/java/org/ojalgo/data/cluster/GeneralisedKMeans.java create mode 100644 src/main/java/org/ojalgo/data/cluster/GreedyClustering.java create mode 100644 src/main/java/org/ojalgo/data/cluster/Point.java create mode 100644 src/main/java/org/ojalgo/data/cluster/PointDistanceCache.java create mode 100644 src/main/java/org/ojalgo/data/cluster/RandomClustering.java rename src/main/java/org/ojalgo/{type/function/ManagedSupplier.java => netio/ManagedReader.java} (67%) rename src/main/java/org/ojalgo/{type/function/ManagedConsumer.java => netio/ManagedWriter.java} (74%) rename src/main/java/org/ojalgo/{type/function/MappedSupplier.java => netio/MappedReader.java} (70%) rename src/main/java/org/ojalgo/{type/function/MappedConsumer.java => netio/MappedWriter.java} (72%) rename src/main/java/org/ojalgo/{type/function/QueuedSupplier.java => netio/QueuedReader.java} (68%) rename src/main/java/org/ojalgo/{type/function/QueuedConsumer.java => netio/QueuedWriter.java} (72%) rename src/main/java/org/ojalgo/{type/function/ScoredDualConsumer.java => netio/ScoredDualWriter.java} (91%) rename src/main/java/org/ojalgo/{type/function/SequencedSupplier.java => netio/SequencedReader.java} (57%) rename src/main/java/org/ojalgo/{type/function/ShardedConsumer.java => netio/ShardedWriter.java} (70%) rename src/main/java/org/ojalgo/{type/function => netio}/SupplierIterator.java (87%) delete mode 100644 src/main/java/org/ojalgo/type/function/AutoConsumer.java delete mode 100644 src/main/java/org/ojalgo/type/function/AutoSupplier.java create mode 100644 src/test/java/org/ojalgo/data/cluster/BasicTest.java rename src/{main/java/org/ojalgo/type/function/AutoFunctional.java => test/java/org/ojalgo/data/cluster/ClusterTests.java} (71%) create mode 100644 src/test/java/org/ojalgo/data/cluster/KaggleTest.java create mode 100644 src/test/java/org/ojalgo/netio/DataInterpreterTest.java create mode 100644 src/test/resources/kaggle/Mall_Customers.csv diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d304fa70..927c86a5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,43 @@ Added / Changed / Deprecated / Fixed / Removed / Security > Corresponds to changes in the `develop` branch since the last release +### Added + +#### org.ojalgo.data + +- New package `org.ojalgo.data.cluster` with k-means and greedy clustering algorithms implemented, as well as generalisations, specialisations and combinations of those. +- `DataProcessors` now has a method `Transformation2D newRowsTransformer(Function)` to complement the existing `newColumnsTransformer`. + +#### org.ojalgo.random + +- `SampleSet` gained a couple of methods; `getMidrange()` and `getRange()`. + +### Changed + +#### org.ojalgo.array + +- Sorting is no longer parallel/multi-threaded. The previous implementations made use of the common `ForkJoinPool`. + +#### org.ojalgo.netio + +- The `FromFileReader` and `ToFileWriter` interfaces and their implementations used to extend and delegate to code in the `org.ojalgo.type.function` package. Much of what was in that package has been moved to and merged with stuff in the `org.ojalgo.netio` package. +- The `FromFileReader.Builder` and `ToFileWriter.Builder` builders now use generic "file" types. They used to be implemented in terms of Java's `File`, but can now be anything like `Path` or ojAlgo's own `SegmentedFile`, `ShardedFile` or `InMemoryFile`. +- The `DataInterpreter` gained some additional standard interpreters, as well as utilities to convert back and forth between `byte[]`. + +#### org.ojalgo.random + +- `SamleSet` no longer makes use of parallel/multi-threaded sorting – to avoid making use of the common `ForkJoinPool`. + +#### org.ojalgo.type + +- The `AutoSupplier` and `AutoConsumer` interfaces are removed. They used to provide abstract/generalised functionality for `FromFileReader` and `ToFileWriter` in the `org.ojalgo.netio` package. All features and functionality still exists, but in terms of the more specific/concrete `FromFileReader` and `ToFileWriter`. If you directly referenced any of the various utility methods in `AutoSupplier` or `AutoConsumer` they're now gone. They primarily existed so that `FromFileReader` and `ToFileWriter` could access them (from another package). The features and functionality they provided are now available through other classes in the `org.ojalgo.netio` package – like `FromFileReader.Builder` and `ToFileWriter.Builder`. + +### Fixed + +#### org.ojalgo.data + +- In `DataProcessors`, the `CENTER_AND_SCALE` transformation didn't do exactly what the documentation said it. That's been fixed. + ## [55.0.2] – 2024-11-30 ### Added diff --git a/src/main/java/org/ojalgo/array/Array1D.java b/src/main/java/org/ojalgo/array/Array1D.java index ab57ab2d9..6fd7ffda4 100644 --- a/src/main/java/org/ojalgo/array/Array1D.java +++ b/src/main/java/org/ojalgo/array/Array1D.java @@ -24,9 +24,6 @@ import java.math.BigDecimal; import java.util.AbstractList; import java.util.RandomAccess; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.RecursiveAction; import org.ojalgo.ProgrammingError; import org.ojalgo.function.BinaryFunction; @@ -124,132 +121,6 @@ public Array1D wrap(final BasicArray array) { } - static final class QuickAscendingSorter extends RecursiveAction { - - private static final long serialVersionUID = 1L; - - private final long high; - private final long low; - private final Array1D myArray; - - private QuickAscendingSorter(final Array1D array, final long low, final long high) { - super(); - myArray = array; - this.low = low; - this.high = high; - } - - QuickAscendingSorter(final Array1D array) { - this(array, 0L, array.count() - 1L); - } - - @Override - protected void compute() { - - long i = low, j = high; - - double pivot = myArray.doubleValue(low + (high - low) / 2); - - while (i <= j) { - - while (myArray.doubleValue(i) < pivot) { - i++; - } - while (myArray.doubleValue(j) > pivot) { - j--; - } - - if (i <= j) { - myArray.exchange(i, j); - i++; - j--; - } - } - - QuickAscendingSorter tmpPartL = null; - QuickAscendingSorter tmpPartH = null; - - if (low < j) { - tmpPartL = new QuickAscendingSorter(myArray, low, j); - tmpPartL.fork(); - } - if (i < high) { - tmpPartH = new QuickAscendingSorter(myArray, i, high); - tmpPartH.fork(); - } - if (tmpPartL != null) { - tmpPartL.join(); - } - if (tmpPartH != null) { - tmpPartH.join(); - } - } - - } - - static final class QuickDescendingSorter extends RecursiveAction { - - private static final long serialVersionUID = 1L; - - private final long high; - private final long low; - private final Array1D myArray; - - private QuickDescendingSorter(final Array1D array, final long low, final long high) { - super(); - myArray = array; - this.low = low; - this.high = high; - } - - QuickDescendingSorter(final Array1D array) { - this(array, 0L, array.count() - 1L); - } - - @Override - protected void compute() { - - long i = low, j = high; - - double pivot = myArray.doubleValue(low + (high - low) / 2); - - while (i <= j) { - - while (myArray.doubleValue(i) > pivot) { - i++; - } - while (myArray.doubleValue(j) < pivot) { - j--; - } - - if (i <= j) { - myArray.exchange(i, j); - i++; - j--; - } - } - - QuickDescendingSorter tmpPartL = null; - QuickDescendingSorter tmpPartH = null; - - if (low < j) { - tmpPartL = new QuickDescendingSorter(myArray, low, j); - tmpPartL.fork(); - } - if (i < high) { - tmpPartH = new QuickDescendingSorter(myArray, i, high); - tmpPartH.fork(); - } - if (tmpPartL != null) { - tmpPartL.join(); - } - if (tmpPartH != null) { - tmpPartH.join(); - } - } - - } - public static final Factory C128 = Array1D.factory(ArrayC128.FACTORY); public static final Factory H256 = Array1D.factory(ArrayH256.FACTORY); public static final Factory Q128 = Array1D.factory(ArrayQ128.FACTORY); @@ -674,13 +545,7 @@ public void sortAscending() { } else { - //this.sortAscending(0L, this.count() - 1L); - - try { - ForkJoinPool.commonPool().submit(new QuickAscendingSorter(this)).get(); - } catch (InterruptedException | ExecutionException exception) { - exception.printStackTrace(); - } + this.sortAscending(0L, this.count() - 1L); } } @@ -693,13 +558,7 @@ public void sortDescending() { } else { - //this.sortDescending(0L, this.count() - 1L); - - try { - ForkJoinPool.commonPool().submit(new QuickDescendingSorter(this)).get(); - } catch (InterruptedException | ExecutionException exception) { - exception.printStackTrace(); - } + this.sortDescending(0L, this.count() - 1L); } } diff --git a/src/main/java/org/ojalgo/array/ArrayR032.java b/src/main/java/org/ojalgo/array/ArrayR032.java index 8a8cf4ec5..b516bcf6f 100644 --- a/src/main/java/org/ojalgo/array/ArrayR032.java +++ b/src/main/java/org/ojalgo/array/ArrayR032.java @@ -217,13 +217,13 @@ public short shortValue(final int index) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { CorePrimitiveOperation.negate(data, 0, data.length, 1, data); - Arrays.parallelSort(data); + Arrays.sort(data); CorePrimitiveOperation.negate(data, 0, data.length, 1, data); } diff --git a/src/main/java/org/ojalgo/array/ArrayR064.java b/src/main/java/org/ojalgo/array/ArrayR064.java index 2afa84776..534d27b37 100644 --- a/src/main/java/org/ojalgo/array/ArrayR064.java +++ b/src/main/java/org/ojalgo/array/ArrayR064.java @@ -221,13 +221,13 @@ public short shortValue(final int index) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { CorePrimitiveOperation.negate(data, 0, data.length, 1, data); - Arrays.parallelSort(data); + Arrays.sort(data); CorePrimitiveOperation.negate(data, 0, data.length, 1, data); } diff --git a/src/main/java/org/ojalgo/array/ArrayR256.java b/src/main/java/org/ojalgo/array/ArrayR256.java index 22742329d..a48110458 100644 --- a/src/main/java/org/ojalgo/array/ArrayR256.java +++ b/src/main/java/org/ojalgo/array/ArrayR256.java @@ -131,12 +131,12 @@ public short shortValue(final int index) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { - Arrays.parallelSort(data, Comparator.reverseOrder()); + Arrays.sort(data, Comparator.reverseOrder()); } @Override diff --git a/src/main/java/org/ojalgo/array/ArrayZ008.java b/src/main/java/org/ojalgo/array/ArrayZ008.java index 35bc3979c..6eb10c944 100644 --- a/src/main/java/org/ojalgo/array/ArrayZ008.java +++ b/src/main/java/org/ojalgo/array/ArrayZ008.java @@ -154,13 +154,13 @@ public void set(final int index, final long value) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { CorePrimitiveOperation.negate(data, 0, data.length, 1, data); - Arrays.parallelSort(data); + Arrays.sort(data); CorePrimitiveOperation.negate(data, 0, data.length, 1, data); } diff --git a/src/main/java/org/ojalgo/array/ArrayZ016.java b/src/main/java/org/ojalgo/array/ArrayZ016.java index fa79be314..d170efb94 100644 --- a/src/main/java/org/ojalgo/array/ArrayZ016.java +++ b/src/main/java/org/ojalgo/array/ArrayZ016.java @@ -159,13 +159,13 @@ public short shortValue(final int index) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { CorePrimitiveOperation.negate(data, 0, data.length, 1, data); - Arrays.parallelSort(data); + Arrays.sort(data); CorePrimitiveOperation.negate(data, 0, data.length, 1, data); } diff --git a/src/main/java/org/ojalgo/array/ArrayZ032.java b/src/main/java/org/ojalgo/array/ArrayZ032.java index c6e5fe4dd..726818e05 100644 --- a/src/main/java/org/ojalgo/array/ArrayZ032.java +++ b/src/main/java/org/ojalgo/array/ArrayZ032.java @@ -164,13 +164,13 @@ public short shortValue(final int index) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { CorePrimitiveOperation.negate(data, 0, data.length, 1, data); - Arrays.parallelSort(data); + Arrays.sort(data); CorePrimitiveOperation.negate(data, 0, data.length, 1, data); } diff --git a/src/main/java/org/ojalgo/array/ArrayZ064.java b/src/main/java/org/ojalgo/array/ArrayZ064.java index 9e90efd2c..a9df22637 100644 --- a/src/main/java/org/ojalgo/array/ArrayZ064.java +++ b/src/main/java/org/ojalgo/array/ArrayZ064.java @@ -164,13 +164,13 @@ public short shortValue(final int index) { @Override public void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public void sortDescending() { CorePrimitiveOperation.negate(data, 0, data.length, 1, data); - Arrays.parallelSort(data); + Arrays.sort(data); CorePrimitiveOperation.negate(data, 0, data.length, 1, data); } diff --git a/src/main/java/org/ojalgo/array/NumberList.java b/src/main/java/org/ojalgo/array/NumberList.java index c985edb97..51df0ecd3 100644 --- a/src/main/java/org/ojalgo/array/NumberList.java +++ b/src/main/java/org/ojalgo/array/NumberList.java @@ -109,6 +109,15 @@ public boolean add(final double element) { return true; } + public boolean add(final float element) { + + this.ensureCapacity(); + + myStorage.set(myActualCount++, element); + + return true; + } + @Override public void add(final int index, final N element) { diff --git a/src/main/java/org/ojalgo/array/ScalarArray.java b/src/main/java/org/ojalgo/array/ScalarArray.java index 5b370de71..29fcfeddf 100644 --- a/src/main/java/org/ojalgo/array/ScalarArray.java +++ b/src/main/java/org/ojalgo/array/ScalarArray.java @@ -87,12 +87,12 @@ public short shortValue(final int index) { @Override public final void sortAscending() { - Arrays.parallelSort(data); + Arrays.sort(data); } @Override public final void sortDescending() { - Arrays.parallelSort(data, Comparator.reverseOrder()); + Arrays.sort(data, Comparator.reverseOrder()); } @Override diff --git a/src/main/java/org/ojalgo/concurrent/MultiviewSet.java b/src/main/java/org/ojalgo/concurrent/MultiviewSet.java index 6aaeae7a7..532596b38 100644 --- a/src/main/java/org/ojalgo/concurrent/MultiviewSet.java +++ b/src/main/java/org/ojalgo/concurrent/MultiviewSet.java @@ -21,12 +21,12 @@ */ package org.ojalgo.concurrent; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.PriorityBlockingQueue; /** @@ -99,23 +99,10 @@ boolean remove(final Object entry) { } private final Set myCommonSet = ConcurrentHashMap.newKeySet(); - private final Collection myViews = new LinkedBlockingDeque<>(); - private final boolean myRemoveFromViews; + private final Collection myViews = new ArrayList<>(); public MultiviewSet() { - this(false); //TODO Not decided what the default should be - - } - - /** - * @param removeFromViews Switch if each and every call to {@link #remove(Object)} should also explicitly - * call {@link PrioritisedView#remove(Object)} on each of the views. This is (probably) - * innefficient, and is unnecessary as a call to {@link PrioritisedView#poll()} will assert that - * the returned instance did exist in the main {@link Set}. - */ - public MultiviewSet(final boolean removeFromViews) { super(); - myRemoveFromViews = removeFromViews; } /** @@ -167,16 +154,10 @@ public PrioritisedView newView(final Comparator comparator) { } /** - * Remove an entry from the common {@link Set} and all {@link Queue}:s. + * Remove an entry from the common {@link Set}. */ public boolean remove(final T entry) { - boolean retVal = myCommonSet.remove(entry); - if (myRemoveFromViews && retVal) { - for (MultiviewSet.PrioritisedView view : myViews) { - view.remove(entry); - } - } - return retVal; + return myCommonSet.remove(entry); } public int size() { diff --git a/src/main/java/org/ojalgo/concurrent/ProcessingService.java b/src/main/java/org/ojalgo/concurrent/ProcessingService.java index ede57be6a..454ae521d 100644 --- a/src/main/java/org/ojalgo/concurrent/ProcessingService.java +++ b/src/main/java/org/ojalgo/concurrent/ProcessingService.java @@ -12,7 +12,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -191,7 +191,7 @@ public void process(final Collection work, final int parallelis int concurrency = Math.min(work.size(), parallelism); - Queue queue = new LinkedBlockingDeque<>(work); + Queue queue = new LinkedTransferQueue<>(work); List> tasks = new ArrayList<>(concurrency); for (int i = 0; i < concurrency; i++) { @@ -277,7 +277,7 @@ public > R reduceCombineable( int concurrency = Math.min(work.size(), parallelism); - Queue queue = new LinkedBlockingDeque<>(work); + Queue queue = new LinkedTransferQueue<>(work); List> tasks = new ArrayList<>(concurrency); for (int i = 0; i < concurrency; i++) { @@ -333,7 +333,7 @@ public > R reduceMergeable(final C int concurrency = Math.min(work.size(), parallelism); - Queue queue = new LinkedBlockingDeque<>(work); + Queue queue = new LinkedTransferQueue<>(work); List> tasks = new ArrayList<>(concurrency); for (int i = 0; i < concurrency; i++) { diff --git a/src/main/java/org/ojalgo/data/DataProcessors.java b/src/main/java/org/ojalgo/data/DataProcessors.java index 232469095..e8b6f62d4 100644 --- a/src/main/java/org/ojalgo/data/DataProcessors.java +++ b/src/main/java/org/ojalgo/data/DataProcessors.java @@ -36,6 +36,7 @@ import org.ojalgo.structure.Access1D; import org.ojalgo.structure.Access2D; import org.ojalgo.structure.Access2D.ColumnView; +import org.ojalgo.structure.Access2D.RowView; import org.ojalgo.structure.Factory2D; import org.ojalgo.structure.Mutate2D; import org.ojalgo.structure.Transformation2D; @@ -50,26 +51,26 @@ public class DataProcessors { /** - * Variables centered so that their average will be 0.0 + * Variables (columns) centered so that their average will be 0.0 */ public static final Transformation2D CENTER = DataProcessors.newColumnsTransformer(ss -> SUBTRACT.by(ss.getMean())); /** - * Variables will be centered around 0.0 AND scaled to be [-1.0,1.0]. The minimum value will be - * transformed to -1.0 and the maximum to +1.0. + * Variables (columns) will be transformed to be [-1.0,1.0]. The minimum value will be transformed to -1.0 + * and the maximum to +1.0. The midrange will be transformed to 0.0. */ public static final Transformation2D CENTER_AND_SCALE = DataProcessors - .newColumnsTransformer(ss -> SUBTRACT.by(ss.getMean()).andThen(DIVIDE.by((ss.getMaximum() - ss.getMinimum()) / TWO))); + .newColumnsTransformer(ss -> SUBTRACT.by(ss.getMidrange()).andThen(DIVIDE.by(ss.getRange() / TWO))); /** - * Variables scaled to be within [-1.0,1.0] (divide by largest magnitude regardless of sign). If all - * values are positive the range will within [0.0,1.0]. If all are negative the range will be within + * Variables (columns) scaled to be within [-1.0,1.0] (divide by largest magnitude regardless of sign). If + * all values are positive the range will within [0.0,1.0]. If all are negative the range will be within * [-1.0,0.0] */ public static final Transformation2D SCALE = DataProcessors.newColumnsTransformer(ss -> DIVIDE.by(ss.getLargest())); /** - * Will normalise each variable - replace each value with its standard score. + * Will normalise each variable (columns) - replace each value with its standard score. */ public static final Transformation2D STANDARD_SCORE = DataProcessors .newColumnsTransformer(ss -> SUBTRACT.by(ss.getMean()).andThen(DIVIDE.by(ss.getStandardDeviation()))); @@ -244,7 +245,7 @@ public static > M covariances(final Factory2D *

* The constants {@link #CENTER}, {@link #SCALE}, {@link #CENTER_AND_SCALE} and {@link #STANDARD_SCORE} * are predefined {@link Transformation2D} instances created by calling this method. - * + * * @param definition A {@link Function} that will create a {@link UnaryFunction} from a {@link SampleSet} * to be applied to each column * @return A {@link Transformation2D} that will apply a {@link UnaryFunction} to each column @@ -265,4 +266,20 @@ public > void transform(final T tr }; } + public static Transformation2D newRowsTransformer(final Function> definition) { + + return new Transformation2D<>() { + + public > void transform(final T transformable) { + SampleSet sampleSet = SampleSet.make(); + for (RowView view : transformable.rows()) { + sampleSet.swap(view); + UnaryFunction modifier = definition.apply(sampleSet); + transformable.modifyRow(view.row(), modifier); + } + } + + }; + } + } diff --git a/src/main/java/org/ojalgo/data/batch/BatchNode.java b/src/main/java/org/ojalgo/data/batch/BatchNode.java index f3005ea67..45784a4dd 100644 --- a/src/main/java/org/ojalgo/data/batch/BatchNode.java +++ b/src/main/java/org/ojalgo/data/batch/BatchNode.java @@ -41,8 +41,6 @@ import org.ojalgo.netio.FromFileReader; import org.ojalgo.netio.ShardedFile; import org.ojalgo.netio.ToFileWriter; -import org.ojalgo.type.function.AutoConsumer; -import org.ojalgo.type.function.AutoSupplier; import org.ojalgo.type.function.TwoStepMapper; import org.ojalgo.type.management.MBeanUtils; import org.ojalgo.type.management.Throughput; @@ -218,7 +216,7 @@ public static BatchNode newInstance(final File directory, final DataInter private final IntSupplier myParallelism; private final ProcessingService myProcessor; private final int myQueueCapacity; - private transient Function> myReaderFactory = null; + private transient Function> myReaderFactory = null; private final Throughput myReaderManager; private final ShardedFile myShards; private final Throughput myWriterManger; @@ -250,7 +248,7 @@ public void dispose() { myShards.delete(); } - public AutoConsumer newWriter() { + public ToFileWriter newWriter() { return ToFileWriter.newBuilder(myShards).queue(myQueueCapacity).parallelism(myParallelism).statistics(myWriterManger).build(myDistributor, shard -> DataWriter.of(shard, myInterpreter)); } @@ -277,7 +275,7 @@ public void processAll(final Supplier> processorFactory) { * Similar to {@link #processMergeable(Supplier, Consumer)} but the {@code processor} is called with the * aggregator instance itself rather than its extracted results. This corresponds to * {@link TwoStepMapper#Combineable} rather than {@link TwoStepMapper#Mergeable}. - * + * * @see #processMergeable(Supplier, Consumer) */ public > void processCombineable(final Supplier aggregatorFactory, final Consumer processor) { @@ -367,7 +365,7 @@ public > R reduceByMerging(final Supp * {@link TwoStepMapper#merge(Object)} - you can only use this if merging partial (sub)results is * possible. Use a constructor or factory method that produce instances of that type as the argument to * this method. - * + * * @deprecated v54 Use {@link #reduceByMerging(Supplier)} instead */ @Deprecated @@ -375,7 +373,7 @@ public > R reduceMapped(final Supplie return this.reduceByMerging(aggregatorFactory); } - private Function> getReaderFactory() { + private Function> getReaderFactory() { if (myReaderFactory == null) { Function> baseReader = file -> DataReader.of(file, myInterpreter); myReaderFactory = file -> FromFileReader.newBuilder(file).parallelism(1).queue(myQueueCapacity / myParallelism.getAsInt()) @@ -386,7 +384,7 @@ private Function> getReaderFactory() { private void process(final File shard, final Consumer consumer) { - try (AutoSupplier reader = this.newReader(shard)) { + try (FromFileReader reader = this.newReader(shard)) { T item = null; while ((item = reader.read()) != null) { @@ -403,7 +401,7 @@ private > void processAggregators(final File sh A aggregator = aggregatorFactory.get(); // It's a ThreadLocal... - try (AutoSupplier reader = this.newReader(shard)) { + try (FromFileReader reader = this.newReader(shard)) { T item = null; while ((item = reader.read()) != null) { @@ -423,7 +421,7 @@ private > void processResults(final File shard, A aggregator = aggregatorFactory.get(); // It's a ThreadLocal... - try (AutoSupplier reader = this.newReader(shard)) { + try (FromFileReader reader = this.newReader(shard)) { T item = null; while ((item = reader.read()) != null) { @@ -439,7 +437,7 @@ private > void processResults(final File shard, } } - AutoSupplier newReader(final File file) { + FromFileReader newReader(final File file) { return this.getReaderFactory().apply(file); } diff --git a/src/main/java/org/ojalgo/data/cluster/ClusteringAlgorithm.java b/src/main/java/org/ojalgo/data/cluster/ClusteringAlgorithm.java new file mode 100644 index 000000000..41b0285fe --- /dev/null +++ b/src/main/java/org/ojalgo/data/cluster/ClusteringAlgorithm.java @@ -0,0 +1,11 @@ +package org.ojalgo.data.cluster; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +public interface ClusteringAlgorithm { + + List> cluster(final Collection input); + +} diff --git a/src/main/java/org/ojalgo/data/cluster/GeneralisedKMeans.java b/src/main/java/org/ojalgo/data/cluster/GeneralisedKMeans.java new file mode 100644 index 000000000..46c2e9ef6 --- /dev/null +++ b/src/main/java/org/ojalgo/data/cluster/GeneralisedKMeans.java @@ -0,0 +1,111 @@ +package org.ojalgo.data.cluster; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.function.ToDoubleBiFunction; + +import org.ojalgo.type.context.NumberContext; + +/** + * Contains the outline of the k-means algorithm, but designed for customisation. + *

+ */ +public final class GeneralisedKMeans implements ClusteringAlgorithm { + + private static final NumberContext ACCURACY = NumberContext.of(4); + + private final Function, T> myCentroidUpdater; + private final ToDoubleBiFunction myDistanceCalculator; + private final Function, List> myCentroidInitialiser; + + /** + * You have to configure how distances are measured and how centroids are derived. + * + * @param centroidInitialiser The initialisation function should return a list of k centroids. This + * function determines 'K'. + * @param centroidUpdater The update function should return a new centroid based on a collection of points + * (the set of items in a cluster). + * @param distanceCalculator A function that calculates the distance between two points. + */ + public GeneralisedKMeans(final Function, List> centroidInitialiser, + final Function, T> centroidUpdater, + final ToDoubleBiFunction distanceCalculator) { + + super(); + + myCentroidInitialiser = centroidInitialiser; + myCentroidUpdater = centroidUpdater; + myDistanceCalculator = distanceCalculator; + } + + @Override + public List> cluster(final Collection input) { + + List centroids = myCentroidInitialiser.apply(input); + + int k = centroids.size(); + int maxIterations = Math.max(5, Math.min((int) Math.round(Math.sqrt(input.size())), 50)); + + List> clusters = new ArrayList<>(k); + for (int i = 0; i < k; i++) { + clusters.add(i, new HashSet<>()); + } + + int iterations = 0; + boolean converged = false; + do { + + converged = true; + + for (Set cluster : clusters) { + cluster.clear(); + } + + for (T point : input) { + + int bestCluster = 0; + double minDistance = Double.MAX_VALUE; + + for (int i = 0; i < k; i++) { + double distance = myDistanceCalculator.applyAsDouble(centroids.get(i), point); + if (distance < minDistance) { + minDistance = distance; + bestCluster = i; + } + } + + clusters.get(bestCluster).add(point); + } + + Set cluster; + T oldCenter; + T newCenter; + for (int i = 0; i < k; i++) { + + cluster = clusters.get(i); + + if (!cluster.isEmpty()) { + + oldCenter = centroids.get(i); + newCenter = myCentroidUpdater.apply(cluster); + + converged &= ACCURACY.isZero(myDistanceCalculator.applyAsDouble(oldCenter, newCenter)); + + centroids.set(i, newCenter); + } + } + + } while (++iterations < maxIterations && !converged); + + return clusters; + } + +} diff --git a/src/main/java/org/ojalgo/data/cluster/GreedyClustering.java b/src/main/java/org/ojalgo/data/cluster/GreedyClustering.java new file mode 100644 index 000000000..f95ed7064 --- /dev/null +++ b/src/main/java/org/ojalgo/data/cluster/GreedyClustering.java @@ -0,0 +1,82 @@ +package org.ojalgo.data.cluster; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.function.ToDoubleBiFunction; + +/** + * Greedy clustering algorithm. Assigns each item to the nearest centroid, creating new centroids as needed. + * Will only pass through the data once. The centroids are recalculated as the clusters are updated. + */ +public final class GreedyClustering implements ClusteringAlgorithm { + + private final List myCentroids = new ArrayList<>(); + private final Function, T> myCentroidUpdater; + private final ToDoubleBiFunction myDistanceCalculator; + private final double myDistanceThreshold; + + /** + * @param centroidUpdater The update function should return a new centroid based on a collection of points + * (the set of items in a cluster). + * @param distanceCalculator A function that calculates the distance between two points. + * @param distanceThreshold The maximum distance between a point and a centroid for the point to be + * assigned to that cluster. The points are always assigned to the nearest centroid among the + * already existing clusters. This threshold determines when a new cluster should be created. + */ + public GreedyClustering(final Function, T> centroidUpdater, final ToDoubleBiFunction distanceCalculator, + final double distanceThreshold) { + + super(); + + myCentroidUpdater = centroidUpdater; + myDistanceCalculator = distanceCalculator; + myDistanceThreshold = distanceThreshold; + } + + @Override + public List> cluster(final Collection input) { + + List> clusters = new ArrayList<>(); + myCentroids.clear(); + + for (T point : input) { + + int indexOfBestExisting = -1; + double minDistance = Double.MAX_VALUE; + + for (int i = 0; i < myCentroids.size(); i++) { + T centroid = myCentroids.get(i); + + double distance = myDistanceCalculator.applyAsDouble(point, centroid); + if (distance <= myDistanceThreshold && distance < minDistance) { + minDistance = distance; + indexOfBestExisting = i; + } + } + + if (indexOfBestExisting >= 0) { + + Set cluster = clusters.get(indexOfBestExisting); + cluster.add(point); + myCentroids.set(indexOfBestExisting, myCentroidUpdater.apply(cluster)); + + } else { + + Set newCluster = new HashSet<>(); + newCluster.add(point); + clusters.add(newCluster); + myCentroids.add(point); + } + } + + return clusters; + } + + List getCentroids() { + return myCentroids; + } +} diff --git a/src/main/java/org/ojalgo/data/cluster/Point.java b/src/main/java/org/ojalgo/data/cluster/Point.java new file mode 100644 index 000000000..61f8a02d7 --- /dev/null +++ b/src/main/java/org/ojalgo/data/cluster/Point.java @@ -0,0 +1,192 @@ +package org.ojalgo.data.cluster; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.ojalgo.function.constant.PrimitiveMath; + +public final class Point implements Comparable { + + /** + * Primarily used when constructing test cases and similar. For real world applications you should use + * {@link Point#convert(List, Function)} instead. + */ + public static final class Factory { + + private final int myDimensions; + private final AtomicInteger myNextID = new AtomicInteger(); + + public Factory(final int dimensions) { + super(); + myDimensions = dimensions; + } + + public Point newPoint(final float... coordinates) { + if (coordinates.length != myDimensions) { + throw new IllegalArgumentException(); + } + return new Point(myNextID.getAndIncrement(), coordinates); + } + + public void reset() { + myNextID.set(0); + } + + } + + /** + * Essentially works like this: + *
    + *
  1. Calculate, and store, distances between all the points (to enable statistical analysis, and speed + * up the following steps) + *
  2. Perform statistical analysis of the distances to determine a suitable distance threshold (to get + * the threshold needed for greedy clustering) + *
  3. Perform greedy clustering to get an initial set of centroids + *
  4. Filter out centroids/clusters corresponding to extremely small clusters (This determines the 'k') + *
  5. Perform k-means clustering to refine the clusters and centroids + *
+ */ + public static List> cluster(final Collection input) { + + PointDistanceCache cache = new PointDistanceCache(); + cache.setup(input, Point::distance); + + GeneralisedKMeans clusterar = new GeneralisedKMeans<>(cache::initialiser, cache::centroid, cache::distance); + + return clusterar.cluster(input); + } + + /** + * Converts a list of objects to a list of points using the provided converter to derive the coordinates. + * There will be one point for each object in the input list, at matching positions. Further the point id + * will be the index of the object in the input list. + * + * @param The type of the objects in the input list + * @param input The list of objects to convert + * @param converter The function to convert the objects to coordinates + * @return A list of points + */ + public static List convert(final List input, final Function converter) { + int size = input.size(); + List points = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + points.add(new Point(i, converter.apply(input.get(i)))); + } + return points; + } + + public static Point mean(final Collection points) { + + double[] sum = null; + int length = 0; + for (Point point : points) { + if (sum == null) { + length = point.coordinates.length; + sum = new double[length]; + } + for (int i = 0; i < length; i++) { + sum[i] += point.coordinates[i]; + } + } + + float[] retVal = new float[length]; + + for (int i = 0; i < length; i++) { + retVal[i] = (float) (sum[i] / points.size()); + } + + return new Point(-1, retVal); + } + + public static Point.Factory newFactory(final int dimensions) { + return new Point.Factory(dimensions); + } + + /** + * Greedy algorithm. The distance measurement is the same as for k-means ({@link #distance(Point)}) and + * the threshold must match that. + */ + public static ClusteringAlgorithm newGreedyClusterer(final double distanceThreshold) { + return new GreedyClustering<>(Point::mean, Point::distance, distanceThreshold); + } + + /** + * Standard k-means clustering + */ + public static ClusteringAlgorithm newKMeansClusterer(final int k) { + RandomClustering initialiser = new RandomClustering<>(k); + return new GeneralisedKMeans<>(initialiser::centroids, Point::mean, Point::distance); + } + + public static Point of(final int id, final float... coordinates) { + return new Point(id, coordinates); + } + + public final float[] coordinates; + public final int id; + + Point(final int id, final float[] coordinates) { + super(); + this.id = id; + this.coordinates = coordinates; + } + + @Override + public int compareTo(final Point ref) { + return Integer.compare(id, ref.id); + } + + /** + * The sum of the squared differences between the coordinates of this and the other point. (Not the + * Euclidean distance. This is the squared Euclidean distance.) + */ + public double distance(final Point other) { + + double retVal = PrimitiveMath.ZERO; + + int limit = Math.min(coordinates.length, other.coordinates.length); + + float diff; + for (int i = 0; i < limit; i++) { + diff = coordinates[i] - other.coordinates[i]; + retVal += diff * diff; + } + + return retVal; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof Point)) { + return false; + } + Point other = (Point) obj; + if (id != other.id || !Arrays.equals(coordinates, other.coordinates)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + id; + result = prime * result + Arrays.hashCode(coordinates); + return result; + } + + @Override + public String toString() { + return Arrays.toString(coordinates); + } + +} diff --git a/src/main/java/org/ojalgo/data/cluster/PointDistanceCache.java b/src/main/java/org/ojalgo/data/cluster/PointDistanceCache.java new file mode 100644 index 000000000..8337418d9 --- /dev/null +++ b/src/main/java/org/ojalgo/data/cluster/PointDistanceCache.java @@ -0,0 +1,104 @@ +package org.ojalgo.data.cluster; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.function.ToDoubleBiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.ojalgo.array.ArrayR064; +import org.ojalgo.array.NumberList; +import org.ojalgo.function.constant.PrimitiveMath; +import org.ojalgo.random.SampleSet; + +final class PointDistanceCache { + + private double[][] myDistances; + private final SampleSet mySampleSet = SampleSet.make(); + private final NumberList myValues = NumberList.factory(ArrayR064.FACTORY).make(); + + PointDistanceCache() { + super(); + } + + private double distance(final int i, final int j) { + if (i == j) { + return PrimitiveMath.ZERO; + } else if (i < j) { + return myDistances[j][i]; + } else { + return myDistances[i][j]; + } + } + + Point centroid(final Collection cluster) { + + Point retVal = null; + double minSum = Double.POSITIVE_INFINITY; + + for (Point candidate : cluster) { + + double sum = PrimitiveMath.ZERO; + + for (Point member : cluster) { + sum += this.distance(candidate, member); + } + + if (sum < minSum) { + minSum = sum; + retVal = candidate; + } + } + + return retVal; + } + + double distance(final Point point1, final Point point2) { + return this.distance(point1.id, point2.id); + } + + double getThreshold() { + return mySampleSet.swap(myValues).getMedian(); + } + + List initialiser(final Collection input) { + + GreedyClustering greedy = new GreedyClustering<>(this::centroid, this::distance, this.getThreshold()); + List> clusters = greedy.cluster(input); + List centroids = greedy.getCentroids(); + + double total = myDistances.length; + double largest = clusters.stream().mapToInt(Set::size).max().orElse(0); + + return IntStream.range(0, centroids.size()).filter(i -> { + double size = clusters.get(i).size(); + return size > 1D && size / total > 0.01D && size / largest > 0.02D; + }).mapToObj(centroids::get).collect(Collectors.toList()); + } + + void setup(final Collection input, final ToDoubleBiFunction distanceCalculator) { + + int nbPoints = input.size(); + + if (myDistances == null || myDistances.length != nbPoints) { + myDistances = new double[nbPoints][]; + for (int i = 0; i < nbPoints; i++) { + myDistances[i] = new double[i]; + } + } + + myValues.clear(); + + for (Point pointR : input) { + int row = pointR.id; + for (Point pointC : input) { + int col = pointC.id; + if (row > col) { + myValues.add(myDistances[row][col] = distanceCalculator.applyAsDouble(pointR, pointC)); + } + } + } + } + +} diff --git a/src/main/java/org/ojalgo/data/cluster/RandomClustering.java b/src/main/java/org/ojalgo/data/cluster/RandomClustering.java new file mode 100644 index 000000000..c1f498bac --- /dev/null +++ b/src/main/java/org/ojalgo/data/cluster/RandomClustering.java @@ -0,0 +1,60 @@ +package org.ojalgo.data.cluster; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +/** + * Randomly assigns each item to one of k clusters. + */ +final class RandomClustering implements ClusteringAlgorithm { + + private static final Random RANDOM = new Random(); + + private final int myK; + + RandomClustering(final int k) { + super(); + myK = k; + } + + @Override + public List> cluster(final Collection input) { + + List> clusters = new ArrayList<>(myK); + for (int i = 0; i < myK; i++) { + clusters.add(new HashSet<>()); + } + + for (T item : input) { + int index = RANDOM.nextInt(myK); + clusters.get(index).add(item); + } + + return clusters; + } + + List centroids(final Collection input) { + + List centroids = new ArrayList<>(myK); + + do { + + centroids.clear(); + List> clusters = this.cluster(input); + + for (Set cluster : clusters) { + if (!cluster.isEmpty()) { + T centroid = cluster.iterator().next(); + centroids.add(centroid); + } + } + + } while (centroids.size() != myK); + + return centroids; + } +} diff --git a/src/main/java/org/ojalgo/data/domain/finance/series/DataFetcher.java b/src/main/java/org/ojalgo/data/domain/finance/series/DataFetcher.java index a6cc38cb7..ad15dbb8b 100644 --- a/src/main/java/org/ojalgo/data/domain/finance/series/DataFetcher.java +++ b/src/main/java/org/ojalgo/data/domain/finance/series/DataFetcher.java @@ -23,16 +23,16 @@ import java.io.InputStream; +import org.ojalgo.netio.FromFileReader; import org.ojalgo.netio.TextLineReader; import org.ojalgo.netio.TextLineReader.Parser; import org.ojalgo.type.CalendarDateUnit; -import org.ojalgo.type.function.AutoSupplier; public interface DataFetcher { InputStream getInputStream(); - default AutoSupplier getReader(final Parser parser) { + default FromFileReader getReader(final Parser parser) { InputStream stream = this.getInputStream(); TextLineReader reader = new TextLineReader(stream); return reader.withFilteredParser(parser); diff --git a/src/main/java/org/ojalgo/data/domain/finance/series/DataSource.java b/src/main/java/org/ojalgo/data/domain/finance/series/DataSource.java index 2dac7994d..44e2531e2 100644 --- a/src/main/java/org/ojalgo/data/domain/finance/series/DataSource.java +++ b/src/main/java/org/ojalgo/data/domain/finance/series/DataSource.java @@ -37,6 +37,7 @@ import org.ojalgo.array.DenseArray; import org.ojalgo.netio.BasicLogger; import org.ojalgo.netio.BasicParser; +import org.ojalgo.netio.FromFileReader; import org.ojalgo.series.BasicSeries; import org.ojalgo.series.CalendarDateSeries; import org.ojalgo.series.SimpleSeries; @@ -44,7 +45,6 @@ import org.ojalgo.type.CalendarDate; import org.ojalgo.type.CalendarDateUnit; import org.ojalgo.type.PrimitiveNumber; -import org.ojalgo.type.function.AutoSupplier; import org.ojalgo.type.keyvalue.KeyValue; public final class DataSource implements FinanceData { @@ -131,7 +131,7 @@ public static Coordinated coordinated(final CalendarDateUnit resolution) { /** * All data downloaders/fetchers are deprecated. They will be removed in a future release, and most likely * they're already broken. - * + * * @deprecated */ @Deprecated @@ -142,7 +142,7 @@ public static DataSource newAlphaVantage(final String symbol, final CalendarDate /** * All data downloaders/fetchers are deprecated. They will be removed in a future release, and most likely * they're already broken. - * + * * @deprecated */ @Deprecated @@ -163,7 +163,7 @@ public static DataSource newFileReader(final File file, fi /** * All data downloaders/fetchers are deprecated. They will be removed in a future release, and most likely * they're already broken. - * + * * @deprecated */ @Deprecated @@ -176,7 +176,7 @@ public static DataSource newIEXTrading(final String symbol) { /** * All data downloaders/fetchers are deprecated. They will be removed in a future release, and most likely * they're already broken. - * + * * @deprecated */ @Deprecated @@ -251,7 +251,7 @@ public KeyValue> getHistoricalData() { List value = new ArrayList<>(); - try (AutoSupplier reader = myFetcher.getReader(myParser)) { + try (FromFileReader reader = myFetcher.getReader(myParser)) { reader.forEach(value::add); } catch (final Exception cause) { BasicLogger.error(cause, "Fetch problem for {}!", myFetcher.getClass().getSimpleName()); diff --git a/src/main/java/org/ojalgo/data/domain/finance/series/DatePrice.java b/src/main/java/org/ojalgo/data/domain/finance/series/DatePrice.java index 228992baa..5f63e6150 100644 --- a/src/main/java/org/ojalgo/data/domain/finance/series/DatePrice.java +++ b/src/main/java/org/ojalgo/data/domain/finance/series/DatePrice.java @@ -105,6 +105,7 @@ protected DatePrice(final LocalDate key) { date = key; } + @Override public int compareTo(final PrimitiveNumber reference) { int retVal = 0; @@ -120,10 +121,12 @@ public int compareTo(final PrimitiveNumber reference) { return retVal; } + @Override public boolean containsKey(final Object key) { return date.equals(key); } + @Override public boolean containsValue(final Object value) { if (value instanceof Comparable) { return NumberDefinition.doubleValue((Comparable) value) == this.getPrice(); @@ -132,6 +135,7 @@ public boolean containsValue(final Object value) { } } + @Override public final double doubleValue() { return this.getPrice(); } @@ -155,6 +159,7 @@ public boolean equals(final Object obj) { return true; } + @Override public PrimitiveNumber get(final Object key) { if (date.equals(key)) { return this; @@ -163,6 +168,7 @@ public PrimitiveNumber get(final Object key) { } } + @Override public final LocalDate getKey() { return date; } @@ -173,9 +179,10 @@ public final LocalDate getKey() { public int hashCode() { final int prime = 31; int result = 1; - return prime * result + ((date == null) ? 0 : date.hashCode()); + return prime * result + (date == null ? 0 : date.hashCode()); } + @Override public Set keySet() { return Collections.singleton(date); } @@ -184,4 +191,5 @@ public Set keySet() { public final String toString() { return this.getKey() + ": " + this.getPrice(); } + } diff --git a/src/main/java/org/ojalgo/data/domain/finance/series/FinanceDataReader.java b/src/main/java/org/ojalgo/data/domain/finance/series/FinanceDataReader.java index e473919c2..4832f1bd2 100644 --- a/src/main/java/org/ojalgo/data/domain/finance/series/FinanceDataReader.java +++ b/src/main/java/org/ojalgo/data/domain/finance/series/FinanceDataReader.java @@ -29,13 +29,13 @@ import java.util.ArrayList; import java.util.List; +import org.ojalgo.netio.FromFileReader; import org.ojalgo.netio.InMemoryFile; import org.ojalgo.netio.TextLineReader; import org.ojalgo.series.BasicSeries; import org.ojalgo.series.SimpleSeries; import org.ojalgo.type.CalendarDateUnit; import org.ojalgo.type.PrimitiveNumber; -import org.ojalgo.type.function.AutoSupplier; import org.ojalgo.type.keyvalue.KeyValue; public final class FinanceDataReader implements FinanceData, DataFetcher { @@ -92,22 +92,24 @@ public static String toSymbol(final String fileName) { myResolution = resolution; } + @Override public KeyValue> getHistoricalData() { return KeyValue.of(this.getSymbol(), this.getHistoricalPrices()); } + @Override public List getHistoricalPrices() { List retVal = new ArrayList<>(); if (myFile != null) { - try (TextLineReader reader = TextLineReader.of(myFile); AutoSupplier supplier = reader.withFilteredParser(myParser)) { + try (TextLineReader reader = TextLineReader.of(myFile); FromFileReader supplier = reader.withFilteredParser(myParser)) { supplier.forEach(retVal::add); } catch (Exception cause) { throw new RuntimeException(cause); } } else if (myInMemoryFile != null) { - try (TextLineReader reader = TextLineReader.of(myInMemoryFile); AutoSupplier supplier = reader.withFilteredParser(myParser)) { + try (TextLineReader reader = TextLineReader.of(myInMemoryFile); FromFileReader supplier = reader.withFilteredParser(myParser)) { supplier.forEach(retVal::add); } catch (Exception cause) { throw new RuntimeException(cause); @@ -119,6 +121,7 @@ public List getHistoricalPrices() { return retVal; } + @Override public InputStream getInputStream() { if (myFile != null) { try { @@ -133,18 +136,19 @@ public InputStream getInputStream() { } } + @Override public BasicSeries getPriceSeries() { BasicSeries retVal = new SimpleSeries<>(); if (myFile != null) { - try (TextLineReader reader = TextLineReader.of(myFile); AutoSupplier supplier = reader.withFilteredParser(myParser)) { + try (TextLineReader reader = TextLineReader.of(myFile); FromFileReader supplier = reader.withFilteredParser(myParser)) { supplier.forEach(dp -> retVal.put(dp.date, dp)); } catch (Exception cause) { throw new RuntimeException(cause); } } else if (myInMemoryFile != null) { - try (TextLineReader reader = TextLineReader.of(myInMemoryFile); AutoSupplier supplier = reader.withFilteredParser(myParser)) { + try (TextLineReader reader = TextLineReader.of(myInMemoryFile); FromFileReader supplier = reader.withFilteredParser(myParser)) { supplier.forEach(dp -> retVal.put(dp.date, dp)); } catch (Exception cause) { throw new RuntimeException(cause); @@ -156,10 +160,12 @@ public BasicSeries getPriceSeries() { return retVal; } + @Override public CalendarDateUnit getResolution() { return myResolution; } + @Override public String getSymbol() { if (myFile != null) { return FinanceDataReader.toSymbol(myFile.getName()); diff --git a/src/main/java/org/ojalgo/netio/BasicParser.java b/src/main/java/org/ojalgo/netio/BasicParser.java index bcacf32a0..db9bc1448 100644 --- a/src/main/java/org/ojalgo/netio/BasicParser.java +++ b/src/main/java/org/ojalgo/netio/BasicParser.java @@ -48,7 +48,7 @@ public interface BasicParser extends TextLineReader.Parser { default void parse(final File file, final boolean skipHeader, final Consumer consumer) { try (TextLineReader supplier = TextLineReader.of(file)) { - this.parse(supplier, skipHeader, consumer); + this.parse(supplier::read, skipHeader, consumer); } catch (IOException cause) { throw new RuntimeException(cause); } @@ -73,7 +73,7 @@ default void parse(final File file, final Consumer consumer) { default void parse(final Reader reader, final boolean skipHeader, final Consumer consumer) { try (TextLineReader supplier = new TextLineReader(reader)) { - this.parse(supplier, skipHeader, consumer); + this.parse(supplier::read, skipHeader, consumer); } catch (IOException cause) { throw new RuntimeException(cause); } diff --git a/src/main/java/org/ojalgo/netio/DataInterpreter.java b/src/main/java/org/ojalgo/netio/DataInterpreter.java index 6691f8e0b..d050668cd 100644 --- a/src/main/java/org/ojalgo/netio/DataInterpreter.java +++ b/src/main/java/org/ojalgo/netio/DataInterpreter.java @@ -36,7 +36,61 @@ public interface DataInterpreter extends DataReader.Deserializer, DataWriter.Serializer { - DataInterpreter STRING = new DataInterpreter<>() { + DataInterpreter BYTES = new DataInterpreter<>() { + + public byte[] deserialize(final DataInput input) throws IOException { + int length = input.readInt(); + byte[] retVal = new byte[length]; + for (int i = 0; i < length; i++) { + retVal[i] = input.readByte(); + } + return retVal; + } + + public void serialize(final byte[] data, final DataOutput output) throws IOException { + int length = data.length; + output.writeInt(length); + for (int i = 0; i < length; i++) { + output.writeByte(data[i]); + } + } + + }; + + DataInterpreter STRING_BYTES = new DataInterpreter<>() { + + public String deserialize(final DataInput input) throws IOException { + return new String(BYTES.deserialize(input)); + } + + public void serialize(final String data, final DataOutput output) throws IOException { + int length = data.length(); + output.writeInt(length); + output.writeBytes(data); + } + + }; + + DataInterpreter STRING_CHARS = new DataInterpreter<>() { + + public String deserialize(final DataInput input) throws IOException { + int length = input.readInt(); + StringBuilder builder = new StringBuilder(length); + for (int i = 0; i < length; i++) { + builder.append(input.readChar()); + } + return builder.toString(); + } + + public void serialize(final String data, final DataOutput output) throws IOException { + int length = data.length(); + output.writeInt(length); + output.writeChars(data); + } + + }; + + DataInterpreter STRING_UTF = new DataInterpreter<>() { public String deserialize(final DataInput input) throws IOException { return input.readUTF(); @@ -48,6 +102,12 @@ public void serialize(final String data, final DataOutput output) throws IOExcep }; + /** + * @deprecated v56 Use one of the other alternatives + */ + @Deprecated + DataInterpreter STRING = STRING_UTF; + static > DataInterpreter> newIDX(final DenseArray.Factory denseArray) { ArrayAnyD.Factory factory = ArrayAnyD.factory(denseArray); @@ -136,6 +196,64 @@ public void serialize(final EntryPair.KeyedPrimitive> data, fin }; } + static byte toByte(final byte[] bytes) { + return bytes[0]; + } + + static byte[] toBytes(final byte value) { + return new byte[] { value }; + } + + static byte[] toBytes(final char value) { + return new byte[] { (byte) (value >>> 8 & 0xFF), (byte) (value >>> 0 & 0xFF) }; + } + + static byte[] toBytes(final double value) { + return DataInterpreter.toBytes(Double.doubleToLongBits(value)); + } + + static byte[] toBytes(final float value) { + return DataInterpreter.toBytes(Float.floatToIntBits(value)); + } + + static byte[] toBytes(final int value) { + return new byte[] { (byte) (value >>> 24 & 0xFF), (byte) (value >>> 16 & 0xFF), (byte) (value >>> 8 & 0xFF), (byte) (value >>> 0 & 0xFF) }; + } + + static byte[] toBytes(final long value) { + return new byte[] { (byte) (value >>> 56 & 0xFF), (byte) (value >>> 48 & 0xFF), (byte) (value >>> 40 & 0xFF), (byte) (value >>> 32 & 0xFF), + (byte) (value >>> 24 & 0xFF), (byte) (value >>> 16 & 0xFF), (byte) (value >>> 8 & 0xFF), (byte) (value >>> 0 & 0xFF) }; + } + + static byte[] toBytes(final short value) { + return new byte[] { (byte) (value >>> 8 & 0xFF), (byte) (value >>> 0 & 0xFF) }; + } + + static char toChar(final byte[] bytes) { + return (char) ((bytes[0] & 0xFF) << 8 | bytes[1] & 0xFF); + } + + static double toDouble(final byte[] bytes) { + return Double.longBitsToDouble(DataInterpreter.toLong(bytes)); + } + + static float toFloat(final byte[] bytes) { + return Float.intBitsToFloat(DataInterpreter.toInt(bytes)); + } + + static int toInt(final byte[] bytes) { + return (bytes[0] & 0xFF) << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | bytes[3] & 0xFF; + } + + static long toLong(final byte[] bytes) { + return (long) (bytes[0] & 0xFF) << 56 | (long) (bytes[1] & 0xFF) << 48 | (long) (bytes[2] & 0xFF) << 40 | (long) (bytes[3] & 0xFF) << 32 + | (long) (bytes[4] & 0xFF) << 24 | (long) (bytes[5] & 0xFF) << 16 | (long) (bytes[6] & 0xFF) << 8 | bytes[7] & 0xFF; + } + + static short toShort(final byte[] bytes) { + return (short) ((bytes[0] & 0xFF) << 8 | bytes[1] & 0xFF); + } + default DataReader newReader(final File file) { return DataReader.of(file, this); } diff --git a/src/main/java/org/ojalgo/netio/DataReader.java b/src/main/java/org/ojalgo/netio/DataReader.java index c377a7067..297f7f299 100644 --- a/src/main/java/org/ojalgo/netio/DataReader.java +++ b/src/main/java/org/ojalgo/netio/DataReader.java @@ -24,6 +24,7 @@ import java.io.BufferedInputStream; import java.io.DataInput; import java.io.DataInputStream; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -43,11 +44,14 @@ public interface Deserializer extends Function { /** * Will return null on EOF */ + @Override default T apply(final DataInput input) { try { return this.deserialize(input); - } catch (IOException cause) { + } catch (EOFException cause) { return null; + } catch (IOException cause) { + throw new RuntimeException(cause); } } @@ -83,10 +87,12 @@ public DataReader(final InputStream inputStream, final DataReader.Deserializer extends AutoSupplier, Closeable { +/** + * Essentially just a {@link Supplier}, but assumed to be reading from a file or some other source of data, + * and therefore extends {@link Closeable}. + */ +@FunctionalInterface +public interface FromFileReader extends Iterable, Closeable { - public static final class Builder extends ReaderWriterBuilder { + public static final class Builder extends ReaderWriterBuilder> { - Builder(final File[] files) { + Builder(final F[] files) { super(files); } - public AutoSupplier build(final Function> factory) { + @SuppressWarnings("resource") + public FromFileReader build(final Function> factory) { - File[] files = this.getFiles(); + F[] files = this.getFiles(); - LinkedBlockingDeque queue = new LinkedBlockingDeque<>(this.getQueueCapacity()); + BlockingQueue queue = this.newQueue(this.getQueueCapacity()); - AutoSupplier single; + FromFileReader single = FromFileReader.empty(); if (files.length == 1) { - single = AutoSupplier.queued(this.getExecutor(), queue, factory.apply(files[0])); + single = new QueuedReader<>(this.getExecutor(), queue, factory.apply(files[0])); } else { - LinkedBlockingDeque containers = new LinkedBlockingDeque<>(files.length); + BlockingQueue containers = new LinkedTransferQueue<>(); Collections.addAll(containers, files); - AutoSupplier[] readers = (AutoSupplier[]) new AutoSupplier[this.getParallelism()]; + FromFileReader[] readers = (FromFileReader[]) new FromFileReader[this.getParallelism()]; for (int i = 0; i < readers.length; i++) { - readers[i] = AutoSupplier.sequenced(containers, factory); + readers[i] = new SequencedReader<>(containers, factory); } - single = AutoSupplier.queued(this.getExecutor(), queue, readers); + single = new QueuedReader<>(this.getExecutor(), queue, readers); } if (this.isStatisticsCollector()) { - return AutoSupplier.managed(this.getStatisticsCollector(), single); + return new ManagedReader<>(this.getStatisticsCollector(), single); } else { return single; } @@ -132,6 +147,10 @@ static T deserializeObjectFromFile(final File file) { } } + static FromFileReader empty() { + return () -> null; + } + static InputStream input(final File file) { try { @@ -157,43 +176,79 @@ static InputStream input(final File file, final OperatorWithException Builder newBuilder(final F... file) { + return new Builder<>(file); + } + + static Builder newBuilder(final File file) { + return new Builder<>(new File[] { file }); + } + + static Builder newBuilder(final Path file) { + return new Builder<>(new Path[] { file }); } - static Builder newBuilder(final ShardedFile shards) { - return new Builder(shards.shards()); + static Builder newBuilder(final SegmentedFile segmented) { + return new Builder<>(segmented.getSegments()); + } + + static Builder newBuilder(final ShardedFile sharded) { + return new Builder<>(sharded.shards()); + } + + @Override + default void close() throws IOException { + // Default implementation does nothing } /** - * A factory that produce readers that read items from the supplied sources. (You have a collection of - * files and want to read through them all using 1 or more readers.) + * Behaves similar to {@link BlockingQueue#drainTo(Collection, int)} except that returning 0 means there + * are no more items to read. */ - static Supplier> newFactory(final Function> factory, final Collection sources) { + default int drainTo(final Collection container, final int maxElements) { - BlockingQueue work = new LinkedBlockingDeque<>(sources); + int retVal = 0; - return () -> AutoSupplier.sequenced(work, factory); + T item = null; + while (retVal < maxElements && (item = this.read()) != null) { + container.add(item); + retVal++; + } + + return retVal; } - static Supplier> newFactory(final Function> factory, final S... sources) { + /** + * Similar to {@link #forEach(Consumer)} but processes items in batches. Will extract up to batchSize + * items before calling the action, and then repeat until no more items are available. + */ + default void forEachInBacthes(final int batchSize, final Consumer action) { - BlockingQueue work = new LinkedBlockingDeque<>(); - Collections.addAll(work, sources); + List batch = new ArrayList<>(batchSize); - return () -> AutoSupplier.sequenced(work, factory); + while (this.drainTo(batch, batchSize) > 0) { + batch.forEach(action); + batch.clear(); + } } - default void close() throws IOException { - try { - AutoSupplier.super.close(); - } catch (Exception cause) { - if (cause instanceof IOException) { - throw (IOException) cause; - } else { - throw new RuntimeException(cause); - } - } + @Override + default Iterator iterator() { + return new SupplierIterator<>(this); + } + + default FromFileReader map(final Function mapper) { + return new MappedReader<>(this, mapper); + } + + /** + * Returning null indicates that there are no more items to read. That's the same behaviour as + * {@link BufferedReader#readLine()}. All implementations must return null precisely once. + */ + T read(); + + default Stream stream() { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this.iterator(), Spliterator.ORDERED | Spliterator.NONNULL), false); } } diff --git a/src/main/java/org/ojalgo/type/function/ManagedSupplier.java b/src/main/java/org/ojalgo/netio/ManagedReader.java similarity index 67% rename from src/main/java/org/ojalgo/type/function/ManagedSupplier.java rename to src/main/java/org/ojalgo/netio/ManagedReader.java index 6afd2bacc..86dca6f8c 100644 --- a/src/main/java/org/ojalgo/type/function/ManagedSupplier.java +++ b/src/main/java/org/ojalgo/netio/ManagedReader.java @@ -19,31 +19,44 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; -import java.util.function.Supplier; +import java.io.IOException; +import java.util.Collection; import org.ojalgo.type.management.Throughput; -final class ManagedSupplier implements AutoSupplier { +final class ManagedReader implements FromFileReader { private final Throughput myManager; - private final Supplier mySupplier; + private final FromFileReader myReader; - ManagedSupplier(final Throughput manager, final Supplier supplier) { + ManagedReader(final Throughput manager, final FromFileReader reader) { super(); myManager = manager; - mySupplier = supplier; + myReader = reader; } - public void close() throws Exception { - if (mySupplier instanceof AutoCloseable) { - ((AutoCloseable) mySupplier).close(); + @Override + public void close() throws IOException { + myReader.close(); + } + + @Override + public int drainTo(final Collection container, final int maxElements) { + + int retVal = myReader.drainTo(container, maxElements); + + if (retVal != 0) { + myManager.add(retVal); } + + return retVal; } + @Override public T read() { - T retVal = mySupplier.get(); + T retVal = myReader.read(); if (retVal != null) { myManager.increment(); } diff --git a/src/main/java/org/ojalgo/type/function/ManagedConsumer.java b/src/main/java/org/ojalgo/netio/ManagedWriter.java similarity index 74% rename from src/main/java/org/ojalgo/type/function/ManagedConsumer.java rename to src/main/java/org/ojalgo/netio/ManagedWriter.java index cb190264b..3577e700a 100644 --- a/src/main/java/org/ojalgo/type/function/ManagedConsumer.java +++ b/src/main/java/org/ojalgo/netio/ManagedWriter.java @@ -19,32 +19,32 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; -import java.util.function.Consumer; +import java.io.IOException; import org.ojalgo.type.management.Throughput; -final class ManagedConsumer implements AutoConsumer { +final class ManagedWriter implements ToFileWriter { - private final Consumer myConsumer; private final Throughput myManager; + private final ToFileWriter myWriter; - ManagedConsumer(final Throughput manager, final Consumer consumer) { + ManagedWriter(final Throughput manager, final ToFileWriter writer) { super(); myManager = manager; - myConsumer = consumer; + myWriter = writer; } - public void close() throws Exception { - if (myConsumer instanceof AutoCloseable) { - ((AutoCloseable) myConsumer).close(); - } + @Override + public void close() throws IOException { + myWriter.close(); } + @Override public void write(final T item) { myManager.increment(); - myConsumer.accept(item); + myWriter.write(item); } } diff --git a/src/main/java/org/ojalgo/type/function/MappedSupplier.java b/src/main/java/org/ojalgo/netio/MappedReader.java similarity index 70% rename from src/main/java/org/ojalgo/type/function/MappedSupplier.java rename to src/main/java/org/ojalgo/netio/MappedReader.java index 3d5174abd..1a7e999dd 100644 --- a/src/main/java/org/ojalgo/type/function/MappedSupplier.java +++ b/src/main/java/org/ojalgo/netio/MappedReader.java @@ -19,41 +19,41 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; +import java.io.IOException; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; -final class MappedSupplier implements AutoSupplier { +final class MappedReader implements FromFileReader { - private final Function myMapper; - private final Supplier mySupplier; private final Predicate myFilter; + private final Function myMapper; + private final FromFileReader myReader; - MappedSupplier(final Supplier supplier, final Function mapper) { + MappedReader(final FromFileReader supplier, final Function mapper) { this(supplier, in -> true, mapper); } - MappedSupplier(final Supplier supplier, final Predicate filter, final Function mapper) { + MappedReader(final FromFileReader reader, final Predicate filter, final Function mapper) { super(); - mySupplier = supplier; + myReader = reader; myFilter = filter; myMapper = mapper; } - public void close() throws Exception { - if (mySupplier instanceof AutoCloseable) { - ((AutoCloseable) mySupplier).close(); - } + @Override + public void close() throws IOException { + myReader.close(); } + @Override public OUT read() { IN unmapped = null; OUT retVal = null; - while ((unmapped = mySupplier.get()) != null && (!myFilter.test(unmapped) || (retVal = myMapper.apply(unmapped)) == null)) { + while ((unmapped = myReader.read()) != null && (!myFilter.test(unmapped) || (retVal = myMapper.apply(unmapped)) == null)) { // Read until we get a non-null item that passes the test and mapping works (return not-null) } diff --git a/src/main/java/org/ojalgo/type/function/MappedConsumer.java b/src/main/java/org/ojalgo/netio/MappedWriter.java similarity index 72% rename from src/main/java/org/ojalgo/type/function/MappedConsumer.java rename to src/main/java/org/ojalgo/netio/MappedWriter.java index 21fc8f87e..ccee64ef0 100644 --- a/src/main/java/org/ojalgo/type/function/MappedConsumer.java +++ b/src/main/java/org/ojalgo/netio/MappedWriter.java @@ -19,30 +19,30 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; -import java.util.function.Consumer; +import java.io.IOException; import java.util.function.Function; -final class MappedConsumer implements AutoConsumer { +final class MappedWriter implements ToFileWriter { - private final Consumer myConsumer; + private final ToFileWriter myWriter; private final Function myMapper; - MappedConsumer(final Function mapper, final Consumer consumer) { + MappedWriter(final Function mapper, final ToFileWriter writer) { super(); myMapper = mapper; - myConsumer = consumer; + myWriter = writer; } - public void close() throws Exception { - if (myConsumer instanceof AutoCloseable) { - ((AutoCloseable) myConsumer).close(); - } + @Override + public void close() throws IOException { + myWriter.close(); } + @Override public void write(final IN item) { - myConsumer.accept(myMapper.apply(item)); + myWriter.write(myMapper.apply(item)); } } diff --git a/src/main/java/org/ojalgo/type/function/QueuedSupplier.java b/src/main/java/org/ojalgo/netio/QueuedReader.java similarity index 68% rename from src/main/java/org/ojalgo/type/function/QueuedSupplier.java rename to src/main/java/org/ojalgo/netio/QueuedReader.java index 07b8dd5e5..67e481a2a 100644 --- a/src/main/java/org/ojalgo/type/function/QueuedSupplier.java +++ b/src/main/java/org/ojalgo/netio/QueuedReader.java @@ -19,32 +19,33 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; +import java.io.IOException; import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.function.Supplier; -final class QueuedSupplier implements AutoSupplier { +final class QueuedReader implements FromFileReader { static final class Worker implements Runnable { private final BlockingQueue myQueue; - private final Supplier mySupplier; + private final FromFileReader myReader; - Worker(final BlockingQueue queue, final Supplier reader) { + Worker(final BlockingQueue queue, final FromFileReader reader) { super(); myQueue = queue; - mySupplier = reader; + myReader = reader; } + @Override public void run() { try { T item = null; - while ((item = mySupplier.get()) != null) { + while ((item = myReader.read()) != null) { myQueue.put(item); } } catch (InterruptedException cause) { @@ -56,39 +57,60 @@ public void run() { private final Future[] myFutures; private final BlockingQueue myQueue; - private final Supplier[] mySuppliers; + private final FromFileReader[] myReaders; - QueuedSupplier(final ExecutorService executor, final BlockingQueue queue, final Supplier... suppliers) { + /** + * Multiple suppliers supply to a queue, then you get from that queue. There will be 1 thread (executor + * task) per supplier. + */ + QueuedReader(final ExecutorService executor, final BlockingQueue queue, final FromFileReader... readers) { super(); myQueue = queue; - mySuppliers = suppliers; + myReaders = readers; - myFutures = new Future[suppliers.length]; - for (int i = 0; i < suppliers.length; i++) { - myFutures[i] = executor.submit(new Worker<>(queue, suppliers[i])); + myFutures = new Future[readers.length]; + for (int i = 0; i < readers.length; i++) { + myFutures[i] = executor.submit(new Worker<>(queue, readers[i])); } } @Override - public void close() throws Exception { + public void close() throws IOException { try { for (int i = 0; i < myFutures.length; i++) { myFutures[i].get(); - if (mySuppliers[i] instanceof AutoCloseable) { - ((AutoCloseable) mySuppliers[i]).close(); - } + myReaders[i].close(); } } catch (InterruptedException | ExecutionException cause) { throw new RuntimeException(cause); } } + @Override public int drainTo(final Collection container, final int maxElements) { - return myQueue.drainTo(container, maxElements); + + int drained = myQueue.drainTo(container, maxElements); + + if (drained == 0) { + + T single = this.read(); + + if (single != null) { + container.add(single); + return 1; + } else { + return 0; + } + + } else { + + return drained; + } } + @Override public T read() { T retVal = myQueue.poll(); diff --git a/src/main/java/org/ojalgo/type/function/QueuedConsumer.java b/src/main/java/org/ojalgo/netio/QueuedWriter.java similarity index 72% rename from src/main/java/org/ojalgo/type/function/QueuedConsumer.java rename to src/main/java/org/ojalgo/netio/QueuedWriter.java index 393794123..f0f6139bc 100644 --- a/src/main/java/org/ojalgo/type/function/QueuedConsumer.java +++ b/src/main/java/org/ojalgo/netio/QueuedWriter.java @@ -19,42 +19,37 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.function.Consumer; -final class QueuedConsumer implements AutoConsumer { +final class QueuedWriter implements ToFileWriter { static final class Worker implements Runnable { - private final Consumer myConsumer; - private final QueuedConsumer myParent; + private final ToFileWriter myConsumer; + private final QueuedWriter myParent; - Worker(final QueuedConsumer parent, final Consumer consumer) { + Worker(final QueuedWriter parent, final ToFileWriter consumer) { super(); myParent = parent; myConsumer = consumer; } + @Override public void run() { List batchContainer = myParent.newBatchContainer(); while (myParent.drainTo(batchContainer) != 0 || myParent.isMoreToCome()) { if (batchContainer.size() != 0) { - if (myConsumer instanceof AutoConsumer) { - ((AutoConsumer) myConsumer).writeBatch(batchContainer); - } else { - for (T item : batchContainer) { - myConsumer.accept(item); - } - } + myConsumer.writeBatch(batchContainer); batchContainer.clear(); } else { try { @@ -70,44 +65,43 @@ public void run() { private volatile boolean myActive; private final int myBatchSize; - private final Consumer[] myConsumers; + private final ToFileWriter[] myWriters; private final Future[] myFutures; private final BlockingQueue myQueue; - QueuedConsumer(final ExecutorService executor, final BlockingQueue queue, final Consumer... consumers) { + QueuedWriter(final ExecutorService executor, final BlockingQueue queue, final ToFileWriter... writers) { super(); myQueue = queue; - myConsumers = consumers; + myWriters = writers; - myBatchSize = Math.max(3, queue.remainingCapacity() / (2 + consumers.length)); + myBatchSize = Math.max(3, queue.remainingCapacity() / (2 + writers.length)); myActive = true; - myFutures = new Future[consumers.length]; - for (int i = 0; i < consumers.length; i++) { - myFutures[i] = executor.submit(new Worker<>(this, consumers[i])); + myFutures = new Future[writers.length]; + for (int i = 0; i < writers.length; i++) { + myFutures[i] = executor.submit(new Worker<>(this, writers[i])); } } @Override - public void close() throws Exception { + public void close() throws IOException { myActive = false; try { for (int i = 0; i < myFutures.length; i++) { myFutures[i].get(); - if (myConsumers[i] instanceof AutoCloseable) { - ((AutoCloseable) myConsumers[i]).close(); - } + myWriters[i].close(); } } catch (InterruptedException | ExecutionException cause) { throw new RuntimeException(cause); } } + @Override public void write(final T item) { try { myQueue.put(item); diff --git a/src/main/java/org/ojalgo/netio/ReaderWriterBuilder.java b/src/main/java/org/ojalgo/netio/ReaderWriterBuilder.java index 0c1453c21..2a4cb6108 100644 --- a/src/main/java/org/ojalgo/netio/ReaderWriterBuilder.java +++ b/src/main/java/org/ojalgo/netio/ReaderWriterBuilder.java @@ -21,8 +21,11 @@ */ package org.ojalgo.netio; -import java.io.File; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.SynchronousQueue; import java.util.function.IntSupplier; import org.ojalgo.concurrent.DaemonPoolExecutor; @@ -30,7 +33,7 @@ import org.ojalgo.type.management.MBeanUtils; import org.ojalgo.type.management.Throughput; -abstract class ReaderWriterBuilder> { +abstract class ReaderWriterBuilder> { private static volatile ExecutorService EXECUTOR = null; @@ -46,13 +49,13 @@ static ExecutorService executor() { } private ExecutorService myExecutor = null; - private final File[] myFiles; + private final F[] myFiles; private String myManagerName = null; private IntSupplier myParallelism = Parallelism.CORES.limit(32); private int myQueueCapacity = 1024; private Throughput myStatisticsCollector = null; - ReaderWriterBuilder(final File[] files) { + ReaderWriterBuilder(final F[] files) { super(); myFiles = files; } @@ -62,6 +65,9 @@ public B executor(final ExecutorService executor) { return (B) this; } + /** + * Will create a JMX bean, with the given name, that keeps track of the reader/writer's throughput. + */ public B manager(final String name) { myManagerName = name; return (B) this; @@ -93,7 +99,7 @@ ExecutorService getExecutor() { return myExecutor; } - File[] getFiles() { + F[] getFiles() { return myFiles; } @@ -114,6 +120,17 @@ Throughput getStatisticsCollector() { } boolean isStatisticsCollector() { - return (myStatisticsCollector != null || myManagerName != null); + return myStatisticsCollector != null || myManagerName != null; } + + BlockingQueue newQueue(final int capacity) { + if (capacity == 0) { + return new SynchronousQueue<>(); + } else if (capacity == Integer.MAX_VALUE) { + return new LinkedTransferQueue<>(); + } else { + return new LinkedBlockingQueue<>(capacity); + } + } + } diff --git a/src/main/java/org/ojalgo/type/function/ScoredDualConsumer.java b/src/main/java/org/ojalgo/netio/ScoredDualWriter.java similarity index 91% rename from src/main/java/org/ojalgo/type/function/ScoredDualConsumer.java rename to src/main/java/org/ojalgo/netio/ScoredDualWriter.java index b57e84c74..7037b8867 100644 --- a/src/main/java/org/ojalgo/type/function/ScoredDualConsumer.java +++ b/src/main/java/org/ojalgo/netio/ScoredDualWriter.java @@ -19,14 +19,15 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; import org.ojalgo.type.keyvalue.EntryPair; import org.ojalgo.type.keyvalue.KeyValue; @FunctionalInterface -public interface ScoredDualConsumer extends AutoConsumer>> { +public interface ScoredDualWriter extends ToFileWriter>> { + @Override default void write(final EntryPair.KeyedPrimitive> item) { KeyValue.Dual key = item.getKey(); this.write(key.first, key.second, item.floatValue()); diff --git a/src/main/java/org/ojalgo/netio/SegmentedFile.java b/src/main/java/org/ojalgo/netio/SegmentedFile.java index 5f6c86e2e..3bf2f6d98 100644 --- a/src/main/java/org/ojalgo/netio/SegmentedFile.java +++ b/src/main/java/org/ojalgo/netio/SegmentedFile.java @@ -27,8 +27,13 @@ import java.io.UncheckedIOException; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.function.Function; import java.util.function.IntSupplier; +import java.util.function.Supplier; import org.ojalgo.OjAlgoUtils; import org.ojalgo.concurrent.Parallelism; @@ -245,6 +250,36 @@ public void close() { } } + public FromFileReader newDataReader(final DataReader.Deserializer deserializer) { + return FromFileReader.newBuilder(this).build(segment -> this.newDataReader(segment, deserializer)); + } + + public DataReader newDataReader(final Segment segment, final DataReader.Deserializer deserializer) { + try { + return new DataReader<>(new ByteBufferBackedInputStream(myFileChannel.map(MapMode.READ_ONLY, segment.offset, segment.size)), deserializer); + } catch (IOException cause) { + throw new RuntimeException(cause); + } + } + + /** + * Each reader instantiated by this factory will read from the segments in sequence, until all of them are + * done. The idea is that you can create multiple readers and have them work in parallel (each segment + * will only be read once by one of the readers). If you only instantiate 1 reader, then maybe you + * shouldn't have created file segments in the first place. + */ + public Supplier> newSequencedFactory(final Function> factory) { + + BlockingQueue work = new LinkedTransferQueue<>(); + Collections.addAll(work, mySegments); + + return () -> new SequencedReader<>(work, factory); + } + + public FromFileReader newTextLineReader() { + return FromFileReader.newBuilder(this).build(this::newTextLineReader); + } + /** * Call this once for each file segment, and use the returned {@link TextLineReader} to read the file * segment. The {@link TextLineReader} is not thread safe, and should only be used by a single thread. @@ -261,8 +296,16 @@ public TextLineReader newTextLineReader(final Segment segment) { } } + public FromFileReader newTextLineReader(final TextLineReader.Parser parser) { + return FromFileReader.newBuilder(this).build(segment -> this.newTextLineReader(segment).withParser(parser)); + } + public List segments() { return List.of(mySegments); } + Segment[] getSegments() { + return mySegments; + } + } diff --git a/src/main/java/org/ojalgo/type/function/SequencedSupplier.java b/src/main/java/org/ojalgo/netio/SequencedReader.java similarity index 57% rename from src/main/java/org/ojalgo/type/function/SequencedSupplier.java rename to src/main/java/org/ojalgo/netio/SequencedReader.java index 29b21cfcb..cd247964a 100644 --- a/src/main/java/org/ojalgo/type/function/SequencedSupplier.java +++ b/src/main/java/org/ojalgo/netio/SequencedReader.java @@ -19,19 +19,30 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; +import java.io.File; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.function.Function; -import java.util.function.Supplier; -final class SequencedSupplier implements AutoSupplier { +final class SequencedReader implements FromFileReader { - private Supplier myCurrent; - private final Function> myFactory; + private FromFileReader myCurrent; + private final Function> myFactory; private final BlockingQueue mySources; - SequencedSupplier(final BlockingQueue sources, final Function> factory) { + /** + * Create an {@link AutoSupplier} that will supply items from the containers, one after the other, until + * all containers are empty. You can create multiple such suppliers sharing the same queue of containers. + * + * @param The type of some sort of item container (maybe a {@link File}) + * @param The supplier item type (what do the files contain?) + * @param sources A set of item containers (could be a set of {@link File}:s) + * @param factory A factory method that can take one of the "containers" and return an item supplier. + * @return A sequenced supplier. + */ + SequencedReader(final BlockingQueue sources, final Function> factory) { super(); @@ -41,34 +52,33 @@ final class SequencedSupplier implements AutoSupplier { } @Override - public void close() throws Exception { - if ((myCurrent != null) && (myCurrent instanceof AutoCloseable)) { - ((AutoCloseable) myCurrent).close(); + public void close() throws IOException { + if (myCurrent != null) { + myCurrent.close(); } } + @Override public T read() { if (myCurrent == null) { return null; } - T retVal = myCurrent.get(); + T retVal = myCurrent.read(); if (retVal == null) { - if (myCurrent instanceof AutoCloseable) { - try { - ((AutoCloseable) myCurrent).close(); - } catch (Exception cause) { - throw new RuntimeException(cause); - } + try { + myCurrent.close(); + } catch (Exception cause) { + throw new RuntimeException(cause); } this.nextSupplier(); if (myCurrent != null) { - retVal = myCurrent.get(); + retVal = myCurrent.read(); } } diff --git a/src/main/java/org/ojalgo/netio/ShardedFile.java b/src/main/java/org/ojalgo/netio/ShardedFile.java index c0ac19dcb..80182017c 100644 --- a/src/main/java/org/ojalgo/netio/ShardedFile.java +++ b/src/main/java/org/ojalgo/netio/ShardedFile.java @@ -25,8 +25,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.function.Function; import java.util.function.LongFunction; +import java.util.function.Supplier; import org.ojalgo.type.format.NumberStyle; import org.ojalgo.type.keyvalue.EntryPair; @@ -155,7 +160,20 @@ public int hashCode() { final int prime = 31; int result = 1; result = prime * result + numberOfShards; - return prime * result + ((single == null) ? 0 : single.hashCode()); + return prime * result + (single == null ? 0 : single.hashCode()); + } + + /** + * Each reader instantiated by this factory will read from the shards in sequence, until all of them are + * done. The idea is that you can create multiple readers and have them work in parallel (each shard will + * only be read once by one of the readers). + */ + public Supplier> newSequencedFactory(final Function> factory) { + + BlockingQueue work = new LinkedTransferQueue<>(); + Collections.addAll(work, myShards); + + return () -> new SequencedReader<>(work, factory); } public File shard(final int index) { diff --git a/src/main/java/org/ojalgo/type/function/ShardedConsumer.java b/src/main/java/org/ojalgo/netio/ShardedWriter.java similarity index 70% rename from src/main/java/org/ojalgo/type/function/ShardedConsumer.java rename to src/main/java/org/ojalgo/netio/ShardedWriter.java index 45f209f9f..db780791b 100644 --- a/src/main/java/org/ojalgo/type/function/ShardedConsumer.java +++ b/src/main/java/org/ojalgo/netio/ShardedWriter.java @@ -19,43 +19,44 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; -import java.util.function.Consumer; +import java.io.IOException; import java.util.function.ToIntFunction; import org.ojalgo.function.special.PowerOf2; -abstract class ShardedConsumer implements AutoConsumer { +abstract class ShardedWriter implements ToFileWriter { - static final class GeneralShardedConsumer extends ShardedConsumer { + static final class GeneralShardedConsumer extends ShardedWriter { - private final Consumer[] myConsumers; + private final ToFileWriter[] myWriters; private final ToIntFunction myDistributor; private final int myNumberOfShards; - GeneralShardedConsumer(final ToIntFunction distributor, final Consumer[] consumers) { + GeneralShardedConsumer(final ToIntFunction distributor, final ToFileWriter[] consumers) { super(consumers); - myConsumers = consumers; + myWriters = consumers; myDistributor = distributor; myNumberOfShards = consumers.length; } + @Override public void write(final T item) { - myConsumers[Math.abs(myDistributor.applyAsInt(item) % myNumberOfShards)].accept(item); + myWriters[Math.abs(myDistributor.applyAsInt(item) % myNumberOfShards)].write(item); } } - static final class PowerOf2ShardedConsumer extends ShardedConsumer { + static final class PowerOf2ShardedConsumer extends ShardedWriter { - private final Consumer[] myConsumers; + private final ToFileWriter[] myWriters; private final ToIntFunction myDistributor; private final int myIndexMask; - PowerOf2ShardedConsumer(final ToIntFunction distributor, final Consumer[] consumers) { + PowerOf2ShardedConsumer(final ToIntFunction distributor, final ToFileWriter[] consumers) { super(consumers); @@ -63,18 +64,19 @@ static final class PowerOf2ShardedConsumer extends ShardedConsumer { throw new IllegalArgumentException("The number of consumers must be a power of 2!"); } - myConsumers = consumers; + myWriters = consumers; myDistributor = distributor; myIndexMask = consumers.length - 1; } + @Override public void write(final T item) { - myConsumers[myDistributor.applyAsInt(item) & myIndexMask].accept(item); + myWriters[myDistributor.applyAsInt(item) & myIndexMask].write(item); } } - static ShardedConsumer of(final ToIntFunction distributor, final Consumer[] consumers) { + static ShardedWriter of(final ToIntFunction distributor, final ToFileWriter[] consumers) { if (PowerOf2.isPowerOf2(consumers.length)) { return new PowerOf2ShardedConsumer<>(distributor, consumers); } else { @@ -82,20 +84,19 @@ static ShardedConsumer of(final ToIntFunction distributor, final Consu } } - private final Consumer[] myConsumers; + private final ToFileWriter[] myWriters; - ShardedConsumer(final Consumer[] consumers) { + ShardedWriter(final ToFileWriter[] consumers) { super(); - myConsumers = consumers; + myWriters = consumers; } - public void close() throws Exception { - for (Consumer consumer : myConsumers) { - if (consumer instanceof AutoCloseable) { - ((AutoCloseable) consumer).close(); - } + @Override + public void close() throws IOException { + for (ToFileWriter consumer : myWriters) { + consumer.close(); } } diff --git a/src/main/java/org/ojalgo/type/function/SupplierIterator.java b/src/main/java/org/ojalgo/netio/SupplierIterator.java similarity index 87% rename from src/main/java/org/ojalgo/type/function/SupplierIterator.java rename to src/main/java/org/ojalgo/netio/SupplierIterator.java index 58d2e378b..a37a6a653 100644 --- a/src/main/java/org/ojalgo/type/function/SupplierIterator.java +++ b/src/main/java/org/ojalgo/netio/SupplierIterator.java @@ -19,7 +19,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.netio; import java.util.Iterator; import java.util.NoSuchElementException; @@ -27,24 +27,26 @@ final class SupplierIterator implements Iterator { private transient T myNext; - private final AutoSupplier mySupplier; + private final FromFileReader mySupplier; - SupplierIterator(final AutoSupplier supplier) { + SupplierIterator(final FromFileReader supplier) { super(); mySupplier = supplier; - myNext = mySupplier.get(); + myNext = mySupplier.read(); } + @Override public boolean hasNext() { return myNext != null; } + @Override public T next() { if (myNext == null) { throw new NoSuchElementException(); } T retVal = myNext; - myNext = mySupplier.get(); + myNext = mySupplier.read(); return retVal; } diff --git a/src/main/java/org/ojalgo/netio/TextLineInterpreter.java b/src/main/java/org/ojalgo/netio/TextLineInterpreter.java index f51d266a1..ff8591fbf 100644 --- a/src/main/java/org/ojalgo/netio/TextLineInterpreter.java +++ b/src/main/java/org/ojalgo/netio/TextLineInterpreter.java @@ -25,25 +25,23 @@ import java.io.InputStream; import java.io.OutputStream; -import org.ojalgo.type.function.AutoConsumer; -import org.ojalgo.type.function.AutoSupplier; import org.ojalgo.type.function.OperatorWithException; public interface TextLineInterpreter extends TextLineReader.Parser, TextLineWriter.Formatter { - default AutoSupplier newReader(final File file) { + default FromFileReader newReader(final File file) { return TextLineReader.of(file).withParser(this); } - default AutoSupplier newReader(final File file, final OperatorWithException filter) { + default FromFileReader newReader(final File file, final OperatorWithException filter) { return TextLineReader.of(file, filter).withParser(this); } - default AutoConsumer newWriter(final File file) { + default ToFileWriter newWriter(final File file) { return TextLineWriter.of(file).withFormatter(this); } - default AutoConsumer newWriter(final File file, final OperatorWithException filter) { + default ToFileWriter newWriter(final File file, final OperatorWithException filter) { return TextLineWriter.of(file, filter).withFormatter(this); } diff --git a/src/main/java/org/ojalgo/netio/TextLineReader.java b/src/main/java/org/ojalgo/netio/TextLineReader.java index 2b45b66f9..1cc35c697 100644 --- a/src/main/java/org/ojalgo/netio/TextLineReader.java +++ b/src/main/java/org/ojalgo/netio/TextLineReader.java @@ -30,7 +30,6 @@ import java.io.UnsupportedEncodingException; import java.util.function.Predicate; -import org.ojalgo.type.function.AutoSupplier; import org.ojalgo.type.function.OperatorWithException; public final class TextLineReader implements FromFileReader { @@ -104,19 +103,19 @@ public String read() { /** * The filter is {@link TextLineReader#isLineOK(String)} */ - public AutoSupplier withFilteredParser(final Parser parser) { - return AutoSupplier.mapped(this, TextLineReader::isLineOK, parser::parse); + public FromFileReader withFilteredParser(final Parser parser) { + return new MappedReader<>(this, TextLineReader::isLineOK, parser::parse); } /** * The filter could for instance be {@link TextLineReader#isLineOK(String)} */ - public AutoSupplier withFilteredParser(final Predicate filter, final Parser parser) { - return AutoSupplier.mapped(this, filter, parser::parse); + public FromFileReader withFilteredParser(final Predicate filter, final Parser parser) { + return new MappedReader<>(this, filter, parser::parse); } - public AutoSupplier withParser(final Parser parser) { - return AutoSupplier.mapped(this, parser::parse); + public FromFileReader withParser(final Parser parser) { + return new MappedReader<>(this, parser::parse); } } diff --git a/src/main/java/org/ojalgo/netio/TextLineWriter.java b/src/main/java/org/ojalgo/netio/TextLineWriter.java index a83e3b361..0d311f08a 100644 --- a/src/main/java/org/ojalgo/netio/TextLineWriter.java +++ b/src/main/java/org/ojalgo/netio/TextLineWriter.java @@ -28,7 +28,6 @@ import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; -import org.ojalgo.type.function.AutoConsumer; import org.ojalgo.type.function.OperatorWithException; public final class TextLineWriter implements ToFileWriter { @@ -180,8 +179,8 @@ public CSVLineBuilder newCSVLineBuilder(final String delimiter) { return new CSVLineBuilder(this, delimiter); } - public AutoConsumer withFormatter(final Formatter formatter) { - return AutoConsumer.mapped(formatter::format, this); + public ToFileWriter withFormatter(final Formatter formatter) { + return new MappedWriter<>(formatter::format, this); } @Override diff --git a/src/main/java/org/ojalgo/netio/ToFileWriter.java b/src/main/java/org/ojalgo/netio/ToFileWriter.java index 6900d7768..6b3b89001 100644 --- a/src/main/java/org/ojalgo/netio/ToFileWriter.java +++ b/src/main/java/org/ojalgo/netio/ToFileWriter.java @@ -28,36 +28,42 @@ import java.io.ObjectOutputStream; import java.io.OutputStream; import java.io.Serializable; +import java.nio.file.Path; import java.util.Arrays; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.ToIntFunction; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipOutputStream; -import org.ojalgo.type.function.AutoConsumer; import org.ojalgo.type.keyvalue.EntryPair.KeyedPrimitive; -public interface ToFileWriter extends AutoConsumer, Closeable { +/** + * Essentially just a {@link Consumer}, but assumed to be writing to a file or similar, and therefore extends + * {@link Closeable}. + */ +@FunctionalInterface +public interface ToFileWriter extends Closeable { - public static final class Builder extends ReaderWriterBuilder { + public static final class Builder extends ReaderWriterBuilder> { - Builder(final File[] files) { + Builder(final F[] files) { super(files); } - public AutoConsumer build(final Function> factory) { + public ToFileWriter build(final Function> factory) { return this.build(Object::hashCode, factory); } @SuppressWarnings("resource") - public AutoConsumer build(final ToIntFunction distributor, final Function> factory) { + public ToFileWriter build(final ToIntFunction distributor, final Function> factory) { - File[] files = this.getFiles(); + F[] files = this.getFiles(); - AutoConsumer[] shards = (AutoConsumer[]) new AutoConsumer[files.length]; + ToFileWriter[] shards = (ToFileWriter[]) new ToFileWriter[files.length]; for (int i = 0; i < shards.length; i++) { shards[i] = factory.apply(files[i]); } @@ -66,98 +72,113 @@ public AutoConsumer build(final ToIntFunction distributor, final Funct int parallelism = this.getParallelism(); ExecutorService executor = this.getExecutor(); - int numberOfShards = shards.length; - int numberOfQueues = Math.max(1, Math.min(parallelism, numberOfShards)); - int capacityPerQueue = Math.max(3, queueCapacity / numberOfQueues); + int nbShards = shards.length; + int nbQueues = Math.max(1, Math.min(parallelism, nbShards)); + int capacityPerQueue = Math.max(0, queueCapacity / nbQueues); + + ToFileWriter single; + + if (nbShards == 1) { - AutoConsumer single; + BlockingQueue queue = this.newQueue(capacityPerQueue); - if (numberOfShards == 1) { + single = new QueuedWriter<>(executor, queue, shards[0]); - LinkedBlockingQueue queue = new LinkedBlockingQueue<>(capacityPerQueue); - single = AutoConsumer.queued(executor, queue, shards[0]); + } else if (nbQueues == 1) { - } else if (numberOfQueues == 1) { + BlockingQueue queue = this.newQueue(capacityPerQueue); + ToFileWriter consumer = ShardedWriter.of(distributor, shards); - LinkedBlockingQueue queue = new LinkedBlockingQueue<>(capacityPerQueue); - AutoConsumer consumer = AutoConsumer.sharded(distributor, shards); - single = AutoConsumer.queued(executor, queue, consumer); + single = new QueuedWriter<>(executor, queue, consumer); - } else if (numberOfQueues == numberOfShards) { + } else if (nbQueues == nbShards) { - AutoConsumer[] queuedWriters = (AutoConsumer[]) new AutoConsumer[numberOfQueues]; + ToFileWriter[] queuedWriters = (ToFileWriter[]) new ToFileWriter[nbQueues]; - for (int q = 0; q < numberOfQueues; q++) { - LinkedBlockingQueue queue = new LinkedBlockingQueue<>(capacityPerQueue); - queuedWriters[q] = AutoConsumer.queued(executor, queue, shards[q]); + for (int q = 0; q < nbQueues; q++) { + BlockingQueue queue = this.newQueue(capacityPerQueue); + queuedWriters[q] = new QueuedWriter<>(executor, queue, shards[q]); } - single = AutoConsumer.sharded(distributor, queuedWriters); + single = ShardedWriter.of(distributor, queuedWriters); } else { - int candidateShardsPerQueue = numberOfShards / numberOfQueues; - while (candidateShardsPerQueue * numberOfQueues < numberOfShards) { + int candidateShardsPerQueue = nbShards / nbQueues; + while (candidateShardsPerQueue * nbQueues < nbShards) { candidateShardsPerQueue++; } int shardsPerQueue = candidateShardsPerQueue; - AutoConsumer[] queuedWriters = (AutoConsumer[]) new AutoConsumer[numberOfQueues]; + ToFileWriter[] queuedWriters = (ToFileWriter[]) new ToFileWriter[nbQueues]; - ToIntFunction toQueueDistributor = item -> Math.abs(distributor.applyAsInt(item) % numberOfShards) / shardsPerQueue; - ToIntFunction toShardDistributor = item -> Math.abs(distributor.applyAsInt(item) % numberOfShards) % shardsPerQueue; + ToIntFunction toQueueDistributor = item -> Math.abs(distributor.applyAsInt(item) % nbShards) / shardsPerQueue; + ToIntFunction toShardDistributor = item -> Math.abs(distributor.applyAsInt(item) % nbShards) % shardsPerQueue; - for (int q = 0; q < numberOfQueues; q++) { + for (int q = 0; q < nbQueues; q++) { int offset = q * shardsPerQueue; - AutoConsumer[] shardWriters = (AutoConsumer[]) new AutoConsumer[shardsPerQueue]; - Arrays.fill(shardWriters, AutoConsumer.NULL); - for (int b = 0; b < shardsPerQueue && offset + b < numberOfShards; b++) { + ToFileWriter[] shardWriters = (ToFileWriter[]) new ToFileWriter[shardsPerQueue]; + Arrays.fill(shardWriters, ToFileWriter.NULL); + for (int b = 0; b < shardsPerQueue && offset + b < nbShards; b++) { shardWriters[b] = shards[offset + b]; } - LinkedBlockingQueue queue = new LinkedBlockingQueue<>(capacityPerQueue); - AutoConsumer writer = AutoConsumer.sharded(toShardDistributor, shardWriters); - queuedWriters[q] = AutoConsumer.queued(executor, queue, writer); + BlockingQueue queue = this.newQueue(capacityPerQueue); + ToFileWriter writer = ShardedWriter.of(toShardDistributor, shardWriters); + + queuedWriters[q] = new QueuedWriter<>(executor, queue, writer); } - single = AutoConsumer.sharded(toQueueDistributor, queuedWriters); + single = ShardedWriter.of(toQueueDistributor, queuedWriters); } if (this.isStatisticsCollector()) { - return AutoConsumer.managed(this.getStatisticsCollector(), single); + return new ManagedWriter<>(this.getStatisticsCollector(), single); } else { return single; } } - public AutoConsumer> buildMapped(final Function> factory) { + public ToFileWriter> buildMapped(final Function> factory) { Function, T> mapper = KeyedPrimitive::getKey; ToIntFunction> distributor = KeyedPrimitive::intValue; - Function>> mappedFactory = file -> AutoConsumer.mapped(mapper, factory.apply(file)); + Function>> mappedFactory = file -> new MappedWriter<>(mapper, factory.apply(file)); return this.build(distributor, mappedFactory); } } + ToFileWriter NULL = item -> { + throw new IllegalStateException("NULL!"); + }; + /** * Make sure this directory exists, create if necessary */ static void mkdirs(final File dir) { - if (!dir.exists() && (!dir.mkdirs() && !dir.exists())) { + if (!dir.exists() && !dir.mkdirs() && !dir.exists()) { throw new RuntimeException("Failed to create " + dir.getAbsolutePath()); } } - static Builder newBuilder(final File... file) { - return new Builder(file); + static Builder newBuilder(final F... file) { + return new Builder<>(file); } - static Builder newBuilder(final ShardedFile shards) { - return new Builder(shards.shards()); + static Builder newBuilder(final File file) { + return new Builder<>(new File[] { file }); + } + + static Builder newBuilder(final Path file) { + return new Builder<>(new Path[] { file }); + } + + static Builder newBuilder(final ShardedFile sharded) { + return new Builder<>(sharded.shards()); } static OutputStream output(final File file) { @@ -189,15 +210,26 @@ static void serializeObjectToFile(final T object, final } } + @Override default void close() throws IOException { - try { - AutoConsumer.super.close(); - } catch (Exception cause) { - if (cause instanceof IOException) { - throw (IOException) cause; - } else { - throw new RuntimeException(cause); - } + // Default implementation does nothing + } + + /** + * Write the item to the consumer. + * + * @param item The item to be written + */ + void write(T item); + + /** + * Write the batch (collection of items) to the consumer. + * + * @param batch The batch to be written + */ + default void writeBatch(final Iterable batch) { + for (T item : batch) { + this.write(item); } } diff --git a/src/main/java/org/ojalgo/random/SampleSet.java b/src/main/java/org/ojalgo/random/SampleSet.java index c59688d85..246bb390b 100644 --- a/src/main/java/org/ojalgo/random/SampleSet.java +++ b/src/main/java/org/ojalgo/random/SampleSet.java @@ -294,6 +294,13 @@ public double getMedian() { return this.getQuartile2(); } + /** + * The mean of the highest and lowest values. (Max + Min) / 2 + */ + public double getMidrange() { + return (this.getMaximum() + this.getMinimum()) / TWO; + } + /** * min(value) */ @@ -353,6 +360,13 @@ public double getQuartile3() { return myQuartile3; } + /** + * The difference between the highest and lowest values. Max - Min + */ + public double getRange() { + return this.getMaximum() - this.getMinimum(); + } + /** * min(abs(value)) */ @@ -560,10 +574,10 @@ Access1D getSamples() { if (mySortedCopy == null || mySortedCopy.length < nbSamples || mySortedCopy.length == 0) { mySortedCopy = samples.toRawCopy1D(); - Arrays.parallelSort(mySortedCopy); + Arrays.sort(mySortedCopy); } else if (mySortedCopy[0] == Double.POSITIVE_INFINITY) { FillMatchingSingle.fill(mySortedCopy, samples); - Arrays.parallelSort(mySortedCopy, 0, nbSamples); + Arrays.sort(mySortedCopy, 0, nbSamples); } return mySortedCopy; diff --git a/src/main/java/org/ojalgo/type/ObjectPool.java b/src/main/java/org/ojalgo/type/ObjectPool.java index 7a68cc8a5..a42ad7018 100644 --- a/src/main/java/org/ojalgo/type/ObjectPool.java +++ b/src/main/java/org/ojalgo/type/ObjectPool.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedTransferQueue; import org.ojalgo.ProgrammingError; @@ -33,7 +34,7 @@ public abstract class ObjectPool { public ObjectPool() { super(); - myObjects = new LinkedBlockingQueue<>(); + myObjects = new LinkedTransferQueue<>(); myLimited = false; } diff --git a/src/main/java/org/ojalgo/type/function/AutoConsumer.java b/src/main/java/org/ojalgo/type/function/AutoConsumer.java deleted file mode 100644 index e748f5038..000000000 --- a/src/main/java/org/ojalgo/type/function/AutoConsumer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 1997-2024 Optimatika - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package org.ojalgo.type.function; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.ToIntFunction; - -import org.ojalgo.type.management.MBeanUtils; -import org.ojalgo.type.management.Throughput; - -/** - * Utilities for {@link AutoCloseable} {@link Consumer}:s - * - * @author apete - */ -@FunctionalInterface -public interface AutoConsumer extends AutoCloseable, Consumer, AutoFunctional { - - AutoConsumer NULL = item -> { - throw new IllegalStateException("NULL!"); - }; - - /** - * Will create a JMX bean, with the given name, that keeps track of the consumer's throughput. - */ - static AutoConsumer managed(final String name, final Consumer consumer) { - - Throughput manager = new Throughput(); - - MBeanUtils.register(manager, name); - - return new ManagedConsumer<>(manager, consumer); - } - - /** - * If you want that throughput manager to be registered as a JMX bean, that's up to you. - */ - static AutoConsumer managed(final Throughput manager, final Consumer consumer) { - return new ManagedConsumer<>(manager, consumer); - } - - /** - * Map/transform and then consume - */ - static AutoConsumer mapped(final Function mapper, final Consumer consumer) { - return new MappedConsumer<>(mapper, consumer); - } - - /** - * Put on the queue, and then the consumers work off that queue. There will be 1 thread (executor task) - * per consumer. - */ - static AutoConsumer queued(final ExecutorService executor, final BlockingQueue queue, final Consumer... consumers) { - return new QueuedConsumer<>(executor, queue, consumers); - } - - /** - * Distribute to 1 of the consumers - */ - static AutoConsumer sharded(final ToIntFunction distributor, final Consumer... consumers) { - return ShardedConsumer.of(distributor, consumers); - } - - /** - * @see #write(Object) - */ - @Override - default void accept(final T item) { - this.write(item); - } - - @Override - default void close() throws Exception { - // Default implementation does nothing - } - - /** - * Write the item to the consumer. - * - * @param item The item to be written - */ - void write(T item); - - /** - * Write the batch (collection of items) to the consumer. - * - * @param batch The batch to be written - */ - default void writeBatch(final Iterable batch) { - for (T item : batch) { - this.write(item); - } - } -} diff --git a/src/main/java/org/ojalgo/type/function/AutoSupplier.java b/src/main/java/org/ojalgo/type/function/AutoSupplier.java deleted file mode 100644 index 5e3d8d004..000000000 --- a/src/main/java/org/ojalgo/type/function/AutoSupplier.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 1997-2024 Optimatika - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package org.ojalgo.type.function; - -import java.io.File; -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.function.Supplier; - -import org.ojalgo.type.management.MBeanUtils; -import org.ojalgo.type.management.Throughput; - -/** - * Utilities for {@link AutoCloseable} {@link Supplier}:s - * - * @author apete - */ -@FunctionalInterface -public interface AutoSupplier extends AutoCloseable, Supplier, AutoFunctional, Iterable { - - static AutoSupplier empty() { - return () -> null; - } - - /** - * Will create a JMX bean, with the given name, that keeps track of the supplier's throughput. - */ - static AutoSupplier managed(final String name, final Supplier supplier) { - - Throughput manager = new Throughput(); - - MBeanUtils.register(manager, name); - - return new ManagedSupplier<>(manager, supplier); - } - - /** - * If you want that throughput manager to be registered as a JMX bean, that's up to you. - */ - static AutoSupplier managed(final Throughput manager, final Supplier supplier) { - return new ManagedSupplier<>(manager, supplier); - } - - /** - * Get something and map/transform before returning it - */ - static AutoSupplier mapped(final Supplier supplier, final Function mapper) { - return new MappedSupplier<>(supplier, mapper); - } - - /** - * Get something, that passes the test, and map/transform before returning it - */ - static AutoSupplier mapped(final Supplier supplier, final Predicate filter, final Function mapper) { - return new MappedSupplier<>(supplier, filter, mapper); - } - - /** - * Multiple suppliers supply to a queue, then you get from that queue. There will be 1 thread (executor - * task) per supplier. - */ - static AutoSupplier queued(final ExecutorService executor, final BlockingQueue queue, final Supplier... suppliers) { - return new QueuedSupplier<>(executor, queue, suppliers); - } - - static AutoSupplier sequenced(final BlockingQueue> sources) { - return new SequencedSupplier<>(sources, s -> s); - } - - /** - * Create an {@link AutoSupplier} that will supply items from the containers, one after the other, until - * all containers are empty. You can create multiple such suppliers sharing the same queue of containers. - * - * @param The type of some sort of item container (maybe a {@link File}) - * @param The supplier item type (what do the files contain?) - * @param sources A set of item containers (could be a set of {@link File}:s) - * @param factory A factory method that can take one of the "containers" and return an item supplier. - * @return A sequenced supplier. - */ - static AutoSupplier sequenced(final BlockingQueue sources, final Function> factory) { - return new SequencedSupplier<>(sources, factory); - } - - @Override - default void close() throws Exception { - // Default implementation does nothing - } - - default int drainTo(final Collection container, final int maxElements) { - - int retVal = 0; - - T item = null; - while (retVal < maxElements && (item = this.get()) != null) { - container.add(item); - retVal++; - } - - return retVal; - } - - @Override - default T get() { - return this.read(); - } - - @Override - default Iterator iterator() { - return new SupplierIterator<>(this); - } - - default void processAll(final Consumer processor) { - for (T item : this) { - processor.accept(item); - } - } - - T read(); -} diff --git a/src/test/java/org/ojalgo/TestUtils.java b/src/test/java/org/ojalgo/TestUtils.java index 1437530ec..12146753a 100644 --- a/src/test/java/org/ojalgo/TestUtils.java +++ b/src/test/java/org/ojalgo/TestUtils.java @@ -74,6 +74,10 @@ public abstract class TestUtils /* extends Assertions */ { private static final NumberContext EQUALS = NumberContext.of(12); + public static void assertArrayEquals(final byte[] expected, final byte[] actual) { + Assertions.assertArrayEquals(expected, actual); + } + public static void assertBounds(final Comparable lower, final Access1D values, final Comparable upper, final NumberContext precision) { for (ElementView1D tmpValue : values.elements()) { TestUtils.assertBounds(lower, tmpValue.get(), upper, precision); diff --git a/src/test/java/org/ojalgo/data/cluster/BasicTest.java b/src/test/java/org/ojalgo/data/cluster/BasicTest.java new file mode 100644 index 000000000..5bb0b57b9 --- /dev/null +++ b/src/test/java/org/ojalgo/data/cluster/BasicTest.java @@ -0,0 +1,72 @@ +package org.ojalgo.data.cluster; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Test; +import org.ojalgo.TestUtils; + +public class BasicTest extends ClusterTests { + + static final int K = 2; + + final Set cluster1; + final Set cluster2; + + final List points = new ArrayList<>(); + + BasicTest() { + + super(); + + Point.Factory factory = Point.newFactory(K); + cluster1 = Set.of(factory.newPoint(1, 0), factory.newPoint(1, 4), factory.newPoint(1, 3), factory.newPoint(1, 1), factory.newPoint(1, 2)); + cluster2 = Set.of(factory.newPoint(9, 0), factory.newPoint(9, -4), factory.newPoint(9, -3), factory.newPoint(9, -1), factory.newPoint(9, -2)); + + points.addAll(cluster1); + points.addAll(cluster2); + } + + @Test + void testAuto() { + + List> autoconfig = Point.cluster(points); + if (DEBUG) { + this.printClusters("autoconfig", autoconfig); + } + TestUtils.assertEquals(K, autoconfig.size()); + TestUtils.assertTrue(autoconfig.contains(cluster1)); + TestUtils.assertTrue(autoconfig.contains(cluster2)); + } + + @Test + void testGready() { + + double threshold = 18.0; + // max sum of squares within cluster is 4^2 = 16.0 + // min sum of squares between clusters is 8^2 = 64.0 + List> greedy = Point.newGreedyClusterer(threshold).cluster(points); + + if (DEBUG) { + this.printClusters("greedy", greedy); + } + TestUtils.assertEquals(K, greedy.size()); + TestUtils.assertTrue(greedy.contains(cluster1)); + TestUtils.assertTrue(greedy.contains(cluster2)); + } + + @Test + void testKMeans() { + + List> kmeans = Point.newKMeansClusterer(K).cluster(points); + + if (DEBUG) { + this.printClusters("k-means", kmeans); + } + TestUtils.assertEquals(K, kmeans.size()); + TestUtils.assertTrue(kmeans.contains(cluster1)); + TestUtils.assertTrue(kmeans.contains(cluster2)); + } + +} diff --git a/src/main/java/org/ojalgo/type/function/AutoFunctional.java b/src/test/java/org/ojalgo/data/cluster/ClusterTests.java similarity index 71% rename from src/main/java/org/ojalgo/type/function/AutoFunctional.java rename to src/test/java/org/ojalgo/data/cluster/ClusterTests.java index b1b9d3ce6..0e281c446 100644 --- a/src/main/java/org/ojalgo/type/function/AutoFunctional.java +++ b/src/test/java/org/ojalgo/data/cluster/ClusterTests.java @@ -19,8 +19,22 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -package org.ojalgo.type.function; +package org.ojalgo.data.cluster; -interface AutoFunctional { +import java.util.List; +import java.util.Set; + +import org.ojalgo.netio.BasicLogger; + +abstract class ClusterTests { + + static final boolean DEBUG = false; + + void printClusters(final String label, final List> clusters) { + BasicLogger.debug(label); + for (int i = 0; i < clusters.size(); i++) { + BasicLogger.debug("Cluster " + (i + 1) + ": " + clusters.get(i)); + } + } } diff --git a/src/test/java/org/ojalgo/data/cluster/KaggleTest.java b/src/test/java/org/ojalgo/data/cluster/KaggleTest.java new file mode 100644 index 000000000..cedcc5d60 --- /dev/null +++ b/src/test/java/org/ojalgo/data/cluster/KaggleTest.java @@ -0,0 +1,127 @@ +package org.ojalgo.data.cluster; + +import java.io.InputStream; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.ojalgo.TestUtils; +import org.ojalgo.netio.ASCII; +import org.ojalgo.netio.BasicLogger; +import org.ojalgo.netio.FromFileReader; +import org.ojalgo.netio.TextLineReader; + +public class KaggleTest extends ClusterTests { + + static final class MallCustomer { + + static boolean filter(final String line) { + return TextLineReader.isLineOK(line) && ASCII.isDigit(line.charAt(0)); + } + + static MallCustomer parse(final String line) { + + String[] parts = line.split(","); + + if (parts.length != 5 && !ASCII.isDigit(parts[0].charAt(0))) { + throw new IllegalArgumentException("Invalid input line: " + line); + + } + + int customerID = Integer.parseInt(parts[0]); + boolean gender = "Male".equals(parts[1]); + int age = Integer.parseInt(parts[2]); + int annualIncome = Integer.parseInt(parts[3]); + int spendingScore = Integer.parseInt(parts[4]); + + return new MallCustomer(customerID, gender, age, annualIncome, spendingScore); + } + + /** + * Age + */ + int age; + /** + * Annual Income (k$) + */ + int annualIncome; + /** + * CustomerID + */ + int customerID; + /** + * Gender (Male=true=10, Female=false=0) + */ + boolean gender; + /** + * Spending Score (1-100) + */ + int spendingScore; + + MallCustomer(final int customerID, final boolean gender, final int age, final int annualIncome, final int spendingScore) { + super(); + this.customerID = customerID; + this.gender = gender; + this.age = age; + this.annualIncome = annualIncome; + this.spendingScore = spendingScore; + } + + } + + static void describe(final Collection cluster) { + BasicLogger.debug("Size: {}", cluster.size()); + BasicLogger.debug("Average: {}", Point.mean(cluster)); + BasicLogger.debug(); + + } + + /** + * https://www.kaggle.com/datasets/vjchoudhary7/customer-segmentation-tutorial-in-python/data + *

+ * https://www.kaggleusercontent.com/kf/211731631/eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0..bGWxHGVeU7-W8E4h6GAXQw.h1RcWyg0CmbXxHKtbuR6FL-cDjxmYvJMt8iFY-KATMuSqNdK9HJVII1ENb3Z5X97bVxta8DTis-mVM5-kWSUbwLzZqZjr7_YqKAYLVrEyjBcuX1WAvUSDp6m0PYGw014rhbvLBlLSSiykj3H10yx_5aBmBhSFnuLlvAx3ff5e2Wv0A2HaDXTzwibTR8fKx8KlWukMimniO951pWB2075HD4H30FKKiLaSQ0AgsA2gOj3qFiU14xG5hzlpzA_SVehLzUX4TAKuXtLCnQEpaNDdpkfY_X91JiD_zdyLzRLh7rwXoazpkFIZmBkAzemWn4STlDFDP5F-aLaLxZbb8eimZMdT7FFWOk0IYU-9bw9FxtswaQWpZTOzwsfmIhpsIqKgMZObPAIQxKsi1QN09NXiWfGRa_GNxa2sVU9XDRK0ObkGoAUaVJT9kHgJ5hIGDnI3QOLlP59BRR-AGMbbEEVey_bTGLJbSj0dd3USjdYs4oZTzXFlpIOCrZEogJm87vAErgpCRh2WKielomIJcDUIdTKeGNwLatj8l9c-LiIrHEdKncuSBXEJAPAAC34534hG4ImZ-oZxdTk1Rhyrsqhq3RNvftm9brn5okAtQbP1jm9FzLSJajGvCuNXeGCgGY_9Tpgg7W2j4xbwH9PuUZ512xa7RDNUkRBKe0fEC_HBjU.FfPHSdytJZY5KYiVkmnkag/__results___files/__results___15_0.png + */ + @Test + void testMallCustomerSegmentation() { + + try (InputStream input = TestUtils.getResource("kaggle", "Mall_Customers.csv"); + FromFileReader reader = new TextLineReader(input).withFilteredParser(MallCustomer::filter, MallCustomer::parse)) { + + List customers = reader.stream().collect(Collectors.toList()); + + Function converter = customer -> new float[] { customer.gender ? 10 : 0, customer.age, customer.annualIncome, + customer.spendingScore }; + + List points = Point.convert(customers, converter); + + TestUtils.assertEquals(200, points.size()); + + List> clusters = Point.cluster(points); + + TestUtils.assertEquals(5, clusters.size()); + TestUtils.assertEquals(200, clusters.stream().mapToInt(Set::size).sum()); + + clusters.sort(Comparator.comparing(Set::size).reversed()); + + if (DEBUG) { + BasicLogger.debug(); + BasicLogger.debug("Complete data set"); + BasicLogger.debug("============================================"); + KaggleTest.describe(points); + BasicLogger.debug("Clusters (ordered by decreasing size)"); + BasicLogger.debug("============================================"); + for (Set cluster : clusters) { + KaggleTest.describe(cluster); + } + } + + } catch (Exception cause) { + throw new RuntimeException(cause); + } + } + +} diff --git a/src/test/java/org/ojalgo/data/domain/finance/series/DataParserTest.java b/src/test/java/org/ojalgo/data/domain/finance/series/DataParserTest.java index db85ea5f0..eaed053fb 100644 --- a/src/test/java/org/ojalgo/data/domain/finance/series/DataParserTest.java +++ b/src/test/java/org/ojalgo/data/domain/finance/series/DataParserTest.java @@ -30,8 +30,8 @@ import org.junit.jupiter.api.Test; import org.ojalgo.TestUtils; import org.ojalgo.function.constant.PrimitiveMath; +import org.ojalgo.netio.FromFileReader; import org.ojalgo.netio.TextLineReader; -import org.ojalgo.type.function.AutoSupplier; public class DataParserTest extends FinanceSeriesTests { @@ -39,6 +39,7 @@ static class ResultsConsumer implements Consumer { List data = new ArrayList<>(); + @Override public void accept(final DP parsed) { data.add(parsed); } @@ -72,7 +73,7 @@ static void compareWithDetectingParser(final File file, final ResultsConsumer DatePriceParser parser = new DatePriceParser(); // try (TextLineReader reader = TextLineReader.of(file); AutoSupplier supplier = reader.withFilteredParser(BasicParser::isLineOK, parser)) { - try (TextLineReader reader = TextLineReader.of(file); AutoSupplier supplier = reader.withParser(parser)) { + try (TextLineReader reader = TextLineReader.of(file); FromFileReader supplier = reader.withParser(parser)) { ResultsConsumer collector = new ResultsConsumer<>(); diff --git a/src/test/java/org/ojalgo/netio/DataInterpreterTest.java b/src/test/java/org/ojalgo/netio/DataInterpreterTest.java new file mode 100644 index 000000000..84fba78a8 --- /dev/null +++ b/src/test/java/org/ojalgo/netio/DataInterpreterTest.java @@ -0,0 +1,87 @@ +package org.ojalgo.netio; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.junit.jupiter.api.Test; +import org.ojalgo.TestUtils; + +public class DataInterpreterTest extends NetioTests { + + private static void doSequenceTest(final DataInterpreter interpreter) throws IOException { + + InMemoryFile file = new InMemoryFile(); + + try (DataWriter writer = new DataWriter<>(file.newOutputStream(), interpreter)) { + writer.write("apete"); + writer.write("abc"); + writer.write("123"); + } + + try (DataReader reader = new DataReader<>(file.newInputStream(), interpreter)) { + TestUtils.assertEquals("apete", reader.read()); + TestUtils.assertEquals("abc", reader.read()); + TestUtils.assertEquals("123", reader.read()); + } + } + + private static void doSingleTest(final DataInterpreter interpreter) throws IOException { + + String data = "Hello, World!"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + interpreter.serialize(data, dos); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + String result = interpreter.deserialize(dis); + + TestUtils.assertEquals(data, result); + } + + @Test + public void testBytesInterpreter() throws IOException { + + byte[] data = { 1, 2, 3, 4, 5 }; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + DataInterpreter.BYTES.serialize(data, dos); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + byte[] result = DataInterpreter.BYTES.deserialize(dis); + + TestUtils.assertArrayEquals(data, result); + } + + @Test + public void testStringBytesInterpreter() throws IOException { + DataInterpreterTest.doSingleTest(DataInterpreter.STRING_BYTES); + } + + @Test + public void testStringCharsInterpreter() throws IOException { + DataInterpreterTest.doSingleTest(DataInterpreter.STRING_CHARS); + } + + @Test + public void testStringInterpreters() throws IOException { + + DataInterpreterTest.doSequenceTest(DataInterpreter.STRING_UTF); + + DataInterpreterTest.doSequenceTest(DataInterpreter.STRING_CHARS); + + DataInterpreterTest.doSequenceTest(DataInterpreter.STRING_BYTES); + } + + @Test + public void testStringUtfInterpreter() throws IOException { + DataInterpreterTest.doSingleTest(DataInterpreter.STRING_UTF); + } + +} diff --git a/src/test/java/org/ojalgo/netio/SegmentedFileTest.java b/src/test/java/org/ojalgo/netio/SegmentedFileTest.java index 0ced25709..f3b50897f 100644 --- a/src/test/java/org/ojalgo/netio/SegmentedFileTest.java +++ b/src/test/java/org/ojalgo/netio/SegmentedFileTest.java @@ -53,7 +53,7 @@ public void testReading1BRC() { try (TextLineReader reader = segmentedFile.newTextLineReader(segment)) { - reader.processAll(line -> { + reader.forEach(line -> { TestUtils.assertEquals(LINES[counter.getAndIncrement()], line); }); } diff --git a/src/test/java/org/ojalgo/optimisation/service/ServiceIntegrationTest.java b/src/test/java/org/ojalgo/optimisation/service/ServiceIntegrationTest.java index e18720ec0..213d86525 100644 --- a/src/test/java/org/ojalgo/optimisation/service/ServiceIntegrationTest.java +++ b/src/test/java/org/ojalgo/optimisation/service/ServiceIntegrationTest.java @@ -11,7 +11,7 @@ @Tag("network") public class ServiceIntegrationTest { - private static final String HOST = "http://16.16.99.66:8080"; + private static final String HOST = "https://optimatika-boot-services-969062758986.europe-north1.run.app"; // private static final String HOST = "http://localhost:8080"; // private static final String HOST = "http://test-service.optimatika.se:8080"; diff --git a/src/test/resources/kaggle/Mall_Customers.csv b/src/test/resources/kaggle/Mall_Customers.csv new file mode 100644 index 000000000..ac8f77460 --- /dev/null +++ b/src/test/resources/kaggle/Mall_Customers.csv @@ -0,0 +1,201 @@ +CustomerID,Gender,Age,Annual Income (k$),Spending Score (1-100) +1,Male,19,15,39 +2,Male,21,15,81 +3,Female,20,16,6 +4,Female,23,16,77 +5,Female,31,17,40 +6,Female,22,17,76 +7,Female,35,18,6 +8,Female,23,18,94 +9,Male,64,19,3 +10,Female,30,19,72 +11,Male,67,19,14 +12,Female,35,19,99 +13,Female,58,20,15 +14,Female,24,20,77 +15,Male,37,20,13 +16,Male,22,20,79 +17,Female,35,21,35 +18,Male,20,21,66 +19,Male,52,23,29 +20,Female,35,23,98 +21,Male,35,24,35 +22,Male,25,24,73 +23,Female,46,25,5 +24,Male,31,25,73 +25,Female,54,28,14 +26,Male,29,28,82 +27,Female,45,28,32 +28,Male,35,28,61 +29,Female,40,29,31 +30,Female,23,29,87 +31,Male,60,30,4 +32,Female,21,30,73 +33,Male,53,33,4 +34,Male,18,33,92 +35,Female,49,33,14 +36,Female,21,33,81 +37,Female,42,34,17 +38,Female,30,34,73 +39,Female,36,37,26 +40,Female,20,37,75 +41,Female,65,38,35 +42,Male,24,38,92 +43,Male,48,39,36 +44,Female,31,39,61 +45,Female,49,39,28 +46,Female,24,39,65 +47,Female,50,40,55 +48,Female,27,40,47 +49,Female,29,40,42 +50,Female,31,40,42 +51,Female,49,42,52 +52,Male,33,42,60 +53,Female,31,43,54 +54,Male,59,43,60 +55,Female,50,43,45 +56,Male,47,43,41 +57,Female,51,44,50 +58,Male,69,44,46 +59,Female,27,46,51 +60,Male,53,46,46 +61,Male,70,46,56 +62,Male,19,46,55 +63,Female,67,47,52 +64,Female,54,47,59 +65,Male,63,48,51 +66,Male,18,48,59 +67,Female,43,48,50 +68,Female,68,48,48 +69,Male,19,48,59 +70,Female,32,48,47 +71,Male,70,49,55 +72,Female,47,49,42 +73,Female,60,50,49 +74,Female,60,50,56 +75,Male,59,54,47 +76,Male,26,54,54 +77,Female,45,54,53 +78,Male,40,54,48 +79,Female,23,54,52 +80,Female,49,54,42 +81,Male,57,54,51 +82,Male,38,54,55 +83,Male,67,54,41 +84,Female,46,54,44 +85,Female,21,54,57 +86,Male,48,54,46 +87,Female,55,57,58 +88,Female,22,57,55 +89,Female,34,58,60 +90,Female,50,58,46 +91,Female,68,59,55 +92,Male,18,59,41 +93,Male,48,60,49 +94,Female,40,60,40 +95,Female,32,60,42 +96,Male,24,60,52 +97,Female,47,60,47 +98,Female,27,60,50 +99,Male,48,61,42 +100,Male,20,61,49 +101,Female,23,62,41 +102,Female,49,62,48 +103,Male,67,62,59 +104,Male,26,62,55 +105,Male,49,62,56 +106,Female,21,62,42 +107,Female,66,63,50 +108,Male,54,63,46 +109,Male,68,63,43 +110,Male,66,63,48 +111,Male,65,63,52 +112,Female,19,63,54 +113,Female,38,64,42 +114,Male,19,64,46 +115,Female,18,65,48 +116,Female,19,65,50 +117,Female,63,65,43 +118,Female,49,65,59 +119,Female,51,67,43 +120,Female,50,67,57 +121,Male,27,67,56 +122,Female,38,67,40 +123,Female,40,69,58 +124,Male,39,69,91 +125,Female,23,70,29 +126,Female,31,70,77 +127,Male,43,71,35 +128,Male,40,71,95 +129,Male,59,71,11 +130,Male,38,71,75 +131,Male,47,71,9 +132,Male,39,71,75 +133,Female,25,72,34 +134,Female,31,72,71 +135,Male,20,73,5 +136,Female,29,73,88 +137,Female,44,73,7 +138,Male,32,73,73 +139,Male,19,74,10 +140,Female,35,74,72 +141,Female,57,75,5 +142,Male,32,75,93 +143,Female,28,76,40 +144,Female,32,76,87 +145,Male,25,77,12 +146,Male,28,77,97 +147,Male,48,77,36 +148,Female,32,77,74 +149,Female,34,78,22 +150,Male,34,78,90 +151,Male,43,78,17 +152,Male,39,78,88 +153,Female,44,78,20 +154,Female,38,78,76 +155,Female,47,78,16 +156,Female,27,78,89 +157,Male,37,78,1 +158,Female,30,78,78 +159,Male,34,78,1 +160,Female,30,78,73 +161,Female,56,79,35 +162,Female,29,79,83 +163,Male,19,81,5 +164,Female,31,81,93 +165,Male,50,85,26 +166,Female,36,85,75 +167,Male,42,86,20 +168,Female,33,86,95 +169,Female,36,87,27 +170,Male,32,87,63 +171,Male,40,87,13 +172,Male,28,87,75 +173,Male,36,87,10 +174,Male,36,87,92 +175,Female,52,88,13 +176,Female,30,88,86 +177,Male,58,88,15 +178,Male,27,88,69 +179,Male,59,93,14 +180,Male,35,93,90 +181,Female,37,97,32 +182,Female,32,97,86 +183,Male,46,98,15 +184,Female,29,98,88 +185,Female,41,99,39 +186,Male,30,99,97 +187,Female,54,101,24 +188,Male,28,101,68 +189,Female,41,103,17 +190,Female,36,103,85 +191,Female,34,103,23 +192,Female,32,103,69 +193,Male,33,113,8 +194,Female,38,113,91 +195,Female,47,120,16 +196,Female,35,120,79 +197,Female,45,126,28 +198,Male,32,126,74 +199,Male,32,137,18 +200,Male,30,137,83