/*
|
* Copyright (C) 2018 The Android Open Source Project
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
|
package com.android.server.wm;
|
|
import android.os.Process;
|
import android.os.SystemClock;
|
import android.util.Slog;
|
|
import com.android.internal.annotations.VisibleForTesting;
|
|
import java.util.ArrayList;
|
import java.util.function.Predicate;
|
|
/**
|
* The common threading logic for persisters to use so that they can run in the same threads.
|
* Methods in this class are synchronized on its instance, so caller could also synchronize on
|
* its instance to perform modifications in items.
|
*/
|
class PersisterQueue {
|
private static final String TAG = "PersisterQueue";
|
private static final boolean DEBUG = false;
|
|
/** When not flushing don't write out files faster than this */
|
private static final long INTER_WRITE_DELAY_MS = 500;
|
|
/**
|
* When not flushing delay this long before writing the first file out. This gives the next task
|
* being launched a chance to load its resources without this occupying IO bandwidth.
|
*/
|
private static final long PRE_TASK_DELAY_MS = 3000;
|
|
/** The maximum number of entries to keep in the queue before draining it automatically. */
|
private static final int MAX_WRITE_QUEUE_LENGTH = 6;
|
|
/** Special value for mWriteTime to mean don't wait, just write */
|
private static final long FLUSH_QUEUE = -1;
|
|
/** An {@link WriteQueueItem} that doesn't do anything. Used to trigger {@link
|
* Listener#onPreProcessItem}. */
|
static final WriteQueueItem EMPTY_ITEM = () -> { };
|
|
private final long mInterWriteDelayMs;
|
private final long mPreTaskDelayMs;
|
private final LazyTaskWriterThread mLazyTaskWriterThread;
|
private final ArrayList<WriteQueueItem> mWriteQueue = new ArrayList<>();
|
|
private final ArrayList<Listener> mListeners = new ArrayList<>();
|
|
/**
|
* Value determines write delay mode as follows: < 0 We are Flushing. No delays between writes
|
* until the image queue is drained and all tasks needing persisting are written to disk. There
|
* is no delay between writes. == 0 We are Idle. Next writes will be delayed by
|
* #PRE_TASK_DELAY_MS. > 0 We are Actively writing. Next write will be at this time. Subsequent
|
* writes will be delayed by #INTER_WRITE_DELAY_MS.
|
*/
|
private long mNextWriteTime = 0;
|
|
PersisterQueue() {
|
this(INTER_WRITE_DELAY_MS, PRE_TASK_DELAY_MS);
|
}
|
|
/** Used for tests to reduce waiting time. */
|
@VisibleForTesting
|
PersisterQueue(long interWriteDelayMs, long preTaskDelayMs) {
|
if (interWriteDelayMs < 0 || preTaskDelayMs < 0) {
|
throw new IllegalArgumentException("Both inter-write delay and pre-task delay need to"
|
+ "be non-negative. inter-write delay: " + interWriteDelayMs
|
+ "ms pre-task delay: " + preTaskDelayMs);
|
}
|
mInterWriteDelayMs = interWriteDelayMs;
|
mPreTaskDelayMs = preTaskDelayMs;
|
mLazyTaskWriterThread = new LazyTaskWriterThread("LazyTaskWriterThread");
|
}
|
|
synchronized void startPersisting() {
|
if (!mLazyTaskWriterThread.isAlive()) {
|
mLazyTaskWriterThread.start();
|
}
|
}
|
|
/** Stops persisting thread. Should only be used in tests. */
|
@VisibleForTesting
|
void stopPersisting() throws InterruptedException {
|
if (!mLazyTaskWriterThread.isAlive()) {
|
return;
|
}
|
|
synchronized (this) {
|
mLazyTaskWriterThread.interrupt();
|
}
|
mLazyTaskWriterThread.join();
|
}
|
|
synchronized void addItem(WriteQueueItem item, boolean flush) {
|
mWriteQueue.add(item);
|
|
if (flush || mWriteQueue.size() > MAX_WRITE_QUEUE_LENGTH) {
|
mNextWriteTime = FLUSH_QUEUE;
|
} else if (mNextWriteTime == 0) {
|
mNextWriteTime = SystemClock.uptimeMillis() + mPreTaskDelayMs;
|
}
|
notify();
|
}
|
|
synchronized <T extends WriteQueueItem> T findLastItem(Predicate<T> predicate, Class<T> clazz) {
|
for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
|
WriteQueueItem writeQueueItem = mWriteQueue.get(i);
|
if (clazz.isInstance(writeQueueItem)) {
|
T item = clazz.cast(writeQueueItem);
|
if (predicate.test(item)) {
|
return item;
|
}
|
}
|
}
|
|
return null;
|
}
|
|
/**
|
* Updates the last item found in the queue that matches the given item, or adds it to the end
|
* of the queue if no such item is found.
|
*/
|
synchronized <T extends WriteQueueItem> void updateLastOrAddItem(T item, boolean flush) {
|
final T itemToUpdate = findLastItem(item::matches, (Class<T>) item.getClass());
|
if (itemToUpdate == null) {
|
addItem(item, flush);
|
} else {
|
itemToUpdate.updateFrom(item);
|
}
|
|
yieldIfQueueTooDeep();
|
}
|
|
/**
|
* Removes all items with which given predicate returns {@code true}.
|
*/
|
synchronized <T extends WriteQueueItem> void removeItems(Predicate<T> predicate,
|
Class<T> clazz) {
|
for (int i = mWriteQueue.size() - 1; i >= 0; --i) {
|
WriteQueueItem writeQueueItem = mWriteQueue.get(i);
|
if (clazz.isInstance(writeQueueItem)) {
|
T item = clazz.cast(writeQueueItem);
|
if (predicate.test(item)) {
|
if (DEBUG) Slog.d(TAG, "Removing " + item + " from write queue.");
|
mWriteQueue.remove(i);
|
}
|
}
|
}
|
}
|
|
synchronized void flush() {
|
mNextWriteTime = FLUSH_QUEUE;
|
notifyAll();
|
do {
|
try {
|
wait();
|
} catch (InterruptedException e) {
|
}
|
} while (mNextWriteTime == FLUSH_QUEUE);
|
}
|
|
void yieldIfQueueTooDeep() {
|
boolean stall = false;
|
synchronized (this) {
|
if (mNextWriteTime == FLUSH_QUEUE) {
|
stall = true;
|
}
|
}
|
if (stall) {
|
Thread.yield();
|
}
|
}
|
|
void addListener(Listener listener) {
|
mListeners.add(listener);
|
}
|
|
@VisibleForTesting
|
boolean removeListener(Listener listener) {
|
return mListeners.remove(listener);
|
}
|
|
private void processNextItem() throws InterruptedException {
|
// This part is extracted into a method so that the GC can clearly see the end of the
|
// scope of the variable 'item'. If this part was in the loop in LazyTaskWriterThread, the
|
// last item it processed would always "leak".
|
// See https://b.corp.google.com/issues/64438652#comment7
|
|
// If mNextWriteTime, then don't delay between each call to saveToXml().
|
final WriteQueueItem item;
|
synchronized (this) {
|
if (mNextWriteTime != FLUSH_QUEUE) {
|
// The next write we don't have to wait so long.
|
mNextWriteTime = SystemClock.uptimeMillis() + mInterWriteDelayMs;
|
if (DEBUG) {
|
Slog.d(TAG, "Next write time may be in " + mInterWriteDelayMs
|
+ " msec. (" + mNextWriteTime + ")");
|
}
|
}
|
|
while (mWriteQueue.isEmpty()) {
|
if (mNextWriteTime != 0) {
|
mNextWriteTime = 0; // idle.
|
notify(); // May need to wake up flush().
|
}
|
// Make sure we exit this thread correctly when interrupted before going to
|
// indefinite wait.
|
if (Thread.currentThread().isInterrupted()) {
|
throw new InterruptedException();
|
}
|
if (DEBUG) Slog.d(TAG, "LazyTaskWriter: waiting indefinitely.");
|
wait();
|
// Invariant: mNextWriteTime is either FLUSH_QUEUE or PRE_WRITE_DELAY_MS
|
// from now.
|
}
|
item = mWriteQueue.remove(0);
|
|
long now = SystemClock.uptimeMillis();
|
if (DEBUG) {
|
Slog.d(TAG, "LazyTaskWriter: now=" + now + " mNextWriteTime=" + mNextWriteTime
|
+ " mWriteQueue.size=" + mWriteQueue.size());
|
}
|
while (now < mNextWriteTime) {
|
if (DEBUG) {
|
Slog.d(TAG, "LazyTaskWriter: waiting " + (mNextWriteTime - now));
|
}
|
wait(mNextWriteTime - now);
|
now = SystemClock.uptimeMillis();
|
}
|
|
// Got something to do.
|
}
|
|
item.process();
|
}
|
|
interface WriteQueueItem<T extends WriteQueueItem<T>> {
|
void process();
|
|
default void updateFrom(T item) {}
|
|
default boolean matches(T item) {
|
return false;
|
}
|
}
|
|
interface Listener {
|
/**
|
* Called before {@link PersisterQueue} tries to process next item.
|
*
|
* Note if the queue is empty, this callback will be called before the indefinite wait. This
|
* will be called once when {@link PersisterQueue} starts the internal thread before the
|
* indefinite wait.
|
*
|
* This callback is called w/o locking the instance of {@link PersisterQueue}.
|
*
|
* @param queueEmpty {@code true} if the queue is empty, which indicates {@link
|
* PersisterQueue} is likely to enter indefinite wait; or {@code false} if there is still
|
* item to process.
|
*/
|
void onPreProcessItem(boolean queueEmpty);
|
}
|
|
private class LazyTaskWriterThread extends Thread {
|
|
private LazyTaskWriterThread(String name) {
|
super(name);
|
}
|
|
@Override
|
public void run() {
|
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
|
try {
|
while (true) {
|
final boolean probablyDone;
|
synchronized (PersisterQueue.this) {
|
probablyDone = mWriteQueue.isEmpty();
|
}
|
|
for (int i = mListeners.size() - 1; i >= 0; --i) {
|
mListeners.get(i).onPreProcessItem(probablyDone);
|
}
|
|
processNextItem();
|
}
|
} catch (InterruptedException e) {
|
Slog.e(TAG, "Persister thread is exiting. Should never happen in prod, but"
|
+ "it's OK in tests.");
|
}
|
}
|
}
|
}
|