/*
|
* Copyright (C) 2014 The Android Open Source Project
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
|
package com.android.camera.async;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.NoSuchElementException;
|
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import javax.annotation.Nonnull;
|
|
/**
|
* A {@link BufferQueue} implementation useful for thread-safe producer-consumer
|
* interactions.<br>
|
* Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
|
* closing the queue from either the producer or consumer side and enables
|
* precise accounting of objects which are never read by the consumer. Notably,
|
* this enables cleanly shutting down producer-consumer interactions without
|
* leaking managed resources which might otherwise be left dangling in the
|
* queue.
|
*/
|
public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>,
|
SafeCloseable {
|
/**
|
* A callback to be invoked with all of the elements of the sequence which
|
* are added but never retrieved via {@link #getNext}.
|
*/
|
public static interface UnusedElementProcessor<T> {
|
/**
|
* Implementations should properly close the discarded element, if
|
* necessary.
|
*/
|
public void process(T element);
|
}
|
|
/**
|
* An entry can either be a {@link T} or a special "poison-pill" marker
|
* indicating that the sequence has been closed.
|
*/
|
private static class Entry<T> {
|
private final T mValue;
|
private final boolean mClosing;
|
|
private Entry(T value, boolean closing) {
|
mValue = value;
|
mClosing = closing;
|
}
|
|
public boolean isClosingMarker() {
|
return mClosing;
|
}
|
|
public T getValue() {
|
return mValue;
|
}
|
}
|
/**
|
* Lock used for mQueue modification and mClosed.
|
*/
|
private final Object mLock;
|
/**
|
* The queue in which to store elements of the sequence as they arrive.
|
*/
|
private final BlockingQueue<Entry<T>> mQueue;
|
/**
|
* Whether this sequence is closed.
|
*/
|
private final AtomicBoolean mClosed;
|
/**
|
* The callback to use to process all elements which are discarded by the
|
* queue.
|
*/
|
private final UnusedElementProcessor<T> mUnusedElementProcessor;
|
|
public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) {
|
mUnusedElementProcessor = unusedElementProcessor;
|
mLock = new Object();
|
mQueue = new LinkedBlockingQueue<>();
|
mClosed = new AtomicBoolean();
|
}
|
|
public ConcurrentBufferQueue() {
|
// Instantiate with a DiscardedElementProcessor which does nothing.
|
this(new UnusedElementProcessor<T>() {
|
@Override
|
public void process(T element) {
|
}
|
});
|
}
|
|
@Override
|
public void close() {
|
List<Entry<T>> remainingElements = new ArrayList<>();
|
synchronized (mLock) {
|
// Mark as closed so that no more threads wait in getNext().
|
// Any additional calls to close() will return immediately.
|
boolean alreadyClosed = mClosed.getAndSet(true);
|
if (alreadyClosed) {
|
return;
|
}
|
|
mQueue.drainTo(remainingElements);
|
|
// Keep feeding any currently-waiting consumer threads "poison pill"
|
// {@link Entry}s indicating that the sequence has ended so they
|
// wake up. When no more threads are waiting for another value from
|
// mQueue, the call to peek() from this thread will see a value.
|
// Note that this also ensures that there is a poison pill in the
|
// queue
|
// to keep waking-up any threads which manage to block in getNext()
|
// even after marking mClosed.
|
while (mQueue.peek() == null) {
|
mQueue.add(makeClosingMarker());
|
}
|
}
|
|
for (Entry<T> entry : remainingElements) {
|
if (!entry.isClosingMarker()) {
|
mUnusedElementProcessor.process(entry.getValue());
|
}
|
}
|
}
|
|
@Override
|
public void update(@Nonnull T element) {
|
boolean closed = false;
|
synchronized (mLock) {
|
closed = mClosed.get();
|
if (!closed) {
|
mQueue.add(makeEntry(element));
|
}
|
}
|
if (closed) {
|
mUnusedElementProcessor.process(element);
|
}
|
}
|
|
private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException {
|
if (nextEntry.isClosingMarker()) {
|
// Always keep a poison-pill in the queue to avoid a race condition
|
// in which a thread reaches the mQueue.take() call after close().
|
mQueue.add(nextEntry);
|
throw new BufferQueueClosedException();
|
} else {
|
return nextEntry.getValue();
|
}
|
}
|
|
@Override
|
public T getNext() throws InterruptedException, BufferQueueClosedException {
|
Entry<T> nextEntry = mQueue.take();
|
return doWithNextEntry(nextEntry);
|
}
|
|
@Override
|
public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
|
BufferQueueClosedException {
|
Entry<T> nextEntry = mQueue.poll(timeout, unit);
|
if (nextEntry == null) {
|
throw new TimeoutException();
|
}
|
return doWithNextEntry(nextEntry);
|
}
|
|
@Override
|
public T peekNext() {
|
Entry<T> nextEntry = mQueue.peek();
|
if (nextEntry == null) {
|
return null;
|
} else if (nextEntry.isClosingMarker()) {
|
return null;
|
} else {
|
return nextEntry.getValue();
|
}
|
}
|
|
@Override
|
public void discardNext() {
|
try {
|
Entry<T> nextEntry = mQueue.remove();
|
if (nextEntry.isClosingMarker()) {
|
// Always keep a poison-pill in the queue to avoid a race
|
// condition in which a thread reaches the mQueue.take() call
|
// after close().
|
mQueue.add(nextEntry);
|
} else {
|
mUnusedElementProcessor.process(nextEntry.getValue());
|
}
|
} catch (NoSuchElementException e) {
|
// If the queue is already empty, do nothing.
|
return;
|
}
|
}
|
|
@Override
|
public boolean isClosed() {
|
return mClosed.get();
|
}
|
|
private Entry makeEntry(T value) {
|
return new Entry(value, false);
|
}
|
|
private Entry makeClosingMarker() {
|
return new Entry(null, true);
|
}
|
}
|