pdf_thread.cpp

Go to the documentation of this file.
00001 /*
00002  * File Name: pdf_thread.cpp
00003  */
00004 
00005 /*
00006  * This file is part of uds-plugin-pdf.
00007  *
00008  * uds-plugin-pdf is free software: you can redistribute it and/or modify
00009  * it under the terms of the GNU General Public License as published by
00010  * the Free Software Foundation, either version 2 of the License, or
00011  * (at your option) any later version.
00012  *
00013  * uds-plugin-pdf is distributed in the hope that it will be useful,
00014  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00015  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
00016  * GNU General Public License for more details.
00017  *
00018  * You should have received a copy of the GNU General Public License
00019  * along with this program. If not, see <http://www.gnu.org/licenses/>.
00020  */
00021 
00022 /**
00023  * Copyright (C) 2008 iRex Technologies B.V.
00024  * All rights reserved.
00025  */
00026 
00027 #include "pdf_thread.h"
00028 
00029 namespace pdf
00030 {
00031 
00032 // Notice: The sequence:
00033 // non_static_thread_func     |  abort_current_task
00034 //   notify_end               |    running_task_mutex
00035 //   running_task_mutex       |    abort_and_wait
00036 // In the task, we also add necessary mechanism to prevent from waiting
00037 // signal that will never be emitted.
00038 // TODO: In order to improve performance, it's necessary to 
00039 // provide abort and abort_and_wait.
00040 
00041 Thread::Thread()
00042     : thread(NULL)
00043     , thread_cmd(CMD_NONE)
00044     , running_task(0)
00045 {
00046 }
00047 
00048 Thread::~Thread()
00049 {
00050 }
00051 
00052 gpointer Thread::thread_func(gpointer args)
00053 {
00054     Thread* thiz = reinterpret_cast<Thread *>(args);
00055     return thiz->non_static_thread_func();
00056 }
00057 
00058 // Worker thread
00059 gpointer Thread::non_static_thread_func()
00060 {
00061     while (true)
00062     {
00063         Task *task = 0;
00064         {
00065             ScopeMutex m(&queue_mutex);
00066             while (task_queue.empty() &&
00067                    thread_cmd == CMD_NONE)
00068             {
00069                 queue_cond.wait(queue_mutex.get_gmutex());
00070             }
00071 
00072             if (thread_cmd != CMD_NONE)
00073             {
00074                 break;
00075             }
00076 
00077             // Get task from task queue
00078             task = task_queue.front();
00079             task_queue.pop_front();
00080 
00081             // About executing task, update running_task variable.
00082             ScopeMutex r(&running_task_mutex);
00083             running_task = task;
00084         }
00085 
00086         task->execute();
00087         task->notify_end();
00088 
00089         // Task end(aborted or finished) reset running_task.
00090         {
00091             ScopeMutex lock(&running_task_mutex);
00092             running_task = 0;
00093         }
00094 
00095         /// Memory leak here. When the task is aborted, the task will not
00096         /// be removed. So we must release the task here.
00097         /// Make it the only entry to release the task.
00098         if (!task->is_paused())
00099         {
00100             delete task;
00101         }
00102     }
00103 
00104     return 0;
00105 }
00106 
00107 void Thread::cancel_tasks(void* user_data)
00108 {
00109     clear_all(user_data);
00110 
00111     ScopeMutex r(&running_task_mutex);
00112     if (running_task != 0 &&
00113         running_task->get_user_data() == user_data)
00114     {
00115         running_task->abort_and_wait(cancel_task_mutex);
00116     }
00117 }
00118 
00119 void Thread::clear_all(void* user_data, TaskType t)
00120 {
00121     ScopeMutex m(&queue_mutex);
00122 
00123     TaskQueueIter idx = task_queue.begin();
00124     while (idx != task_queue.end())
00125     {
00126         bool is_remove = false;
00127         if (user_data == 0)
00128         {
00129             is_remove = true;
00130         }
00131         else if (user_data == (*idx)->get_user_data())
00132         {
00133             if (t == TASK_INVALID || (t == (*idx)->type))
00134             {
00135                 is_remove = true;
00136             }
00137         }
00138 
00139         if (is_remove)
00140         {
00141             delete *idx;
00142             idx = task_queue.erase(idx);
00143         }
00144         else
00145         {
00146             idx++;
00147         }
00148     }
00149 
00150     if (user_data == 0)
00151     {
00152         task_queue.clear();
00153     }
00154 }
00155 
00156 bool Thread::start()
00157 {
00158     if (thread != NULL)
00159     {
00160         // The thread has been started.
00161         return false;
00162     }
00163 
00164     // Reset the thread cmd. TODO: Replace this variable by a boolean.
00165     thread_cmd = CMD_NONE;
00166     
00167     thread = g_thread_create(thread_func, this, TRUE, NULL);
00168     return thread != NULL;
00169 }
00170 
00171 /// TODO, still a problem here. running task must be associated with
00172 /// document. Can not use cancle all tasks directly.
00173 void Thread::stop(bool cancel_all_tasks)
00174 {
00175     if (thread == NULL)
00176     {
00177         return;
00178     }
00179 
00180     thread_cmd = cancel_all_tasks ? CMD_TERMINATE : CMD_STOP;
00181 
00182     {
00183         ScopeMutex r(&running_task_mutex);
00184         if (running_task != 0)
00185         {
00186             if (cancel_all_tasks)
00187             {
00188                 running_task->abort_and_wait(cancel_task_mutex);
00189             }
00190         }
00191     }
00192 
00193     queue_cond.signal();
00194 
00195     // Wait for worker thread to die.
00196     g_thread_join(thread);
00197 
00198     clear_all();
00199 
00200     // Set the thread to be NULL
00201     thread = NULL;
00202 }
00203 
00204 bool Thread::append_task(Task* new_task)
00205 {
00206     if (thread_cmd == CMD_NONE)
00207     {
00208         // Append task to the end of the task queue.
00209         {
00210             ScopeMutex m(&queue_mutex);
00211             task_queue.push_back(new_task);
00212         }
00213 
00214         // Tells the worker thread that a new task is available.
00215         queue_cond.signal();
00216         return true;
00217     }
00218 
00219     return false;
00220 }
00221 
00222 bool Thread::prepend_task(Task* new_task, bool abort_current)
00223 {
00224     if (thread_cmd == CMD_NONE)
00225     {
00226         // Insert task to the beginning of the task queue.
00227         ScopeMutex m(&queue_mutex);
00228         task_queue.push_front(new_task);
00229 
00230         // Tells the worker thread that a new task is available.
00231         queue_cond.signal();
00232 
00233         if (abort_current)
00234         {
00235             abort_current_task(new_task);
00236         }
00237 
00238         return true;
00239     }
00240 
00241     return false;
00242 }
00243 
00244 bool Thread::abort_current_task(Task *new_task)
00245 {
00246     ScopeMutex r(&running_task_mutex);
00247     if (running_task != 0)
00248     {
00249         switch (new_task->get_type())
00250         {
00251         case TASK_RENDER:
00252             {
00253                 if (running_task->get_type() == TASK_SEARCH)
00254                 {
00255                     // pause running task and push it next to the first task
00256                     running_task->pause();
00257                     if (!task_queue.empty())
00258                     {
00259                         TaskQueueIter idx = task_queue.begin();
00260                         task_queue.insert(++idx, running_task);
00261                     }
00262                     else
00263                     {
00264                         task_queue.push_front(running_task);
00265                     }
00266                 }
00267                 else
00268                 {
00269                     running_task->abort();
00270                 }
00271             }
00272             break;
00273         case TASK_SEARCH:
00274             {
00275                 // search should not abort render task
00276                 if (running_task->get_type() != TASK_RENDER)
00277                 {
00278                     running_task->abort();
00279                 }
00280             }
00281             break;
00282         default:
00283             break;
00284         }
00285 
00286         return true;
00287     }
00288 
00289     // Woker thread is waiting for a task
00290     return false;
00291 }
00292 
00293 bool Thread::abort_task(void* user_data, TaskType t, unsigned int id)
00294 {
00295     {
00296         ScopeMutex r(&running_task_mutex);
00297         if (running_task != 0 &&
00298             running_task->get_type() == t &&
00299             running_task->get_user_data() == user_data &&
00300             running_task->get_id() == id)
00301         {
00302             // if running task is the one, abort it
00303             running_task->abort();
00304             return true;
00305         }
00306     }
00307 
00308     // search the task in queue
00309     ScopeMutex m(&queue_mutex);
00310     TaskQueueIter idx = task_queue.begin();
00311     while (idx != task_queue.end())
00312     {
00313         if ((*idx)->get_type() == t &&
00314             (*idx)->get_user_data() == user_data &&
00315             (*idx)->get_id() == id)
00316         {
00317             delete *idx;
00318             task_queue.erase(idx);
00319             return true;
00320         }
00321         idx++;
00322     }
00323 
00324     return false;
00325 }
00326 
00327 }
00328 
Generated by  doxygen 1.6.2-20100208