// Copyright 2015 The Chromium Authors. All rights reserved.
|
// Use of this source code is governed by a BSD-style license that can be
|
// found in the LICENSE file.
|
|
#include <stddef.h>
|
#include <stdint.h>
|
|
#include <memory>
|
|
#include "base/bind.h"
|
#include "base/location.h"
|
#include "base/logging.h"
|
#include "base/macros.h"
|
#include "base/message_loop/message_loop.h"
|
#include "base/run_loop.h"
|
#include "build/build_config.h"
|
#include "mojo/core/embedder/embedder.h"
|
#include "mojo/core/test/mojo_test_base.h"
|
#include "mojo/core/test_utils.h"
|
#include "mojo/public/c/system/data_pipe.h"
|
#include "mojo/public/c/system/functions.h"
|
#include "mojo/public/c/system/message_pipe.h"
|
#include "mojo/public/cpp/system/message_pipe.h"
|
#include "mojo/public/cpp/system/simple_watcher.h"
|
#include "testing/gtest/include/gtest/gtest.h"
|
|
namespace mojo {
|
namespace core {
|
namespace {
|
|
const uint32_t kSizeOfOptions =
|
static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
|
|
// In various places, we have to poll (since, e.g., we can't yet wait for a
|
// certain amount of data to be available). This is the maximum number of
|
// iterations (separated by a short sleep).
|
// TODO(vtl): Get rid of this.
|
const size_t kMaxPoll = 100;
|
|
// Used in Multiprocess test.
|
const size_t kMultiprocessCapacity = 37;
|
const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
|
const int kMultiprocessMaxIter = 5;
|
|
// TODO(rockot): There are many uses of ASSERT where EXPECT would be more
|
// appropriate. Fix this.
|
|
class DataPipeTest : public test::MojoTestBase {
|
public:
|
DataPipeTest()
|
: producer_(MOJO_HANDLE_INVALID), consumer_(MOJO_HANDLE_INVALID) {}
|
|
~DataPipeTest() override {
|
if (producer_ != MOJO_HANDLE_INVALID)
|
CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
|
if (consumer_ != MOJO_HANDLE_INVALID)
|
CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
|
}
|
|
MojoResult ReadEmptyMessageWithHandles(MojoHandle pipe,
|
MojoHandle* out_handles,
|
uint32_t num_handles) {
|
std::vector<uint8_t> bytes;
|
std::vector<ScopedHandle> handles;
|
MojoResult rv = ReadMessageRaw(MessagePipeHandle(pipe), &bytes, &handles,
|
MOJO_READ_MESSAGE_FLAG_NONE);
|
if (rv == MOJO_RESULT_OK) {
|
CHECK_EQ(0u, bytes.size());
|
CHECK_EQ(num_handles, handles.size());
|
for (size_t i = 0; i < num_handles; ++i)
|
out_handles[i] = handles[i].release().value();
|
}
|
return rv;
|
}
|
|
MojoResult Create(const MojoCreateDataPipeOptions* options) {
|
return MojoCreateDataPipe(options, &producer_, &consumer_);
|
}
|
|
MojoResult WriteData(const void* elements,
|
uint32_t* num_bytes,
|
bool all_or_none = false) {
|
MojoWriteDataOptions options;
|
options.struct_size = sizeof(options);
|
options.flags = all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
|
: MOJO_WRITE_DATA_FLAG_NONE;
|
return MojoWriteData(producer_, elements, num_bytes, &options);
|
}
|
|
MojoResult ReadData(void* elements,
|
uint32_t* num_bytes,
|
bool all_or_none = false,
|
bool peek = false) {
|
MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
|
if (all_or_none)
|
flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
|
if (peek)
|
flags |= MOJO_READ_DATA_FLAG_PEEK;
|
|
MojoReadDataOptions options;
|
options.struct_size = sizeof(options);
|
options.flags = flags;
|
return MojoReadData(consumer_, &options, elements, num_bytes);
|
}
|
|
MojoResult QueryData(uint32_t* num_bytes) {
|
MojoReadDataOptions options;
|
options.struct_size = sizeof(options);
|
options.flags = MOJO_READ_DATA_FLAG_QUERY;
|
return MojoReadData(consumer_, &options, nullptr, num_bytes);
|
}
|
|
MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
|
MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
|
if (all_or_none)
|
flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
|
MojoReadDataOptions options;
|
options.struct_size = sizeof(options);
|
options.flags = flags;
|
return MojoReadData(consumer_, &options, nullptr, num_bytes);
|
}
|
|
MojoResult BeginReadData(const void** elements, uint32_t* num_bytes) {
|
return MojoBeginReadData(consumer_, nullptr, elements, num_bytes);
|
}
|
|
MojoResult EndReadData(uint32_t num_bytes_read) {
|
return MojoEndReadData(consumer_, num_bytes_read, nullptr);
|
}
|
|
MojoResult BeginWriteData(void** elements, uint32_t* num_bytes) {
|
return MojoBeginWriteData(producer_, nullptr, elements, num_bytes);
|
}
|
|
MojoResult EndWriteData(uint32_t num_bytes_written) {
|
return MojoEndWriteData(producer_, num_bytes_written, nullptr);
|
}
|
|
MojoResult CloseProducer() {
|
MojoResult rv = MojoClose(producer_);
|
producer_ = MOJO_HANDLE_INVALID;
|
return rv;
|
}
|
|
MojoResult CloseConsumer() {
|
MojoResult rv = MojoClose(consumer_);
|
consumer_ = MOJO_HANDLE_INVALID;
|
return rv;
|
}
|
|
MojoHandle producer_, consumer_;
|
|
private:
|
DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
|
};
|
|
TEST_F(DataPipeTest, Basic) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
|
// We can write to a data pipe handle immediately.
|
int32_t elements[10] = {};
|
uint32_t num_bytes = 0;
|
|
num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
|
|
elements[0] = 123;
|
elements[1] = 456;
|
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
|
|
// Now wait for the other side to become readable.
|
MojoHandleSignalsState state;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &state));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
state.satisfied_signals);
|
|
elements[0] = -1;
|
elements[1] = -1;
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
|
ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
|
ASSERT_EQ(elements[0], 123);
|
ASSERT_EQ(elements[1], 456);
|
}
|
|
// Tests creation of data pipes with various (valid) options.
|
TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
|
MojoCreateDataPipeOptions test_options[] = {
|
// Default options.
|
{},
|
// Trivial element size, non-default capacity.
|
{kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1, // |element_num_bytes|.
|
1000}, // |capacity_num_bytes|.
|
// Nontrivial element size, non-default capacity.
|
{kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
4, // |element_num_bytes|.
|
4000}, // |capacity_num_bytes|.
|
// Nontrivial element size, default capacity.
|
{kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
100, // |element_num_bytes|.
|
0} // |capacity_num_bytes|.
|
};
|
for (size_t i = 0; i < arraysize(test_options); i++) {
|
MojoHandle producer_handle, consumer_handle;
|
MojoCreateDataPipeOptions* options = i ? &test_options[i] : nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
|
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
|
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
|
}
|
}
|
|
TEST_F(DataPipeTest, SimpleReadWrite) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
int32_t elements[10] = {};
|
uint32_t num_bytes = 0;
|
|
// Try reading; nothing there yet.
|
num_bytes = static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
|
|
// Query; nothing there yet.
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
|
// Discard; nothing there yet.
|
num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
|
|
// Read with invalid |num_bytes|.
|
num_bytes = sizeof(elements[0]) + 1;
|
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
|
|
// Write two elements.
|
elements[0] = 123;
|
elements[1] = 456;
|
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
|
// It should have written everything (even without "all or none").
|
ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
|
|
// Wait.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Query.
|
// TODO(vtl): It's theoretically possible (though not with the current
|
// implementation/configured limits) that not all the data has arrived yet.
|
// (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
|
// or |2 * ...|.)
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
|
|
// Read one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
|
ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
|
ASSERT_EQ(123, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Query.
|
// TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
|
// should get 1 here.)
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
|
|
// Peek one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
|
ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
|
ASSERT_EQ(456, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Query. Still has 1 element remaining.
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
|
|
// Try to read two elements, with "all or none".
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
|
ReadData(elements, &num_bytes, true, false));
|
ASSERT_EQ(-1, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Try to read two elements, without "all or none".
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
|
ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
|
ASSERT_EQ(456, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Query.
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
}
|
|
// Note: The "basic" waiting tests test that the "wait states" are correct in
|
// various situations; they don't test that waiters are properly awoken on state
|
// changes. (For that, we need to use multiple threads.)
|
TEST_F(DataPipeTest, BasicProducerWaiting) {
|
// Note: We take advantage of the fact that current for current
|
// implementations capacities are strict maximums. This is not guaranteed by
|
// the API.
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
2 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
Create(&options);
|
MojoHandleSignalsState hss;
|
|
// Never readable. Already writable.
|
hss = GetSignalsState(producer_);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Write two elements.
|
int32_t elements[2] = {123, 456};
|
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
|
ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
|
|
// Wait for data to become available to the consumer.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Peek one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
ASSERT_EQ(123, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Read one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
ASSERT_EQ(123, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Try writing, using a two-phase write.
|
void* buffer = nullptr;
|
num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
|
EXPECT_TRUE(buffer);
|
ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
|
|
static_cast<int32_t*>(buffer)[0] = 789;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
EndWriteData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
|
|
// Read one element, using a two-phase read.
|
const void* read_buffer = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
|
EXPECT_TRUE(read_buffer);
|
// The two-phase read should be able to read at least one element.
|
ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
|
ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
|
ASSERT_EQ(MOJO_RESULT_OK,
|
EndReadData(static_cast<uint32_t>(1u * sizeof(elements[0]))));
|
|
// Write one element.
|
elements[0] = 123;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
|
// Close the consumer.
|
CloseConsumer();
|
|
// It should now be never-writable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
}
|
|
TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
2 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Close the consumer.
|
CloseConsumer();
|
|
// It should be signaled.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
}
|
|
TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
2 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Close the producer.
|
CloseProducer();
|
|
// It should be signaled.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
}
|
|
TEST_F(DataPipeTest, BasicConsumerWaiting) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Never writable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
|
EXPECT_EQ(0u, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Write two elements.
|
int32_t elements[2] = {123, 456};
|
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
|
|
// Wait for readability.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Discard one element.
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
|
// Should still be readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Peek one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
|
ASSERT_EQ(456, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Should still be readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Read one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
ASSERT_EQ(456, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Write one element.
|
elements[0] = 789;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
|
|
// Waiting should now succeed.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Close the producer.
|
CloseProducer();
|
|
// Should still be readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_TRUE(hss.satisfied_signals & (MOJO_HANDLE_SIGNAL_READABLE |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfiable_signals);
|
|
// Wait for the peer closed signal.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfiable_signals);
|
|
// Read one element.
|
elements[0] = -1;
|
elements[1] = -1;
|
num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
ASSERT_EQ(789, elements[0]);
|
ASSERT_EQ(-1, elements[1]);
|
|
// Should be never-readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
}
|
|
TEST_F(DataPipeTest, ConsumerNewDataReadable) {
|
const MojoCreateDataPipeOptions create_options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
EXPECT_EQ(MOJO_RESULT_OK, Create(&create_options));
|
|
int32_t elements[2] = {123, 456};
|
uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
|
EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
|
|
// The consumer handle should appear to be readable and have new data.
|
EXPECT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
|
EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
|
|
// Now try to read a minimum of 6 elements.
|
int32_t read_elements[6];
|
uint32_t num_read_bytes = sizeof(read_elements);
|
MojoReadDataOptions read_options;
|
read_options.struct_size = sizeof(read_options);
|
read_options.flags = MOJO_READ_DATA_FLAG_ALL_OR_NONE;
|
EXPECT_EQ(
|
MOJO_RESULT_OUT_OF_RANGE,
|
MojoReadData(consumer_, &read_options, read_elements, &num_read_bytes));
|
|
// The consumer should still appear to be readable but not with new data.
|
EXPECT_TRUE(GetSignalsState(consumer_).satisfied_signals &
|
MOJO_HANDLE_SIGNAL_READABLE);
|
EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
|
|
// Write four more elements.
|
EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
|
EXPECT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
|
|
// The consumer handle should once again appear to be readable.
|
EXPECT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE));
|
|
// Try again to read a minimum of 6 elements. Should succeed this time.
|
EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(consumer_, &read_options,
|
read_elements, &num_read_bytes));
|
|
// And now the consumer is unreadable.
|
EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
|
MOJO_HANDLE_SIGNAL_READABLE);
|
EXPECT_FALSE(GetSignalsState(consumer_).satisfied_signals &
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE);
|
}
|
|
// Test with two-phase APIs and also closing the producer with an active
|
// consumer waiter.
|
TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Write two elements.
|
int32_t* elements = nullptr;
|
void* buffer = nullptr;
|
// Request room for three (but we'll only write two).
|
uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
|
EXPECT_TRUE(buffer);
|
EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
|
elements = static_cast<int32_t*>(buffer);
|
elements[0] = 123;
|
elements[1] = 456;
|
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
|
|
// Wait for readability.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Read one element.
|
// Two should be available, but only read one.
|
const void* read_buffer = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
|
EXPECT_TRUE(read_buffer);
|
ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
|
const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
|
ASSERT_EQ(123, read_elements[0]);
|
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
|
|
// Should still be readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Read one element.
|
// Request three, but not in all-or-none mode.
|
read_buffer = nullptr;
|
num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
|
EXPECT_TRUE(read_buffer);
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
|
read_elements = static_cast<const int32_t*>(read_buffer);
|
ASSERT_EQ(456, read_elements[0]);
|
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
|
|
// Close the producer.
|
CloseProducer();
|
|
// Should be never-readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
}
|
|
// Tests that data pipes aren't writable/readable during two-phase writes/reads.
|
TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// It should be writable.
|
hss = GetSignalsState(producer_);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
|
void* write_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
|
EXPECT_TRUE(write_ptr);
|
EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
|
|
// At this point, it shouldn't be writable.
|
hss = GetSignalsState(producer_);
|
ASSERT_EQ(0u, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// It shouldn't be readable yet either (we'll wait later).
|
hss = GetSignalsState(consumer_);
|
ASSERT_EQ(0u, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
static_cast<int32_t*>(write_ptr)[0] = 123;
|
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
|
|
// It should immediately be writable again.
|
hss = GetSignalsState(producer_);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// It should become readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Start another two-phase write and check that it's readable even in the
|
// middle of it.
|
num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
|
write_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
|
EXPECT_TRUE(write_ptr);
|
EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
|
|
// It should be readable.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// End the two-phase write without writing anything.
|
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
|
|
// Start a two-phase read.
|
num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
|
const void* read_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
|
EXPECT_TRUE(read_ptr);
|
ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
|
|
// At this point, it should still be writable.
|
hss = GetSignalsState(producer_);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// But not readable.
|
hss = GetSignalsState(consumer_);
|
ASSERT_EQ(0u, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// End the two-phase read without reading anything.
|
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
|
|
// It should be readable again.
|
hss = GetSignalsState(consumer_);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
}
|
|
void Seq(int32_t start, size_t count, int32_t* out) {
|
for (size_t i = 0; i < count; i++)
|
out[i] = start + static_cast<int32_t>(i);
|
}
|
|
TEST_F(DataPipeTest, AllOrNone) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
10 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Try writing more than the total capacity of the pipe.
|
uint32_t num_bytes = 20u * sizeof(int32_t);
|
int32_t buffer[100];
|
Seq(0, arraysize(buffer), buffer);
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
|
|
// Should still be empty.
|
num_bytes = ~0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
|
// Write some data.
|
num_bytes = 5u * sizeof(int32_t);
|
Seq(100, arraysize(buffer), buffer);
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
|
ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
|
|
// Wait for data.
|
// TODO(vtl): There's no real guarantee that all the data will become
|
// available at once (except that in current implementations, with reasonable
|
// limits, it will). Eventually, we'll be able to wait for a specified amount
|
// of data to become available.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Half full.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
|
|
// Try writing more than the available capacity of the pipe, but less than the
|
// total capacity.
|
num_bytes = 6u * sizeof(int32_t);
|
Seq(200, arraysize(buffer), buffer);
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
|
|
// Try reading too much.
|
num_bytes = 11u * sizeof(int32_t);
|
memset(buffer, 0xab, sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
|
int32_t expected_buffer[100];
|
memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
|
// Try discarding too much.
|
num_bytes = 11u * sizeof(int32_t);
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
|
|
// Just a little.
|
num_bytes = 2u * sizeof(int32_t);
|
Seq(300, arraysize(buffer), buffer);
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
|
ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
|
|
// Just right.
|
num_bytes = 3u * sizeof(int32_t);
|
Seq(400, arraysize(buffer), buffer);
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
|
ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
|
|
// TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
|
// specified amount of data to be available, so poll.
|
for (size_t i = 0; i < kMaxPoll; i++) {
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
if (num_bytes >= 10u * sizeof(int32_t))
|
break;
|
|
test::Sleep(test::EpsilonDeadline());
|
}
|
ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
|
|
// Read half.
|
num_bytes = 5u * sizeof(int32_t);
|
memset(buffer, 0xab, sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
|
ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
|
memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
Seq(100, 5, expected_buffer);
|
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
|
// Try reading too much again.
|
num_bytes = 6u * sizeof(int32_t);
|
memset(buffer, 0xab, sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
|
memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
|
// Try discarding too much again.
|
num_bytes = 6u * sizeof(int32_t);
|
ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
|
|
// Discard a little.
|
num_bytes = 2u * sizeof(int32_t);
|
ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
|
ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
|
|
// Three left.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
|
|
// Close the producer, then test producer-closed cases.
|
CloseProducer();
|
|
// Wait.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
hss.satisfiable_signals);
|
|
// Try reading too much; "failed precondition" since the producer is closed.
|
num_bytes = 4u * sizeof(int32_t);
|
memset(buffer, 0xab, sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
ReadData(buffer, &num_bytes, true));
|
memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
|
// Try discarding too much; "failed precondition" again.
|
num_bytes = 4u * sizeof(int32_t);
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
|
|
// Read a little.
|
num_bytes = 2u * sizeof(int32_t);
|
memset(buffer, 0xab, sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
|
ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
|
memset(expected_buffer, 0xab, sizeof(expected_buffer));
|
Seq(400, 2, expected_buffer);
|
ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
|
|
// Discard the remaining element.
|
num_bytes = 1u * sizeof(int32_t);
|
ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
|
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
|
|
// Empty again.
|
num_bytes = ~0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
}
|
|
// Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
|
// respectively, as much as possible, even if it may have to "wrap around" the
|
// internal circular buffer. (Note that the two-phase write and read need not do
|
// this.)
|
TEST_F(DataPipeTest, WrapAround) {
|
unsigned char test_data[1000];
|
for (size_t i = 0; i < arraysize(test_data); i++)
|
test_data[i] = static_cast<unsigned char>(i);
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
100u // |capacity_num_bytes|.
|
};
|
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Write 20 bytes.
|
uint32_t num_bytes = 20u;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
|
ASSERT_EQ(20u, num_bytes);
|
|
// Wait for data.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Read 10 bytes.
|
unsigned char read_buffer[1000] = {0};
|
num_bytes = 10u;
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
|
ASSERT_EQ(10u, num_bytes);
|
ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
|
|
// Check that a two-phase write can now only write (at most) 80 bytes. (This
|
// checks an implementation detail; this behavior is not guaranteed.)
|
void* write_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(write_buffer_ptr);
|
ASSERT_EQ(80u, num_bytes);
|
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
|
|
size_t total_num_bytes = 0;
|
while (total_num_bytes < 90) {
|
// Wait to write.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
|
ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
|
ASSERT_EQ(hss.satisfiable_signals, MOJO_HANDLE_SIGNAL_WRITABLE |
|
MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE);
|
|
// Write as much as we can.
|
num_bytes = 100;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
|
total_num_bytes += num_bytes;
|
}
|
|
ASSERT_EQ(90u, total_num_bytes);
|
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(100u, num_bytes);
|
|
// Check that a two-phase read can now only read (at most) 90 bytes. (This
|
// checks an implementation detail; this behavior is not guaranteed.)
|
const void* read_buffer_ptr = nullptr;
|
num_bytes = 0;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(read_buffer_ptr);
|
ASSERT_EQ(90u, num_bytes);
|
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
|
|
// Read as much as possible. We should read 100 bytes.
|
num_bytes =
|
static_cast<uint32_t>(arraysize(read_buffer) * sizeof(read_buffer[0]));
|
memset(read_buffer, 0, num_bytes);
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
|
ASSERT_EQ(100u, num_bytes);
|
ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
|
}
|
|
// Tests the behavior of writing (simple and two-phase), closing the producer,
|
// then reading (simple and two-phase).
|
TEST_F(DataPipeTest, WriteCloseProducerRead) {
|
const char kTestData[] = "hello world";
|
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
1000u // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
|
// Write some data, so we'll have something to read.
|
uint32_t num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Write it again, so we'll have something left over.
|
num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Start two-phase write.
|
void* write_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(write_buffer_ptr);
|
EXPECT_GT(num_bytes, 0u);
|
|
// TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
|
for (size_t i = 0; i < kMaxPoll; i++) {
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
if (num_bytes >= 2u * kTestDataSize)
|
break;
|
|
test::Sleep(test::EpsilonDeadline());
|
}
|
ASSERT_EQ(2u * kTestDataSize, num_bytes);
|
|
// Start two-phase read.
|
const void* read_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(read_buffer_ptr);
|
ASSERT_EQ(2u * kTestDataSize, num_bytes);
|
|
// Close the producer.
|
CloseProducer();
|
|
// The consumer can finish its two-phase read.
|
ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
|
ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
|
|
// And start another.
|
read_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(read_buffer_ptr);
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
}
|
|
// Tests the behavior of interrupting a two-phase read and write by closing the
|
// consumer.
|
TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
|
const char kTestData[] = "hello world";
|
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
1000u // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Write some data, so we'll have something to read.
|
uint32_t num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Start two-phase write.
|
void* write_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(write_buffer_ptr);
|
ASSERT_GT(num_bytes, kTestDataSize);
|
|
// Wait for data.
|
// TODO(vtl): (See corresponding TODO in AllOrNone.)
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Start two-phase read.
|
const void* read_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(read_buffer_ptr);
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Close the consumer.
|
CloseConsumer();
|
|
// Wait for producer to know that the consumer is closed.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
|
|
// Actually write some data. (Note: Premature freeing of the buffer would
|
// probably only be detected under ASAN or similar.)
|
memcpy(write_buffer_ptr, kTestData, kTestDataSize);
|
// Note: Even though the consumer has been closed, ending the two-phase
|
// write will report success.
|
ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
|
|
// But trying to write should result in failure.
|
num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
|
|
// As will trying to start another two-phase write.
|
write_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
BeginWriteData(&write_buffer_ptr, &num_bytes));
|
}
|
|
// Tests the behavior of "interrupting" a two-phase write by closing both the
|
// producer and the consumer.
|
TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
|
const uint32_t kTestDataSize = 15u;
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
1000u // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
|
// Start two-phase write.
|
void* write_buffer_ptr = nullptr;
|
uint32_t num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
|
EXPECT_TRUE(write_buffer_ptr);
|
ASSERT_GT(num_bytes, kTestDataSize);
|
}
|
|
// Tests the behavior of writing, closing the producer, and then reading (with
|
// and without data remaining).
|
TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
|
const char kTestData[] = "hello world";
|
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
1000u // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Write some data, so we'll have something to read.
|
uint32_t num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Close the producer.
|
CloseProducer();
|
|
// Wait. (Note that once the consumer knows that the producer is closed, it
|
// must also know about all the data that was sent.)
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfiable_signals);
|
|
// Peek that data.
|
char buffer[1000];
|
num_bytes = static_cast<uint32_t>(sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
|
|
// Read that data.
|
memset(buffer, 0, 1000);
|
num_bytes = static_cast<uint32_t>(sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
|
|
// A second read should fail.
|
num_bytes = static_cast<uint32_t>(sizeof(buffer));
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
|
|
// A two-phase read should also fail.
|
const void* read_buffer_ptr = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
BeginReadData(&read_buffer_ptr, &num_bytes));
|
|
// Ditto for discard.
|
num_bytes = 10u;
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
|
}
|
|
// Test that during a two phase read the memory stays valid even if more data
|
// comes in.
|
TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
|
const char kTestData[] = "hello world";
|
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
1000u // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Write some data.
|
uint32_t num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Wait for the data.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Begin a two-phase read.
|
const void* read_buffer_ptr = nullptr;
|
uint32_t read_buffer_size = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
|
|
// Write more data.
|
const char kExtraData[] = "bye world";
|
const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
|
num_bytes = kExtraDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
|
ASSERT_EQ(kExtraDataSize, num_bytes);
|
|
// Close the producer.
|
CloseProducer();
|
|
// Wait. (Note that once the consumer knows that the producer is closed, it
|
// must also have received the extra data).
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfiable_signals);
|
|
// Read the two phase memory to check it's still valid.
|
ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
|
EndReadData(read_buffer_size);
|
}
|
|
// Test that two-phase reads/writes behave correctly when given invalid
|
// arguments.
|
TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
10 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// No data.
|
uint32_t num_bytes = 1000u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
|
// Try "ending" a two-phase write when one isn't active.
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
|
EndWriteData(1u * sizeof(int32_t)));
|
|
// Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
|
// have time to propagate.
|
test::Sleep(test::EpsilonDeadline());
|
|
// Still no data.
|
num_bytes = 1000u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
|
// Try ending a two-phase write with an invalid amount (too much).
|
num_bytes = 0u;
|
void* write_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
|
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
|
EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
|
|
// But the two-phase write still ended.
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
|
|
// Wait a bit (as above).
|
test::Sleep(test::EpsilonDeadline());
|
|
// Still no data.
|
num_bytes = 1000u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
|
// Try ending a two-phase write with an invalid amount (not a multiple of the
|
// element size).
|
num_bytes = 0u;
|
write_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
|
EXPECT_GE(num_bytes, 1u);
|
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
|
|
// But the two-phase write still ended.
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
|
|
// Wait a bit (as above).
|
test::Sleep(test::EpsilonDeadline());
|
|
// Still no data.
|
num_bytes = 1000u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(0u, num_bytes);
|
|
// Now write some data, so we'll be able to try reading.
|
int32_t element = 123;
|
num_bytes = 1u * sizeof(int32_t);
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
|
|
// Wait for data.
|
// TODO(vtl): (See corresponding TODO in AllOrNone.)
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// One element available.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
|
|
// Try "ending" a two-phase read when one isn't active.
|
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
|
|
// Still one element available.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
|
|
// Try ending a two-phase read with an invalid amount (too much).
|
num_bytes = 0u;
|
const void* read_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
|
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
|
EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
|
|
// Still one element available.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
|
|
// Try ending a two-phase read with an invalid amount (not a multiple of the
|
// element size).
|
num_bytes = 0u;
|
read_ptr = nullptr;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
|
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
|
ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
|
ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
|
|
// Still one element available.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
|
ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
|
}
|
|
// Test that a producer can be sent over a MP.
|
TEST_F(DataPipeTest, SendProducer) {
|
const char kTestData[] = "hello world";
|
const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
|
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1u, // |element_num_bytes|.
|
1000u // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
MojoHandleSignalsState hss;
|
|
// Write some data.
|
uint32_t num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Wait for the data.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Check the data.
|
const void* read_buffer = nullptr;
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
|
ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
|
EndReadData(num_bytes);
|
|
// Now send the producer over a MP so that it's serialized.
|
MojoHandle pipe0, pipe1;
|
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
|
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &producer_, 1,
|
MOJO_WRITE_MESSAGE_FLAG_NONE));
|
producer_ = MOJO_HANDLE_INVALID;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &producer_, 1));
|
|
// Write more data.
|
const char kExtraData[] = "bye world";
|
const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
|
num_bytes = kExtraDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
|
ASSERT_EQ(kExtraDataSize, num_bytes);
|
|
// Wait for it.
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
hss.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
|
// Check the second write.
|
num_bytes = 0u;
|
ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
|
ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
|
EndReadData(num_bytes);
|
|
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
|
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
|
}
|
|
// Ensures that if a data pipe consumer whose producer has closed is passed over
|
// a message pipe, the deserialized dispatcher is also marked as having a closed
|
// peer.
|
TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
|
1000 * sizeof(int32_t) // |capacity_num_bytes|.
|
};
|
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
|
// We can write to a data pipe handle immediately.
|
int32_t data = 123;
|
uint32_t num_bytes = sizeof(data);
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
|
ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
|
|
// Now wait for the other side to become readable and to see the peer closed.
|
MojoHandleSignalsState state;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
state.satisfied_signals);
|
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
state.satisfiable_signals);
|
|
// Now send the consumer over a MP so that it's serialized.
|
MojoHandle pipe0, pipe1;
|
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
|
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WriteMessageRaw(MessagePipeHandle(pipe0), nullptr, 0, &consumer_, 1,
|
MOJO_WRITE_MESSAGE_FLAG_NONE));
|
consumer_ = MOJO_HANDLE_INVALID;
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(pipe1, MOJO_HANDLE_SIGNAL_READABLE, &state));
|
ASSERT_EQ(MOJO_RESULT_OK, ReadEmptyMessageWithHandles(pipe1, &consumer_, 1));
|
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &state));
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
state.satisfied_signals);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE,
|
state.satisfiable_signals);
|
|
int32_t read_data;
|
ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
|
ASSERT_EQ(sizeof(read_data), num_bytes);
|
ASSERT_EQ(data, read_data);
|
|
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
|
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
|
}
|
|
bool WriteAllData(MojoHandle producer,
|
const void* elements,
|
uint32_t num_bytes) {
|
for (size_t i = 0; i < kMaxPoll; i++) {
|
// Write as much data as we can.
|
uint32_t write_bytes = num_bytes;
|
MojoResult result =
|
MojoWriteData(producer, elements, &write_bytes, nullptr);
|
if (result == MOJO_RESULT_OK) {
|
num_bytes -= write_bytes;
|
elements = static_cast<const uint8_t*>(elements) + write_bytes;
|
if (num_bytes == 0)
|
return true;
|
} else {
|
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
|
}
|
|
MojoHandleSignalsState hss = MojoHandleSignalsState();
|
EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
|
producer, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
|
EXPECT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
|
EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED |
|
MOJO_HANDLE_SIGNAL_PEER_REMOTE,
|
hss.satisfiable_signals);
|
}
|
|
return false;
|
}
|
|
// If |expect_empty| is true, expect |consumer| to be empty after reading.
|
bool ReadAllData(MojoHandle consumer,
|
void* elements,
|
uint32_t num_bytes,
|
bool expect_empty) {
|
for (size_t i = 0; i < kMaxPoll; i++) {
|
// Read as much data as we can.
|
uint32_t read_bytes = num_bytes;
|
MojoResult result = MojoReadData(consumer, nullptr, elements, &read_bytes);
|
if (result == MOJO_RESULT_OK) {
|
num_bytes -= read_bytes;
|
elements = static_cast<uint8_t*>(elements) + read_bytes;
|
if (num_bytes == 0) {
|
if (expect_empty) {
|
// Expect no more data.
|
test::Sleep(test::TinyDeadline());
|
MojoReadDataOptions options;
|
options.struct_size = sizeof(options);
|
options.flags = MOJO_READ_DATA_FLAG_QUERY;
|
MojoReadData(consumer, &options, nullptr, &num_bytes);
|
EXPECT_EQ(0u, num_bytes);
|
}
|
return true;
|
}
|
} else {
|
EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
|
}
|
|
MojoHandleSignalsState hss = MojoHandleSignalsState();
|
EXPECT_EQ(MOJO_RESULT_OK, test::MojoTestBase::WaitForSignals(
|
consumer, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
// Peer could have become closed while we're still waiting for data.
|
EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
|
EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
|
EXPECT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
|
}
|
|
return num_bytes == 0;
|
}
|
|
#if !defined(OS_IOS)
|
|
TEST_F(DataPipeTest, Multiprocess) {
|
const uint32_t kTestDataSize =
|
static_cast<uint32_t>(sizeof(kMultiprocessTestData));
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1, // |element_num_bytes|.
|
kMultiprocessCapacity // |capacity_num_bytes|.
|
};
|
ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
|
|
RunTestClient("MultiprocessClient", [&](MojoHandle server_mp) {
|
// Send some data before serialising and sending the data pipe over.
|
// This is the first write so we don't need to use WriteAllData.
|
uint32_t num_bytes = kTestDataSize;
|
ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
|
MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
|
ASSERT_EQ(kTestDataSize, num_bytes);
|
|
// Send child process the data pipe.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
|
&consumer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
|
|
// Send a bunch of data of varying sizes.
|
uint8_t buffer[100];
|
int seq = 0;
|
for (int i = 0; i < kMultiprocessMaxIter; ++i) {
|
for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
|
for (unsigned int j = 0; j < size; ++j)
|
buffer[j] = seq + j;
|
EXPECT_TRUE(WriteAllData(producer_, buffer, size));
|
seq += size;
|
}
|
}
|
|
// Write the test string in again.
|
ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
|
|
// Swap ends.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WriteMessageRaw(MessagePipeHandle(server_mp), nullptr, 0,
|
&producer_, 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
|
|
// Receive the consumer from the other side.
|
producer_ = MOJO_HANDLE_INVALID;
|
MojoHandleSignalsState hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(server_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
ASSERT_EQ(MOJO_RESULT_OK,
|
ReadEmptyMessageWithHandles(server_mp, &consumer_, 1));
|
|
// Read the test string twice. Once for when we sent it, and once for the
|
// other end sending it.
|
for (int i = 0; i < 2; ++i) {
|
EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
|
EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
|
}
|
|
WriteMessage(server_mp, "quit");
|
|
// Don't have to close the consumer here because it will be done for us.
|
});
|
}
|
|
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
|
const uint32_t kTestDataSize =
|
static_cast<uint32_t>(sizeof(kMultiprocessTestData));
|
|
// Receive the data pipe from the other side.
|
MojoHandle consumer = MOJO_HANDLE_INVALID;
|
MojoHandleSignalsState hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
ASSERT_EQ(MOJO_RESULT_OK,
|
ReadEmptyMessageWithHandles(client_mp, &consumer, 1));
|
|
// Read the initial string that was sent.
|
int32_t buffer[100];
|
EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
|
EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
|
|
// Receive the main data and check it is correct.
|
int seq = 0;
|
uint8_t expected_buffer[100];
|
for (int i = 0; i < kMultiprocessMaxIter; ++i) {
|
for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
|
for (unsigned int j = 0; j < size; ++j)
|
expected_buffer[j] = seq + j;
|
EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
|
EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
|
|
seq += size;
|
}
|
}
|
|
// Swap ends.
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WriteMessageRaw(MessagePipeHandle(client_mp), nullptr, 0, &consumer,
|
1, MOJO_WRITE_MESSAGE_FLAG_NONE));
|
|
// Receive the producer from the other side.
|
MojoHandle producer = MOJO_HANDLE_INVALID;
|
hss = MojoHandleSignalsState();
|
ASSERT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(client_mp, MOJO_HANDLE_SIGNAL_READABLE, &hss));
|
ASSERT_EQ(MOJO_RESULT_OK,
|
ReadEmptyMessageWithHandles(client_mp, &producer, 1));
|
|
// Write the test string one more time.
|
EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
|
|
// We swapped ends, so close the producer.
|
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
|
|
// Wait to receive a "quit" message before exiting.
|
EXPECT_EQ("quit", ReadMessage(client_mp));
|
}
|
|
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
|
MojoHandle p;
|
std::string message = ReadMessageWithHandles(h, &p, 1);
|
|
// Write some data to the producer and close it.
|
uint32_t num_bytes = static_cast<uint32_t>(message.size());
|
EXPECT_EQ(MOJO_RESULT_OK,
|
MojoWriteData(p, message.data(), &num_bytes, nullptr));
|
EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
|
|
// Close the producer before quitting.
|
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
|
|
// Wait for a quit message.
|
EXPECT_EQ("quit", ReadMessage(h));
|
}
|
|
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
|
MojoHandle c;
|
std::string expected_message = ReadMessageWithHandles(h, &c, 1);
|
|
// Wait for the consumer to become readable.
|
EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
|
|
// Drain the consumer and expect to find the given message.
|
uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
|
std::vector<char> bytes(expected_message.size());
|
EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, nullptr, bytes.data(), &num_bytes));
|
EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
|
|
std::string message(bytes.data(), bytes.size());
|
EXPECT_EQ(expected_message, message);
|
|
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
|
|
// Wait for a quit message.
|
EXPECT_EQ("quit", ReadMessage(h));
|
}
|
|
TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
|
// Create a new data pipe.
|
MojoHandle p, c;
|
EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p, &c));
|
|
RunTestClient("WriteAndCloseProducer", [&](MojoHandle producer_client) {
|
RunTestClient("ReadAndCloseConsumer", [&](MojoHandle consumer_client) {
|
const std::string kMessage = "Hello, world!";
|
WriteMessageWithHandles(producer_client, kMessage, &p, 1);
|
WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
|
|
WriteMessage(consumer_client, "quit");
|
});
|
|
WriteMessage(producer_client, "quit");
|
});
|
}
|
|
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
|
const MojoCreateDataPipeOptions options = {
|
kSizeOfOptions, // |struct_size|.
|
MOJO_CREATE_DATA_PIPE_FLAG_NONE, // |flags|.
|
1, // |element_num_bytes|.
|
kMultiprocessCapacity // |capacity_num_bytes|.
|
};
|
|
MojoHandle p, c;
|
ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
|
|
const std::string kMessage = "Hello, world!";
|
WriteMessageWithHandles(h, kMessage, &c, 1);
|
|
// Write some data to the producer and close it.
|
uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
|
EXPECT_EQ(MOJO_RESULT_OK,
|
MojoWriteData(p, kMessage.data(), &num_bytes, nullptr));
|
EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
|
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
|
|
// Wait for a quit message.
|
EXPECT_EQ("quit", ReadMessage(h));
|
}
|
|
TEST_F(DataPipeTest, CreateInChild) {
|
RunTestClient("CreateAndWrite", [&](MojoHandle child) {
|
MojoHandle c;
|
std::string expected_message = ReadMessageWithHandles(child, &c, 1);
|
|
// Wait for the consumer to become readable.
|
EXPECT_EQ(MOJO_RESULT_OK, WaitForSignals(c, MOJO_HANDLE_SIGNAL_READABLE));
|
|
// Drain the consumer and expect to find the given message.
|
uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
|
std::vector<char> bytes(expected_message.size());
|
EXPECT_EQ(MOJO_RESULT_OK,
|
MojoReadData(c, nullptr, bytes.data(), &num_bytes));
|
EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
|
|
std::string message(bytes.data(), bytes.size());
|
EXPECT_EQ(expected_message, message);
|
|
EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
|
WriteMessage(child, "quit");
|
});
|
}
|
|
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(DataPipeStatusChangeInTransitClient,
|
DataPipeTest,
|
parent) {
|
// This test verifies that peer closure is detectable through various
|
// mechanisms when it races with handle transfer.
|
|
MojoHandle handles[6];
|
EXPECT_EQ("o_O", ReadMessageWithHandles(parent, handles, 6));
|
MojoHandle* producers = &handles[0];
|
MojoHandle* consumers = &handles[3];
|
|
// Wait on producer 0
|
EXPECT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(producers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
|
|
// Wait on consumer 0
|
EXPECT_EQ(MOJO_RESULT_OK,
|
WaitForSignals(consumers[0], MOJO_HANDLE_SIGNAL_PEER_CLOSED));
|
|
base::MessageLoop message_loop;
|
|
// Wait on producer 1 and consumer 1 using SimpleWatchers.
|
{
|
base::RunLoop run_loop;
|
int count = 0;
|
auto callback = base::Bind(
|
[](base::RunLoop* loop, int* count, MojoResult result) {
|
EXPECT_EQ(MOJO_RESULT_OK, result);
|
if (++*count == 2)
|
loop->Quit();
|
},
|
&run_loop, &count);
|
SimpleWatcher producer_watcher(FROM_HERE,
|
SimpleWatcher::ArmingPolicy::AUTOMATIC,
|
base::SequencedTaskRunnerHandle::Get());
|
SimpleWatcher consumer_watcher(FROM_HERE,
|
SimpleWatcher::ArmingPolicy::AUTOMATIC,
|
base::SequencedTaskRunnerHandle::Get());
|
producer_watcher.Watch(Handle(producers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
callback);
|
consumer_watcher.Watch(Handle(consumers[1]), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
|
callback);
|
run_loop.Run();
|
EXPECT_EQ(2, count);
|
}
|
|
// Wait on producer 2 by polling with MojoWriteData.
|
MojoResult result;
|
do {
|
uint32_t num_bytes = 0;
|
result = MojoWriteData(producers[2], nullptr, &num_bytes, nullptr);
|
} while (result == MOJO_RESULT_OK);
|
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
|
|
// Wait on consumer 2 by polling with MojoReadData.
|
do {
|
char byte;
|
uint32_t num_bytes = 1;
|
result = MojoReadData(consumers[2], nullptr, &byte, &num_bytes);
|
} while (result == MOJO_RESULT_SHOULD_WAIT);
|
EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
|
|
for (size_t i = 0; i < 6; ++i)
|
CloseHandle(handles[i]);
|
}
|
|
TEST_F(DataPipeTest, StatusChangeInTransit) {
|
MojoHandle producers[6];
|
MojoHandle consumers[6];
|
for (size_t i = 0; i < 6; ++i)
|
CreateDataPipe(&producers[i], &consumers[i], 1);
|
|
RunTestClient("DataPipeStatusChangeInTransitClient", [&](MojoHandle child) {
|
MojoHandle handles[] = {producers[0], producers[1], producers[2],
|
consumers[3], consumers[4], consumers[5]};
|
|
// Send 3 producers and 3 consumers, and let their transfer race with their
|
// peers' closure.
|
WriteMessageWithHandles(child, "o_O", handles, 6);
|
|
for (size_t i = 0; i < 3; ++i)
|
CloseHandle(consumers[i]);
|
for (size_t i = 3; i < 6; ++i)
|
CloseHandle(producers[i]);
|
});
|
}
|
|
#endif // !defined(OS_IOS)
|
|
} // namespace
|
} // namespace core
|
} // namespace mojo
|