// Copyright 2019 Fuzhou Rockchip Electronics Co., Ltd. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "flow.h" #include #include #include #include "buffer.h" #include "key_string.h" #include "utils.h" namespace easymedia { class FlowCoroutine { public: FlowCoroutine(Flow* f, Model sync_model, FunctionProcess func, float inter); ~FlowCoroutine(); void Bind(std::vector& in, std::vector& out); bool Start(); void RunOnce(); int GetCachedBufferCnt(); private: void WhileRun(); void WhileRunSleep(); void SyncFetchInput(MediaBufferVector& in); void ASyncFetchInputCommon(MediaBufferVector& in); void ASyncFetchInputAtomic(MediaBufferVector& in); void SendNullBufferDown(Flow::FlowMap& fm, const MediaBufferVector& in, std::list& flows); void SendBufferDown(Flow::FlowMap& fm, const MediaBufferVector& in, std::list& flows, bool process_ret); void SendBufferDownFromDeque(Flow::FlowMap& fm, const MediaBufferVector& in, std::list& flows, bool process_ret); size_t OutputHoldRelated(Flow::FlowMap& fm, std::shared_ptr& out_buffer, const MediaBufferVector& input_vector); Flow* flow; Model model; float interval; std::vector in_slots; std::vector out_slots; std::thread* th; FunctionProcess th_run; MediaBufferVector in_vector; decltype(&FlowCoroutine::SyncFetchInput) fetch_input_func; decltype(&FlowCoroutine::SendBufferDown) send_down_func; public: void SetMarkName(std::string s) { name = s; } void SetExpectProcessTime(int time) { expect_process_time = time; } std::string name; int expect_process_time; // ms }; FlowCoroutine::FlowCoroutine(Flow* f, Model sync_model, FunctionProcess func, float inter) : flow(f), model(sync_model), interval(inter), th(nullptr), th_run(func), expect_process_time(0) { } FlowCoroutine::~FlowCoroutine() { if (th) { th->join(); delete th; } LOG("%s quit\n", name.c_str()); } void FlowCoroutine::Bind(std::vector& in, std::vector& out) { assert(in_slots.size() == 0 && out_slots.size() == 0 && "flow coroutine binded"); in_slots = in; out_slots = out; } bool FlowCoroutine::Start() { bool need_thread = false; auto func = &FlowCoroutine::WhileRun; switch (model) { case Model::ASYNCCOMMON: need_thread = true; fetch_input_func = &FlowCoroutine::ASyncFetchInputCommon; send_down_func = &FlowCoroutine::SendBufferDownFromDeque; break; case Model::ASYNCATOMIC: need_thread = true; fetch_input_func = &FlowCoroutine::ASyncFetchInputAtomic; send_down_func = &FlowCoroutine::SendBufferDown; func = &FlowCoroutine::WhileRunSleep; break; case Model::SYNC: fetch_input_func = &FlowCoroutine::SyncFetchInput; send_down_func = &FlowCoroutine::SendBufferDown; break; default: LOG("invalid model %d\n", (int)model); return false; } in_vector.resize(in_slots.size()); if (need_thread) { th = new std::thread(func, this); if (!th) { errno = ENOMEM; return false; } } return true; } #ifndef NDEBUG static void check_consume_time(const char* name, int expect, int exactly) { if (exactly > expect) { LOG("%s, expect consume %d ms, however %d ms\n", name, expect, exactly); } } #endif void FlowCoroutine::RunOnce() { bool ret = true; (this->*fetch_input_func)(in_vector); if (flow->GetRunTimesRemaining()) { #ifndef NDEBUG { AutoDuration ad; #endif ret = (*th_run)(flow, in_vector); #ifndef NDEBUG if (expect_process_time > 0) check_consume_time(name.c_str(), expect_process_time, (int)(ad.Get() / 1000)); } #endif // DEBUG } for (int idx : out_slots) { auto& fm = flow->downflowmap[idx]; std::list flows; fm.list_mtx.read_lock(); flows = fm.flows; fm.list_mtx.unlock(); (this->*send_down_func)(fm, in_vector, flows, ret); } for (auto& buffer : in_vector) { buffer.reset(); } } void FlowCoroutine::WhileRun() { prctl(PR_SET_NAME, this->name.c_str()); LOGD("flow-name %s\n", this->name.c_str()); while (!flow->quit) { RunOnce(); } } void FlowCoroutine::WhileRunSleep() { int64_t times = 0; AutoDuration ad; assert(interval > 0); prctl(PR_SET_NAME, this->name.c_str()); LOGD("flow-name %s\n", this->name.c_str()); while (!flow->quit) { if (times == 0) { ad.Reset(); } RunOnce(); ++times; int64_t remain = (interval * times - ad.Get()) / 1000; if (remain > 0) { msleep((int)remain); } if (times >= 10000) { times = 0; } } } void FlowCoroutine::SyncFetchInput(MediaBufferVector& in) { int i = 0; for (int idx : in_slots) { auto& buffer = flow->v_input[idx].cached_buffer; in[i++] = buffer; buffer.reset(); } } void FlowCoroutine::ASyncFetchInputCommon(MediaBufferVector& in) { flow->cond_mtx.lock(); bool empty = true; for (size_t i = 0; i < in_slots.size(); i++) { int idx = in_slots[i]; auto& input = flow->v_input[idx]; auto& v = input.cached_buffers; if (!v.empty()) { empty = false; } } if (empty) { flow->cond_mtx.wait(); } flow->cond_mtx.unlock(); for (size_t i = 0; i < in_slots.size(); i++) { int idx = in_slots[i]; auto& input = flow->v_input[idx]; auto& v = input.cached_buffers; if (v.empty()) { continue; } if (!flow->enable) { in.assign(in_slots.size(), nullptr); break; } AutoLockMutex _alm(input.mtx); assert(!v.empty()); in[i] = v.front(); v.pop_front(); } } void FlowCoroutine::ASyncFetchInputAtomic(MediaBufferVector& in) { int i = 0; for (int idx : in_slots) { std::shared_ptr buffer; auto& input = flow->v_input[idx]; input.spin_mtx.lock(); buffer = input.cached_buffer; input.spin_mtx.unlock(); in[i++] = buffer; } } void FlowCoroutine::SendNullBufferDown(Flow::FlowMap& fm, const MediaBufferVector& in, std::list& flows) { std::shared_ptr nullbuffer; if (fm.hold_input != HoldInputMode::NONE) { auto empty_result = std::make_shared(); if (empty_result && OutputHoldRelated(fm, empty_result, in) > 0) { nullbuffer = empty_result; } } for (auto& f : flows) { f.flow->SendInput(nullbuffer, f.index_of_in); } } void FlowCoroutine::SendBufferDown(Flow::FlowMap& fm, const MediaBufferVector& in, std::list& flows, bool process_ret) { if (!process_ret) { SendNullBufferDown(fm, in, flows); return; } for (auto& f : flows) { OutputHoldRelated(fm, fm.cached_buffer, in); f.flow->SendInput(fm.cached_buffer, f.index_of_in); } fm.cached_buffer.reset(); } void FlowCoroutine::SendBufferDownFromDeque(Flow::FlowMap& fm, const MediaBufferVector& in, std::list& flows, bool process_ret) { if (!process_ret) { SendNullBufferDown(fm, in, flows); return; } if (fm.cached_buffers.empty()) { return; } for (auto& buffer : fm.cached_buffers) { OutputHoldRelated(fm, buffer, in); for (auto& f : flows) { f.flow->SendInput(buffer, f.index_of_in); } } fm.cached_buffers.clear(); } size_t FlowCoroutine::OutputHoldRelated(Flow::FlowMap& fm, std::shared_ptr& out_buffer, const MediaBufferVector& input_vector) { if (fm.hold_input == HoldInputMode::HOLD_INPUT) { return FlowOutputHoldInput(out_buffer, input_vector); } else if (fm.hold_input == HoldInputMode::INHERIT_FORM_INPUT) { return FlowOutputInheritFromInput(out_buffer, input_vector); } return 0; } int FlowCoroutine::GetCachedBufferCnt() { int cnt = 0; for (auto inv : in_vector) { if (inv) { cnt++; } } return cnt; } DEFINE_REFLECTOR(Flow) DEFINE_FACTORY_COMMON_PARSE(Flow) DEFINE_PART_FINAL_EXPOSE_PRODUCT(Flow, Flow) const FunctionProcess Flow::void_transaction00 = void_transaction<0, 0>; Flow::Flow() : out_slot_num(0), input_slot_num(0), down_flow_num(0), waite_down_flow(true), event_handler2_(nullptr), event_callback_(nullptr), enable(true), quit(false), event_handler_(nullptr), play_video_handler_(nullptr), play_audio_handler_(nullptr), user_handler_(nullptr), user_callback_(nullptr), out_handler_(nullptr), out_callback_(nullptr), run_times(-1) { } Flow::~Flow() { StopAllThread(); } void Flow::StopAllThread() { enable = false; quit = true; cond_mtx.lock(); cond_mtx.notify(); cond_mtx.unlock(); for (auto& coroutine : coroutines) { coroutine.reset(); } } bool Flow::IsAllBuffEmpty() { #ifndef NDEBUG int i = 0; for (auto& input : v_input) { LOG("#FLOW v_input-%d cached_buffers size:%zu\n", i, input.cached_buffers.size()); LOG("#FLOW v_input-%d cached_buffer :%s\n", i++, input.cached_buffer ? "NotNull" : "Null"); } i = 0; for (auto& coroutin : coroutines) { LOG("#FLOW coroutin-%d in_vector size:%d\n", i++, coroutin->GetCachedBufferCnt()); } i = 0; for (auto& fm : downflowmap) { LOG("#FLOW downflowmap-%d cached_buffers size:%zu\n", i, fm.cached_buffers.size()); LOG("#FLOW downflowmap-%d cached_buffer : %s\n", i++, fm.cached_buffer ? "NotNull" : "Null"); } #endif for (auto& input : v_input) { if (input.cached_buffers.size() || input.cached_buffer) { return false; } } for (auto& coroutin : coroutines) { if (coroutin->GetCachedBufferCnt()) { return false; } } for (auto& fm : downflowmap) { if (fm.cached_buffers.size() || fm.cached_buffer) { return false; } } return true; } void Flow::StartStream() { waite_down_flow = false; if (source_start_cond_mtx) { source_start_cond_mtx->lock(); source_start_cond_mtx->notify(); source_start_cond_mtx->unlock(); } } int Flow::SetRunTimes(int _run_times) { run_times = _run_times; LOG("Flow:%s set run_times to %d\n", flow_tag.c_str(), run_times); return run_times; } int Flow::GetRunTimesRemaining() { int remaining_value = run_times; if (run_times > 0) { run_times--; } return remaining_value; } void Flow::DumpBase(std::string& dump_info) { int idx = 0; char str_line[1024] = {0}; dump_info = ""; sprintf(str_line, "#Dump Flow(%s) base info:\r\n", GetFlowTag()); dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " InSlotNum: %d\r\n", input_slot_num); dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " OutSlotNum: %d\r\n", out_slot_num); dump_info.append(str_line); for (auto& input : v_input) { memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " ->Input[%d] info:\r\n", idx); dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " Valid: %s\r\n", input.valid ? "True" : "False"); dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); if (input.thread_model == Model::ASYNCCOMMON) { sprintf(str_line, " ThreadMode: ASYNCCOMMON\r\n"); } else if (input.thread_model == Model::ASYNCATOMIC) { sprintf(str_line, " ThreadMode: ASYNCATOMIC\r\n"); } else if (input.thread_model == Model::SYNC) { sprintf(str_line, " ThreadMode: SYNC\r\n"); } else { sprintf(str_line, " ThreadMode: NONE\r\n"); } dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); if (input.mode_when_full == InputMode::BLOCKING) { sprintf(str_line, " InputMode: BLOCKING\r\n"); } else if (input.mode_when_full == InputMode::DROPCURRENT) { sprintf(str_line, " InputMode: DROPCURRENT\r\n"); } else if (input.mode_when_full == InputMode::DROPFRONT) { sprintf(str_line, " InputMode: DROPFRONT\r\n"); } else { sprintf(str_line, " InputMode: NONE\r\n"); } dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " BufferCnt: current:%d, max:%d\r\n", input.cached_buffers.size(), input.max_cache_num); dump_info.append(str_line); } idx = 0; for (auto& fm : downflowmap) { memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " ->FlowMap[%d] info:\r\n", idx); dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " Valid: %s\r\n", fm.valid ? "True" : "False"); dump_info.append(str_line); memset(str_line, 0, sizeof(str_line)); sprintf(str_line, " BufferCnt: %d\r\n", fm.cached_buffers.size()); dump_info.append(str_line); dump_info.append(" NextFlow: "); for (auto& nflow : fm.flows) { dump_info.append(nflow.flow->GetFlowTag()); dump_info.append(" "); } dump_info.append("\r\n"); } } static bool check_slots(std::vector& slots, const char* debugstr) { if (slots.empty()) { return true; } std::sort(slots.begin(), slots.end()); auto iend = std::unique(slots.begin(), slots.end()); if (iend != slots.end()) { LOG("%s slot duplicate :", debugstr); while (iend != slots.end()) { LOG(" %d", *iend); iend++; } LOG("\n"); return false; } return true; } Flow::FlowMap::FlowMap(FlowMap&& fm) { if (fm.valid) { LOG("Flow::FlowMap is not copyable and moveable after inited\n"); assert(0); } } void Flow::FlowMap::Init(Model m, HoldInputMode hold_in) { assert(!valid); valid = true; if (m == Model::ASYNCCOMMON) { set_output_behavior = &FlowMap::SetOutputToQueueBehavior; } else { set_output_behavior = &FlowMap::SetOutputBehavior; } hold_input = hold_in; } void Flow::FlowMap::SetOutputBehavior(const std::shared_ptr& output) { cached_buffer = output; } void Flow::FlowMap::SetOutputToQueueBehavior(const std::shared_ptr& output) { cached_buffers.push_back(output); } Flow::Input::Input(Input&& in) { if (in.valid) { LOG("Flow::Input is not copyable and moveable after inited\n"); assert(0); } } void Flow::Input::Init(Flow* f, Model m, int mcn, InputMode im, bool f_block, std::shared_ptr fc) { assert(!valid); valid = true; flow = f; thread_model = m; fetch_block = f_block; max_cache_num = mcn; mode_when_full = im; switch (m) { case Model::ASYNCCOMMON: send_input_behavior = &Input::ASyncSendInputCommonBehavior; break; case Model::ASYNCATOMIC: send_input_behavior = &Input::ASyncSendInputAtomicBehavior; break; case Model::SYNC: send_input_behavior = &Input::SyncSendInputBehavior; coroutine = fc; break; default: break; } switch (im) { case InputMode::BLOCKING: async_full_behavior = &Input::ASyncFullBlockingBehavior; break; case InputMode::DROPFRONT: async_full_behavior = &Input::ASyncFullDropFrontBehavior; break; case InputMode::DROPCURRENT: async_full_behavior = &Input::ASyncFullDropCurrentBehavior; break; default: break; } } bool Flow::SetAsSource(const std::vector& output_slots, FunctionProcess f, const std::string& mark) { source_start_cond_mtx = std::make_shared(); if (!source_start_cond_mtx) { return false; } SlotMap sm; sm.input_slots = std::vector{0}; sm.output_slots = output_slots; sm.process = f; sm.thread_model = Model::SYNC; sm.mode_when_full = InputMode::DROPFRONT; if (!InstallSlotMap(sm, mark, 0)) { LOG("Fail to InstallSlotMap, read %s\n", mark.c_str()); return false; } return true; } bool Flow::InstallSlotMap(SlotMap& map, const std::string& mark, int exp_process_time) { LOGD("%s, thread_model=%d, mode_when_full=%d\n", mark.c_str(), map.thread_model, map.mode_when_full); // parameters validity check auto& in_slots = map.input_slots; if (in_slots.size() > 1 && map.thread_model == Model::SYNC) { LOG("More than 1 input to flow, can not set sync input\n"); return false; } if (!check_slots(in_slots, "input")) { return false; } bool ret = true; for (int i : in_slots) { if (i >= (int)v_input.size()) { continue; } if (v_input[i].valid) { LOG("input slot %d has been set\n", i); ret = false; } } if (!ret) { return false; } auto& out_slots = map.output_slots; if (!check_slots(out_slots, "output")) { return false; } for (int i : out_slots) { if (i >= (int)downflowmap.size()) { continue; } if (downflowmap[i].valid) { LOG("output slot %d has been set\n", i); ret = false; } } if (!ret) { return false; } auto c = std::make_shared(this, map.thread_model, map.process, map.interval); if (!c) { errno = ENOMEM; return false; } c->Bind(in_slots, out_slots); coroutines.push_back(c); if (!in_slots.empty()) { int max_idx = in_slots[in_slots.size() - 1]; if ((int)v_input.size() <= max_idx) { v_input.resize(max_idx + 1); } for (size_t i = 0; i < in_slots.size(); i++) { v_input[in_slots[i]].Init( this, map.thread_model, (map.thread_model == Model::ASYNCCOMMON) ? map.input_maxcachenum[i] : 0, map.mode_when_full, (map.thread_model == Model::ASYNCCOMMON && map.fetch_block.size() > i) ? map.fetch_block[i] : true, c); input_slot_num++; } } if (!out_slots.empty()) { int max_idx = out_slots[out_slots.size() - 1]; if ((int)downflowmap.size() <= max_idx) { downflowmap.resize(max_idx + 1); } for (size_t i = 0; i < out_slots.size(); i++) { downflowmap[out_slots[i]].Init(map.thread_model, map.hold_input.size() > i ? map.hold_input[i] : HoldInputMode::NONE); out_slot_num++; } } c->SetMarkName(mark); c->SetExpectProcessTime(exp_process_time); c->Start(); return true; } void Flow::FlowMap::AddFlow(std::shared_ptr flow, int index) { AutoLockMutex _lg(list_mtx); auto i = std::find(flows.begin(), flows.end(), flow); if (i != flows.end()) { LOG("repeatedly add, update index\n"); i->index_of_in = index; return; } // TODO: sort by sync type in downflow flows.emplace_back(flow, index); } void Flow::FlowMap::RemoveFlow(std::shared_ptr flow) { AutoLockMutex _lg(list_mtx); flows.remove_if([&flow](FlowInputMap& fm) { return fm == flow; }); } bool Flow::AddDownFlow(std::shared_ptr down, int out_slot_index, int in_slot_index_of_down) { if (out_slot_num <= 0 || (int)downflowmap.size() != out_slot_num) { LOG("Uncompleted or final flow\n"); return false; } if (out_slot_index >= (int)downflowmap.size()) { LOG("output slot index exceed max\n"); return false; } if (this == down.get()) { LOG("can not set self loop flow\n"); return false; } downflowmap[out_slot_index].AddFlow(down, in_slot_index_of_down); if (source_start_cond_mtx) { source_start_cond_mtx->lock(); down_flow_num++; source_start_cond_mtx->notify(); source_start_cond_mtx->unlock(); } return true; } void Flow::RemoveDownFlow(std::shared_ptr down) { if (out_slot_num <= 0 || (int)downflowmap.size() != out_slot_num) { return; } // if (down->down_flow_num > 0) // LOG("the removing flow has down flows, remove them first\n"); for (auto& dm : downflowmap) { if (!dm.valid) { continue; } dm.RemoveFlow(down); if (source_start_cond_mtx) { source_start_cond_mtx->lock(); down_flow_num--; source_start_cond_mtx->notify(); source_start_cond_mtx->unlock(); } } } void Flow::SendInput(std::shared_ptr& input, int in_slot_index) { if (in_slot_index < 0 || in_slot_index >= input_slot_num) { errno = EINVAL; LOG("ERROR: Input slot[%d] is vaild!\n", in_slot_index); return; } if (enable) { auto& in = v_input[in_slot_index]; CALL_MEMBER_FN(in, in.send_input_behavior)(input); } } bool Flow::SetOutput(const std::shared_ptr& output, int out_slot_index) { if (out_slot_index < 0 || out_slot_index >= out_slot_num) { errno = EINVAL; LOG("ERROR: Output slot[%d] is vaild!\n", out_slot_index); return false; } if (out_callback_ && output) { out_callback_(out_handler_, output); } if (enable) { auto& out = downflowmap[out_slot_index]; CALL_MEMBER_FN(out, out.set_output_behavior)(output); return true; } return false; } bool Flow::ParseWrapFlowParams(const char* param, std::map& flow_params, std::list& sub_param_list) { sub_param_list = ParseFlowParamToList(param); if (sub_param_list.empty()) { return false; } if (!parse_media_param_map(sub_param_list.front().c_str(), flow_params)) { return false; } sub_param_list.pop_front(); if (flow_params[KEY_NAME].empty()) { LOG("missing key name\n"); return false; } return true; } void Flow::RegisterEventHandler(std::shared_ptr flow, EventHook proc) { event_handler_.reset(new EventHandler()); if (event_handler_) { event_handler_->RegisterEventHook(flow, proc); } } void Flow::UnRegisterEventHandler() { if (event_handler_) { event_handler_->UnRegisterEventHook(); event_handler_.reset(nullptr); } } void Flow::NotifyToEventHandler(EventParamPtr param, int type) { if (event_handler_) { MessagePtr msg = std::make_shared(this, param, type); event_handler_->NotifyToEventHandler(msg); event_handler_->SignalEventHook(); } } void Flow::NotifyToEventHandler(int id, int type) { if (event_handler_) { EventParamPtr event_param = std::make_shared(id, 0); MessagePtr msg = std::make_shared(this, event_param, type); event_handler_->NotifyToEventHandler(msg); event_handler_->SignalEventHook(); } } void Flow::EventHookWait() { if (event_handler_) { event_handler_->EventHookWait(); } } MessagePtr Flow::GetEventMessage() { if (event_handler_) { return event_handler_->GetEventMessage(); } return nullptr; } EventParamPtr Flow::GetEventParam(MessagePtr msg) { if (msg) { return msg->GetEventParam(); } return nullptr; } void Flow::Input::SyncSendInputBehavior(std::shared_ptr& input) { cached_buffer = input; coroutine->RunOnce(); cached_buffer.reset(); } void Flow::Input::ASyncSendInputCommonBehavior(std::shared_ptr& input) { mtx.lock(); if (max_cache_num > 0 && max_cache_num <= (int)cached_buffers.size()) { bool ret = (this->*async_full_behavior)(flow->enable); if (!ret) { mtx.unlock(); return; } } cached_buffers.push_back(input); mtx.unlock(); AutoLockMutex _alm(flow->cond_mtx); flow->cond_mtx.notify(); } void Flow::Input::ASyncSendInputAtomicBehavior(std::shared_ptr& input) { AutoLockMutex _alm(spin_mtx); cached_buffer = input; } bool Flow::Input::ASyncFullBlockingBehavior(volatile bool& pred) { #ifndef NDEBUG AutoDuration ad; #endif do { mtx.unlock(); msleep(5); if (max_cache_num > (int)cached_buffers.size()) { mtx.lock(); break; } mtx.lock(); } while (pred); #ifndef NDEBUG if (ad.Get() > 100000 /*ms*/) LOG("WARN: Flow[%s]: Input[block mode]: block too long(%.2fms) > 5ms\n", flow ? flow->GetFlowTag() : "Name is null", ad.Get() / 1000.0); #endif return pred; } bool Flow::Input::ASyncFullDropFrontBehavior(volatile bool& pred _UNUSED) { LOG("WARN: Flow[%s]: Input: drop front buffer!\n", flow ? flow->GetFlowTag() : "Name is null"); cached_buffers.pop_front(); return true; } bool Flow::Input::ASyncFullDropCurrentBehavior(volatile bool& pred _UNUSED) { LOG("WARN: Flow[%s]: Input: drop current buffer!\n", flow ? flow->GetFlowTag() : "Name Is Null"); return false; } std::string gen_datatype_rule(std::map& params) { std::string rule; std::string value; CHECK_EMPTY_SETERRNO_RETURN(, value, params, KEY_INPUTDATATYPE, , ""); PARAM_STRING_APPEND(rule, KEY_INPUTDATATYPE, value); CHECK_EMPTY_SETERRNO_RETURN(, value, params, KEY_OUTPUTDATATYPE, , ""); PARAM_STRING_APPEND(rule, KEY_OUTPUTDATATYPE, value); return std::move(rule); } Model GetModelByString(const std::string& model) { static std::map model_map = { {KEY_ASYNCCOMMON, Model::ASYNCCOMMON}, {KEY_ASYNCATOMIC, Model::ASYNCATOMIC}, {KEY_SYNC, Model::SYNC}}; auto it = model_map.find(model); if (it != model_map.end()) { return it->second; } return Model::NONE; } InputMode GetInputModelByString(const std::string& in_model) { static std::map in_model_map = {{KEY_BLOCKING, InputMode::BLOCKING}, {KEY_DROPFRONT, InputMode::DROPFRONT}, {KEY_DROPCURRENT, InputMode::DROPCURRENT}}; auto it = in_model_map.find(in_model); if (it != in_model_map.end()) { return it->second; } return InputMode::NONE; } void ParseParamToSlotMap(std::map& params, SlotMap& sm, int& input_maxcachenum) { float fps = 0.0f; std::string& fps_str = params[KEY_FPS]; if (!fps_str.empty()) { fps = std::stof(fps_str); if (fps > 0.0f) { sm.interval = 1000.0f / fps; } } sm.thread_model = GetModelByString(params[KEK_THREAD_SYNC_MODEL]); sm.mode_when_full = GetInputModelByString(params[KEK_INPUT_MODEL]); std::string& cache_num_str = params[KEY_INPUT_CACHE_NUM]; int cache_num = -1; if (!cache_num_str.empty()) { cache_num = std::stoi(cache_num_str); if (cache_num <= 0) { LOG("warning, input cache num = %d\n", cache_num); } input_maxcachenum = cache_num; } } size_t FlowOutputHoldInput(std::shared_ptr& out_buffer, const MediaBufferVector& input_vector) { assert(out_buffer); size_t i = 0; for (; i < input_vector.size(); i++) { out_buffer->SetRelatedSPtr(input_vector[i], i); } return i; } size_t FlowOutputInheritFromInput(std::shared_ptr& out_buffer, const MediaBufferVector& input_vector) { assert(out_buffer); size_t ret = 0; auto& vec = out_buffer->GetRelatedSPtrs(); for (size_t i = 0; i < input_vector.size(); i++) { auto& input_vec = input_vector[i]->GetRelatedSPtrs(); ret += input_vec.size(); vec.insert(vec.end(), input_vec.begin(), input_vec.end()); } return ret; } std::string JoinFlowParam(const std::string& flow_param, size_t num_elem, ...) { std::string ret; ret.append(flow_param); va_list va; va_start(va, num_elem); while (num_elem--) { const std::string& elem_param = va_arg(va, const std::string); ret.append(1, FLOW_PARAM_SEPARATE_CHAR).append(elem_param); } va_end(va); return std::move(ret); } std::list ParseFlowParamToList(const char* param) { std::list separate_list; if (!parse_media_param_list(param, separate_list, FLOW_PARAM_SEPARATE_CHAR) || separate_list.size() < 2) { separate_list.clear(); } return std::move(separate_list); } } // namespace easymedia