New Features

- Added: addOrGet for sets.
- Added: Async API which allows to easily execute Iterables/Collections
offthread without the complexity.
This commit is contained in:
Speiger 2022-04-07 00:04:52 +02:00
parent 29c4d253cf
commit 6f31fc5abb
15 changed files with 1168 additions and 1 deletions

View File

@ -1,5 +1,9 @@
# Changelog of versions
### Version 0.6.0
- Added: addOrGet for sets.
- Added: Async API which allows to easily execute Iterables/Collections offthread without the complexity.
### Version 0.5.3
- Added: OrderedMap/Set
- Added: Deprecation to Functions that are specific to Ordered interfaces in the SortedMap/Set

View File

@ -158,6 +158,7 @@ public class GlobalVariables
//Final Classes
addClassMapper("ARRAY_LIST", "ArrayList");
addClassMapper("ASYNC_BUILDER", "AsyncBuilder");
addClassMapper("LINKED_LIST", "LinkedList");
addAbstractMapper("IMMUTABLE_LIST", "Immutable%sList");
addClassMapper("ARRAY_FIFO_QUEUE", "ArrayFIFOQueue");
@ -191,6 +192,7 @@ public class GlobalVariables
addAbstractMapper("ABSTRACT_LIST", "Abstract%sList");
addAbstractBiMapper("ABSTRACT_MAP", "Abstract%sMap", "2");
addClassMapper("SUB_LIST", "SubList");
addAbstractMapper("BASE_TASK", "Base%sTask");
//Helper Classes
addClassMapper("LISTS", "Lists");
@ -233,6 +235,7 @@ public class GlobalVariables
addClassMapper("STACK", "Stack");
addClassMapper("SUPPLIER", "Supplier");
addAbstractMapper("SINGLE_UNARY_OPERATOR", "%1$s%1$sUnaryOperator");
addClassMapper("TASK", "Task");
addBiClassMapper("UNARY_OPERATOR", "UnaryOperator", "");
if(type.isObject())
{

View File

@ -21,6 +21,7 @@ import speiger.src.collections.PACKAGE.lists.ARRAY_LIST;
import speiger.src.collections.PACKAGE.sets.SET;
import speiger.src.collections.PACKAGE.sets.LINKED_HASH_SET;
#endif
import speiger.src.collections.PACKAGE.utils.ASYNC_BUILDER;
import speiger.src.collections.PACKAGE.utils.SPLIT_ITERATORS;
import speiger.src.collections.PACKAGE.utils.ITERABLES;
import speiger.src.collections.PACKAGE.utils.ITERATORS;
@ -89,6 +90,16 @@ public interface ITERABLE KEY_GENERIC_TYPE extends Iterable<CLASS_TYPE>
@Override
default SPLIT_ITERATOR KEY_GENERIC_TYPE spliterator() { return SPLIT_ITERATORS.createUnknownSplititerator(iterator(), 0); }
/**
* Creates a Async Builder for moving work of the thread.
* It is not designed to split the work to multithreaded work, so using this keep it singlethreaded, but it allows to be moved to another thread.
* @see ASYNC_BUILDER
* @return a AsyncBuilder
*/
default ASYNC_BUILDER KEY_GENERIC_TYPE asAsync() {
return new ASYNC_BUILDERBRACES(this);
}
/**
* A Helper function to reduce the usage of Streams and allows to convert a Iterable to something else.
* @param mapper the mapping function

View File

@ -0,0 +1,92 @@
package speiger.src.collections.PACKAGE.functions;
import java.util.concurrent.RunnableFuture;
#if !TYPE_OBJECT
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
#endif
/**
*
* A Type Specific Task interface that allows you to keep track of the task that is currently running.<br>
* It extends Runnable future and supports said functions but also provides quality of life functions like:<br>
*
* - isSuccesfull: which allows to detect if the task was completed properly and not interrupted or crashed.
* - pause/resume: which allows to pause/resume the task at any moment, making it easier to create thread-safe actions.
* @Type(T)
*/
public interface TASK KEY_GENERIC_TYPE extends RunnableFuture<CLASS_TYPE> {
/**
* Helper function to detect if the task is currently paused.
* @return true if paused
*/
public boolean isPaused();
/**
* Pauses the task, which lets the thread finish without completing the task.
* Tasks are written in the way where they can pause without any issues.
* This won't be instant, as this function is applied asynchronous and doesn't check if the thread paused.
* So make sure it had the time to pause.
*/
public void pause();
/**
* Pauses the task, which lets the thread finish without completing the task.
* Tasks are written in the way where they can pause without any issues.
* This won't be instant, as this function is applied asynchronous.
* It will await the pausing of the task.
*/
public void awaitPausing();
/**
* Continues the task if it wasn't already completed.
* This is done by resubmitting the task to the executor provided.
*/
public void resume();
/**
* Quality of life function that allows to detect if no cancellation/exception was applied to this task and it completed on its own.
* @return true if it was properly completed
*/
public boolean isSuccessful();
#if !TYPE_OBJECT
/**
* A Type Specific get method that allows to reduce (un)boxing of primtives.
*
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result as primitive
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
public KEY_TYPE GET_KEY() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result as primitive
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an exception
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws TimeoutException if the wait timed out
*/
public KEY_TYPE GET_KEY(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
@Override
@Deprecated
public default CLASS_TYPE get() throws InterruptedException, ExecutionException { return KEY_TO_OBJ(GET_KEY()); }
@Override
@Deprecated
public default CLASS_TYPE get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return KEY_TO_OBJ(GET_KEY(timeout, unit)); }
#endif
}

View File

@ -258,6 +258,43 @@ public class AVL_TREE_SET KEY_GENERIC_TYPE extends ABSTRACT_SET KEY_GENERIC_TYPE
return true;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) {
validate(o);
if(tree == null) {
tree = first = last = new EntryBRACES(o, null);
size++;
return o;
}
int compare = 0;
Entry KEY_GENERIC_TYPE parent = tree;
while(true) {
if((compare = compare(o, parent.key)) == 0) return parent.key;
if(compare < 0) {
if(parent.left == null) break;
parent = parent.left;
}
else if(compare > 0) {
if(parent.right == null) break;
parent = parent.right;
}
}
Entry KEY_GENERIC_TYPE adding = new EntryBRACES(o, parent);
if(compare < 0) {
parent.left = adding;
if(parent == first) first = adding;
}
else {
parent.right = adding;
if(parent == last) last = adding;
}
fixAfterInsertion(adding);
size++;
return o;
}
#endif
@Override
public boolean addAndMoveToFirst(KEY_TYPE o) { throw new UnsupportedOperationException(); }

View File

@ -14,6 +14,11 @@ import speiger.src.collections.PACKAGE.collections.ITERATOR;
*/
public abstract class ABSTRACT_SET KEY_GENERIC_TYPE extends ABSTRACT_COLLECTION KEY_GENERIC_TYPE implements SET KEY_GENERIC_TYPE
{
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#endif
@Override
public abstract ITERATOR KEY_GENERIC_TYPE iterator();
@Override

View File

@ -132,6 +132,17 @@ public class ARRAY_SET KEY_GENERIC_TYPE extends ABSTRACT_SET KEY_GENERIC_TYPE im
return false;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) {
int index = findIndex(o);
if(index != -1) return data[index];
if(data.length == size) data = Arrays.copyOf(data, size == 0 ? 2 : size * 2);
data[size++] = o;
return o;
}
#endif
@Override
public boolean addAndMoveToFirst(KEY_TYPE o) {
int index = findIndex(o);

View File

@ -241,6 +241,10 @@ public class IMMUTABLE_HASH_SET KEY_GENERIC_TYPE extends ABSTRACT_SET KEY_GENERI
@Override
public boolean add(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#endif
@Override
@Primitive
public boolean addAll(Collection<? extends CLASS_TYPE> c) { throw new UnsupportedOperationException(); }

View File

@ -277,6 +277,30 @@ public class CUSTOM_HASH_SET KEY_GENERIC_TYPE extends ABSTRACT_SET KEY_GENERIC_T
return true;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) {
if(strategy.equals(o, EMPTY_KEY_VALUE)) {
if(containsNull) return EMPTY_KEY_VALUE;
containsNull = true;
onNodeAdded(nullIndex);
}
else {
int pos = HashUtil.mix(strategy.hashCode(o)) & mask;
KEY_TYPE current = keys[pos];
if(!strategy.equals(current, EMPTY_KEY_VALUE)) {
if(strategy.equals(current, o)) return current;
while(!strategy.equals((current = keys[pos = (++pos & mask)]), EMPTY_KEY_VALUE))
if(strategy.equals(current, o)) return current;
}
keys[pos] = o;
onNodeAdded(pos);
}
if(size++ >= maxFill) rehash(HashUtil.arraySize(size+1, loadFactor));
return o;
}
#endif
@Override
@Primitive
public boolean addAll(Collection<? extends CLASS_TYPE> c) {

View File

@ -235,6 +235,30 @@ public class HASH_SET KEY_GENERIC_TYPE extends ABSTRACT_SET KEY_GENERIC_TYPE imp
return true;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) {
if(KEY_EQUALS_NULL(o)) {
if(containsNull) return null;
containsNull = true;
onNodeAdded(nullIndex);
}
else {
int pos = HashUtil.mix(KEY_TO_HASH(o)) & mask;
KEY_TYPE current = keys[pos];
if(KEY_EQUALS_NOT_NULL(current)) {
if(KEY_EQUALS(current, o)) return current;
while(KEY_EQUALS_NOT_NULL((current = keys[pos = (++pos & mask)])))
if(KEY_EQUALS(current, o)) return current;
}
keys[pos] = o;
onNodeAdded(pos);
}
if(size++ >= maxFill) rehash(HashUtil.arraySize(size+1, loadFactor));
return o;
}
#endif
@Override
@Primitive
public boolean addAll(Collection<? extends CLASS_TYPE> c) {

View File

@ -258,6 +258,43 @@ public class RB_TREE_SET KEY_GENERIC_TYPE extends ABSTRACT_SET KEY_GENERIC_TYPE
return true;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) {
validate(o);
if(tree == null) {
tree = first = last = new EntryBRACES(o, null);
size++;
return o;
}
int compare = 0;
Entry KEY_GENERIC_TYPE parent = tree;
while(true) {
if((compare = compare(o, parent.key)) == 0) return parent.key;
if(compare < 0) {
if(parent.left == null) break;
parent = parent.left;
}
else if(compare > 0) {
if(parent.right == null) break;
parent = parent.right;
}
}
Entry KEY_GENERIC_TYPE adding = new EntryBRACES(o, parent);
if(compare < 0) {
parent.left = adding;
if(parent == first) first = adding;
}
else {
parent.right = adding;
if(parent == last) last = adding;
}
fixAfterInsertion(adding);
size++;
return o;
}
#endif
@Override
public boolean addAndMoveToFirst(KEY_TYPE o) { throw new UnsupportedOperationException(); }

View File

@ -54,6 +54,15 @@ public interface SET KEY_GENERIC_TYPE extends Set<CLASS_TYPE>, COLLECTION KEY_GE
return COLLECTION.super.remove(o);
}
#else
/**
* A Helper method that allows to add a element or getting the already present implement.
* Allowing to make unique references reuseable.
* @param o the element to add
* @return either the inserted element or the present element.
*/
public KEY_TYPE addOrGet(KEY_TYPE o);
#endif
#if !TYPE_BOOLEAN
/**

View File

@ -0,0 +1,882 @@
package speiger.src.collections.PACKAGE.utils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
#if !TYPE_OBJECT
import speiger.src.collections.PACKAGE.functions.CONSUMER;
#else
import java.util.function.BiFunction;
#endif
import speiger.src.collections.PACKAGE.collections.ITERABLE;
import speiger.src.collections.PACKAGE.collections.COLLECTION;
import speiger.src.collections.PACKAGE.collections.ITERATOR;
import speiger.src.collections.PACKAGE.functions.TASK;
import speiger.src.collections.PACKAGE.functions.function.PREDICATE;
import speiger.src.collections.PACKAGE.functions.function.TO_OBJECT_FUNCTION;
import speiger.src.collections.PACKAGE.functions.function.UNARY_OPERATOR;
import speiger.src.collections.PACKAGE.lists.ARRAY_LIST;
import speiger.src.collections.PACKAGE.lists.LIST;
#if !TYPE_BOOLEAN
import speiger.src.collections.PACKAGE.sets.SET;
import speiger.src.collections.PACKAGE.sets.LINKED_HASH_SET;
#endif
#if !TYPE_BOOLEAN
import speiger.src.collections.booleans.utils.BooleanAsyncBuilder;
import speiger.src.collections.booleans.utils.BooleanAsyncBuilder.BaseBooleanTask;
#endif
#if !TYPE_OBJECT
import speiger.src.collections.objects.utils.ObjectAsyncBuilder;
import speiger.src.collections.objects.utils.ObjectAsyncBuilder.BaseObjectTask;
#endif
#if !TYPE_INT
import speiger.src.collections.ints.utils.IntAsyncBuilder;
import speiger.src.collections.ints.utils.IntAsyncBuilder.BaseIntTask;
#endif
import speiger.src.collections.utils.SanityChecks;
/**
*
*
* The Async API allows you to process collections on a different thread without having to deal with the Multithreading complexity. <br>
* It uses the Lightweight Stream Replace API to do its work, which can make it faster then sequential streams. <br>
* This feature isn't designed to do Multithreading work, but to allow moving work off the main thread into a worker thread. <br>
* So anything executed on this is still Singlethreaded. <br>
* <br>
* How it works is you create the AsyncBuilder using a Iterable of the Type. <br>
* Then select things you want to use on the Iterable. <br>
* - For example: map/filter/peek/limit/etc <br>
* <br>
* After that you select your action the Iterable should be applied on. <br>
* - For Example: forEach/findFirst/matchAny/etc <br>
* <br>
* Then optionally a custom Executor or callback can be applied <br>
* At the end either execute or join can be called to start the task. <br>
* - execute will return the Task reference so that the task can be traced back. <br>
* - join will await the completion of the task and return the return value <br>
* <br>
* During Construction a couple Disposable Builder Objects will be created. <br>
* These will be only used during construction. <br>
* <br>
* The Task Object is also pause-able/Interruptable at any moment during the Processing. <br>
*
* A small example
* <pre>
* public void processFiles(ObjectCollection<String> potentialFiles) {
* potentialFiles.asAsync()
* .map(Paths::get).filter(Files::exists) //Modifies the collection (Optional)
* .forEach(Files::delete) //Creates the action (Required)
* .callback(T -> {}} //Callback on completion (Optional)
* .execute() //Starts the task. (Required)
* }
* </pre>
* @author Speiger
*
* @Type(T)
*/
public class ASYNC_BUILDER KEY_GENERIC_TYPE
{
ITERABLE KEY_GENERIC_TYPE iterable;
BASE_TASK KEY_GENERIC_TYPE task;
/**
* Main Constructor that uses a Iterable to build a Offthread Task.
* @param iterable
*/
public ASYNC_BUILDER(ITERABLE KEY_GENERIC_TYPE iterable) {
this.iterable = iterable;
}
/**
* Helper constructor.
* @param task
*/
public ASYNC_BUILDER(BASE_TASK KEY_GENERIC_TYPE task) {
this.task = task;
}
/**
* Maps the elements to something else
* @param mapper the mapping function
* @param <E> The return type.
* @return a new Builder Object with the mapped Iterable
*/
public <E> ObjectAsyncBuilder<E> map(TO_OBJECT_FUNCTION KKS_GENERIC_TYPE<E> mapper) {
return new ObjectAsyncBuilder<>(ITERABLES.map(iterable, mapper));
}
/**
* Maps the elements to something else
* @param mapper the flatMapping function
* @param <V> The return type supplier.
* @param <E> The return type.
* @return a new Builder Object with the mapped Iterable
*/
public <E, V extends Iterable<E>> ObjectAsyncBuilder<E> flatMap(TO_OBJECT_FUNCTION KKS_GENERIC_TYPE<V> mapper) {
return new ObjectAsyncBuilder<>(ITERABLES.flatMap(iterable, mapper));
}
/**
* Maps the elements to something else
* @param mapper the flatMapping function
* @param <E> The return type.
* @return a new Builder Object with the mapped Iterable
*/
public <E> ObjectAsyncBuilder<E> arrayflatMap(TO_OBJECT_FUNCTION KKS_GENERIC_TYPE<E[]> mapper) {
return new ObjectAsyncBuilder<>(ITERABLES.arrayFlatMap(iterable, mapper));
}
/**
* Filters out the unwanted elements out of the Iterable
* @param filter the elements that should be kept
* @return Self with a filter applied
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE filter(PREDICATE KEY_GENERIC_TYPE filter) {
iterable = ITERABLES.filter(iterable, filter);
return this;
}
/**
* Removes duplicated elements out of the Iterable
* @return Self with a deduplicator applied
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE distinct() {
iterable = ITERABLES.distinct(iterable);
return this;
}
/**
* Limits how many elements are inside of the Iterable
* @param limit how many elements should max be iterated through
* @return self with a limiter applied
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE limit(long limit) {
iterable = ITERABLES.limit(iterable, limit);
return this;
}
/**
* Allows to preview elements before they are processed
* @param action the action that should be applied
* @return self with a preview applied
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE peek(CONSUMER KEY_GENERIC_TYPE action) {
iterable = ITERABLES.peek(iterable, action);
return this;
}
/**
* Iterates over the Iterable with a desired action
* @param action that should be applied
* @return a new Builder with the forEach action applied.
*/
public ObjectAsyncBuilder<Void> forEach(CONSUMER KEY_GENERIC_TYPE action) {
return new ObjectAsyncBuilder<>(new ForEachTask<>(iterable.iterator(), action));
}
/**
* Reduces the elements inside of the Iterable down to one element
* @param operator that reduces the elements.
* @return self with the reduce action applied
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE reduce(UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator) {
task = new SimpleReduceTaskBRACES(iterable.iterator(), operator);
return this;
}
#if TYPE_OBJECT
/**
* Reduces the elements inside of the Iterable down to one element using a identity element.
* @param <E> the return type
* @param identity the element the reduce function should start with
* @param operator that reduces the elements.
* @return a new Builder with the reduce function applied
*/
public <KEY_SPECIAL_TYPE> ASYNC_BUILDER<KEY_SPECIAL_TYPE> reduce(KEY_SPECIAL_TYPE identity, BiFunction<KEY_SPECIAL_TYPE, KEY_TYPE, KEY_SPECIAL_TYPE> operator) {
return new ASYNC_BUILDERBRACES(new ReduceTaskBRACES(iterable.iterator(), operator, identity));
}
#else
/**
* Reduces the elements inside of the Iterable down to one element using a identity element.
* @param identity the element the reduce function should start with
* @param operator that reduces the elements.
* @return a new Builder with the reduce function applied
*/
public ASYNC_BUILDER reduce(KEY_TYPE identity, UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator) {
return new ASYNC_BUILDERBRACES(new ReduceTaskBRACES(iterable.iterator(), operator, identity));
}
#endif
/**
* Pours all elements into a List that can be later
* @return a new Builder with the pour function applied
*/
public ObjectAsyncBuilder<LIST KEY_GENERIC_TYPE> pourAsList() {
return pour(new ARRAY_LISTBRACES());
}
#if !TYPE_BOOLEAN
/**
* Pours all elements into a Set that can be later
* @return a new Builder with the pour function applied
*/
public ObjectAsyncBuilder<SET KEY_GENERIC_TYPE> pourAsSet() {
return pour(new LINKED_HASH_SETBRACES());
}
#endif
/**
* Pours all elements into a collection that can be later
* @param <E> the return type
* @param collection the collection the elements
* @return a new Builder with the pour function applied
*/
public <E extends COLLECTION KEY_GENERIC_TYPE> ObjectAsyncBuilder<E> pour(E collection) {
return new ObjectAsyncBuilder<>(new CollectTask<>(iterable.iterator(), collection));
}
/**
* Searches through the elements of the Iterable to find if the desired element is present.
* @param filter that decides the desired elements
* @return a new Builder with the matchAny function applied
*/
public BooleanAsyncBuilder matchAny(PREDICATE KEY_GENERIC_TYPE filter) {
return new BooleanAsyncBuilder(new MatchTaskBRACES(iterable.iterator(), filter, 0));
}
/**
* Searches through the elements of the Iterable to find if unwanted elements are present.
* @param filter that decides the unwanted elements
* @return a new Builder with the matchNone function applied
*/
public BooleanAsyncBuilder matchNone(PREDICATE KEY_GENERIC_TYPE filter) {
return new BooleanAsyncBuilder(new MatchTaskBRACES(iterable.iterator(), filter, 1));
}
/**
* Searches through the elements of the Iterable to find if all the desired elements are present.
* @param filter that decides the desired elements
* @return a new Builder with the matchAll function applied
*/
public BooleanAsyncBuilder matchAll(PREDICATE KEY_GENERIC_TYPE filter) {
return new BooleanAsyncBuilder(new MatchTaskBRACES(iterable.iterator(), filter, 2));
}
/**
* Searches through the elements of the Iterable to find if the desired element.
* If not present it will return the default value of the type
* @param filter that decides the desired elements
* @return self with the findFirst function applied
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE findFirst(PREDICATE KEY_GENERIC_TYPE filter) {
task = new FindFirstTaskBRACES(iterable.iterator(), filter);
return this;
}
/**
* Counts all desired elements inside the Iterable
* @param filter that decides the desired elements
* @return a new Builder with the count function applied
*/
public IntAsyncBuilder count(PREDICATE KEY_GENERIC_TYPE filter) {
return new IntAsyncBuilder(new CountTaskBRACES(iterable.iterator(), filter));
}
/**
* Optional way to add a custom executor that runs this offthread task.
* Can only be set after the action was decided on.
* @param executor that executes the task, defaults to {@link SanityChecks#invokeAsyncTask(Runnable) }
* @return self with the executor set
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE executor(@Nonnull Executor executor) {
if(task == null) throw new IllegalStateException("Action is missing");
task.withExecutor(executor);
return this;
}
/**
* Optional way to set a callback that allows to compute actions after the task was completed.
* The state of the task has to be validated by the callback.
* @param callback that should be notified after completion of the task
* @return self with the callback set
*/
public ASYNC_BUILDER KEY_GENERIC_TYPE callback(@Nullable Consumer<TASK KEY_GENERIC_TYPE> callback) {
if(task == null) throw new IllegalStateException("Action is missing");
task.withCallback(callback);
return this;
}
/**
* Starts the Execution of the task without awaiting its result
* @return the task object that allow to trace it.
*/
public TASK KEY_GENERIC_TYPE execute() {
BASE_TASK KEY_GENERIC_TYPE toRun = task;
toRun.begin();
task = null;
return toRun;
}
/**
* Starts the Execution of the task and will await its completion, returning the result.
* @return the result of the task provided.
* @throws ExecutionException if the task threw a exception
* @throws InterruptedException if the caller thread was interrupted
*/
public KEY_TYPE join() throws ExecutionException, InterruptedException {
BASE_TASK KEY_GENERIC_TYPE toRun = task;
task = null;
toRun.begin();
return toRun.GET_KEY();
}
/**
* Starts the Execution of the task and will await its completion with a timeout, returning the result.
* @param timeout of how long the thread should wait the task to be completed
* @param unit of the desired waiting time.
* @return the result of the provided task
* @throws InterruptedException if the caller thread was interrupted
* @throws ExecutionException if the task threw a exception
* @throws TimeoutException if the timeout was reached before the task was finished
*/
public KEY_TYPE join(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
BASE_TASK KEY_GENERIC_TYPE toRun = task;
task = null;
toRun.begin();
return toRun.GET_KEY(timeout, unit);
}
#if !TYPE_OBJECT
private static class ReduceTask extends BASE_TASK
{
ITERATOR KEY_GENERIC_TYPE iter;
UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator;
KEY_TYPE value;
public ReduceTask(ITERATOR KEY_GENERIC_TYPE iter, UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator, KEY_TYPE value) {
this.iter = iter;
this.operator = operator;
this.value = value;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
value = operator.apply(value, iter.NEXT());
}
if(!iter.hasNext()) {
setResult(value);
return true;
}
return false;
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
operator = null;
#if TYPE_OBJECT
value = null;
#endif
}
}
#else
private static class ReduceTask<T, E> extends BaseObjectTask<E>
{
ObjectIterator<T> iter;
BiFunction<E, T, E> operator;
E value;
public ReduceTask(ITERATOR KEY_GENERIC_TYPE iter, BiFunction<E, T, E> operator, E value) {
this.iter = iter;
this.operator = operator;
this.value = value;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
value = operator.apply(value, iter.NEXT());
}
if(!iter.hasNext()) {
setResult(value);
return true;
}
return false;
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
operator = null;
#if TYPE_OBJECT
value = null;
#endif
}
}
#endif
private static class SimpleReduceTask KEY_GENERIC_TYPE extends BASE_TASK KEY_GENERIC_TYPE
{
ITERATOR KEY_GENERIC_TYPE iter;
UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator;
boolean first = true;
KEY_TYPE value;
public SimpleReduceTask(ITERATOR KEY_GENERIC_TYPE iter, UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator) {
this.iter = iter;
this.operator = operator;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
if(first) {
first = false;
value = iter.NEXT();
}
else {
value = operator.APPLY_VALUE(value, iter.NEXT());
}
}
if(!iter.hasNext()) {
setResult(value);
return true;
}
return false;
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
operator = null;
#if TYPE_OBJECT
value = null;
#endif
}
}
private static class MatchTask KEY_GENERIC_TYPE extends BaseBooleanTask
{
ITERATOR KEY_GENERIC_TYPE iter;
PREDICATE KEY_GENERIC_TYPE filter;
int type;
public MatchTask(ITERATOR KEY_GENERIC_TYPE iter, PREDICATE KEY_GENERIC_TYPE filter, int type) {
this.iter = iter;
this.filter = filter;
this.type = type;
if(type < 0 || type > 2) throw new IllegalArgumentException("Type is not allowed has to be between 0-2");
}
@Override
protected boolean execute() throws Exception {
switch(type) {
case 0:
while(shouldRun() && iter.hasNext()) {
#if TYPE_OBJECT
if(filter.getBoolean(iter.NEXT())) {
#else
if(filter.GET_VALUE(iter.NEXT())) {
#endif
setResult(true);
return true;
}
}
break;
case 1:
while(shouldRun() && iter.hasNext()) {
#if TYPE_OBJECT
if(filter.getBoolean(iter.NEXT())) {
#else
if(filter.GET_VALUE(iter.NEXT())) {
#endif
setResult(false);
return true;
}
}
break;
case 2:
while(shouldRun() && iter.hasNext()) {
#if TYPE_OBJECT
if(!filter.getBoolean(iter.NEXT())) {
#else
if(!filter.GET_VALUE(iter.NEXT())) {
#endif
setResult(false);
return true;
}
}
break;
}
if(!iter.hasNext()) {
setResult(type >= 1);
return true;
}
return false;
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
filter = null;
}
}
private static class FindFirstTask KEY_GENERIC_TYPE extends BASE_TASK KEY_GENERIC_TYPE
{
ITERATOR KEY_GENERIC_TYPE iter;
PREDICATE KEY_GENERIC_TYPE filter;
public FindFirstTask(ITERATOR KEY_GENERIC_TYPE iter, PREDICATE KEY_GENERIC_TYPE filter) {
this.iter = iter;
this.filter = filter;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
KEY_TYPE entry = iter.NEXT();
#if TYPE_OBJECT
if(filter.getBoolean(iter.NEXT())) {
#else
if(filter.GET_VALUE(iter.NEXT())) {
#endif
setResult(entry);
return true;
}
}
return !iter.hasNext();
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
filter = null;
}
}
private static class CountTask KEY_GENERIC_TYPE extends BaseIntTask
{
ITERATOR KEY_GENERIC_TYPE iter;
PREDICATE KEY_GENERIC_TYPE filter;
int counted = 0;
public CountTask(ITERATOR KEY_GENERIC_TYPE iter, PREDICATE KEY_GENERIC_TYPE filter) {
this.iter = iter;
this.filter = filter;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
if(filter.TEST_VALUE(iter.NEXT())) {
counted++;
}
}
if(!iter.hasNext())
{
setResult(counted);
return true;
}
return false;
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
filter = null;
}
}
private static class CollectTask KSS_GENERIC_TYPE<V, T extends COLLECTION KEY_SPECIAL_GENERIC_TYPE<V>> extends BaseObjectTask<T>
{
ITERATOR KEY_SPECIAL_GENERIC_TYPE<V> iter;
T collection;
public CollectTask(ITERATOR KEY_SPECIAL_GENERIC_TYPE<V> iter, T collection) {
this.iter = iter;
this.collection = collection;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
collection.add(iter.NEXT());
}
if(!iter.hasNext()) {
setResult(collection);
collection = null;
return true;
}
return false;
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
}
}
private static class ForEachTask<T> extends BaseObjectTask<Void>
{
ITERATOR KEY_GENERIC_TYPE iter;
CONSUMER KEY_GENERIC_TYPE listener;
public ForEachTask(ITERATOR KEY_GENERIC_TYPE iter, CONSUMER KEY_GENERIC_TYPE listener) {
this.iter = iter;
this.listener = listener;
}
@Override
protected boolean execute() throws Exception {
while(shouldRun() && iter.hasNext()) {
listener.accept(iter.NEXT());
}
return !iter.hasNext();
}
@Override
protected void onCompletion() {
super.onCompletion();
iter = null;
listener = null;
}
}
/**
* Base Task of the Actions that can be performed.
* Allows to simplify the actions that get executed.
* @Type(T)
*/
public abstract static class BASE_TASK KEY_GENERIC_TYPE implements TASK KEY_GENERIC_TYPE
{
private static final int CREATED = 0;
private static final int RUNNING = 1;
private static final int PAUSING = 2;
private static final int PAUSED = 3;
private static final int FINISHING = 4;
private static final int FINISHED = 5;
private static final int EXCEPTIONALLY = 6;
private static final int CANCELLED = 7;
private volatile WaitNode waiter;
private volatile int state = CREATED;
Consumer<TASK KEY_GENERIC_TYPE> callback;
Executor executor = SanityChecks::invokeAsyncTask;
KEY_TYPE result;
Throwable excpetion;
void withCallback(Consumer<TASK KEY_GENERIC_TYPE> callback) {
this.callback = callback;
}
void withExecutor(Executor executor) {
this.executor = executor;
}
void begin() {
executor.execute(this);
}
protected abstract boolean execute() throws Exception;
protected void onCompletion() {
if(callback != null) {
callback.accept(this);
callback = null;
}
executor = null;
}
protected void setResult(KEY_TYPE result) {
this.result = result;
}
@Override
public void run() {
state = RUNNING;
try {
if(execute()) {
state = FINISHING;
finishCompletion(FINISHED);
}
else if(state == PAUSING) {
state = PAUSED;
}
}
catch(Exception e) {
state = EXCEPTIONALLY;
this.excpetion = e;
finishCompletion(EXCEPTIONALLY);
}
}
private void finishCompletion(int nextState) {
WaitNode current = waiter;
waiter = null;
while(current != null) {
Thread t = current.thread;
if (t != null) {
current.thread = null;
LockSupport.unpark(t);
}
WaitNode next = current.next;
if (next == null)
break;
current.next = null;
current = next;
}
state = nextState;
onCompletion();
}
@Override
public boolean cancel(boolean cancelIfRunnning) {
if(state == RUNNING && !cancelIfRunnning) return false;
state = CANCELLED;
finishCompletion(CANCELLED);
return true;
}
@Override
public KEY_TYPE GET_KEY() throws InterruptedException, ExecutionException {
int s = state;
return report(s <= FINISHING ? awaitDone(false, 0L) : s);
}
@Override
public KEY_TYPE GET_KEY(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) throw new NullPointerException();
int s = state;
if (s <= FINISHING && (s = awaitDone(true, unit.toNanos(timeout))) <= FINISHING) throw new TimeoutException();
return report(s);
}
private KEY_TYPE report(int s) throws ExecutionException {
if (s == FINISHED) return result;
if (s >= CANCELLED) throw new CancellationException();
throw new ExecutionException(excpetion);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
while(true) {
if(Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if(s > FINISHING) {
if(q != null) q.thread = null;
return s;
}
else if(s == FINISHING) Thread.yield();
else if(q == null) q = new WaitNode();
else if(!queued) {
q.next = waiter;
waiter = q;
queued = true;
}
else if(timed) {
nanos = deadline - System.nanoTime();
if(nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else LockSupport.park(this);
}
}
private void removeWaiter(WaitNode node) {
if(node == null) return;
node.thread = null;
retry:
while(true) {
for(WaitNode prev = null, current = waiter, next = null; current != null; current = next) {
next = current.next;
if(current.thread != null) prev = current;
else if(prev != null) {
prev.next = next;
if(prev.thread == null) continue retry; //Previous element got removed which means another thread was editing this while we were editing.
}
else if(waiter == current) {
waiter = next;
continue retry;
}
}
break;
}
}
@Override
public boolean isCancelled() { return state >= CANCELLED; }
@Override
public boolean isDone() { return state >= FINISHING; }
@Override
public boolean isPaused() { return state == PAUSED; }
@Override
public boolean isSuccessful() { return state == FINISHED; }
protected boolean shouldRun() { return state == RUNNING; }
@Override
public void pause() {
if(state == PAUSED || state == PAUSING || state >= FINISHING) return;
state = PAUSING;
}
@Override
public void awaitPausing() {
if(state == PAUSED) return;
pause();
if(state == PAUSING) {
while(state == PAUSING) {
Thread.yield();
}
}
}
@Override
public void resume() {
if(state != PAUSED && state != PAUSING) return;
if(state == PAUSING) {
while(state == PAUSING) {
Thread.yield();
}
}
state = RUNNING;
executor.execute(this);
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() {
thread = Thread.currentThread();
}
}
}
}

View File

@ -207,6 +207,9 @@ public class SETS
#endif
@Override
public boolean add(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#if TYPE_OBJECT
public KEY_TYPE addOrGet(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#endif
@Override
public ITERATOR KEY_GENERIC_TYPE iterator()
{
@ -234,6 +237,9 @@ public class SETS
#if !TYPE_OBJECT
@Override
public boolean remove(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#else
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#endif
@Override
public EmptySet KEY_GENERIC_TYPE copy() { return this; }
@ -404,6 +410,11 @@ public class SETS
s = c;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) { throw new UnsupportedOperationException(); }
#endif
@Override
public SET KEY_GENERIC_TYPE copy() { return s.copy(); }
@ -676,6 +687,11 @@ public class SETS
s = c;
}
#if TYPE_OBJECT
@Override
public KEY_TYPE addOrGet(KEY_TYPE o) { synchronized(mutex) { return s.addOrGet(o); } }
#endif
@Override
public SET KEY_GENERIC_TYPE copy() { synchronized(mutex) { return s.copy(); } }

View File

@ -104,6 +104,14 @@ public class SanityChecks
getPool().execute(task);
}
/**
* A Helper method to start a Async Task. This method will not await the finalization of said task
* @param task the Task to invoke
*/
public static void invokeAsyncTask(Runnable task) {
getPool().execute(task);
}
/**
* Helper method to control what ForkJoinPool is being used for any given task.
* @note this method is not thread-save. It is only there to provide control over how Library specific Threaded tasks are handled.