/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
you may not use this file except in compliance with the License.
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
distributed under the License is distributed on an "AS IS" BASIS,
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
See the License for the specific language governing permissions and
|
limitations under the License.
|
==============================================================================*/
|
|
#include "tensorflow/core/distributed_runtime/rpc/grpc_session.h"
|
|
#include <memory>
|
|
#include "tensorflow/core/common_runtime/device.h"
|
#include "tensorflow/core/debug/debug_io_utils.h"
|
#include "tensorflow/core/distributed_runtime/rpc/grpc_testlib.h"
|
#include "tensorflow/core/framework/graph.pb.h"
|
#include "tensorflow/core/framework/op.h"
|
#include "tensorflow/core/framework/summary.pb.h"
|
#include "tensorflow/core/framework/tensor_testutil.h"
|
#include "tensorflow/core/graph/default_device.h"
|
#include "tensorflow/core/graph/graph.h"
|
#include "tensorflow/core/graph/testlib.h"
|
#include "tensorflow/core/lib/core/status_test_util.h"
|
#include "tensorflow/core/lib/io/path.h"
|
#include "tensorflow/core/lib/strings/strcat.h"
|
#include "tensorflow/core/platform/env.h"
|
#include "tensorflow/core/platform/init_main.h"
|
#include "tensorflow/core/platform/logging.h"
|
#include "tensorflow/core/platform/test.h"
|
#include "tensorflow/core/protobuf/debug.pb.h"
|
#include "tensorflow/core/protobuf/rewriter_config.pb.h"
|
#include "tensorflow/core/public/session.h"
|
#include "tensorflow/core/util/port.h"
|
|
namespace tensorflow {
|
namespace {
|
|
SessionOptions Devices(int num_cpus, int num_gpus) {
|
SessionOptions result;
|
(*result.config.mutable_device_count())["CPU"] = num_cpus;
|
(*result.config.mutable_device_count())["GPU"] = num_gpus;
|
return result;
|
}
|
|
void CreateGraphDef(GraphDef* graph_def, string node_names[3]) {
|
Graph graph(OpRegistry::Global());
|
|
Tensor a_tensor(DT_FLOAT, TensorShape({1, 2}));
|
test::FillValues<float>(&a_tensor, {1.0, 2.0});
|
Node* a = test::graph::Constant(&graph, a_tensor);
|
node_names[0] = a->name();
|
|
Tensor b_tensor(DT_FLOAT, TensorShape({2, 1}));
|
test::FillValues<float>(&b_tensor, {2.0, 1.0});
|
Node* b = test::graph::Constant(&graph, b_tensor);
|
node_names[1] = b->name();
|
|
// c = a * b
|
Node* c = test::graph::Matmul(&graph, a, b, false, false);
|
node_names[2] = c->name();
|
|
test::graph::ToGraphDef(&graph, graph_def);
|
}
|
|
// Asserts that "val" is a single float tensor. The only float is
|
// "expected_val".
|
void IsSingleFloatValue(const Tensor& val, float expected_val) {
|
ASSERT_EQ(val.dtype(), DT_FLOAT);
|
ASSERT_EQ(val.NumElements(), 1);
|
ASSERT_EQ(val.flat<float>()(0), expected_val);
|
}
|
|
SessionOptions Options(const string& target, int placement_period) {
|
SessionOptions options;
|
// NOTE(mrry): GrpcSession requires a grpc:// scheme prefix in the target
|
// string.
|
options.target = strings::StrCat("grpc://", target);
|
options.config.set_placement_period(placement_period);
|
options.config.mutable_graph_options()
|
->mutable_optimizer_options()
|
->set_opt_level(OptimizerOptions::L0);
|
options.config.mutable_graph_options()
|
->mutable_rewrite_options()
|
->set_constant_folding(RewriterConfig::OFF);
|
return options;
|
}
|
|
std::unique_ptr<Session> NewRemote(const SessionOptions& options) {
|
return std::unique_ptr<Session>(CHECK_NOTNULL(NewSession(options)));
|
}
|
|
class GrpcSessionDebugTest : public ::testing::Test {
|
protected:
|
void SetUp() override { CreateDumpDir(); }
|
|
void TearDown() override { DeleteDumpDir(); }
|
|
void DeleteDumpDir() {
|
if (Env::Default()->IsDirectory(dump_dir_).ok()) {
|
int64 undeleted_files = 0;
|
int64 undeleted_dirs = 0;
|
ASSERT_TRUE(
|
Env::Default()
|
->DeleteRecursively(dump_dir_, &undeleted_files, &undeleted_dirs)
|
.ok());
|
ASSERT_EQ(0, undeleted_files);
|
ASSERT_EQ(0, undeleted_dirs);
|
}
|
}
|
|
const string GetDebugURL() { return debug_url_; }
|
|
void LoadTensorDumps(const string& subdir, std::vector<Tensor>* tensors) {
|
const string dirpath = io::JoinPath(dump_dir_, subdir);
|
if (!(Env::Default()->IsDirectory(dirpath).ok())) {
|
return;
|
}
|
|
std::vector<string> filenames;
|
TF_ASSERT_OK(Env::Default()->GetChildren(dirpath, &filenames));
|
|
for (const string& filename : filenames) {
|
Event event;
|
TF_ASSERT_OK(ReadEventFromFile(io::JoinPath(dirpath, filename), &event));
|
if (event.summary().value().size() == 1) {
|
Tensor tensor;
|
ASSERT_TRUE(tensor.FromProto(event.summary().value(0).tensor()));
|
tensors->push_back(tensor);
|
}
|
}
|
}
|
|
private:
|
void CreateDumpDir() {
|
char dir_template[] = "/tmp/tfdbg_grpc_sessions_XXXXXX";
|
dump_dir_ = mkdtemp(dir_template);
|
debug_url_ = strings::StrCat("file://", dump_dir_);
|
}
|
|
string dump_dir_;
|
string debug_url_;
|
};
|
|
TEST_F(GrpcSessionDebugTest, FileDebugURL) {
|
GraphDef graph;
|
string node_names[3];
|
CreateGraphDef(&graph, node_names);
|
|
std::unique_ptr<test::TestCluster> cluster;
|
TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 0), 2, &cluster));
|
|
auto session = NewRemote(Options(cluster->targets()[0], 1));
|
TF_CHECK_OK(session->Create(graph));
|
|
// Iteration 0: No watch.
|
// Iterations 1 and 2: Watch one Tensor.
|
// Iterations 3 and 4: Watch two Tensors.
|
// Iteration 5: No watch.
|
for (size_t i = 0; i < 6; ++i) {
|
RunOptions options;
|
if (i >= 1 && i < 5) {
|
DebugOptions* debug_options = options.mutable_debug_options();
|
DebugTensorWatch* watch = debug_options->add_debug_tensor_watch_opts();
|
watch->set_node_name(node_names[0]);
|
watch->set_output_slot(0);
|
watch->add_debug_ops("DebugIdentity");
|
watch->add_debug_urls(GetDebugURL());
|
|
if (i >= 3) {
|
watch = debug_options->add_debug_tensor_watch_opts();
|
watch->set_node_name(node_names[1]);
|
watch->set_output_slot(0);
|
watch->add_debug_ops("DebugIdentity");
|
watch->add_debug_urls(GetDebugURL());
|
}
|
}
|
|
RunMetadata metadata;
|
std::vector<Tensor> outputs;
|
TF_CHECK_OK(
|
session->Run(options, {}, {node_names[2]}, {}, &outputs, &metadata));
|
ASSERT_EQ(1, outputs.size());
|
IsSingleFloatValue(outputs[0], 4.0);
|
|
std::vector<Tensor> dumped_tensors;
|
LoadTensorDumps(io::JoinPath(DebugNodeKey::DeviceNameToDevicePath(
|
cluster->devices()[0].name()),
|
"n"),
|
&dumped_tensors);
|
|
if (i == 0 || i == 5) {
|
ASSERT_EQ(0, dumped_tensors.size());
|
} else {
|
if (i == 1 || i == 2) {
|
ASSERT_EQ(1, dumped_tensors.size());
|
ASSERT_EQ(TensorShape({1, 2}), dumped_tensors[0].shape());
|
ASSERT_EQ(1.0, dumped_tensors[0].flat<float>()(0));
|
ASSERT_EQ(2.0, dumped_tensors[0].flat<float>()(1));
|
} else {
|
ASSERT_EQ(2, dumped_tensors.size());
|
}
|
DeleteDumpDir();
|
}
|
}
|
TF_CHECK_OK(session->Close());
|
}
|
|
void SetDevice(GraphDef* graph, const string& name, const string& dev) {
|
for (size_t i = 0; i < graph->node_size(); ++i) {
|
if (graph->node(i).name() == name) {
|
graph->mutable_node(i)->set_device(dev);
|
return;
|
}
|
}
|
LOG(FATAL) << "Name '" << name << "' not found.";
|
}
|
|
TEST_F(GrpcSessionDebugTest, MultiDevices_String) {
|
std::unique_ptr<test::TestCluster> cluster;
|
TF_CHECK_OK(test::TestCluster::MakeTestCluster(Devices(1, 1), 2, &cluster));
|
auto session = NewRemote(Options(cluster->targets()[0], 1000));
|
|
// b = a
|
Graph graph(OpRegistry::Global());
|
Tensor a_tensor(DT_STRING, TensorShape({2, 2}));
|
for (size_t i = 0; i < 4; ++i) {
|
a_tensor.flat<string>()(i) = "hello, world";
|
}
|
Node* a = test::graph::Constant(&graph, a_tensor);
|
Node* b = test::graph::Identity(&graph, a);
|
|
GraphDef def;
|
test::graph::ToGraphDef(&graph, &def);
|
|
// In this test, we force each node (a, b) on every possible device.
|
// We test all possible cases.
|
for (const auto& a_dev : cluster->devices()) {
|
for (const auto& b_dev : cluster->devices()) {
|
LOG(INFO) << "a: " << a_dev.name() << " b: " << b_dev.name();
|
SetDevice(&def, a->name(), a_dev.name());
|
SetDevice(&def, b->name(), b_dev.name());
|
|
Status s = session->Create(def);
|
if (s.ok()) {
|
std::vector<Tensor> outputs;
|
|
RunOptions options;
|
DebugOptions* debug_options = options.mutable_debug_options();
|
DebugTensorWatch* watch = debug_options->add_debug_tensor_watch_opts();
|
watch->set_node_name(a->name());
|
watch->set_output_slot(0);
|
watch->add_debug_ops("DebugIdentity");
|
watch->add_debug_urls(GetDebugURL());
|
|
RunMetadata metadata;
|
TF_CHECK_OK(
|
session->Run(options, {}, {b->name()}, {}, &outputs, &metadata));
|
ASSERT_EQ(1, outputs.size());
|
ASSERT_EQ(outputs[0].dtype(), DT_STRING);
|
ASSERT_EQ(outputs[0].NumElements(), 4);
|
for (size_t i = 0; i < outputs[0].NumElements(); ++i) {
|
EXPECT_EQ(outputs[0].flat<string>()(i), "hello, world");
|
}
|
TF_CHECK_OK(session->Close());
|
|
std::vector<Tensor> dumped_tensors;
|
LoadTensorDumps(
|
io::JoinPath(DebugNodeKey::DeviceNameToDevicePath(a_dev.name()),
|
"n"),
|
&dumped_tensors);
|
ASSERT_EQ(1, dumped_tensors.size());
|
ASSERT_EQ(TensorShape({2, 2}), dumped_tensors[0].shape());
|
for (size_t i = 0; i < 4; ++i) {
|
ASSERT_EQ("hello, world", dumped_tensors[0].flat<string>()(i));
|
}
|
|
DeleteDumpDir();
|
} else {
|
// CUDA and SYCL devices do not have an Identity op for strings
|
LOG(ERROR) << "Error: " << s;
|
ASSERT_TRUE((a_dev.device_type() == DEVICE_GPU) ||
|
(a_dev.device_type() == DEVICE_SYCL) ||
|
(b_dev.device_type() == DEVICE_GPU) ||
|
(b_dev.device_type() == DEVICE_SYCL));
|
ASSERT_FALSE(s.ok());
|
}
|
}
|
}
|
}
|
|
} // namespace
|
} // namespace tensorflow
|