hc
2023-02-13 e440ec23c5a540cdd3f7464e8779219be6fd3d95
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
 * Copyright 2020 Rockchip Electronics Co. LTD
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * src author: <mediapipe-team@google.com>
 * new author: modified by <rimon.xu@rock-chips.com>
 *       date: 2020-03-15
 *        ref: https://github.com/google/mediapipe
 */
 
#ifndef SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_
#define SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_
 
#include <deque>
#include <functional>
#include <string>
#include <vector>
#include <map>
 
#include "rt_header.h"              // NOLINT
#include "RTThreadOptions.h"        // NOLINT
#include "rt_thread.h"
#include "rt_mutex.h"
 
typedef enum _RTThreadPoolMode {
    RT_THREAD_POOL_RANDOM_MODE,
    RT_THREAD_POOL_ASSIGN_MODE,
} RTThreadPoolMode;
 
// A thread pool consists of a set of threads that sit around waiting
// for callbacks to appear on a queue.  When that happens, one of the
// threads pulls a callback off the queue and runs it.
//
// The thread pool is shut down when the pool is destroyed.
//
// Sample usage:
//
// {
//   RTThreadPool pool("testpool", num_workers);
//   pool.startWorkers();
//   for (INT32 i = 0; i < N; ++i) {
//     pool.schedule([i]() { DoWork(i); });
//   }
// }
//
class RTThreadPool {
 public:
    // Create a thread pool that provides a concurrency of "numThreads"
    // threads. I.e., if "numThreads" items are added, they are all
    // guaranteed to run concurrently without excessive delay.
    // It has an effectively infinite maximum queue length.
    // If numThreads is 1, the callbacks are run in FIFO order.
    explicit RTThreadPool(INT32 numThreads);
    RTThreadPool(const RTThreadPool&) = delete;
    RTThreadPool& operator=(const RTThreadPool&) = delete;
 
    // Like the RTThreadPool(INT32 numThreads) constructor, except that
    // it also associates "namePrefix" with each of the threads
    // in the thread pool.
    RTThreadPool(const std::string& namePrefix, INT32 numThreads,
                RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE);
 
    // Create a thread pool that creates and can use up to "numThreads"
    // threads.  Any standard thread options, such as stack size, should
    // be passed via "threadOptions".  "namePrefix" specifies the
    // thread name prefix.
    RTThreadPool(const RTThreadOptions& threadOptions,
               const std::string& namePrefix, INT32 numThreads,
               RTThreadPoolMode mode = RT_THREAD_POOL_RANDOM_MODE);
 
    // Waits for closures (if any) to complete. May be called without
    // having called startWorkers().
    ~RTThreadPool();
 
    // REQUIRES: startWorkers has not been called
    // Actually start the worker threads.
    void startWorkers();
 
    // REQUIRES: startWorkers has been called
    // Add specified callback to queue of pending callbacks.  Eventually a
    // thread will pull this callback off the queue and execute it.
    void schedule(std::function<void()> callback, INT32 lockThreadId = 0);
 
    // Provided for debugging and testing only.
    INT32 getNumThreads() const;
 
    // Standard thread options.  Use this accessor to get them.
    const RTThreadOptions& getThreadOptions() const;
 
 private:
    class RTWorkerThread;
    typedef struct RTAssignWorker {
        RTWorkerThread *worker;
        std::deque<std::function<void()>> tasks;
        RtMutex mutex;
        RtCondition condition;
    } RTAssignWorker;
 
    void  runWorker(INT32 threadId);
    void  randomAndRunWorker();
    void  assignAndRunWorker(INT32 threadId);
    void  randomScheduleToWorker(std::function<void()> callback);
    void  assignScheduleToWorker(std::function<void()> callback, INT32 threadId);
    void  setThreadCount(INT32 numThreads);
 
    std::string mNamePrefix;
    /* thread id and worker */
    std::map<int, RTAssignWorker *> mWorkers;
    INT32 mNumThreads;
 
    RtMutex mMutex;
    RtCondition mCondition;
    bool mStopped = false;
 
    std::deque<std::function<void()>> mTasks;
 
    RTThreadOptions mThreadOptions;
    RTThreadPoolMode mMode;
};
 
namespace internal {
 
// Creates name for thread in a thread pool based on provided prefix and
// thread id. Length of the resulting name is guaranteed to be less or equal
// to 15. Name or thread id can be truncated to achieve that, see truncation
// samples below:
// namePrefix, 1234       -> namePrefix/123
// namePrefix, 1234567    -> namePrefix/123
// namePrefix_long, 1234  -> namePrefix_lon
std::string createThreadName(const std::string& prefix, INT32 threadId);
 
}  // namespace internal
 
#endif  // SRC_RT_TASK_TASK_GRAPH_RTTHREADPOOL_H_