View Javadoc

1   /***
2    *     Ambient - A music player for the Android platform
3    Copyright (C) 2007 Martin Vysny
4    
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9    
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14  
15   You should have received a copy of the GNU General Public License
16   along with this program.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package sk.baka.ambient;
20  
21  import java.util.Map;
22  import java.util.concurrent.Callable;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.Future;
28  import java.util.concurrent.RejectedExecutionException;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.locks.ReadWriteLock;
33  import java.util.concurrent.locks.ReentrantReadWriteLock;
34  
35  import sk.baka.ambient.commons.GuardedBy;
36  import sk.baka.ambient.commons.NullArgumentException;
37  import sk.baka.ambient.commons.ThreadSafe;
38  import android.util.Log;
39  
40  /***
41   * <p>
42   * Able to execute multiple long operations running simultaneously in the
43   * background. Accepts {@link Runnable} task implementations. The tasks will run
44   * in low-priority daemon thread. Thread-safe.
45   * </p>
46   * <p>
47   * Tasks are differentiated based on a task type object. At most a single
48   * instance of a task of given type may be submitted - multiple submissions of
49   * tasks having the same task type will do nothing unless there is no such task
50   * in executor's queue or already being executed. The task type object must
51   * comply the contract for a {@link Map} key.
52   * </p>
53   * <p>
54   * A task must periodically check for its {@link Thread#isInterrupted()
55   * interrupt} status. If it is interrupted, it should terminate ASAP. It may do
56   * so by throwing an Exception - exceptions are not reported when the process is
57   * interrupted.
58   * </p>
59   * <p>
60   * Uncaught exceptions thrown by tasks are reported using
61   * {@link AmbientApplication#error(Class, boolean, String, Throwable)}.
62   * </p>
63   * <p>
64   * The executor initially starts in offline mode. Use
65   * {@link #setOffline(boolean)} to change the mode.
66   * </p>
67   * <p>
68   * There is no support for mutually exclusive tasks. If you need this
69   * functionality you have to implement it yourself, for example by synchronizing
70   * on the same object instance.
71   * </p>
72   * 
73   * @author Martin Vysny
74   */
75  @ThreadSafe
76  public final class BackgroundOpExecutor implements ThreadFactory,
77  		IBackgroundTask {
78  	/***
79  	 * Describes a single task.
80  	 * 
81  	 * @author Martin Vysny
82  	 */
83  	private static class TaskInfo {
84  		/***
85  		 * The current task name.
86  		 */
87  		public volatile String name;
88  		/***
89  		 * Current progress of the task.
90  		 */
91  		public volatile int progress = 0;
92  		/***
93  		 * The maximum progress.
94  		 */
95  		public volatile int maxProgress = 100;
96  
97  		/***
98  		 * The future of the task.
99  		 */
100 		public volatile Future<Void> future;
101 	}
102 
103 	/***
104 	 * Contains set of scheduled or executing tasks. Maps the task type to the
105 	 * task name.
106 	 */
107 	private final ConcurrentMap<Object, TaskInfo> scheduledTasks = new ConcurrentHashMap<Object, TaskInfo>();
108 
109 	/***
110 	 * <p>
111 	 * Schedules given task for execution. Does nothing if the task is rejected
112 	 * to be submitted. Task may be rejected for these reasons:
113 	 * </p>
114 	 * <ul>
115 	 * <li>it is already scheduled for execution or executed</li>
116 	 * <li>Processing of online task was requested while in offline mode</li>
117 	 * </ul>
118 	 * 
119 	 * @param task
120 	 *            the task to schedule, must not be <code>null</code>.
121 	 * @param taskType
122 	 *            the task type, must not be <code>null</code>.
123 	 * @param online
124 	 *            if <code>true</code> then this task processes some online
125 	 *            content.
126 	 * @param name
127 	 *            the displayable task name. A grammar construct of <code>name +
128 	 *            "failed"</code> should be a meaningful text.
129 	 * @return a {@link Future} instance if the task was scheduled,
130 	 *         <code>null</code> if it was rejected.
131 	 */
132 	public Future<Void> schedule(final Callable<Void> task,
133 			final Object taskType, final boolean online, final String name) {
134 		if (name == null) {
135 			throw new NullArgumentException("name");
136 		}
137 		if (taskType == null) {
138 			throw new NullArgumentException("taskType");
139 		}
140 		final TaskInfo info = new TaskInfo();
141 		info.name = name;
142 		if (scheduledTasks.putIfAbsent(taskType, info) != null) {
143 			return null;
144 		}
145 		// okay, we have a new task. Schedule it.
146 		boolean scheduled = false;
147 		executorLock.readLock().lock();
148 		try {
149 			final ExecutorService executor;
150 			if (online) {
151 				executor = onlineExecutor;
152 				if (executor == null) {
153 					return null;
154 				}
155 			} else {
156 				executor = offlineExecutor;
157 			}
158 			try {
159 				info.future = executor.submit(new ProtectedRunnable(task, info,
160 						taskType), null);
161 			} catch (RejectedExecutionException ex) {
162 				// no luck. we are probably terminating, or someone switched to
163 				// offline mode.
164 				return null;
165 			}
166 			scheduled = true;
167 			return info.future;
168 		} finally {
169 			if (!scheduled) {
170 				scheduledTasks.remove(taskType);
171 			}
172 			executorLock.readLock().unlock();
173 		}
174 	}
175 
176 	private final AtomicBoolean offline = new AtomicBoolean(true);
177 
178 	/***
179 	 * Checks if this task is already scheduled.
180 	 * 
181 	 * @param taskType
182 	 *            the task handle to check, must not be <code>null</code>.
183 	 * @return <code>true</code> if it is scheduled.
184 	 */
185 	public boolean isScheduledOrExecuted(final Object taskType) {
186 		if (taskType == null) {
187 			throw new NullArgumentException("taskType");
188 		}
189 		return scheduledTasks.containsKey(taskType);
190 	}
191 
192 	private final ReadWriteLock executorLock = new ReentrantReadWriteLock();
193 
194 	/***
195 	 * The online tasks executor.
196 	 */
197 	@GuardedBy("executorLock")
198 	private ExecutorService onlineExecutor = null;
199 
200 	/***
201 	 * Executes tasks that does not require Internet access.
202 	 */
203 	@GuardedBy("executorLock")
204 	private ExecutorService offlineExecutor = Executors
205 			.newCachedThreadPool(this);
206 
207 	/***
208 	 * Sets the online/offline state of the executor. During the offline period
209 	 * the executor rejects online tasks.
210 	 * 
211 	 * @param offline
212 	 *            if <code>true</code> then the executor sets itself as offline
213 	 *            and all pending online tasks are terminated.
214 	 */
215 	public void setOffline(final boolean offline) {
216 		if (!this.offline.compareAndSet(!offline, offline)) {
217 			return;
218 		}
219 		executorLock.writeLock().lock();
220 		try {
221 			if (offline) {
222 				// terminate the online task executor
223 				onlineExecutor.shutdownNow();
224 				onlineExecutor = null;
225 			} else {
226 				if (offlineExecutor.isShutdown()) {
227 					return;
228 				}
229 				onlineExecutor = Executors.newCachedThreadPool(this);
230 			}
231 		} finally {
232 			executorLock.writeLock().unlock();
233 		}
234 	}
235 
236 	private final AtomicInteger threadId = new AtomicInteger();
237 
238 	public Thread newThread(Runnable r) {
239 		final Thread result = new Thread(r, "backgroundOp-"
240 				+ threadId.getAndIncrement());
241 		result.setPriority(Thread.MIN_PRIORITY);
242 		result.setDaemon(true);
243 		return result;
244 	}
245 
246 	/***
247 	 * Immediately stops all running/pending tasks.
248 	 */
249 	public void stopAllTasks() {
250 		executorLock.writeLock().lock();
251 		try {
252 			offlineExecutor.shutdownNow();
253 			offlineExecutor = Executors.newCachedThreadPool(this);
254 			if (onlineExecutor != null) {
255 				onlineExecutor.shutdownNow();
256 				onlineExecutor = Executors.newCachedThreadPool(this);
257 			}
258 		} finally {
259 			executorLock.writeLock().unlock();
260 		}
261 	}
262 
263 	/***
264 	 * Immediately terminates the executor and all pending tasks. The executor
265 	 * will reject all submitted tasks from now on.
266 	 */
267 	public void terminate() {
268 		executorLock.readLock().lock();
269 		try {
270 			offlineExecutor.shutdownNow();
271 			if (onlineExecutor != null) {
272 				onlineExecutor.shutdownNow();
273 			}
274 		} finally {
275 			executorLock.readLock().unlock();
276 		}
277 	}
278 
279 	private final String failed = AmbientApplication.getInstance().getString(
280 			R.string.failed);
281 
282 	/***
283 	 * This class wraps given runnable and takes care of counting
284 	 * scheduled/running tasks instances and handling exceptions.
285 	 * 
286 	 * @author Martin Vysny
287 	 */
288 	private class ProtectedRunnable implements Runnable {
289 		private final Callable<Void> r;
290 		private final TaskInfo task;
291 		private final Object taskType;
292 
293 		/***
294 		 * Creates new instance.
295 		 * 
296 		 * @param r
297 		 *            the wrapped runnable.
298 		 * @param task
299 		 *            this task info.
300 		 * @param taskType
301 		 *            this task type.
302 		 */
303 		public ProtectedRunnable(final Callable<Void> r, final TaskInfo task,
304 				final Object taskType) {
305 			this.r = r;
306 			this.task = task;
307 			this.taskType = taskType;
308 		}
309 
310 		public void run() {
311 			// synchronize on the executor instance to prevent reporting
312 			// decreasing numbers. This could happen when one thread increments
313 			// the value and enters the fireEvent method after another thread
314 			// incremented the value but still did not entered the fireEvent.
315 			synchronized (BackgroundOpExecutor.this) {
316 				taskInfo.set(task);
317 				fireEvent();
318 			}
319 			try {
320 				r.call();
321 			} catch (final Throwable e) {
322 				if (Thread.currentThread().isInterrupted()) {
323 					Log.i(r.getClass().getSimpleName(), "Interrupted: "
324 							+ e.getMessage(), e);
325 				} else {
326 					AmbientApplication.getInstance().error(r.getClass(), true,
327 							task.name + " " + failed, e);
328 				}
329 			} finally {
330 				scheduledTasks.remove(taskType);
331 				taskInfo.set(null);
332 				// synchronize on the executor instance to prevent reporting
333 				// increasing numbers
334 				fireEvent();
335 			}
336 		}
337 	}
338 
339 	private final ThreadLocal<TaskInfo> taskInfo = new ThreadLocal<TaskInfo>();
340 
341 	private synchronized void fireEvent() {
342 		String randomName = null;
343 		int taskCount = 0;
344 		int progress = 0;
345 		int maxProgress = 0;
346 		for (final TaskInfo info : scheduledTasks.values()) {
347 			taskCount++;
348 			if (randomName == null) {
349 				randomName = info.name;
350 			}
351 			progress += info.progress;
352 			maxProgress += info.maxProgress;
353 		}
354 		if (maxProgress <= 0) {
355 			maxProgress = 100;
356 		}
357 		AmbientApplication.getInstance().getBus().getInvocator(
358 				IBackgroundTask.class, true).backgroundTask(taskCount,
359 				randomName, progress, maxProgress);
360 	}
361 
362 	/***
363 	 * Sets a progress of the current task. Must be invoked from a task
364 	 * {@link Runnable}.
365 	 * 
366 	 * @param taskCount
367 	 *            unused, ignored.
368 	 * @param name
369 	 *            a new name for the task.
370 	 * @param progress
371 	 *            current task progress.
372 	 * @param maxProgress
373 	 *            maximum progress.
374 	 */
375 	public void backgroundTask(int taskCount, String name, int progress,
376 			int maxProgress) {
377 		final TaskInfo info = taskInfo.get();
378 		if (info == null) {
379 			throw new IllegalStateException("Must be invoked from a task");
380 		}
381 		info.name = name;
382 		info.progress = progress < 0 ? 0 : progress;
383 		info.maxProgress = maxProgress < info.progress ? info.progress
384 				: maxProgress;
385 		fireEvent();
386 	}
387 
388 	/***
389 	 * Cancels given task. Does nothing if no such task is running.
390 	 * 
391 	 * @param taskType
392 	 *            the task type, must not be <code>null</code>.
393 	 */
394 	public void cancel(final Object taskType) {
395 		final TaskInfo info = scheduledTasks.get(taskType);
396 		if (info == null) {
397 			return;
398 		}
399 		info.future.cancel(true);
400 	}
401 }