package speiger.src.collections.PACKAGE.utils; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; #if !TYPE_OBJECT import speiger.src.collections.PACKAGE.functions.CONSUMER; import speiger.src.collections.PACKAGE.functions.COMPARATOR; #else import java.util.function.IntFunction; import java.util.function.BiFunction; import java.util.Comparator; #iterate #argument MAPPER Predicate ToByteFunction ToShortFunction ToIntFunction ToLongFunction ToFloatFunction ToDoubleFunction #argument BUILDER BooleanAsyncBuilder ByteAsyncBuilder ShortAsyncBuilder IntAsyncBuilder LongAsyncBuilder FloatAsyncBuilder DoubleAsyncBuilder #argument PACKAGE booleans bytes shorts ints longs floats doubles #argument CHECK BOOLEAN_ASYNC_MODULE BYTE_ASYNC_MODULE SHORT_ASYNC_MODULE INT_ASYNC_MODULE LONG_ASYNC_MODULE FLOAT_ASYNC_MODULE DOUBLE_ASYNC_MODULE #if CHECK import speiger.src.collections.objects.functions.function.MAPPER; import speiger.src.collections.PACKAGE.utils.BUILDER; #endif #enditerate #endif import speiger.src.collections.PACKAGE.collections.ITERABLE; #if OBJECT_ASYNC_MODULE import speiger.src.collections.PACKAGE.collections.COLLECTION; #endif import speiger.src.collections.PACKAGE.collections.ITERATOR; import speiger.src.collections.PACKAGE.functions.TASK; #if !TYPE_OBJECT import speiger.src.collections.PACKAGE.functions.function.PREDICATE; #endif #if OBJECT_ASYNC_MODULE import speiger.src.collections.PACKAGE.functions.function.TO_OBJECT_FUNCTION; #endif import speiger.src.collections.PACKAGE.functions.function.UNARY_OPERATOR; #if ARRAY_LIST_FEATURE || LINKED_LIST_FEATURE import speiger.src.collections.PACKAGE.lists.LIST; #if ARRAY_LIST_FEATURE import speiger.src.collections.PACKAGE.lists.ARRAY_LIST; #else if LINKED_LIST_FEATURE import speiger.src.collections.PACKAGE.lists.LINKED_LIST; #endif #endif #if !TYPE_BOOLEAN && OBJECT_ASYNC_MODULE #if SET_MODULE #if LINKED_SET_FEATURE || LINKED_CUSTOM_SET_FEATURE || SET_FEATURE || CUSTOM_SET_FEATURE || RB_TREE_SET_FEATURE || AVL_TREE_SET_FEATURE || ARRAY_SET_FEATURE import speiger.src.collections.PACKAGE.sets.SET; #if LINKED_SET_FEATURE import speiger.src.collections.PACKAGE.sets.LINKED_HASH_SET; #else if LINKED_CUSTOM_SET_FEATURE import speiger.src.collections.PACKAGE.sets.LINKED_CUSTOM_HASH_SET; #else if SET_FEATURE import speiger.src.collections.PACKAGE.sets.HASH_SET; #else if CUSTOM_SET_FEATURE import speiger.src.collections.PACKAGE.sets.CUSTOM_HASH_SET; #else if RB_TREE_SET_FEATURE import speiger.src.collections.PACKAGE.sets.RB_TREE_SET; #else if AVL_TREE_SET_FEATURE import speiger.src.collections.PACKAGE.sets.AVL_TREE_SET; #else if ARRAY_SET_FEATURE import speiger.src.collections.PACKAGE.sets.ARRAY_SET; #endif #endif #endif #endif #if !TYPE_BOOLEAN && BOOLEAN_ASYNC_MODULE #if !TYPE_OBJECT import speiger.src.collections.booleans.utils.BooleanAsyncBuilder; #endif import speiger.src.collections.booleans.utils.BooleanAsyncBuilder.BaseBooleanTask; #endif #if !TYPE_OBJECT && OBJECT_ASYNC_MODULE import speiger.src.collections.objects.utils.ObjectAsyncBuilder; import speiger.src.collections.objects.utils.ObjectAsyncBuilder.BaseObjectTask; #endif #if !TYPE_INT && INT_ASYNC_MODULE #if !TYPE_OBJECT import speiger.src.collections.ints.utils.IntAsyncBuilder; #endif import speiger.src.collections.ints.utils.IntAsyncBuilder.BaseIntTask; #endif import speiger.src.collections.utils.ISizeProvider; import speiger.src.collections.utils.SanityChecks; /** * * * The Async API allows you to process collections on a different thread without having to deal with the Multithreading complexity.
* It uses the Lightweight Stream Replace API to do its work, which can make it faster then sequential streams.
* This feature isn't designed to do Multithreading work, but to allow moving work off the main thread into a worker thread.
* So anything executed on this is still Singlethreaded.
*
* How it works is you create the AsyncBuilder using a Iterable of the Type.
* Then select things you want to use on the Iterable.
* - For example: map/filter/peek/limit/etc
*
* After that you select your action the Iterable should be applied on.
* - For Example: forEach/findFirst/matchAny/etc
*
* Then optionally a custom Executor or callback can be applied
* At the end either execute or join can be called to start the task.
* - execute will return the Task reference so that the task can be traced back.
* - join will await the completion of the task and return the return value
*
* During Construction a couple Disposable Builder Objects will be created.
* These will be only used during construction.
*
* The Task Object is also pause-able/Interruptable at any moment during the Processing.
* * A small example *

 * public void processFiles(ObjectCollection<String> potentialFiles) {
 * 	potentialFiles.asAsync()
 * 		.map(Paths::get).filter(Files::exists) //Modifies the collection (Optional)
 * 		.forEach(Files::delete) //Creates the action (Required)
 * 		.onCompletion(T -> {}} //Callback on completion (Optional)
 * 		.execute() //Starts the task. (Required)
 * }
 * 
* @author Speiger * * @Type(T) */ public class ASYNC_BUILDER KEY_GENERIC_TYPE { ITERABLE KEY_GENERIC_TYPE iterable; BASE_TASK KEY_GENERIC_TYPE task; /** * Main Constructor that uses a Iterable to build a Offthread Task. * @param iterable that should be processed */ public ASYNC_BUILDER(ITERABLE KEY_GENERIC_TYPE iterable) { this.iterable = iterable; } /** * Helper constructor. * @param task that had been build */ public ASYNC_BUILDER(BASE_TASK KEY_GENERIC_TYPE task) { this.task = task; } /** * Helper function that automatically wraps a Iterable into a AsyncBuilder since it forces this collections Iterable. * @param iterable that should be wrapped * @Type(T) * @return a AsyncBuilder with the iterable wrapped */ public static GENERIC_KEY_BRACES ASYNC_BUILDER KEY_GENERIC_TYPE of(Iterable iterable) { return new ASYNC_BUILDERBRACES(ITERABLES.wrap(iterable)); } #if ARRAY_LIST_FEATURE /** * Helper function that automatically wraps a array into a AsyncBuilder since it forces this collections Iterable. * @param values that should be wrapped * @Type(T) * @return a AsyncBuilder with the values wrapped */ public static GENERIC_KEY_BRACES ASYNC_BUILDER KEY_GENERIC_TYPE of(KEY_TYPE...values) { return new ASYNC_BUILDERBRACES(ARRAY_LIST.wrap(values)); } #endif #if OBJECT_ASYNC_MODULE /** * Maps the elements to something else * @param mapper the mapping function * @param The return type. * @return a new Builder Object with the mapped Iterable */ public ObjectAsyncBuilder map(TO_OBJECT_FUNCTION KKS_GENERIC_TYPE mapper) { return new ObjectAsyncBuilder<>(ITERABLES.map(iterable, mapper)); } /** * Maps the elements to something else * @param mapper the flatMapping function * @param The return type supplier. * @param The return type. * @return a new Builder Object with the mapped Iterable */ public > ObjectAsyncBuilder flatMap(TO_OBJECT_FUNCTION KKS_GENERIC_TYPE mapper) { return new ObjectAsyncBuilder<>(ITERABLES.flatMap(iterable, mapper)); } /** * Maps the elements to something else * @param mapper the flatMapping function * @param The return type. * @return a new Builder Object with the mapped Iterable */ public ObjectAsyncBuilder arrayflatMap(TO_OBJECT_FUNCTION KKS_GENERIC_TYPE mapper) { return new ObjectAsyncBuilder<>(ITERABLES.arrayFlatMap(iterable, mapper)); } #endif #if TYPE_OBJECT #iterate #argument BUILDER BooleanAsyncBuilder ByteAsyncBuilder ShortAsyncBuilder IntAsyncBuilder LongAsyncBuilder FloatAsyncBuilder DoubleAsyncBuilder #argument MAPPER Predicate ToByteFunction ToShortFunction ToIntFunction ToLongFunction ToFloatFunction ToDoubleFunction #argument TYPE Boolean Byte Short Int Long Float Double #argument CHECK BOOLEAN_ASYNC_MODULE BYTE_ASYNC_MODULE SHORT_ASYNC_MODULE INT_ASYNC_MODULE LONG_ASYNC_MODULE FLOAT_ASYNC_MODULE DOUBLE_ASYNC_MODULE #if CHECK /** * Maps the elements to something else * @param mapper the mapping function * @return a new Builder Object with the mapped Iterable */ public BUILDER mapToTYPE(MAPPER mapper) { return new BUILDER(ITERABLES.mapToTYPE(iterable, mapper)); } #endif #enditerate #endif /** * Filters out the unwanted elements out of the Iterable * @param filter the elements that should be kept * @return Self with a filter applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE filter(PREDICATE KEY_GENERIC_TYPE filter) { iterable = ITERABLES.filter(iterable, filter); return this; } /** * Removes duplicated elements out of the Iterable * @return Self with a deduplicator applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE distinct() { iterable = ITERABLES.distinct(iterable); return this; } /** * Repeats the elements inside of the Iterable * @param repeats the amount of times the elements should be repeated * @return self with a repeater applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE repeat(int repeats) { iterable = ITERABLES.repeat(iterable, repeats); return this; } /** * Limits how many elements are inside of the Iterable * @param limit how many elements should max be iterated through * @return self with a limiter applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE limit(long limit) { iterable = ITERABLES.limit(iterable, limit); return this; } /** * Sorts the elements inside of the Iterable. * This operation is heavily hurting performance because it rebuilds the entire iterator and then sorts it, and this will affect the pausing feature. * @param sorter that sorts the elements. * @return self with a sorter applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE sorted(COMPARATOR KEY_GENERIC_TYPE sorter) { iterable = ITERABLES.sorted(iterable, sorter); return this; } /** * Allows to preview elements before they are processed * @param action the action that should be applied * @return self with a preview applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE peek(CONSUMER KEY_GENERIC_TYPE action) { iterable = ITERABLES.peek(iterable, action); return this; } #if OBJECT_ASYNC_MODULE /** * Iterates over the Iterable with a desired action * @param action that should be applied * @return a new Builder with the forEach action applied. */ public ObjectAsyncBuilder forEach(CONSUMER KEY_GENERIC_TYPE action) { return new ObjectAsyncBuilder<>(new ForEachTask<>(iterable.iterator(), action)); } #endif /** * Reduces the elements inside of the Iterable down to one element * @param operator that reduces the elements. * @return self with the reduce action applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE reduce(UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator) { task = new SimpleReduceTaskBRACES(iterable.iterator(), operator); return this; } #if TYPE_OBJECT /** * Reduces the elements inside of the Iterable down to one element using a identity element. * @param the return type * @param identity the element the reduce function should start with * @param operator that reduces the elements. * @return a new Builder with the reduce function applied */ public ASYNC_BUILDER reduce(KEY_SPECIAL_TYPE identity, BiFunction operator) { return new ASYNC_BUILDERBRACES(new ReduceTaskBRACES(iterable.iterator(), operator, identity)); } #else /** * Reduces the elements inside of the Iterable down to one element using a identity element. * @param identity the element the reduce function should start with * @param operator that reduces the elements. * @return a new Builder with the reduce function applied */ public ASYNC_BUILDER reduce(KEY_TYPE identity, UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator) { return new ASYNC_BUILDERBRACES(new ReduceTaskBRACES(iterable.iterator(), operator, identity)); } #endif #if OBJECT_ASYNC_MODULE #if TYPE_OBJECT /** * Pours all elements of the Iterable down into a Array. * @param action creates the final array that should be copied into. * @return a new Builder with the ToArray function applied */ public ObjectAsyncBuilder TO_ARRAY(IntFunction action) { return new ObjectAsyncBuilder<>(new ArrayTaskBRACES(iterable, action)); } #else /** * Pours all elements of the Iterable down into a Array. * @return a new Builder with the ToArray function applied */ public ObjectAsyncBuilder TO_ARRAY() { return new ObjectAsyncBuilder<>(new ArrayTaskBRACES(iterable)); } #endif #if ARRAY_LIST_FEATURE || LINKED_LIST_FEATURE /** * Pours all elements into a List that can be later * @return a new Builder with the pour function applied */ public ObjectAsyncBuilder pourAsList() { #if ARRAY_LIST_FEATURE return pour(new ARRAY_LISTBRACES()); #else return pour(new LINKED_LISTBRACES()); #endif } #endif #if !TYPE_BOOLEAN && SET_MODULE #if LINKED_SET_FEATURE || LINKED_CUSTOM_SET_FEATURE || SET_FEATURE || CUSTOM_SET_FEATURE || RB_TREE_SET_FEATURE || AVL_TREE_SET_FEATURE || ARRAY_SET_FEATURE /** * Pours all elements into a Set that can be later * @return a new Builder with the pour function applied */ public ObjectAsyncBuilder pourAsSet() { #if LINKED_SET_FEATURE return pour(new LINKED_HASH_SETBRACES()); #else if LINKED_CUSTOM_SET_FEATURE return pour(new LINKED_CUSTOM_HASH_SETBRACES(STRATEGY.normalStrategy())); #else if SET_FEATURE return pour(new HASH_SETBRACES()); #else if CUSTOM_SET_FEATURE return pour(new CUSTOM_HASH_SETBRACES(STRATEGY.normalStrategy())); #else if RB_TREE_SET_FEATURE return pour(new RB_Tree_SETBRACES()); #else if AVL_TREE_SET_FEATURE return pour(new AVL_Tree_SETBRACES()); #else if ARRAY_SET_FEATURE return pour(new ARRAY_SETBRACES()); #endif } #endif #endif /** * Pours all elements into a collection that can be later * @param the return type * @param collection the collection the elements * @return a new Builder with the pour function applied */ public ObjectAsyncBuilder pour(E collection) { return new ObjectAsyncBuilder<>(new CollectTask<>(iterable.iterator(), collection)); } #endif #if BOOLEAN_ASYNC_MODULE /** * Searches through the elements of the Iterable to find if the desired element is present. * @param filter that decides the desired elements * @return a new Builder with the matchAny function applied */ public BooleanAsyncBuilder matchAny(PREDICATE KEY_GENERIC_TYPE filter) { return new BooleanAsyncBuilder(new MatchTaskBRACES(iterable.iterator(), filter, 0)); } /** * Searches through the elements of the Iterable to find if unwanted elements are present. * @param filter that decides the unwanted elements * @return a new Builder with the matchNone function applied */ public BooleanAsyncBuilder matchNone(PREDICATE KEY_GENERIC_TYPE filter) { return new BooleanAsyncBuilder(new MatchTaskBRACES(iterable.iterator(), filter, 1)); } /** * Searches through the elements of the Iterable to find if all the desired elements are present. * @param filter that decides the desired elements * @return a new Builder with the matchAll function applied */ public BooleanAsyncBuilder matchAll(PREDICATE KEY_GENERIC_TYPE filter) { return new BooleanAsyncBuilder(new MatchTaskBRACES(iterable.iterator(), filter, 2)); } #endif /** * Searches through the elements of the Iterable to find if the desired element. * If not present it will return the default value of the type * @param filter that decides the desired elements * @return self with the findFirst function applied */ public ASYNC_BUILDER KEY_GENERIC_TYPE findFirst(PREDICATE KEY_GENERIC_TYPE filter) { task = new FindFirstTaskBRACES(iterable.iterator(), filter); return this; } #if INT_ASYNC_MODULE /** * Counts all desired elements inside the Iterable * @param filter that decides the desired elements * @return a new Builder with the count function applied */ public IntAsyncBuilder count(PREDICATE KEY_GENERIC_TYPE filter) { return new IntAsyncBuilder(new CountTaskBRACES(iterable.iterator(), filter)); } #endif /** * Optional way to add a custom executor that runs this offthread task. * Can only be set after the action was decided on. * @param executor that executes the task, defaults to {@link SanityChecks#invokeAsyncTask(Runnable) } * @return self with the executor set * @note has to be NonNull */ public ASYNC_BUILDER KEY_GENERIC_TYPE executor(Executor executor) { if(task == null) throw new IllegalStateException("Action is missing"); task.withExecutor(executor); return this; } /** * Optional way to set a callback that allows to compute actions after the task was completed. * The state of the task has to be validated by the callback. * @param callback that should be notified after completion of the task * @return self with the callback set * @note can be null */ public ASYNC_BUILDER KEY_GENERIC_TYPE onCompletion(Consumer callback) { if(task == null) throw new IllegalStateException("Action is missing"); task.withCallback(callback); return this; } /** * Starts the Execution of the task without awaiting its result * @return the task object that allow to trace it. */ public TASK KEY_GENERIC_TYPE execute() { BASE_TASK KEY_GENERIC_TYPE toRun = task; toRun.begin(); task = null; return toRun; } /** * Starts the Execution of the task and will await its completion, returning the result. * @return the result of the task provided. * @throws ExecutionException if the task threw a exception * @throws InterruptedException if the caller thread was interrupted */ public KEY_TYPE join() throws ExecutionException, InterruptedException { BASE_TASK KEY_GENERIC_TYPE toRun = task; task = null; toRun.begin(); return toRun.GET_KEY(); } /** * Starts the Execution of the task and will await its completion with a timeout, returning the result. * @param timeout of how long the thread should wait the task to be completed * @param unit of the desired waiting time. * @return the result of the provided task * @throws InterruptedException if the caller thread was interrupted * @throws ExecutionException if the task threw a exception * @throws TimeoutException if the timeout was reached before the task was finished */ public KEY_TYPE join(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { BASE_TASK KEY_GENERIC_TYPE toRun = task; task = null; toRun.begin(); return toRun.GET_KEY(timeout, unit); } #if !TYPE_OBJECT private static class ReduceTask extends BASE_TASK { ITERATOR KEY_GENERIC_TYPE iter; UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator; KEY_TYPE value; public ReduceTask(ITERATOR KEY_GENERIC_TYPE iter, UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator, KEY_TYPE value) { this.iter = iter; this.operator = operator; this.value = value; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { value = operator.apply(value, iter.NEXT()); } if(!iter.hasNext()) { setResult(value); return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; operator = null; #if TYPE_OBJECT value = null; #endif } } #else private static class ReduceTask extends BaseObjectTask { ObjectIterator iter; BiFunction operator; E value; public ReduceTask(ITERATOR KEY_GENERIC_TYPE iter, BiFunction operator, E value) { this.iter = iter; this.operator = operator; this.value = value; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { value = operator.apply(value, iter.NEXT()); } if(!iter.hasNext()) { setResult(value); return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; operator = null; #if TYPE_OBJECT value = null; #endif } } #endif private static class SimpleReduceTask KEY_GENERIC_TYPE extends BASE_TASK KEY_GENERIC_TYPE { ITERATOR KEY_GENERIC_TYPE iter; UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator; boolean first = true; KEY_TYPE value; public SimpleReduceTask(ITERATOR KEY_GENERIC_TYPE iter, UNARY_OPERATOR KEY_KEY_GENERIC_TYPE operator) { this.iter = iter; this.operator = operator; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { if(first) { first = false; value = iter.NEXT(); } else { value = operator.APPLY_VALUE(value, iter.NEXT()); } } if(!iter.hasNext()) { setResult(value); return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; operator = null; #if TYPE_OBJECT value = null; #endif } } #if BOOLEAN_ASYNC_MODULE private static class MatchTask KEY_GENERIC_TYPE extends BaseBooleanTask { ITERATOR KEY_GENERIC_TYPE iter; PREDICATE KEY_GENERIC_TYPE filter; int type; public MatchTask(ITERATOR KEY_GENERIC_TYPE iter, PREDICATE KEY_GENERIC_TYPE filter, int type) { this.iter = iter; this.filter = filter; this.type = type; if(type < 0 || type > 2) throw new IllegalArgumentException("Type is not allowed has to be between 0-2"); } @Override protected boolean execute() throws Exception { switch(type) { case 0: while(shouldRun() && iter.hasNext()) { if(filter.test(iter.NEXT())) { setResult(true); return true; } } break; case 1: while(shouldRun() && iter.hasNext()) { if(filter.test(iter.NEXT())) { setResult(false); return true; } } break; case 2: while(shouldRun() && iter.hasNext()) { if(!filter.test(iter.NEXT())) { setResult(false); return true; } } break; } if(!iter.hasNext()) { setResult(type >= 1); return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; filter = null; } } #endif private static class FindFirstTask KEY_GENERIC_TYPE extends BASE_TASK KEY_GENERIC_TYPE { ITERATOR KEY_GENERIC_TYPE iter; PREDICATE KEY_GENERIC_TYPE filter; public FindFirstTask(ITERATOR KEY_GENERIC_TYPE iter, PREDICATE KEY_GENERIC_TYPE filter) { this.iter = iter; this.filter = filter; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { KEY_TYPE entry = iter.NEXT(); if(filter.test(iter.NEXT())) { setResult(entry); return true; } } return !iter.hasNext(); } @Override protected void onCompletion() { super.onCompletion(); iter = null; filter = null; } } #if INT_ASYNC_MODULE private static class CountTask KEY_GENERIC_TYPE extends BaseIntTask { ITERATOR KEY_GENERIC_TYPE iter; PREDICATE KEY_GENERIC_TYPE filter; int counted = 0; public CountTask(ITERATOR KEY_GENERIC_TYPE iter, PREDICATE KEY_GENERIC_TYPE filter) { this.iter = iter; this.filter = filter; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { if(filter.test(iter.NEXT())) { counted++; } } if(!iter.hasNext()) { setResult(counted); return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; filter = null; } } #endif #if OBJECT_ASYNC_MODULE private static class CollectTask KSS_GENERIC_TYPE> extends BaseObjectTask { ITERATOR KEY_SPECIAL_GENERIC_TYPE iter; T collection; public CollectTask(ITERATOR KEY_SPECIAL_GENERIC_TYPE iter, T collection) { this.iter = iter; this.collection = collection; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { collection.add(iter.NEXT()); } if(!iter.hasNext()) { setResult(collection); collection = null; return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; } } private static class ArrayTask KEY_GENERIC_TYPE extends BaseObjectTask { ITERATOR KEY_GENERIC_TYPE iter; COLLECTIONS.CollectionWrapper KEY_GENERIC_TYPE wrapper; #if TYPE_OBJECT IntFunction builder; public ArrayTask(ITERABLE KEY_GENERIC_TYPE iterable, IntFunction builder) { this.builder = builder; #else public ArrayTask(ITERABLE KEY_GENERIC_TYPE iterable) { #endif iter = iterable.iterator(); ISizeProvider prov = ISizeProvider.of(iterable); int size = prov == null ? -1 : prov.size(); wrapper = size < 0 ? COLLECTIONS.wrapper() : COLLECTIONS.wrapper(size); } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { wrapper.add(iter.NEXT()); } if(!iter.hasNext()) { #if TYPE_OBJECT setResult(wrapper.TO_ARRAY(builder)); #else setResult(wrapper.TO_ARRAY()); #endif wrapper = null; return true; } return false; } @Override protected void onCompletion() { super.onCompletion(); iter = null; #if TYPE_OBJECT builder = null; #endif } } private static class ForEachTask extends BaseObjectTask { ITERATOR KEY_GENERIC_TYPE iter; CONSUMER KEY_GENERIC_TYPE listener; public ForEachTask(ITERATOR KEY_GENERIC_TYPE iter, CONSUMER KEY_GENERIC_TYPE listener) { this.iter = iter; this.listener = listener; } @Override protected boolean execute() throws Exception { while(shouldRun() && iter.hasNext()) { listener.accept(iter.NEXT()); } return !iter.hasNext(); } @Override protected void onCompletion() { super.onCompletion(); iter = null; listener = null; } } #endif /** * Base Task of the Actions that can be performed. * Allows to simplify the actions that get executed. * @Type(T) */ public abstract static class BASE_TASK KEY_GENERIC_TYPE implements TASK KEY_GENERIC_TYPE { private static final int CREATED = 0; private static final int RUNNING = 1; private static final int PAUSING = 2; private static final int PAUSED = 3; private static final int FINISHING = 4; private static final int FINISHED = 5; private static final int EXCEPTIONALLY = 6; private static final int CANCELLED = 7; private volatile WaitNode waiter; private volatile int state = CREATED; Consumer callback; Executor executor = SanityChecks::invokeAsyncTask; KEY_TYPE result; Throwable excpetion; void withCallback(Consumer callback) { this.callback = callback; } void withExecutor(Executor executor) { this.executor = executor; } void begin() { executor.execute(this); } protected abstract boolean execute() throws Exception; protected void onCompletion() { if(callback != null) { callback.accept(this); callback = null; } executor = null; } protected void setResult(KEY_TYPE result) { this.result = result; } @Override public void run() { state = RUNNING; try { if(execute()) { state = FINISHING; finishCompletion(FINISHED); } else if(state == PAUSING) { state = PAUSED; } } catch(Exception e) { state = EXCEPTIONALLY; this.excpetion = e; finishCompletion(EXCEPTIONALLY); } } private void finishCompletion(int nextState) { WaitNode current = waiter; waiter = null; while(current != null) { Thread t = current.thread; if (t != null) { current.thread = null; LockSupport.unpark(t); } WaitNode next = current.next; if (next == null) break; current.next = null; current = next; } state = nextState; onCompletion(); } @Override public boolean cancel(boolean cancelIfRunnning) { if(state == RUNNING && !cancelIfRunnning) return false; state = CANCELLED; finishCompletion(CANCELLED); return true; } @Override public KEY_TYPE GET_KEY() throws InterruptedException, ExecutionException { int s = state; return report(s <= FINISHING ? awaitDone(false, 0L) : s); } @Override public KEY_TYPE GET_KEY(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= FINISHING && (s = awaitDone(true, unit.toNanos(timeout))) <= FINISHING) throw new TimeoutException(); return report(s); } private KEY_TYPE report(int s) throws ExecutionException { if (s == FINISHED) return result; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException(excpetion); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; while(true) { if(Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if(s > FINISHING) { if(q != null) q.thread = null; return s; } else if(s == FINISHING) Thread.yield(); else if(q == null) q = new WaitNode(); else if(!queued) { q.next = waiter; waiter = q; queued = true; } else if(timed) { nanos = deadline - System.nanoTime(); if(nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } private void removeWaiter(WaitNode node) { if(node == null) return; node.thread = null; retry: while(true) { for(WaitNode prev = null, current = waiter, next = null; current != null; current = next) { next = current.next; if(current.thread != null) prev = current; else if(prev != null) { prev.next = next; if(prev.thread == null) continue retry; //Previous element got removed which means another thread was editing this while we were editing. } else if(waiter == current) { waiter = next; continue retry; } } break; } } @Override public boolean isCancelled() { return state >= CANCELLED; } @Override public boolean isDone() { return state >= FINISHING; } @Override public boolean isPaused() { return state == PAUSED; } @Override public boolean isSuccessful() { return state == FINISHED; } protected boolean shouldRun() { return state == RUNNING; } @Override public void pause() { if(state == PAUSED || state == PAUSING || state >= FINISHING) return; state = PAUSING; } @Override public void awaitPausing() { if(state == PAUSED) return; pause(); if(state == PAUSING) { while(state == PAUSING) { Thread.yield(); } } } @Override public void resume() { if(state != PAUSED && state != PAUSING) return; if(state == PAUSING) { while(state == PAUSING) { Thread.yield(); } } state = RUNNING; executor.execute(this); } static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } } }