/* * task_service.h * * Copyright (c) 2021 Rockchip Electronics Co., Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * Author: Cody Xie */ #ifndef XCORE_TASK_SERVICE_H #define XCORE_TASK_SERVICE_H #include #include #include #include #include #include #include #include #include #include #include #include #include "task_traits.h" #define DEBUG 1 #ifndef TASK_LOG #define TASK_LOG(format, ...) \ printf("DEBUG: " #format "\n", ##__VA_ARGS__) #endif namespace XCam { // SERVICE CONCEPTS: // * A aynchronous threaded service to execute specific tasks that deal with // INPUTS and OUTPUTS parameters // * Service will calculate the MAX_PROCESS_TIME of the task consumed // * Support run ONCE/SYNC/ASYNC on a task // * ASYNC: Run task in a threaded loop // * SYNC: Call ENQUEUE/DEQUEUE synchronous by user // * ONCE: RunOnce for one param input // * Support ENQUEUE/DEQUEUE methods // * ENQUEUE will signal loop immediately to run task // * DEQUEUE will wait until results not empty and the MAY_BLOCK is true // * DEQUEUE may return empty param if no result and the MAY_BLOCK is false // * Support buffer overrun/underrun behaviors on // * BUSY (fifo full) // * IDLE (fifo empty for longer than 2 period) // * LATE (execution time longer than 1 period) // * ERROR (process error) // * Support config and config change event ??? // to negotiate with other service ??? // // * Users can ENQUEUE new params, DEQUEUE PROCESSED/ERROR/SKIPPED params // * Users can trigger START, STOP the service // * Users can FLUSH the proccessing queue, mark param to SKIPPED // * Users can specific the MAXIMUM PROCESS TIME // // * Parameters are UNIQUE, FIFO even on ERROR/LATE happened // * Parameters have a BUFFER POOL, with MAX_PARAM_COUNT // * Parameters can be ALLOCATED inside or outside of Service // * Last result will be HOLD A COPY if current task failed // // * Task is a FUNCTOR object // * Task does NOT OWN the param // * Task may HOLD several input buffers // * Task may return process result // * SUCCESS - returned, successful processed // * SKIPPED - returned, but this param skipped // * FAILED - returned, but failed in processing // * AGAIN - required more inputs // * Task inputs and outputs may have different id // * Task outputs may have inputs param // template >> class TaskService { public: typedef TaskService value_type; typedef ServiceTask task_type; typedef ServiceParam param_type; TaskService() = delete; TaskService(const TaskService&) = delete; const TaskService& operator=(const TaskService&) = delete; explicit TaskService(std::unique_ptr> task, const bool may_block_ = default_may_block, const uint8_t max_param_count = default_max_param_count, const TaskDuration max_process_time = default_process_time) : task_(std::move(task)), may_block_(default_may_block), max_param_count_(max_param_count), max_process_time_(max_process_time), started_(false) { assert(task_.get() != nullptr); allocServiceParam(); } virtual ~TaskService() { stop(); clear(); } void loop() { while (started_) { std::unique_lock in_lock(in_mutex_); auto wait_ret = in_cond_.wait_for(in_lock, max_process_time_, [this]() { return (!started_ || (!in_params_.empty() && in_params_.front().state == ParamState::kReadyForProcess)); }); if (wait_ret == false) { // IDLE continue; } if (!started_) break; auto param = std::move(in_params_.front()); in_params_.pop_front(); in_lock.unlock(); param.state = ParamState::kProcessing; const auto start = std::chrono::steady_clock::now(); auto ret = (*task_)(param); if (ret == TaskResult::kSuccess) { param.state = ParamState::kProcessedSuccess; } else { param.state = ParamState::kProcessedError; TASK_LOG("task processs failed"); #ifdef USE_TASK_CALLBACK callbacks_.onError(); #endif } { std::lock_guard out_lock(out_mutex_); // TODO(Cody): if we have alllocated params in front, // this will block the result until these params are // dequeued. if we use push_front, we break the sequence // of params out_params_.push_back(std::move(param)); out_cond_.notify_one(); } const auto end = std::chrono::steady_clock::now(); const std::chrono::duration elapsed = end - start; if (elapsed >= max_process_time_) { // TODO(Cody): match the frame id ? TASK_LOG("params processs elapsed %lf exceeds %lf", elapsed.count(), max_process_time_.count()); #if USE_TASK_CALLBACK callbacks_.onLate(); #endif } } } void start() { if (started_) { return; } started_ = true; executor_ = std::thread([this] { loop(); }); // executor_.detach(); #if USE_TASK_CALLBACK callbacks_.onStarted(); #endif } void stop() { if (started_) { started_ = false; executor_.join(); } #if USE_TASK_CALLBACK callbacks_.onStoped(); #endif } void allocServiceParam() noexcept { std::lock_guard lock(out_mutex_); while (out_params_.size() < max_param_count_) { // TODO(Cody): Must add try block if -fexceptions enabled // Ensure new object does not throw any exceptions ServiceParam param; param.state = ParamState::kAllocated; param.unique_id = -1; param.payload = std::make_shared(); out_params_.push_back(std::move(param)); out_cond_.notify_one(); } } void enqueue(ServiceParam& input) { std::lock_guard lock(in_mutex_); // TODO(Cody): deal with busy case #if 0 if (in_params_.size() == max_param_count_) { TASK_LOG("input params exceeds max count!"); #if USE_TASK_CALLBACK callbacks_.onFull(input); #endif return; } #endif input.state = ParamState::kReadyForProcess; in_params_.push_back(input); in_cond_.notify_one(); } void enqueue(ServiceParam&& input) { std::lock_guard lock(in_mutex_); // TODO(Cody): deal with busy case #if 0 if (in_params_.size() == max_param_count_) { TASK_LOG("input params exceeds max count!"); #if USE_TASK_CALLBACK callbacks_.onFull(input); #endif return; } #endif input.state = ParamState::kReadyForProcess; in_params_.push_back(std::move(input)); in_cond_.notify_one(); } ServiceParam dequeue() { std::unique_lock lock(out_mutex_); out_cond_.wait(lock, [this]() { if ((!out_params_.empty() && (out_params_.front().state == ParamState::kAllocated || out_params_.front().state == ParamState::kProcessedSuccess || out_params_.front().state == ParamState::kProcessedError))) { return true; } else if (!may_block_) { ServiceParam empty = { .state = ParamState::kNull, .unique_id = -1, .payload = std::shared_ptr(nullptr), }; out_params_.push_front(std::move(empty)); return true; } else { return false; } }); auto p = std::move(out_params_.front()); out_params_.pop_front(); return p; } void flush() { std::lock(in_mutex_, out_mutex_); std::lock_guard in_lock(in_mutex_, std::adopt_lock); std::lock_guard out_lock(in_mutex_, std::adopt_lock); if (in_params_.empty()) { return; } auto paramIt = std::remove_if(in_params_.begin(), in_params_.end(), [](const ServiceParam& p) { return (p.state != ParamState::kReadyForProcess) && (p.state != ParamState::kAllocated); }); for (auto param = paramIt; param != in_params_.end(); param++) { param.state = ParamState::kAllocated; param.unique_id = -1; out_params_.push_back(param); out_cond_.notify_one(); } } void clear() { std::lock(in_mutex_, out_mutex_); std::lock_guard in_lock(in_mutex_, std::adopt_lock); std::lock_guard out_lock(in_mutex_, std::adopt_lock); in_params_.clear(); out_params_.clear(); } const uint8_t getMaxParamCount() { return max_param_count_; } void setMaxParamCount(const uint8_t count) { max_param_count_ = count; } const TaskDuration getMaxProceedTime() { return max_process_time_; } void setMaxProceedTime(const TaskDuration duration) { max_process_time_ = duration; } void setMaxProceedTimeByFps(const int fps) { max_process_time_ = TaskDuration(std::chrono::milliseconds(1000 / fps)); } // TODO(Cody): add support link to other services #if 0 template friend void link(TaskService* other); template friend void unlink(TaskService* other); #endif private: uint8_t max_param_count_; TaskDuration max_process_time_; bool may_block_; bool started_; std::mutex in_mutex_; std::condition_variable in_cond_; std::mutex out_mutex_; std::condition_variable out_cond_; std::unique_ptr> task_; std::thread executor_; Container in_params_; Container out_params_; #if USE_TASK_CALLBACK TaskCallbacks callbacks_; #endif }; }; // namespace XCam #endif // ALGOS_AEIS_TASK_SERVICE_H