hc
2023-02-13 e440ec23c5a540cdd3f7464e8779219be6fd3d95
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include "tcp_server.h"
 
#include <atomic>
#include <net/if.h>
#include <pthread.h>
#include <signal.h>
 
#ifdef LOG_TAG
    #undef LOG_TAG
#endif
#define LOG_TAG "aiqtool"
 
TCPServer::~TCPServer()
{
    SaveExit();
}
 
void TCPServer::SaveExit()
{
    quit_.store(true, std::memory_order_release);
    if (accept_thread_ && accept_thread_->joinable())
        accept_thread_->join();
    std::for_each(recv_threads_.begin(), recv_threads_.end(), [](const std::unique_ptr<std::thread>& thrd) {
        if (thrd && thrd->joinable())
            thrd->join();
    });
    close(sockfd);
    sockfd = -1;
}
 
int TCPServer::Send(int cilent_socket, char* buff, int size)
{
    return send(cilent_socket, buff, size, 0);
}
 
int TCPServer::Recvieve(int cilent_socket)
{
    sigset_t set;
    sigemptyset(&set);
    sigaddset(&set, SIGQUIT);
    sigaddset(&set, SIGINT);
    sigaddset(&set, SIGTERM);
    pthread_sigmask(SIG_BLOCK, &set, NULL);
 
    std::thread::id threadID = std::this_thread::get_id();
    LOG_DEBUG("TCPServer::Recvieve enter %d\n", cilent_socket);
    char buffer[MAXPACKETSIZE];
    int size = sizeof(buffer);
    struct timeval interval = {3, 0};
    setsockopt(cilent_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval));
    while (!quit_.load()) {
        int length = recv(cilent_socket, buffer, size, 0);
        if (length == 0) {
            LOG_DEBUG("socket recvieve exit\n");
            break;
        } else if (length < 0 && errno == EAGAIN) {
            // LOG_INFO("socket recvieve failed\n");
            continue;
        } else if (length < 0) {
            break;
        }
        LOG_DEBUG("socket recvieve length: %d\n", length);
 
        if (callback_) {
            callback_(cilent_socket, buffer, length);
        }
    }
 
    LOG_DEBUG("TCPServer::Recvieve exit %d\n", cilent_socket);
    close(cilent_socket);
    cilent_socket = -1;
    //
    recv_threads_finished_id_.push_back(threadID);
    LOG_DEBUG("TCPServer::recv_threads_finished_id_ len: %d\n", recv_threads_finished_id_.size());
    return 0;
}
 
void TCPServer::Accepted()
{
    LOG_INFO("TCPServer::Accepted\n");
    sigset_t set;
    sigemptyset(&set);
    sigaddset(&set, SIGQUIT);
    sigaddset(&set, SIGINT);
    sigaddset(&set, SIGTERM);
    pthread_sigmask(SIG_BLOCK, &set, NULL);
 
    struct timeval interval = {1, 0};
    setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval));
    int reuseTrue = 1;
    setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseTrue, sizeof(int));
    while (!quit_) {
        int cilent_socket;
        socklen_t sosize = sizeof(clientAddress);
        cilent_socket = accept(sockfd, (struct sockaddr*)&clientAddress, &sosize);
 
        if (cilent_socket < 0) {
            if (errno != EAGAIN && errno != EINTR) {
                LOG_ERROR("Error socket accept failed %d %d\n", cilent_socket, errno);
                break;
            }
            continue;
        }
        LOG_DEBUG("socket accept ip %s\n", inet_ntoa(clientAddress.sin_addr));
 
        recv_threads_.push_back(
            std::unique_ptr<std::thread>(new std::thread(&TCPServer::Recvieve, this, cilent_socket)));
        LOG_DEBUG("socket accept close\n");
    }
    close(sockfd);
    sockfd = -1;
    exited_.store(true, std::memory_order_release);
    LOG_DEBUG("socket accept exit\n");
}
 
int TCPServer::Process(int port)
{
    exited_.store(false, std::memory_order_release);
    LOG_DEBUG("TCPServer::Process\n");
    int opt = 1;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        LOG_ERROR("Failed to create socket with tunner");
        exited_.store(true, std::memory_order_release);
        return -1;
    }
 
    memset(&serverAddress, 0, sizeof(serverAddress));
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        LOG_ERROR("Error setsockopt\n");
        exited_.store(true, std::memory_order_release);
        return -1;
    }
 
    serverAddress.sin_family = AF_INET;
    serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
    serverAddress.sin_port = htons(port);
    if ((::bind(sockfd, (struct sockaddr*)&serverAddress, sizeof(serverAddress))) < 0) {
        LOG_ERROR("Error bind\n");
        exited_.store(true, std::memory_order_release);
        return -1;
    }
    if (listen(sockfd, 5) < 0) {
        LOG_ERROR("Error listen\n");
        exited_.store(true, std::memory_order_release);
        return -1;
    }
 
    if (accept_thread_) {
        // SaveExit();
    }
    quit_.store(false, std::memory_order_release);
    accept_thread_ = std::unique_ptr<std::thread>(new std::thread(&TCPServer::Accepted, this));
 
    return 0;
}