Class LinkedTransferQueue<E>
- Type Parameters:
E
- the type of elements held in this collection.
- All Implemented Interfaces:
Serializable
,Iterable<E>
,Collection<E>
,BlockingQueue<E>
,Queue<E>
,TransferQueue<E>
TransferQueue
based on linked nodes.
This queue orders elements FIFO (first-in-first-out) with respect
to any given producer. The head of the queue is that
element that has been on the queue the longest time for some
producer. The tail of the queue is that element that has
been on the queue the shortest time for some producer.
Beware that, unlike in most collections, the size
method
is NOT a constant-time operation. Because of the
asynchronous nature of these queues, determining the current number
of elements requires a traversal of the elements, and so may report
inaccurate results if this collection is modified during traversal.
Additionally, the bulk operations addAll
,
removeAll
, retainAll
, containsAll
,
equals
, and toArray
are not guaranteed
to be performed atomically. For example, an iterator operating
concurrently with an addAll
operation might view only some
of the added elements.
This class and its iterator implement all of the
optional methods of the Collection
and Iterator
interfaces.
Memory consistency effects: As with other concurrent
collections, actions in a thread prior to placing an object into a
LinkedTransferQueue
happen-before
actions subsequent to the access or removal of that element from
the LinkedTransferQueue
in another thread.
This class is a member of the
Java Collections Framework.
TODO: Do NOT remove this class. It's referenced from DataStructures
.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) final class
(package private) static final class
Queue nodes. -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate static final int
private static final int
The number of times to spin before blocking when a node is preceded by another node that is apparently spinning.private static final int
The number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking when a node is apparently the first waiter in the queue.(package private) LinkedTransferQueue.Node
head of the queue; null until first enqueueprivate static final long
private static final boolean
True if on multiprocessorprivate static final int
private static final long
(package private) static final int
The maximum number of estimated removal failures (sweepVotes) to tolerate before sweeping through the queue unlinking cancelled nodes that were not unlinked upon initial removal.private int
The number of apparent failures to unsplice removed nodesprivate static final long
private static final int
private LinkedTransferQueue.Node
tail of the queue; null until first appendprivate static final long
private static final int
private static final sun.misc.Unsafe
-
Constructor Summary
ConstructorsConstructorDescriptionCreates an initially emptyLinkedTransferQueue
.LinkedTransferQueue
(Collection<? extends E> c) Creates aLinkedTransferQueue
initially containing the elements of the given collection, added in traversal order of the collection's iterator. -
Method Summary
Modifier and TypeMethodDescriptionboolean
Inserts the specified element at the tail of this queue.private E
awaitMatch
(LinkedTransferQueue.Node s, LinkedTransferQueue.Node pred, E e, boolean timed, long nanos) Spins/yields/blocks until node s is matched or caller gives up.private boolean
private boolean
casSweepVotes
(int cmp, int val) (package private) static <E> E
private boolean
boolean
Returnstrue
if this queue contains the specified element.private int
countOfMode
(boolean data) Traverses and counts unmatched nodes of the given mode.int
drainTo
(Collection<? super E> c) int
drainTo
(Collection<? super E> c, int maxElements) private boolean
Main implementation of remove(Object)private E
Returns the item in the first unmatched node with isData; or null if none.private LinkedTransferQueue.Node
firstOfMode
(boolean isData) Returns the first unmatched node of the given mode, or null if none.(package private) static sun.misc.Unsafe
Returns a sun.misc.Unsafe.int
Returns an estimate of the number of consumers waiting to dequeue elements viatake
orpoll
.boolean
Returnstrue
if there is at least one consumer waiting to dequeue an element viatake
orpoll
.boolean
isEmpty()
Returnstrue
if this queue contains no elements.iterator()
Returns an iterator over the elements in this queue in proper sequence.boolean
Inserts the specified element at the tail of this queue.boolean
Inserts the specified element at the tail of this queue.peek()
poll()
void
Inserts the specified element at the tail of this queue.private void
Reconstitutes the Queue instance from a stream (that is, deserializes it).int
Always returnsInteger.MAX_VALUE
because aLinkedTransferQueue
is not capacity constrained.boolean
Removes a single instance of the specified element from this queue, if it is present.int
size()
Returns the number of elements in this queue.private static int
spinsFor
(LinkedTransferQueue.Node pred, boolean haveData) Returns spin/yield value for a node with given predecessor and data mode.(package private) final LinkedTransferQueue.Node
Returns the successor of p, or the head node if p.next has been linked to self, which will only be true if traversing with a stale pointer that is now off the list.private void
sweep()
Unlinks matched (typically cancelled) nodes encountered in a traversal from head.take()
void
Transfers the element to a consumer, waiting if necessary to do so.private LinkedTransferQueue.Node
tryAppend
(LinkedTransferQueue.Node s, boolean haveData) Tries to append node s as tail.boolean
tryTransfer
(E e) Transfers the element to a waiting consumer immediately, if possible.boolean
tryTransfer
(E e, long timeout, TimeUnit unit) Transfers the element to a consumer if it is possible to do so before the timeout elapses.(package private) final void
Unsplices (now or later) the given deleted/cancelled node with the given predecessor.private void
Saves the state to a stream (that is, serializes it).private E
Implements all queuing methods.Methods inherited from class java.util.AbstractQueue
addAll, clear, element, remove
Methods inherited from class java.util.AbstractCollection
containsAll, removeAll, retainAll, toArray, toArray, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface java.util.Collection
addAll, clear, containsAll, equals, hashCode, parallelStream, removeAll, removeIf, retainAll, spliterator, stream, toArray, toArray, toArray
-
Field Details
-
serialVersionUID
private static final long serialVersionUID- See Also:
-
MP
private static final boolean MPTrue if on multiprocessor -
FRONT_SPINS
private static final int FRONT_SPINSThe number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking when a node is apparently the first waiter in the queue. See above for explanation. Must be a power of two. The value is empirically derived -- it works pretty well across a variety of processors, numbers of CPUs, and OSes.- See Also:
-
CHAINED_SPINS
private static final int CHAINED_SPINSThe number of times to spin before blocking when a node is preceded by another node that is apparently spinning. Also serves as an increment to FRONT_SPINS on phase changes, and as base average frequency for yielding during spins. Must be a power of two.- See Also:
-
SWEEP_THRESHOLD
static final int SWEEP_THRESHOLDThe maximum number of estimated removal failures (sweepVotes) to tolerate before sweeping through the queue unlinking cancelled nodes that were not unlinked upon initial removal. See above for explanation. The value must be at least two to avoid useless sweeps when removing trailing nodes.- See Also:
-
head
head of the queue; null until first enqueue -
tail
tail of the queue; null until first append -
sweepVotes
private transient volatile int sweepVotesThe number of apparent failures to unsplice removed nodes -
NOW
private static final int NOW- See Also:
-
ASYNC
private static final int ASYNC- See Also:
-
SYNC
private static final int SYNC- See Also:
-
TIMED
private static final int TIMED- See Also:
-
UNSAFE
private static final sun.misc.Unsafe UNSAFE -
headOffset
private static final long headOffset -
tailOffset
private static final long tailOffset -
sweepVotesOffset
private static final long sweepVotesOffset
-
-
Constructor Details
-
LinkedTransferQueue
public LinkedTransferQueue()Creates an initially emptyLinkedTransferQueue
. -
LinkedTransferQueue
Creates aLinkedTransferQueue
initially containing the elements of the given collection, added in traversal order of the collection's iterator.- Parameters:
c
- the collection of elements to initially contain- Throws:
NullPointerException
- if the specified collection or any of its elements are null
-
-
Method Details
-
casTail
-
casHead
-
casSweepVotes
private boolean casSweepVotes(int cmp, int val) -
cast
-
xfer
Implements all queuing methods. See above for explanation.- Parameters:
e
- the item or null for takehaveData
- true if this is a put, else a takehow
- NOW, ASYNC, SYNC, or TIMEDnanos
- timeout in nanosecs, used only if mode is TIMED- Returns:
- an item if matched, else e
- Throws:
NullPointerException
- if haveData mode but e is null
-
tryAppend
Tries to append node s as tail.- Parameters:
s
- the node to appendhaveData
- true if appending in data mode- Returns:
- null on failure due to losing race with append in different mode, else s's predecessor, or s itself if no predecessor
-
awaitMatch
private E awaitMatch(LinkedTransferQueue.Node s, LinkedTransferQueue.Node pred, E e, boolean timed, long nanos) Spins/yields/blocks until node s is matched or caller gives up.- Parameters:
s
- the waiting nodepred
- the predecessor of s, or s itself if it has no predecessor, or null if unknown (the null case does not occur in any current calls but may in possible future extensions)e
- the comparison value for checking matchtimed
- if true, wait only until timeout elapsesnanos
- timeout in nanosecs, used only if timed is true- Returns:
- matched item, or e if unmatched on interrupt or timeout
-
spinsFor
Returns spin/yield value for a node with given predecessor and data mode. See above for explanation. -
succ
Returns the successor of p, or the head node if p.next has been linked to self, which will only be true if traversing with a stale pointer that is now off the list. -
firstOfMode
Returns the first unmatched node of the given mode, or null if none. Used by methods isEmpty, hasWaitingConsumer. -
firstDataItem
Returns the item in the first unmatched node with isData; or null if none. Used by peek. -
countOfMode
private int countOfMode(boolean data) Traverses and counts unmatched nodes of the given mode. Used by methods size and getWaitingConsumerCount. -
unsplice
Unsplices (now or later) the given deleted/cancelled node with the given predecessor.- Parameters:
pred
- a node that was at one time known to be the predecessor of s, or null or s itself if s is/was at heads
- the node to be unspliced
-
sweep
private void sweep()Unlinks matched (typically cancelled) nodes encountered in a traversal from head. -
findAndRemove
Main implementation of remove(Object) -
put
Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never block.- Specified by:
put
in interfaceBlockingQueue<E>
- Throws:
NullPointerException
- if the specified element is null
-
offer
Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never block or returnfalse
.- Specified by:
offer
in interfaceBlockingQueue<E>
- Returns:
true
(as specified byBlockingQueue.offer
)- Throws:
NullPointerException
- if the specified element is null
-
offer
Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never returnfalse
.- Specified by:
offer
in interfaceBlockingQueue<E>
- Specified by:
offer
in interfaceQueue<E>
- Returns:
true
(as specified byQueue.offer(E)
)- Throws:
NullPointerException
- if the specified element is null
-
add
Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never throwIllegalStateException
or returnfalse
.- Specified by:
add
in interfaceBlockingQueue<E>
- Specified by:
add
in interfaceCollection<E>
- Specified by:
add
in interfaceQueue<E>
- Overrides:
add
in classAbstractQueue<E>
- Returns:
true
(as specified byCollection.add(E)
)- Throws:
NullPointerException
- if the specified element is null
-
tryTransfer
Transfers the element to a waiting consumer immediately, if possible.More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in
take()
or timedpoll
), otherwise returningfalse
without enqueuing the element.- Specified by:
tryTransfer
in interfaceTransferQueue<E>
- Parameters:
e
- the element to transfer- Returns:
true
if the element was transferred, elsefalse
- Throws:
NullPointerException
- if the specified element is null
-
transfer
Transfers the element to a consumer, waiting if necessary to do so.More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in
take()
or timedpoll
), else inserts the specified element at the tail of this queue and waits until the element is received by a consumer.- Specified by:
transfer
in interfaceTransferQueue<E>
- Parameters:
e
- the element to transfer- Throws:
NullPointerException
- if the specified element is nullInterruptedException
- if interrupted while waiting, in which case the element is not enqueued.
-
tryTransfer
Transfers the element to a consumer if it is possible to do so before the timeout elapses.More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in
take()
or timedpoll
), else inserts the specified element at the tail of this queue and waits until the element is received by a consumer, returningfalse
if the specified wait time elapses before the element can be transferred.- Specified by:
tryTransfer
in interfaceTransferQueue<E>
- Parameters:
e
- the element to transfertimeout
- how long to wait before giving up, in units ofunit
unit
- aTimeUnit
determining how to interpret thetimeout
parameter- Returns:
true
if successful, orfalse
if the specified waiting time elapses before completion, in which case the element is not enqueued.- Throws:
NullPointerException
- if the specified element is nullInterruptedException
- if interrupted while waiting, in which case the element is not enqueued.
-
take
- Specified by:
take
in interfaceBlockingQueue<E>
- Throws:
InterruptedException
-
poll
- Specified by:
poll
in interfaceBlockingQueue<E>
- Throws:
InterruptedException
-
poll
-
drainTo
- Specified by:
drainTo
in interfaceBlockingQueue<E>
-
drainTo
- Specified by:
drainTo
in interfaceBlockingQueue<E>
-
iterator
Returns an iterator over the elements in this queue in proper sequence. The elements will be returned in order from first (head) to last (tail).The returned iterator is a "weakly consistent" iterator that will never throw
ConcurrentModificationException
, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.- Specified by:
iterator
in interfaceCollection<E>
- Specified by:
iterator
in interfaceIterable<E>
- Specified by:
iterator
in classAbstractCollection<E>
- Returns:
- an iterator over the elements in this queue in proper sequence
-
peek
-
isEmpty
public boolean isEmpty()Returnstrue
if this queue contains no elements.- Specified by:
isEmpty
in interfaceCollection<E>
- Overrides:
isEmpty
in classAbstractCollection<E>
- Returns:
true
if this queue contains no elements
-
hasWaitingConsumer
public boolean hasWaitingConsumer()Description copied from interface:TransferQueue
Returnstrue
if there is at least one consumer waiting to dequeue an element viatake
orpoll
. The return value represents a momentary state of affairs.- Specified by:
hasWaitingConsumer
in interfaceTransferQueue<E>
- Returns:
true
if there is at least one waiting consumer
-
size
public int size()Returns the number of elements in this queue. If this queue contains more thanInteger.MAX_VALUE
elements, returnsInteger.MAX_VALUE
.Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal.
- Specified by:
size
in interfaceCollection<E>
- Specified by:
size
in classAbstractCollection<E>
- Returns:
- the number of elements in this queue
-
getWaitingConsumerCount
public int getWaitingConsumerCount()Description copied from interface:TransferQueue
Returns an estimate of the number of consumers waiting to dequeue elements viatake
orpoll
. The return value is an approximation of a momentary state of affairs, that may be inaccurate if consumers have completed or given up waiting. The value may be useful for monitoring and heuristics, but not for synchronization control. Implementations of this method are likely to be noticeably slower than those forTransferQueue.hasWaitingConsumer()
.- Specified by:
getWaitingConsumerCount
in interfaceTransferQueue<E>
- Returns:
- the number of consumers waiting to dequeue elements
-
remove
Removes a single instance of the specified element from this queue, if it is present. More formally, removes an elemente
such thato.equals(e)
, if this queue contains one or more such elements. Returnstrue
if this queue contained the specified element (or equivalently, if this queue changed as a result of the call).- Specified by:
remove
in interfaceBlockingQueue<E>
- Specified by:
remove
in interfaceCollection<E>
- Overrides:
remove
in classAbstractCollection<E>
- Parameters:
o
- element to be removed from this queue, if present- Returns:
true
if this queue changed as a result of the call
-
contains
Returnstrue
if this queue contains the specified element. More formally, returnstrue
if and only if this queue contains at least one elemente
such thato.equals(e)
.- Specified by:
contains
in interfaceBlockingQueue<E>
- Specified by:
contains
in interfaceCollection<E>
- Overrides:
contains
in classAbstractCollection<E>
- Parameters:
o
- object to be checked for containment in this queue- Returns:
true
if this queue contains the specified element
-
remainingCapacity
public int remainingCapacity()Always returnsInteger.MAX_VALUE
because aLinkedTransferQueue
is not capacity constrained.- Specified by:
remainingCapacity
in interfaceBlockingQueue<E>
- Returns:
Integer.MAX_VALUE
(as specified byBlockingQueue.remainingCapacity
)
-
writeObject
Saves the state to a stream (that is, serializes it).- Parameters:
s
- the stream- Throws:
IOException
-
readObject
Reconstitutes the Queue instance from a stream (that is, deserializes it).- Parameters:
s
- the stream- Throws:
IOException
ClassNotFoundException
-
getUnsafe
static sun.misc.Unsafe getUnsafe()Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. Replace with a simple call to Unsafe.getUnsafe when integrating into a jdk.- Returns:
- a sun.misc.Unsafe
-