#include "tcp_server.h"
|
|
#include <atomic>
|
#include <net/if.h>
|
#include <pthread.h>
|
#include <signal.h>
|
|
#ifdef LOG_TAG
|
#undef LOG_TAG
|
#endif
|
#define LOG_TAG "aiqtool"
|
|
TCPServer::~TCPServer()
|
{
|
SaveExit();
|
}
|
|
void TCPServer::SaveExit()
|
{
|
quit_.store(true, std::memory_order_release);
|
if (accept_thread_ && accept_thread_->joinable())
|
accept_thread_->join();
|
std::for_each(recv_threads_.begin(), recv_threads_.end(), [](const std::unique_ptr<std::thread>& thrd) {
|
if (thrd && thrd->joinable())
|
thrd->join();
|
});
|
close(sockfd);
|
sockfd = -1;
|
}
|
|
int TCPServer::Send(int cilent_socket, char* buff, int size)
|
{
|
return send(cilent_socket, buff, size, 0);
|
}
|
|
int TCPServer::Recvieve(int cilent_socket)
|
{
|
sigset_t set;
|
sigemptyset(&set);
|
sigaddset(&set, SIGQUIT);
|
sigaddset(&set, SIGINT);
|
sigaddset(&set, SIGTERM);
|
pthread_sigmask(SIG_BLOCK, &set, NULL);
|
|
std::thread::id threadID = std::this_thread::get_id();
|
LOG_DEBUG("TCPServer::Recvieve enter %d\n", cilent_socket);
|
char buffer[MAXPACKETSIZE];
|
int size = sizeof(buffer);
|
struct timeval interval = {3, 0};
|
setsockopt(cilent_socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval));
|
while (!quit_.load()) {
|
int length = recv(cilent_socket, buffer, size, 0);
|
if (length == 0) {
|
LOG_DEBUG("socket recvieve exit\n");
|
break;
|
} else if (length < 0 && errno == EAGAIN) {
|
// LOG_INFO("socket recvieve failed\n");
|
continue;
|
} else if (length < 0) {
|
break;
|
}
|
LOG_DEBUG("socket recvieve length: %d\n", length);
|
|
if (callback_) {
|
callback_(cilent_socket, buffer, length);
|
}
|
}
|
|
LOG_DEBUG("TCPServer::Recvieve exit %d\n", cilent_socket);
|
close(cilent_socket);
|
cilent_socket = -1;
|
//
|
recv_threads_finished_id_.push_back(threadID);
|
LOG_DEBUG("TCPServer::recv_threads_finished_id_ len: %d\n", recv_threads_finished_id_.size());
|
return 0;
|
}
|
|
void TCPServer::Accepted()
|
{
|
LOG_INFO("TCPServer::Accepted\n");
|
sigset_t set;
|
sigemptyset(&set);
|
sigaddset(&set, SIGQUIT);
|
sigaddset(&set, SIGINT);
|
sigaddset(&set, SIGTERM);
|
pthread_sigmask(SIG_BLOCK, &set, NULL);
|
|
struct timeval interval = {1, 0};
|
setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char*)&interval, sizeof(struct timeval));
|
int reuseTrue = 1;
|
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuseTrue, sizeof(int));
|
while (!quit_) {
|
int cilent_socket;
|
socklen_t sosize = sizeof(clientAddress);
|
cilent_socket = accept(sockfd, (struct sockaddr*)&clientAddress, &sosize);
|
|
if (cilent_socket < 0) {
|
if (errno != EAGAIN && errno != EINTR) {
|
LOG_ERROR("Error socket accept failed %d %d\n", cilent_socket, errno);
|
break;
|
}
|
continue;
|
}
|
LOG_DEBUG("socket accept ip %s\n", inet_ntoa(clientAddress.sin_addr));
|
|
recv_threads_.push_back(
|
std::unique_ptr<std::thread>(new std::thread(&TCPServer::Recvieve, this, cilent_socket)));
|
LOG_DEBUG("socket accept close\n");
|
}
|
close(sockfd);
|
sockfd = -1;
|
exited_.store(true, std::memory_order_release);
|
LOG_DEBUG("socket accept exit\n");
|
}
|
|
int TCPServer::Process(int port)
|
{
|
exited_.store(false, std::memory_order_release);
|
LOG_DEBUG("TCPServer::Process\n");
|
int opt = 1;
|
sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
if (sockfd < 0) {
|
LOG_ERROR("Failed to create socket with tunner");
|
exited_.store(true, std::memory_order_release);
|
return -1;
|
}
|
|
memset(&serverAddress, 0, sizeof(serverAddress));
|
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
|
LOG_ERROR("Error setsockopt\n");
|
exited_.store(true, std::memory_order_release);
|
return -1;
|
}
|
|
serverAddress.sin_family = AF_INET;
|
serverAddress.sin_addr.s_addr = htonl(INADDR_ANY);
|
serverAddress.sin_port = htons(port);
|
if ((::bind(sockfd, (struct sockaddr*)&serverAddress, sizeof(serverAddress))) < 0) {
|
LOG_ERROR("Error bind\n");
|
exited_.store(true, std::memory_order_release);
|
return -1;
|
}
|
if (listen(sockfd, 5) < 0) {
|
LOG_ERROR("Error listen\n");
|
exited_.store(true, std::memory_order_release);
|
return -1;
|
}
|
|
if (accept_thread_) {
|
// SaveExit();
|
}
|
quit_.store(false, std::memory_order_release);
|
accept_thread_ = std::unique_ptr<std::thread>(new std::thread(&TCPServer::Accepted, this));
|
|
return 0;
|
}
|