1 /***
2 * Ambient - A music player for the Android platform
3 Copyright (C) 2007 Martin Vysny
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package sk.baka.ambient;
20
21 import java.util.Map;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.locks.ReadWriteLock;
33 import java.util.concurrent.locks.ReentrantReadWriteLock;
34
35 import sk.baka.ambient.commons.GuardedBy;
36 import sk.baka.ambient.commons.NullArgumentException;
37 import sk.baka.ambient.commons.ThreadSafe;
38 import android.util.Log;
39
40 /***
41 * <p>
42 * Able to execute multiple long operations running simultaneously in the
43 * background. Accepts {@link Runnable} task implementations. The tasks will run
44 * in low-priority daemon thread. Thread-safe.
45 * </p>
46 * <p>
47 * Tasks are differentiated based on a task type object. At most a single
48 * instance of a task of given type may be submitted - multiple submissions of
49 * tasks having the same task type will do nothing unless there is no such task
50 * in executor's queue or already being executed. The task type object must
51 * comply the contract for a {@link Map} key.
52 * </p>
53 * <p>
54 * A task must periodically check for its {@link Thread#isInterrupted()
55 * interrupt} status. If it is interrupted, it should terminate ASAP. It may do
56 * so by throwing an Exception - exceptions are not reported when the process is
57 * interrupted.
58 * </p>
59 * <p>
60 * Uncaught exceptions thrown by tasks are reported using
61 * {@link AmbientApplication#error(Class, boolean, String, Throwable)}.
62 * </p>
63 * <p>
64 * The executor initially starts in offline mode. Use
65 * {@link #setOffline(boolean)} to change the mode.
66 * </p>
67 * <p>
68 * There is no support for mutually exclusive tasks. If you need this
69 * functionality you have to implement it yourself, for example by synchronizing
70 * on the same object instance.
71 * </p>
72 *
73 * @author Martin Vysny
74 */
75 @ThreadSafe
76 public final class BackgroundOpExecutor implements ThreadFactory,
77 IBackgroundTask {
78 /***
79 * Describes a single task.
80 *
81 * @author Martin Vysny
82 */
83 private static class TaskInfo {
84 /***
85 * The current task name.
86 */
87 public volatile String name;
88 /***
89 * Current progress of the task.
90 */
91 public volatile int progress = 0;
92 /***
93 * The maximum progress.
94 */
95 public volatile int maxProgress = 100;
96
97 /***
98 * The future of the task.
99 */
100 public volatile Future<Void> future;
101 }
102
103 /***
104 * Contains set of scheduled or executing tasks. Maps the task type to the
105 * task name.
106 */
107 private final ConcurrentMap<Object, TaskInfo> scheduledTasks = new ConcurrentHashMap<Object, TaskInfo>();
108
109 /***
110 * <p>
111 * Schedules given task for execution. Does nothing if the task is rejected
112 * to be submitted. Task may be rejected for these reasons:
113 * </p>
114 * <ul>
115 * <li>it is already scheduled for execution or executed</li>
116 * <li>Processing of online task was requested while in offline mode</li>
117 * </ul>
118 *
119 * @param task
120 * the task to schedule, must not be <code>null</code>.
121 * @param taskType
122 * the task type, must not be <code>null</code>.
123 * @param online
124 * if <code>true</code> then this task processes some online
125 * content.
126 * @param name
127 * the displayable task name. A grammar construct of <code>name +
128 * "failed"</code> should be a meaningful text.
129 * @return a {@link Future} instance if the task was scheduled,
130 * <code>null</code> if it was rejected.
131 */
132 public Future<Void> schedule(final Callable<Void> task,
133 final Object taskType, final boolean online, final String name) {
134 if (name == null) {
135 throw new NullArgumentException("name");
136 }
137 if (taskType == null) {
138 throw new NullArgumentException("taskType");
139 }
140 final TaskInfo info = new TaskInfo();
141 info.name = name;
142 if (scheduledTasks.putIfAbsent(taskType, info) != null) {
143 return null;
144 }
145
146 boolean scheduled = false;
147 executorLock.readLock().lock();
148 try {
149 final ExecutorService executor;
150 if (online) {
151 executor = onlineExecutor;
152 if (executor == null) {
153 return null;
154 }
155 } else {
156 executor = offlineExecutor;
157 }
158 try {
159 info.future = executor.submit(new ProtectedRunnable(task, info,
160 taskType), null);
161 } catch (RejectedExecutionException ex) {
162
163
164 return null;
165 }
166 scheduled = true;
167 return info.future;
168 } finally {
169 if (!scheduled) {
170 scheduledTasks.remove(taskType);
171 }
172 executorLock.readLock().unlock();
173 }
174 }
175
176 private final AtomicBoolean offline = new AtomicBoolean(true);
177
178 /***
179 * Checks if this task is already scheduled.
180 *
181 * @param taskType
182 * the task handle to check, must not be <code>null</code>.
183 * @return <code>true</code> if it is scheduled.
184 */
185 public boolean isScheduledOrExecuted(final Object taskType) {
186 if (taskType == null) {
187 throw new NullArgumentException("taskType");
188 }
189 return scheduledTasks.containsKey(taskType);
190 }
191
192 private final ReadWriteLock executorLock = new ReentrantReadWriteLock();
193
194 /***
195 * The online tasks executor.
196 */
197 @GuardedBy("executorLock")
198 private ExecutorService onlineExecutor = null;
199
200 /***
201 * Executes tasks that does not require Internet access.
202 */
203 @GuardedBy("executorLock")
204 private ExecutorService offlineExecutor = Executors
205 .newCachedThreadPool(this);
206
207 /***
208 * Sets the online/offline state of the executor. During the offline period
209 * the executor rejects online tasks.
210 *
211 * @param offline
212 * if <code>true</code> then the executor sets itself as offline
213 * and all pending online tasks are terminated.
214 */
215 public void setOffline(final boolean offline) {
216 if (!this.offline.compareAndSet(!offline, offline)) {
217 return;
218 }
219 executorLock.writeLock().lock();
220 try {
221 if (offline) {
222
223 onlineExecutor.shutdownNow();
224 onlineExecutor = null;
225 } else {
226 if (offlineExecutor.isShutdown()) {
227 return;
228 }
229 onlineExecutor = Executors.newCachedThreadPool(this);
230 }
231 } finally {
232 executorLock.writeLock().unlock();
233 }
234 }
235
236 private final AtomicInteger threadId = new AtomicInteger();
237
238 public Thread newThread(Runnable r) {
239 final Thread result = new Thread(r, "backgroundOp-"
240 + threadId.getAndIncrement());
241 result.setPriority(Thread.MIN_PRIORITY);
242 result.setDaemon(true);
243 return result;
244 }
245
246 /***
247 * Immediately stops all running/pending tasks.
248 */
249 public void stopAllTasks() {
250 executorLock.writeLock().lock();
251 try {
252 offlineExecutor.shutdownNow();
253 offlineExecutor = Executors.newCachedThreadPool(this);
254 if (onlineExecutor != null) {
255 onlineExecutor.shutdownNow();
256 onlineExecutor = Executors.newCachedThreadPool(this);
257 }
258 } finally {
259 executorLock.writeLock().unlock();
260 }
261 }
262
263 /***
264 * Immediately terminates the executor and all pending tasks. The executor
265 * will reject all submitted tasks from now on.
266 */
267 public void terminate() {
268 executorLock.readLock().lock();
269 try {
270 offlineExecutor.shutdownNow();
271 if (onlineExecutor != null) {
272 onlineExecutor.shutdownNow();
273 }
274 } finally {
275 executorLock.readLock().unlock();
276 }
277 }
278
279 private final String failed = AmbientApplication.getInstance().getString(
280 R.string.failed);
281
282 /***
283 * This class wraps given runnable and takes care of counting
284 * scheduled/running tasks instances and handling exceptions.
285 *
286 * @author Martin Vysny
287 */
288 private class ProtectedRunnable implements Runnable {
289 private final Callable<Void> r;
290 private final TaskInfo task;
291 private final Object taskType;
292
293 /***
294 * Creates new instance.
295 *
296 * @param r
297 * the wrapped runnable.
298 * @param task
299 * this task info.
300 * @param taskType
301 * this task type.
302 */
303 public ProtectedRunnable(final Callable<Void> r, final TaskInfo task,
304 final Object taskType) {
305 this.r = r;
306 this.task = task;
307 this.taskType = taskType;
308 }
309
310 public void run() {
311
312
313
314
315 synchronized (BackgroundOpExecutor.this) {
316 taskInfo.set(task);
317 fireEvent();
318 }
319 try {
320 r.call();
321 } catch (final Throwable e) {
322 if (Thread.currentThread().isInterrupted()) {
323 Log.i(r.getClass().getSimpleName(), "Interrupted: "
324 + e.getMessage(), e);
325 } else {
326 AmbientApplication.getInstance().error(r.getClass(), true,
327 task.name + " " + failed, e);
328 }
329 } finally {
330 scheduledTasks.remove(taskType);
331 taskInfo.set(null);
332
333
334 fireEvent();
335 }
336 }
337 }
338
339 private final ThreadLocal<TaskInfo> taskInfo = new ThreadLocal<TaskInfo>();
340
341 private synchronized void fireEvent() {
342 String randomName = null;
343 int taskCount = 0;
344 int progress = 0;
345 int maxProgress = 0;
346 for (final TaskInfo info : scheduledTasks.values()) {
347 taskCount++;
348 if (randomName == null) {
349 randomName = info.name;
350 }
351 progress += info.progress;
352 maxProgress += info.maxProgress;
353 }
354 if (maxProgress <= 0) {
355 maxProgress = 100;
356 }
357 AmbientApplication.getInstance().getBus().getInvocator(
358 IBackgroundTask.class, true).backgroundTask(taskCount,
359 randomName, progress, maxProgress);
360 }
361
362 /***
363 * Sets a progress of the current task. Must be invoked from a task
364 * {@link Runnable}.
365 *
366 * @param taskCount
367 * unused, ignored.
368 * @param name
369 * a new name for the task.
370 * @param progress
371 * current task progress.
372 * @param maxProgress
373 * maximum progress.
374 */
375 public void backgroundTask(int taskCount, String name, int progress,
376 int maxProgress) {
377 final TaskInfo info = taskInfo.get();
378 if (info == null) {
379 throw new IllegalStateException("Must be invoked from a task");
380 }
381 info.name = name;
382 info.progress = progress < 0 ? 0 : progress;
383 info.maxProgress = maxProgress < info.progress ? info.progress
384 : maxProgress;
385 fireEvent();
386 }
387
388 /***
389 * Cancels given task. Does nothing if no such task is running.
390 *
391 * @param taskType
392 * the task type, must not be <code>null</code>.
393 */
394 public void cancel(final Object taskType) {
395 final TaskInfo info = scheduledTasks.get(taskType);
396 if (info == null) {
397 return;
398 }
399 info.future.cancel(true);
400 }
401 }