#include "tcp_server.h" #include #include #include #include #ifdef LOG_TAG #undef LOG_TAG #endif #define LOG_TAG "aiqtool" TCPServer::~TCPServer() { SaveExit(); } void TCPServer::SaveExit() { quit_.store(true, std::memory_order_release); if (accept_thread_ && accept_thread_->joinable()) accept_thread_->join(); std::for_each(recv_threads_.begin(), recv_threads_.end(), [](const std::unique_ptr& thrd) { if (thrd && thrd->joinable()) thrd->join(); }); close(sockfd); sockfd = -1; } int TCPServer::Send(int cilent_socket, char* buff, int size) { return send(cilent_socket, buff, size, 0); } int TCPServer::Recvieve(int cilent_socket) { sigset_t set; sigemptyset(&set); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGINT); sigaddset(&set, SIGTERM); pthread_sigmask(SIG_BLOCK, &set, NULL); std::thread::id threadID = std::this_thread::get_id(); LOG_DEBUG("TCPServer::Recvieve enter %d\n", cilent_socket); char buffer[MAXPACKETSIZE]; int size = sizeof(buffer); struct timeval interval = {3, 0}; setsockopt(cilent_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval)); while (!quit_.load()) { int length = recv(cilent_socket, buffer, size, 0); if (length == 0) { LOG_DEBUG("socket recvieve exit\n"); break; } else if (length < 0 && errno == EAGAIN) { // LOG_INFO("socket recvieve failed\n"); continue; } else if (length < 0) { break; } LOG_DEBUG("socket recvieve length: %d\n", length); if (callback_) { callback_(cilent_socket, buffer, length); } } LOG_DEBUG("TCPServer::Recvieve exit %d\n", cilent_socket); close(cilent_socket); cilent_socket = -1; // recv_threads_finished_id_.push_back(threadID); LOG_DEBUG("TCPServer::recv_threads_finished_id_ len: %d\n", recv_threads_finished_id_.size()); return 0; } void TCPServer::Accepted() { LOG_INFO("TCPServer::Accepted\n"); sigset_t set; sigemptyset(&set); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGINT); sigaddset(&set, SIGTERM); pthread_sigmask(SIG_BLOCK, &set, NULL); struct timeval interval = {1, 0}; setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval)); int reuseTrue = 1; setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseTrue, sizeof(int)); while (!quit_) { int cilent_socket; socklen_t sosize = sizeof(clientAddress); cilent_socket = accept(sockfd, (struct sockaddr*)&clientAddress, &sosize); if (cilent_socket < 0) { if (errno != EAGAIN && errno != EINTR) { LOG_ERROR("Error socket accept failed %d %d\n", cilent_socket, errno); break; } continue; } LOG_DEBUG("socket accept ip %s\n", inet_ntoa(clientAddress.sin_addr)); recv_threads_.push_back( std::unique_ptr(new std::thread(&TCPServer::Recvieve, this, cilent_socket))); LOG_DEBUG("socket accept close\n"); } close(sockfd); sockfd = -1; exited_.store(true, std::memory_order_release); LOG_DEBUG("socket accept exit\n"); } int TCPServer::Process(int port) { exited_.store(false, std::memory_order_release); LOG_DEBUG("TCPServer::Process\n"); int opt = 1; sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { LOG_ERROR("Failed to create socket with tunner"); exited_.store(true, std::memory_order_release); return -1; } memset(&serverAddress, 0, sizeof(serverAddress)); if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) { LOG_ERROR("Error setsockopt\n"); exited_.store(true, std::memory_order_release); return -1; } serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = htonl(INADDR_ANY); serverAddress.sin_port = htons(port); if ((::bind(sockfd, (struct sockaddr*)&serverAddress, sizeof(serverAddress))) < 0) { LOG_ERROR("Error bind\n"); exited_.store(true, std::memory_order_release); return -1; } if (listen(sockfd, 5) < 0) { LOG_ERROR("Error listen\n"); exited_.store(true, std::memory_order_release); return -1; } if (accept_thread_) { // SaveExit(); } quit_.store(false, std::memory_order_release); accept_thread_ = std::unique_ptr(new std::thread(&TCPServer::Accepted, this)); return 0; }