// Copyright 2018 The Fuchsia Authors. All rights reserved.
|
// Use of this source code is governed by a BSD-style license that can be
|
// found in the LICENSE file.
|
|
#include "lib/fidl/cpp/internal/message_reader.h"
|
|
#include <lib/async/default.h>
|
#include <lib/fidl/cpp/message_buffer.h>
|
#include <lib/fidl/epitaph.h>
|
#include <zircon/assert.h>
|
|
namespace fidl {
|
namespace internal {
|
namespace {
|
|
constexpr zx_signals_t kSignals = ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED;
|
|
// |Canary| is a stack-allocated object that observes when a |MessageReader| is
|
// destroyed or unbound from the current channel.
|
//
|
// Because |WaitAndDispatchOneMessageUntil| can be called re-entrantly, we can
|
// be in a state where there are N nested calls to |ReadAndDispatchMessage| on
|
// the stack. While dispatching any of those messages, the client can destroy
|
// the |MessageReader| or unbind it from the current channel. When that happens
|
// we need to stop reading messages from the channel and unwind the stack
|
// safely.
|
//
|
// The |Canary| works by storing a pointer to its |should_stop_| field in the
|
// |MessageReader|. Upon destruction or unbinding, the |MessageReader| writes
|
// |true| into |should_stop_|. When we unwind the stack, the |Canary| forwards
|
// that value to the next |Canary| on the stack.
|
class Canary {
|
public:
|
explicit Canary(bool** should_stop_slot)
|
: should_stop_slot_(should_stop_slot),
|
previous_should_stop_(*should_stop_slot_),
|
should_stop_(false) {
|
*should_stop_slot_ = &should_stop_;
|
}
|
|
~Canary() {
|
if (should_stop_) {
|
// If we should stop, we need to propagate that information to the
|
// |Canary| higher up the stack, if any. We also cannot touch
|
// |*should_stop_slot_| because the |MessageReader| might have been
|
// destroyed (or bound to another channel).
|
if (previous_should_stop_)
|
*previous_should_stop_ = should_stop_;
|
} else {
|
// Otherwise, the |MessageReader| was not destroyed and is still bound to
|
// the same channel. We need to restore the previous |should_stop_|
|
// pointer so that a |Canary| further up the stack can still be informed
|
// about whether to stop.
|
*should_stop_slot_ = previous_should_stop_;
|
}
|
}
|
|
// Whether the |ReadAndDispatchMessage| that created the |Canary| should stop
|
// after dispatching the current message.
|
bool should_stop() const { return should_stop_; }
|
|
private:
|
bool** should_stop_slot_;
|
bool* previous_should_stop_;
|
bool should_stop_;
|
};
|
|
} // namespace
|
|
static_assert(std::is_standard_layout<MessageReader>::value,
|
"We need offsetof to work");
|
|
MessageReader::MessageReader(MessageHandler* message_handler)
|
: wait_{{ASYNC_STATE_INIT},
|
&MessageReader::CallHandler,
|
ZX_HANDLE_INVALID,
|
kSignals},
|
dispatcher_(nullptr),
|
should_stop_(nullptr),
|
message_handler_(message_handler),
|
error_handler_(nullptr) {}
|
|
MessageReader::~MessageReader() {
|
Stop();
|
if (dispatcher_)
|
async_cancel_wait(dispatcher_, &wait_);
|
}
|
|
zx_status_t MessageReader::Bind(zx::channel channel,
|
async_dispatcher_t* dispatcher) {
|
if (is_bound())
|
Unbind();
|
if (!channel)
|
return ZX_OK;
|
channel_ = std::move(channel);
|
if (dispatcher) {
|
dispatcher_ = dispatcher;
|
} else {
|
dispatcher_ = async_get_default_dispatcher();
|
}
|
ZX_ASSERT_MSG(dispatcher_ != nullptr,
|
"either |dispatcher| must be non-null, or "
|
"|async_get_default_dispatcher| must "
|
"be configured to return a non-null value");
|
wait_.object = channel_.get();
|
zx_status_t status = async_begin_wait(dispatcher_, &wait_);
|
if (status != ZX_OK)
|
Unbind();
|
return status;
|
}
|
|
zx::channel MessageReader::Unbind() {
|
if (!is_bound())
|
return zx::channel();
|
Stop();
|
async_cancel_wait(dispatcher_, &wait_);
|
wait_.object = ZX_HANDLE_INVALID;
|
dispatcher_ = nullptr;
|
zx::channel channel = std::move(channel_);
|
if (message_handler_)
|
message_handler_->OnChannelGone();
|
return channel;
|
}
|
|
void MessageReader::Reset() {
|
Unbind();
|
error_handler_ = nullptr;
|
}
|
|
zx_status_t MessageReader::TakeChannelAndErrorHandlerFrom(
|
MessageReader* other) {
|
zx_status_t status = Bind(other->Unbind(), other->dispatcher_);
|
if (status != ZX_OK)
|
return status;
|
error_handler_ = std::move(other->error_handler_);
|
return ZX_OK;
|
}
|
|
zx_status_t MessageReader::WaitAndDispatchOneMessageUntil(zx::time deadline) {
|
if (!is_bound())
|
return ZX_ERR_BAD_STATE;
|
zx_signals_t pending = ZX_SIGNAL_NONE;
|
zx_status_t status = channel_.wait_one(kSignals, deadline, &pending);
|
if (status == ZX_ERR_TIMED_OUT)
|
return status;
|
if (status != ZX_OK) {
|
NotifyError(status);
|
return status;
|
}
|
|
if (pending & ZX_CHANNEL_READABLE) {
|
MessageBuffer buffer;
|
return ReadAndDispatchMessage(&buffer);
|
}
|
|
ZX_DEBUG_ASSERT(pending & ZX_CHANNEL_PEER_CLOSED);
|
NotifyError(ZX_ERR_PEER_CLOSED);
|
return ZX_ERR_PEER_CLOSED;
|
}
|
|
void MessageReader::CallHandler(async_dispatcher_t* dispatcher,
|
async_wait_t* wait, zx_status_t status,
|
const zx_packet_signal_t* signal) {
|
static_assert(offsetof(MessageReader, wait_) == 0,
|
"The wait must be the first member for this cast to be valid.");
|
reinterpret_cast<MessageReader*>(wait)->OnHandleReady(dispatcher, status,
|
signal);
|
}
|
|
void MessageReader::OnHandleReady(async_dispatcher_t* dispatcher,
|
zx_status_t status,
|
const zx_packet_signal_t* signal) {
|
if (status != ZX_OK) {
|
NotifyError(status);
|
return;
|
}
|
|
if (signal->observed & ZX_CHANNEL_READABLE) {
|
MessageBuffer buffer;
|
for (uint64_t i = 0; i < signal->count; i++) {
|
status = ReadAndDispatchMessage(&buffer);
|
// If ReadAndDispatchMessage returns ZX_ERR_STOP, that means the message
|
// handler has destroyed this object and we need to unwind without
|
// touching |this|.
|
if (status == ZX_ERR_SHOULD_WAIT)
|
break;
|
if (status != ZX_OK)
|
return;
|
}
|
status = async_begin_wait(dispatcher, &wait_);
|
if (status != ZX_OK) {
|
NotifyError(status);
|
}
|
return;
|
}
|
|
ZX_DEBUG_ASSERT(signal->observed & ZX_CHANNEL_PEER_CLOSED);
|
// Notice that we don't notify an error until we've drained all the messages
|
// out of the channel.
|
NotifyError(ZX_ERR_PEER_CLOSED);
|
}
|
|
zx_status_t MessageReader::ReadAndDispatchMessage(MessageBuffer* buffer) {
|
Message message = buffer->CreateEmptyMessage();
|
zx_status_t status = message.Read(channel_.get(), 0);
|
if (status == ZX_ERR_SHOULD_WAIT)
|
return status;
|
if (status != ZX_OK) {
|
NotifyError(status);
|
return status;
|
}
|
|
if (message.has_header() && message.ordinal() == FIDL_EPITAPH_ORDINAL) {
|
// This indicates the message is an epitaph, and that any epitaph-friendly
|
// error handlers should be invoked. Note the epitaph error is stored in
|
// the header's reserved word.
|
|
// TODO(FIDL-322): Use a different error code to distinguish remote encoding
|
// errors from local ones.
|
if (message.bytes().actual() != sizeof(fidl_epitaph_t)) {
|
NotifyError(ZX_ERR_INVALID_ARGS);
|
return ZX_ERR_INVALID_ARGS;
|
}
|
fidl_epitaph_t* epitaph = message.GetBytesAs<fidl_epitaph_t>();
|
NotifyError(epitaph->hdr.reserved0);
|
return ZX_ERR_PEER_CLOSED;
|
}
|
|
if (!message_handler_)
|
return ZX_OK;
|
Canary canary(&should_stop_);
|
status = message_handler_->OnMessage(std::move(message));
|
if (canary.should_stop())
|
return ZX_ERR_STOP;
|
if (status != ZX_OK)
|
NotifyError(status);
|
return status;
|
}
|
|
zx_status_t MessageReader::Close(zx_status_t epitaph_value) {
|
if (!is_bound()) {
|
return ZX_ERR_BAD_STATE;
|
}
|
|
zx_status_t status = fidl_epitaph_write(channel_.get(), epitaph_value);
|
if (status != ZX_OK) {
|
return status;
|
}
|
Unbind();
|
return ZX_OK;
|
}
|
|
void MessageReader::NotifyError(zx_status_t epitaph_value) {
|
Unbind();
|
if (error_handler_) {
|
error_handler_(epitaph_value);
|
}
|
}
|
|
void MessageReader::Stop() {
|
if (should_stop_) {
|
*should_stop_ = true;
|
should_stop_ = nullptr;
|
}
|
}
|
|
} // namespace internal
|
} // namespace fidl
|