/*
|
* task_service.h
|
*
|
* Copyright (c) 2021 Rockchip Electronics Co., Ltd.
|
*
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
* you may not use this file except in compliance with the License.
|
* You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*
|
* Author: Cody Xie <cody.xie@rock-chips.com>
|
*/
|
#ifndef XCORE_TASK_SERVICE_H
|
#define XCORE_TASK_SERVICE_H
|
|
#include <algorithm>
|
#include <atomic>
|
#include <cassert>
|
#include <chrono>
|
#include <condition_variable>
|
#include <cstdint>
|
#include <deque>
|
#include <functional>
|
#include <iostream>
|
#include <mutex>
|
#include <thread>
|
#include <utility>
|
|
#include "task_traits.h"
|
|
#define DEBUG 1
|
#ifndef TASK_LOG
|
#define TASK_LOG(format, ...) \
|
printf("DEBUG: " #format "\n", ##__VA_ARGS__)
|
#endif
|
|
namespace XCam {
|
|
// SERVICE CONCEPTS:
|
// * A aynchronous threaded service to execute specific tasks that deal with
|
// INPUTS and OUTPUTS parameters
|
// * Service will calculate the MAX_PROCESS_TIME of the task consumed
|
// * Support run ONCE/SYNC/ASYNC on a task
|
// * ASYNC: Run task in a threaded loop
|
// * SYNC: Call ENQUEUE/DEQUEUE synchronous by user
|
// * ONCE: RunOnce for one param input
|
// * Support ENQUEUE/DEQUEUE methods
|
// * ENQUEUE will signal loop immediately to run task
|
// * DEQUEUE will wait until results not empty and the MAY_BLOCK is true
|
// * DEQUEUE may return empty param if no result and the MAY_BLOCK is false
|
// * Support buffer overrun/underrun behaviors on
|
// * BUSY (fifo full)
|
// * IDLE (fifo empty for longer than 2 period)
|
// * LATE (execution time longer than 1 period)
|
// * ERROR (process error)
|
// * Support config and config change event ???
|
// to negotiate with other service ???
|
//
|
// * Users can ENQUEUE new params, DEQUEUE PROCESSED/ERROR/SKIPPED params
|
// * Users can trigger START, STOP the service
|
// * Users can FLUSH the proccessing queue, mark param to SKIPPED
|
// * Users can specific the MAXIMUM PROCESS TIME
|
//
|
// * Parameters are UNIQUE, FIFO even on ERROR/LATE happened
|
// * Parameters have a BUFFER POOL, with MAX_PARAM_COUNT
|
// * Parameters can be ALLOCATED inside or outside of Service
|
// * Last result will be HOLD A COPY if current task failed
|
//
|
// * Task is a FUNCTOR object
|
// * Task does NOT OWN the param
|
// * Task may HOLD several input buffers
|
// * Task may return process result
|
// * SUCCESS - returned, successful processed
|
// * SKIPPED - returned, but this param skipped
|
// * FAILED - returned, but failed in processing
|
// * AGAIN - required more inputs
|
// * Task inputs and outputs may have different id
|
// * Task outputs may have inputs param
|
//
|
|
template <typename T, class Container = std::deque<ServiceParam<T>>>
|
class TaskService {
|
public:
|
typedef TaskService<T> value_type;
|
typedef ServiceTask<T> task_type;
|
typedef ServiceParam<T> param_type;
|
|
TaskService() = delete;
|
TaskService(const TaskService<T>&) = delete;
|
const TaskService<T>& operator=(const TaskService<T>&) = delete;
|
|
explicit TaskService(std::unique_ptr<ServiceTask<T>> task,
|
const bool may_block_ = default_may_block,
|
const uint8_t max_param_count = default_max_param_count,
|
const TaskDuration max_process_time = default_process_time)
|
: task_(std::move(task)),
|
may_block_(default_may_block),
|
max_param_count_(max_param_count),
|
max_process_time_(max_process_time),
|
started_(false) {
|
assert(task_.get() != nullptr);
|
allocServiceParam();
|
}
|
|
virtual ~TaskService() {
|
stop();
|
clear();
|
}
|
|
void loop() {
|
while (started_) {
|
std::unique_lock<std::mutex> in_lock(in_mutex_);
|
auto wait_ret = in_cond_.wait_for(in_lock, max_process_time_, [this]() {
|
return (!started_ || (!in_params_.empty() &&
|
in_params_.front().state == ParamState::kReadyForProcess));
|
});
|
if (wait_ret == false) {
|
// IDLE
|
continue;
|
}
|
if (!started_) break;
|
|
auto param = std::move(in_params_.front());
|
in_params_.pop_front();
|
in_lock.unlock();
|
param.state = ParamState::kProcessing;
|
const auto start = std::chrono::steady_clock::now();
|
|
auto ret = (*task_)(param);
|
if (ret == TaskResult::kSuccess) {
|
param.state = ParamState::kProcessedSuccess;
|
} else {
|
param.state = ParamState::kProcessedError;
|
TASK_LOG("task processs failed");
|
#ifdef USE_TASK_CALLBACK
|
callbacks_.onError();
|
#endif
|
}
|
|
{
|
std::lock_guard<std::mutex> out_lock(out_mutex_);
|
// TODO(Cody): if we have alllocated params in front,
|
// this will block the result until these params are
|
// dequeued. if we use push_front, we break the sequence
|
// of params
|
out_params_.push_back(std::move(param));
|
out_cond_.notify_one();
|
}
|
|
const auto end = std::chrono::steady_clock::now();
|
|
const std::chrono::duration<double, std::milli> elapsed = end - start;
|
if (elapsed >= max_process_time_) {
|
// TODO(Cody): match the frame id ?
|
TASK_LOG("params processs elapsed %lf exceeds %lf", elapsed.count(),
|
max_process_time_.count());
|
#if USE_TASK_CALLBACK
|
callbacks_.onLate();
|
#endif
|
}
|
}
|
}
|
|
void start() {
|
if (started_) {
|
return;
|
}
|
started_ = true;
|
executor_ = std::thread([this] { loop(); });
|
// executor_.detach();
|
#if USE_TASK_CALLBACK
|
callbacks_.onStarted();
|
#endif
|
}
|
|
void stop() {
|
if (started_) {
|
started_ = false;
|
executor_.join();
|
}
|
#if USE_TASK_CALLBACK
|
callbacks_.onStoped();
|
#endif
|
}
|
|
void allocServiceParam() noexcept {
|
std::lock_guard<std::mutex> lock(out_mutex_);
|
while (out_params_.size() < max_param_count_) {
|
// TODO(Cody): Must add try block if -fexceptions enabled
|
// Ensure new object does not throw any exceptions
|
ServiceParam<T> param;
|
param.state = ParamState::kAllocated;
|
param.unique_id = -1;
|
param.payload = std::make_shared<T>();
|
out_params_.push_back(std::move(param));
|
out_cond_.notify_one();
|
}
|
}
|
|
void enqueue(ServiceParam<T>& input) {
|
std::lock_guard<std::mutex> lock(in_mutex_);
|
// TODO(Cody): deal with busy case
|
#if 0
|
if (in_params_.size() == max_param_count_) {
|
TASK_LOG("input params exceeds max count!");
|
#if USE_TASK_CALLBACK
|
callbacks_.onFull(input);
|
#endif
|
return;
|
}
|
#endif
|
input.state = ParamState::kReadyForProcess;
|
in_params_.push_back(input);
|
in_cond_.notify_one();
|
}
|
|
void enqueue(ServiceParam<T>&& input) {
|
std::lock_guard<std::mutex> lock(in_mutex_);
|
// TODO(Cody): deal with busy case
|
#if 0
|
if (in_params_.size() == max_param_count_) {
|
TASK_LOG("input params exceeds max count!");
|
#if USE_TASK_CALLBACK
|
callbacks_.onFull(input);
|
#endif
|
return;
|
}
|
#endif
|
input.state = ParamState::kReadyForProcess;
|
in_params_.push_back(std::move(input));
|
in_cond_.notify_one();
|
}
|
|
ServiceParam<T> dequeue() {
|
std::unique_lock<std::mutex> lock(out_mutex_);
|
out_cond_.wait(lock, [this]() {
|
if ((!out_params_.empty() &&
|
(out_params_.front().state == ParamState::kAllocated ||
|
out_params_.front().state == ParamState::kProcessedSuccess ||
|
out_params_.front().state == ParamState::kProcessedError))) {
|
return true;
|
} else if (!may_block_) {
|
ServiceParam<T> empty = {
|
.state = ParamState::kNull,
|
.unique_id = -1,
|
.payload = std::shared_ptr<T>(nullptr),
|
};
|
out_params_.push_front(std::move(empty));
|
return true;
|
} else {
|
return false;
|
}
|
});
|
auto p = std::move(out_params_.front());
|
out_params_.pop_front();
|
return p;
|
}
|
|
void flush() {
|
std::lock(in_mutex_, out_mutex_);
|
std::lock_guard<std::mutex> in_lock(in_mutex_, std::adopt_lock);
|
std::lock_guard<std::mutex> out_lock(in_mutex_, std::adopt_lock);
|
if (in_params_.empty()) {
|
return;
|
}
|
auto paramIt =
|
std::remove_if(in_params_.begin(), in_params_.end(), [](const ServiceParam<T>& p) {
|
return (p.state != ParamState::kReadyForProcess) &&
|
(p.state != ParamState::kAllocated);
|
});
|
for (auto param = paramIt; param != in_params_.end(); param++) {
|
param.state = ParamState::kAllocated;
|
param.unique_id = -1;
|
out_params_.push_back(param);
|
out_cond_.notify_one();
|
}
|
}
|
|
void clear() {
|
std::lock(in_mutex_, out_mutex_);
|
std::lock_guard<std::mutex> in_lock(in_mutex_, std::adopt_lock);
|
std::lock_guard<std::mutex> out_lock(in_mutex_, std::adopt_lock);
|
in_params_.clear();
|
out_params_.clear();
|
}
|
|
const uint8_t getMaxParamCount() { return max_param_count_; }
|
void setMaxParamCount(const uint8_t count) { max_param_count_ = count; }
|
|
const TaskDuration getMaxProceedTime() { return max_process_time_; }
|
void setMaxProceedTime(const TaskDuration duration) { max_process_time_ = duration; }
|
void setMaxProceedTimeByFps(const int fps) {
|
max_process_time_ = TaskDuration(std::chrono::milliseconds(1000 / fps));
|
}
|
|
// TODO(Cody): add support link to other services
|
#if 0
|
template <typename O>
|
friend void link(TaskService<O>* other);
|
template <typename O>
|
friend void unlink(TaskService<O>* other);
|
#endif
|
|
private:
|
uint8_t max_param_count_;
|
TaskDuration max_process_time_;
|
bool may_block_;
|
bool started_;
|
|
std::mutex in_mutex_;
|
std::condition_variable in_cond_;
|
std::mutex out_mutex_;
|
std::condition_variable out_cond_;
|
std::unique_ptr<ServiceTask<T>> task_;
|
std::thread executor_;
|
|
Container in_params_;
|
Container out_params_;
|
|
#if USE_TASK_CALLBACK
|
TaskCallbacks callbacks_;
|
#endif
|
};
|
|
}; // namespace XCam
|
|
#endif // ALGOS_AEIS_TASK_SERVICE_H
|