/*
|
* thread_pool.cpp - Thread Pool
|
*
|
* Copyright (c) 2017 Intel Corporation
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*
|
* Author: Wind Yuan <feng.yuan@intel.com>
|
*/
|
|
#include "thread_pool.h"
|
|
#define XCAM_POOL_MIN_THREADS 2
|
#define XCAM_POOL_MAX_THREADS 1024
|
|
namespace XCam {
|
|
class UserThread
|
: public Thread
|
{
|
public:
|
UserThread (const SmartPtr<ThreadPool> &pool, const char *name)
|
: Thread (name)
|
, _pool (pool)
|
{}
|
|
protected:
|
virtual bool started ();
|
virtual void stopped ();
|
virtual bool loop ();
|
|
private:
|
SmartPtr<ThreadPool> _pool;
|
};
|
|
bool
|
UserThread::started ()
|
{
|
XCAM_ASSERT (_pool.ptr ());
|
SmartLock lock (_pool->_mutex);
|
return true;
|
}
|
|
void
|
UserThread::stopped ()
|
{
|
XCAM_LOG_DEBUG ("thread(%s, %p) stopped", XCAM_STR(get_name ()), this);
|
}
|
|
bool
|
UserThread::loop ()
|
{
|
XCAM_ASSERT (_pool.ptr ());
|
{
|
SmartLock lock (_pool->_mutex);
|
if (!_pool->_running)
|
return false;
|
}
|
|
SmartPtr<ThreadPool::UserData> data = _pool->_data_queue.pop ();
|
if (!data.ptr ()) {
|
XCAM_LOG_DEBUG ("user thread(%s) get null data, need stop", XCAM_STR (_pool->get_name ()));
|
return false;
|
}
|
|
{
|
SmartLock lock (_pool->_mutex);
|
XCAM_ASSERT (_pool->_free_threads > 0);
|
--_pool->_free_threads;
|
}
|
|
bool ret = _pool->dispatch (data);
|
|
if (ret) {
|
SmartLock lock (_pool->_mutex);
|
++_pool->_free_threads;
|
}
|
return ret;
|
}
|
|
bool
|
ThreadPool::dispatch (const SmartPtr<ThreadPool::UserData> &data)
|
{
|
XCAM_FAIL_RETURN (
|
ERROR, data.ptr(), true,
|
"ThreadPool(%s) dispatch NULL data", XCAM_STR (get_name ()));
|
XCamReturn err = data->run ();
|
data->done (err);
|
return true;
|
}
|
|
ThreadPool::ThreadPool (const char *name)
|
: _name (NULL)
|
, _min_threads (XCAM_POOL_MIN_THREADS)
|
, _max_threads (XCAM_POOL_MIN_THREADS)
|
, _allocated_threads (0)
|
, _free_threads (0)
|
, _running (false)
|
{
|
if (name)
|
_name = strndup (name, XCAM_MAX_STR_SIZE);
|
}
|
|
ThreadPool::~ThreadPool ()
|
{
|
stop ();
|
|
xcam_mem_clear (_name);
|
}
|
|
bool
|
ThreadPool::set_threads (uint32_t min, uint32_t max)
|
{
|
XCAM_FAIL_RETURN (
|
ERROR, !_running, false,
|
"ThreadPool(%s) set threads failed, need stop the pool first", XCAM_STR(get_name ()));
|
|
if (min < XCAM_POOL_MIN_THREADS)
|
min = XCAM_POOL_MIN_THREADS;
|
if (max > XCAM_POOL_MAX_THREADS)
|
max = XCAM_POOL_MAX_THREADS;
|
|
if (min > max)
|
min = max;
|
|
_min_threads = min;
|
_max_threads = max;
|
return true;
|
}
|
|
bool
|
ThreadPool::is_running ()
|
{
|
SmartLock locker(_mutex);
|
return _running;
|
}
|
|
XCamReturn
|
ThreadPool::start ()
|
{
|
SmartLock locker(_mutex);
|
if (_running)
|
return XCAM_RETURN_NO_ERROR;
|
|
_free_threads = 0;
|
_allocated_threads = 0;
|
_data_queue.resume_pop ();
|
|
for (uint32_t i = 0; i < _min_threads; ++i) {
|
XCamReturn ret = create_user_thread_unsafe ();
|
XCAM_FAIL_RETURN (
|
ERROR, xcam_ret_is_ok (ret), ret,
|
"thread pool(%s) start failed by creating user thread", XCAM_STR (get_name()));
|
}
|
|
XCAM_ASSERT (_allocated_threads == _min_threads);
|
|
_running = true;
|
return XCAM_RETURN_NO_ERROR;
|
}
|
|
XCamReturn
|
ThreadPool::stop ()
|
{
|
UserThreadList threads;
|
{
|
SmartLock locker(_mutex);
|
if (!_running)
|
return XCAM_RETURN_NO_ERROR;
|
|
_running = false;
|
threads = _thread_list;
|
_thread_list.clear ();
|
}
|
|
for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
|
{
|
SmartPtr<UserThread> t = *i;
|
XCAM_ASSERT (t.ptr ());
|
t->emit_stop ();
|
}
|
|
_data_queue.pause_pop ();
|
_data_queue.clear ();
|
|
for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
|
{
|
SmartPtr<UserThread> t = *i;
|
XCAM_ASSERT (t.ptr ());
|
t->stop ();
|
}
|
|
{
|
SmartLock locker(_mutex);
|
_free_threads = 0;
|
_allocated_threads = 0;
|
}
|
|
return XCAM_RETURN_NO_ERROR;
|
}
|
|
XCamReturn
|
ThreadPool::create_user_thread_unsafe ()
|
{
|
char name[256];
|
snprintf (name, 255, "%s-%d", XCAM_STR (get_name()), _allocated_threads);
|
SmartPtr<UserThread> thread = new UserThread (this, name);
|
XCAM_ASSERT (thread.ptr ());
|
XCAM_FAIL_RETURN (
|
ERROR, thread.ptr () && thread->start (), XCAM_RETURN_ERROR_THREAD,
|
"ThreadPool(%s) create user thread failed by starting error", XCAM_STR (get_name()));
|
|
_thread_list.push_back (thread);
|
|
++_allocated_threads;
|
++_free_threads;
|
XCAM_ASSERT (_free_threads <= _allocated_threads);
|
|
return XCAM_RETURN_NO_ERROR;
|
}
|
|
XCamReturn
|
ThreadPool::queue (const SmartPtr<UserData> &data)
|
{
|
XCAM_ASSERT (data.ptr ());
|
{
|
SmartLock locker (_mutex);
|
if (!_running)
|
return XCAM_RETURN_ERROR_THREAD;
|
}
|
|
if (!_data_queue.push (data))
|
return XCAM_RETURN_ERROR_THREAD;
|
|
do {
|
SmartLock locker(_mutex);
|
if (!_running) {
|
_data_queue.erase (data);
|
return XCAM_RETURN_ERROR_THREAD;
|
}
|
|
if (_allocated_threads >= _max_threads)
|
break;
|
|
if (!_free_threads)
|
break;
|
|
XCamReturn err = create_user_thread_unsafe ();
|
if (!xcam_ret_is_ok (err) && _allocated_threads) {
|
XCAM_LOG_WARNING ("thread pool(%s) create new thread failed but queue data can continue");
|
break;
|
}
|
|
XCAM_FAIL_RETURN (
|
ERROR, xcam_ret_is_ok (err), err,
|
"thread pool(%s) queue data failed by creating user thread", XCAM_STR (get_name()));
|
|
} while (0);
|
|
return XCAM_RETURN_NO_ERROR;
|
}
|
|
}
|