/* * Copyright (c) 2019-2021 Rockchip Eletronics Co., Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "MessageParser.hpp" #define BUFFER_MAX_SIZE (1024 * 512) #ifndef LOGE #define LOGE printf #endif // #define AIQ_MSGPARSER_CHECK_HASH #define CALC_32BIT_LITTLE(array) \ ((array[0] & 0xff) | ((array[1] & 0xff) << 8) | ((array[2] & 0xff) << 16) | \ ((array[3] & 0xff) << 24)) namespace RkMSG { MessageParser::MessageParser() : is_running(false) {} int MessageParser::stop() { is_running = false; notify_wakeup(); return 0; } MessageParser::~MessageParser() { this->stop(); if (this->proc_thread && this->proc_thread->joinable()) { this->proc_thread->join(); this->proc_thread.reset(); this->proc_thread = nullptr; } if (raw_stream.size()) { raw_stream.clear(); } } int MessageParser::notify_wakeup() { std::unique_lock lck(this->proc_mutex); this->proc_cond.notify_all(); return 0; } int MessageParser::freePacket(void *packet, MessageType type) { if (!packet) { return 0; } if (type == RKAIQ_MESSAGE_NEW) { RkAiqSocketPacket_t *newpkt = (RkAiqSocketPacket_t *)packet; if (newpkt->data) { free(newpkt->data); } free(newpkt); } else if (type == RKAIQ_MESSAGE_OLD) { RkAiqSocketPacket *newpkt = (RkAiqSocketPacket *)packet; if (newpkt->data) { free(newpkt->data); } free(newpkt); } return 0; } void *MessageParser::clonePacket(void *from, MessageType type) { if (type == RKAIQ_MESSAGE_NEW) { RkAiqSocketPacket_t *temp = (RkAiqSocketPacket_t *)from; RkAiqSocketPacket_t *opkt = (RkAiqSocketPacket_t *)malloc(sizeof(RkAiqSocketPacket_t)); if (!opkt) { return nullptr; } memcpy(opkt, temp, sizeof(RkAiqSocketPacket_t)); opkt->data = (uint8_t *)malloc(temp->payload_size); if (!opkt->data) { free(opkt); return nullptr; } memcpy(opkt->data, (uint8_t *)&(temp->data), temp->payload_size); return opkt; } else if (type == RKAIQ_MESSAGE_OLD) { RkAiqSocketPacket *temp = (RkAiqSocketPacket *)from; RkAiqSocketPacket *opkt = (RkAiqSocketPacket *)malloc(sizeof(RkAiqSocketPacket)); if (!opkt) { return nullptr; } memcpy(opkt, temp, sizeof(RkAiqSocketPacket)); opkt->data = (char *)malloc(temp->dataSize); if (!opkt->data) { free(opkt); return nullptr; } memcpy(opkt->data, temp->data, temp->dataSize); return opkt; } return nullptr; } void MessageParser::process() { while (is_running) { std::unique_lock lck(proc_mutex); while (raw_stream.size() <= 0 && is_running) { proc_cond.wait(lck); } if (!is_running) { break; } // Found full packet: deal and call then erease // Found non full packet: wait // Found nothing: clear raw stream size_t start_index = 0; size_t end_index = 0; RkAiqSocketPacket *old_pkt = nullptr; RkAiqSocketPacket_t *new_pkt = nullptr; // Check if a new packet, if so, copy it and erase all ahead data new_pkt = findValidSection(&raw_stream[0], raw_stream.size(), &start_index, &end_index); // if found full packet if (new_pkt && mCallBackFunc) { mCallBackFunc(pri, new_pkt, RKAIQ_MESSAGE_NEW); } if ((start_index >= 0) && (end_index > 0)) { raw_stream.erase(raw_stream.begin(), raw_stream.begin() + end_index); } start_index = 0; end_index = 0; // Check if a new packet, if so, copy it and erase all ahead data old_pkt = findValidSection2(&raw_stream[0], raw_stream.size(), &start_index, &end_index); // if found full packet if (old_pkt && mCallBackFunc) { mCallBackFunc(pri, old_pkt, RKAIQ_MESSAGE_OLD); freePacket(old_pkt, RKAIQ_MESSAGE_OLD); } if ((start_index >= 0) && (end_index > 0)) { raw_stream.erase(raw_stream.begin(), raw_stream.begin() + end_index); } if (!new_pkt && !old_pkt && is_running) { proc_cond.wait(lck); } } LOGE("MessageParser %s loop exit!\n", __func__); } int MessageParser::pushRawData(const uint8_t *data, size_t size) { { int erase_section = 0; const std::lock_guard lock(proc_mutex); // reach max buffer size, erase old if (size > RKAIQ_RAW_STREAM_MAX_SIZE) { erase_section = RKAIQ_RAW_STREAM_MAX_SIZE; } else { erase_section = size; } // do need erase if (raw_stream.size() >= RKAIQ_RAW_STREAM_MAX_SIZE) { raw_stream.erase(raw_stream.begin(), raw_stream.begin() + erase_section); } raw_stream.insert(raw_stream.end(), data, data + size); } notify_wakeup(); return 0; } uint8_t *MessageParser::bit_stream_find(uint8_t *data, int size, const uint8_t *dst, int len) { int start_pos = -1; if (!data || !size || !dst || !len) { return NULL; } if (size < len) { return NULL; } for (start_pos = 0; start_pos < size - len; start_pos++) { if (0 == memcmp(data + start_pos, dst, len)) { return data + start_pos; } } return NULL; } // 1. find valid erase // 2. find error packet // 3. if crashed erase header then search again // 4. erase ahead of first valid data // 5. return null with non zero start en,then packet error RkAiqSocketPacket_t *MessageParser::findValidSection(uint8_t *buffer, int len, size_t *start_of, size_t *end_of) { RkAiqSocketPacket_t *aiq_pkt; uint8_t *start_pos = NULL; size_t skip_size = 0; size_t remain_size = 0; *start_of = 0; *end_of = 0; start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_DATA_HEADER, RKAIQ_SOCKET_DATA_HEADER_LEN); // Found valid start if (NULL == start_pos) { return nullptr; } // Calculate data size skip_size = start_pos - buffer; remain_size = len - skip_size; // Check if contains packet information if (remain_size < (int)sizeof(RkAiqSocketPacket_t)) { LOGE("Not a complete packet [%d], wait more...\n", len); return nullptr; } /* * +---------------------------------------------------------------------+ * |<--------------------------VALID DATA SIZE-------------------------->| * +-------------------+------------------------------+------------------+ * |<---HEADER-SIZE--->|<--------PAYLOAD-SIZE-------->|<---HASH-SIZE---->| * +-------------------+------------------------------+------------------+ * | HEADER DATA | REAL DATA/CMD | VERIFY DATA | * +-------------------+------------------------------+------------------+ * * */ // Found complete packet header, then parse packet info aiq_pkt = (RkAiqSocketPacket_t *)start_pos; // Assume Single packet, check if data all present if (remain_size < aiq_pkt->packet_size) { return nullptr; } *start_of = start_pos - buffer; *end_of = *start_of + aiq_pkt->payload_size + RKAIQ_SOCKET_DATA_EXTRA_SIZE; #ifdef AIQ_MSGPARSER_CHECK_HASH #endif return (RkAiqSocketPacket_t *)clonePacket(aiq_pkt, RKAIQ_MESSAGE_NEW); } RkAiqSocketPacket *MessageParser::findValidSection2(uint8_t *buffer, int len, size_t *start_of, size_t *end_of) { RkAiqSocketPacket *aiq_pkt; uint8_t *start_pos = NULL; size_t skip_size = 0; size_t remain_size = 0; size_t pkt_size = 0; *start_of = 0; *end_of = 0; start_pos = bit_stream_find(buffer, len, RKAIQ_SOCKET_OLD_HEADER, RKAIQ_SOCKET_OLD_HEADER_LEN); // Found valid start if (NULL == start_pos) { return nullptr; } // Calculate data size skip_size = start_pos - buffer; remain_size = len - skip_size; // Check if contains packet information if (remain_size < (int)sizeof(RkAiqSocketPacket_t)) { LOGE("Not a complete packet [%d], wait more...\n", len); return nullptr; } // Found complete packet header, then parse packet info aiq_pkt = (RkAiqSocketPacket *)start_pos; pkt_size = CALC_32BIT_LITTLE(aiq_pkt->packetSize); // Assume Single packet, check if data all present if (remain_size < pkt_size) { return nullptr; } char *tmpArray = (char *)start_pos; int packetSize = (tmpArray[2] & 0xff) | ((tmpArray[3] & 0xff) << 8) | ((tmpArray[4] & 0xff) << 16) | ((tmpArray[5] & 0xff) << 24); if (packetSize >= 102400) { LOGE("MessageParser %s: packetSize error!\n", __func__); return nullptr; } char *receivedPacket = (char *)malloc(packetSize); memset(receivedPacket, 0, packetSize); memcpy(receivedPacket, tmpArray, packetSize); // parse data RkAiqSocketPacket receivedData; int offset = 0; offset += 2; // packetSize memcpy(receivedData.packetSize, receivedPacket + offset, 4); offset += 4; // command id memcpy((void *)&(receivedData.commandID), receivedPacket + offset, sizeof(int)); offset += sizeof(int); // command id memcpy((void *)&(receivedData.commandResult), receivedPacket + offset, sizeof(int)); offset += sizeof(int); // data size memcpy((void *)&(receivedData.dataSize), receivedPacket + offset, sizeof(unsigned int)); offset += sizeof(unsigned int); // data receivedData.data = (char *)start_pos + offset; offset += receivedData.dataSize; // data hash memcpy((void *)&(receivedData.dataHash), receivedPacket + offset, sizeof(unsigned int)); free(receivedPacket); receivedPacket = NULL; // hash check unsigned int dataHash = MurMurHash(receivedData.data, receivedData.dataSize); // got wrong, discard these data if (dataHash != receivedData.dataHash) { *start_of = 0; *end_of = 0; return nullptr; } *start_of = start_pos - buffer; *end_of = *start_of + pkt_size; return (RkAiqSocketPacket *)clonePacket(&receivedData, RKAIQ_MESSAGE_OLD); } unsigned int MessageParser::MurMurHash(const void *key, int len) { const unsigned int m = 0x5bd1e995; const int r = 24; const int seed = 97; unsigned int h = seed ^ len; // Mix 4 bytes at a time into the hash const unsigned char *data = (const unsigned char *)key; while (len >= 4) { unsigned int k = *(unsigned int *)data; k *= m; k ^= k >> r; k *= m; h *= m; h ^= k; data += 4; len -= 4; } // Handle the last few bytes of the input array switch (len) { case 3: h ^= data[2] << 16; case 2: h ^= data[1] << 8; case 1: h ^= data[0]; h *= m; }; // Do a few final mixes of the hash to ensure the last few // bytes are well-incorporated. h ^= h >> 13; h *= m; h ^= h >> 15; return h; } int MessageParser::start() { const std::lock_guard lock(proc_mutex); if (is_running) { return -1; } is_running = true; proc_thread = std::make_shared(&RkMSG::MessageParser::process, this); return 0; } } // namespace RkMSG