kicad/thirdparty/pybind11/tests/test_iostream.cpp

119 lines
3.3 KiB
C++

/*
tests/test_iostream.cpp -- Usage of scoped_output_redirect
Copyright (c) 2017 Henry F. Schreiner
All rights reserved. Use of this source code is governed by a
BSD-style license that can be found in the LICENSE file.
*/
#if defined(_MSC_VER) && _MSC_VER < 1910 // VS 2015's MSVC
# pragma warning(disable: 4702) // unreachable code in system header (xatomic.h(382))
#endif
#include <pybind11/iostream.h>
#include "pybind11_tests.h"
#include <atomic>
#include <iostream>
#include <thread>
void noisy_function(std::string msg, bool flush) {
std::cout << msg;
if (flush)
std::cout << std::flush;
}
void noisy_funct_dual(std::string msg, std::string emsg) {
std::cout << msg;
std::cerr << emsg;
}
// object to manage C++ thread
// simply repeatedly write to std::cerr until stopped
// redirect is called at some point to test the safety of scoped_estream_redirect
struct TestThread {
TestThread() : t_{nullptr}, stop_{false} {
auto thread_f = [this] {
while (!stop_) {
std::cout << "x" << std::flush;
std::this_thread::sleep_for(std::chrono::microseconds(50));
} };
t_ = new std::thread(std::move(thread_f));
}
~TestThread() {
delete t_;
}
void stop() { stop_ = true; }
void join() {
py::gil_scoped_release gil_lock;
t_->join();
}
void sleep() {
py::gil_scoped_release gil_lock;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::thread * t_;
std::atomic<bool> stop_;
};
TEST_SUBMODULE(iostream, m) {
add_ostream_redirect(m);
// test_evals
m.def("captured_output_default", [](std::string msg) {
py::scoped_ostream_redirect redir;
std::cout << msg << std::flush;
});
m.def("captured_output", [](std::string msg) {
py::scoped_ostream_redirect redir(std::cout, py::module_::import("sys").attr("stdout"));
std::cout << msg << std::flush;
});
m.def("guard_output", &noisy_function,
py::call_guard<py::scoped_ostream_redirect>(),
py::arg("msg"), py::arg("flush")=true);
m.def("captured_err", [](std::string msg) {
py::scoped_ostream_redirect redir(std::cerr, py::module_::import("sys").attr("stderr"));
std::cerr << msg << std::flush;
});
m.def("noisy_function", &noisy_function, py::arg("msg"), py::arg("flush") = true);
m.def("dual_guard", &noisy_funct_dual,
py::call_guard<py::scoped_ostream_redirect, py::scoped_estream_redirect>(),
py::arg("msg"), py::arg("emsg"));
m.def("raw_output", [](std::string msg) {
std::cout << msg << std::flush;
});
m.def("raw_err", [](std::string msg) {
std::cerr << msg << std::flush;
});
m.def("captured_dual", [](std::string msg, std::string emsg) {
py::scoped_ostream_redirect redirout(std::cout, py::module_::import("sys").attr("stdout"));
py::scoped_ostream_redirect redirerr(std::cerr, py::module_::import("sys").attr("stderr"));
std::cout << msg << std::flush;
std::cerr << emsg << std::flush;
});
py::class_<TestThread>(m, "TestThread")
.def(py::init<>())
.def("stop", &TestThread::stop)
.def("join", &TestThread::join)
.def("sleep", &TestThread::sleep);
}