使用 Wangle 来构建 modern C++ 服务器
原文地址: https://medium.com/hacker-daily/writing-high-performance-servers-in-modern-c-7cd00926828#.j5bditvpp
首先,感谢我第一篇博客的反馈 – Starting a tech startup with C++
我将展示如何用 48 行代码构建一个 C++ 高性能异步服务器。
我在之前的文章里提到我能够用 Facebook 的 Wangle 在一天内构建一个数据库引擎的原型。这篇文章将讲讲我怎么做到。在本文结尾,你能够完成一个基于 wangle 的高性能 C++ 服务器。这篇文章也将成为 Wangle 的 README.md 里面的教程。
我将展示怎么使用 modern C++ 写一个 echo server,(echo server)在分布式系统中相当于 “hello world”. 服务器响应每条 message 的方式是将该 message 原封返回。我们也将写一个客户端来向 echo server 发送 messages. 可以在这里找到源代码。
Wangle 是构建异步,事件驱动的 modern C++ 服务的 client/server 应用框架。 Wangle 的基本抽象是 Pipeline . 一旦完全理解了这种抽象,你讲能够写出各种复杂的 modern C++ 服务。另一个重要的抽象是 Service , Service 是 pipeline 的高级版本。但是,本文不讨论 Service.
Pipeline
Pipeline 是 Wangle 里最重要的也是最强大的抽象。Pipeline 为自定义 requests 和 responses 如何被 Services 处理提供了无与伦比的灵活性。
Pipeline 是一系列的 request/response handlers. 我认为这里的 Pipeline 就像制造业工厂的生产线一样。在一条按顺序工作的生产线上,每个工人收到一个物件,完成一个工序,把它给到下一个工人那里,直到完成制造。这个比喻其实不恰当,它只能朝一个方向前进而不能反向将制成品变回原材料,而 Pipeline 可以这么反方向做。
一个 Wangle handler 可以处理下游(处理响应)也可以处理上游(处理请求)事件。只要将所有的 handlers 连在一起,那就可以非常灵活地将原始数据流(raw data stream)转换成需要的信息类型(message type, 比如 class)或者反方向,将需要的信息类型转换成原始数据流。
举个🌰, 在 echo service 的 pipeline 里,我们可以建立一个包含下面几个 handler 的 pipeline.
-
Handler 1
上游(Upstream):从 socket 里接收一个原始的二进制数据,将其读进一个零拷贝字节的缓冲器,然后发送到 handler 2.
下游(Downstream):接收零拷贝字节的缓冲器的数据,将其内容从 socket 发送。 -
Handler 2
上游:接收来自 handler 1 的零拷贝字节的缓冲器,将字节缓冲器解码成 string,并将 string 发送到 handler 3.
下游:接收来自 handler 3 的 string, 将 std::string 编码成零拷贝字节的缓冲器,并发送到 handler 1. -
Handler 3
上游:接收来自 handler 2 的 string,并通过 Pipeline 发送回客户端。这里 handler 3 将 string 发送回 handler 2.
下游:从上游接收 std::string 并传递给 handler 2.
这里有一个重要的地方就是每一个 handler 能且只能做一件事。如果你有一个 handler 处理多于一个函数,像直接将原始数据流解码成 string,那么你需要将其分解成几个 handler. 这对于最大化可维护性和灵活性很重要。
handler 是线程不安全的,所以千万不要加任何不由 mutex/atomic lock 等守护的共享态。如果想要线程安全的容器,请使用 Folly 无锁数据结构,它是 Wangle 的依赖,而且速度超快,引入它也是很简单的。
现在不要担心 — 当看到实际代码时,它自然就清晰多了。
echo server
下面展示怎么用 Wangle 来写第一个 C++ echo server. 请先安装 Wangle. 不过 Wangle 不能在 OSX 上 build,所以我推荐使用带图形界面的 Ubuntu 14.04 来安装 Wangle.
这里是 echo handler,接收 string,通过 stdout 打印,并在 Pipeline 里发送回下游。加入一个行定界符很重要,因为我们的 Pipeline 将使用行解码器(line decoder)— 将字节缓冲器分解成行分割的字节缓冲器。
EchoHandler.cpp
// the main logic of our echo server; receives a string and writes it straight
// back
class EchoHandler : public HandlerAdapter<std::string> {
public:
virtual void read(Context* ctx, std::string msg) override {
std::cout << "handling " << msg << std::endl;
write(ctx, msg + "\r\n");
}
};
这是 Pipeline 里的 final handler. 我们需要新建一个 PipelineFactory ,并在这里定义如何处理请求和响应。
EchoPipelineFactory.cpp
// where we define the chain of handlers for each messeage received
class EchoPipelineFactory : public PipelineFactory<EchoPipeline> {
public:
EchoPipeline::Ptr newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
auto pipeline = EchoPipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
pipeline->addBack(LineBasedFrameDecoder(8192));
pipeline->addBack(StringCodec());
pipeline->addBack(EchoHandler());
pipeline->finalize();
return pipeline;
}
};
一定要严格按照上面的顺序来将几个 handler 加入到 pipeline
这里的 Pipeline 有 4 个 handlers :
-
AsyncSocketHandler:
上游:从 socket 中读取原始数据流,并将其变成零拷贝字节缓冲器。
下游:将零拷贝字节缓冲器的内容从下属的 socket 发出。 -
LineBasedFrameDecoder:
上游:接收零拷贝字节缓冲器,并在行尾分割。
上游:将零拷贝字节缓冲器发送给 AsyncSocketHandler -
StringCodec:
上游:接收零拷贝字节缓冲器并解码成 std::string 发送给 EchoHandler.
下游:接收 std::string 并编码成零拷贝字节缓冲器,发送给 LineBasedFrameDecoder. -
EchoHandler:
上游:接收 std::stirng 并将其写入到 pipeline,Pipeline 将把 message 发送到下游。
下游:接收 std::string 并转发给 StringCodec.
那么现在要做就是将 Pipeline factory 塞进 ServerBootStrap, 好了。绑定端口并等它停止。
EchoServer.cpp
#include <gflags/gflags.h>
#include <wangle/bootstrap/ServerBootstrap.h>
#include <wangle/channel/AsyncSocketHandler.h>
#include <wangle/codec/LineBasedFrameDecoder.h>
#include <wangle/codec/StringCodec.h>
using namespace folly;
using namespace wangle;
DEFINE_int32(port, 8080, "echo server port");
typedef Pipeline<IOBufQueue&, std::string> EchoPipeline;
// the main logic of our echo server; receives a string and writes it straight
// back
class EchoHandler : public HandlerAdapter<std::string> {
public:
virtual void read(Context* ctx, std::string msg) override {
std::cout << "handling " << msg << std::endl;
write(ctx, msg + "\r\n");
}
};
// where we define the chain of handlers for each messeage received
class EchoPipelineFactory : public PipelineFactory<EchoPipeline> {
public:
EchoPipeline::Ptr newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
auto pipeline = EchoPipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
pipeline->addBack(LineBasedFrameDecoder(8192));
pipeline->addBack(StringCodec());
pipeline->addBack(EchoHandler());
pipeline->finalize();
return pipeline;
}
};
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
ServerBootstrap<EchoPipeline> server;
server.childPipeline(std::make_shared<EchoPipelineFactory>());
server.bind(FLAGS_port);
server.waitForStop();
return 0;
}
这样使用 48 行代码就完成了一个高性能异步的 C++ 服务器。
Echo Client
echo client 和 echo server 的代码很相似。
EchoClientHandler.cpp
// the handler for receiving messages back from the server
class EchoHandler : public HandlerAdapter<std::string> {
public:
virtual void read(Context* ctx, std::string msg) override {
std::cout << "received back: " << msg;
}
virtual void readException(Context* ctx, exception_wrapper e) override {
std::cout << exceptionStr(e) << std::endl;
close(ctx);
}
virtual void readEOF(Context* ctx) override {
std::cout << "EOF received :(" << std::endl;
close(ctx);
}
};
这里重写 readException 和 readEOF 函数。还有一些其他的函数可以重写。如果需要处理特定的事件,重写相应的虚函数就行了。
这里是 client 的 Pipeline factory. 这里和 EventBaseHandler 分离出的 server 端的 pipeline factory 基本一致, EventBaseHandler 从事件循环线程中处理写入数据。
EchoClientPipeline.cpp
// chains the handlers together to define the response pipeline
class EchoPipelineFactory : public PipelineFactory<EchoPipeline> {
public:
EchoPipeline::Ptr newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
auto pipeline = EchoPipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
pipeline->addBack(
EventBaseHandler()); // ensure we can write from any thread
pipeline->addBack(LineBasedFrameDecoder(8192, false));
pipeline->addBack(StringCodec());
pipeline->addBack(EchoHandler());
pipeline->finalize();
return pipeline;
}
};
完整的 client 端的代码如下:
EchoClient.cpp
#include <gflags/gflags.h>
#include <iostream>
#include <wangle/bootstrap/ClientBootstrap.h>
#include <wangle/channel/AsyncSocketHandler.h>
#include <wangle/channel/EventBaseHandler.h>
#include <wangle/codec/LineBasedFrameDecoder.h>
#include <wangle/codec/StringCodec.h>
using namespace folly;
using namespace wangle;
DEFINE_int32(port, 8080, "echo server port");
DEFINE_string(host, "::1", "echo server address");
typedef Pipeline<folly::IOBufQueue&, std::string> EchoPipeline;
// the handler for receiving messages back from the server
class EchoHandler : public HandlerAdapter<std::string> {
public:
virtual void read(Context* ctx, std::string msg) override {
std::cout << "received back: " << msg;
}
virtual void readException(Context* ctx, exception_wrapper e) override {
std::cout << exceptionStr(e) << std::endl;
close(ctx);
}
virtual void readEOF(Context* ctx) override {
std::cout << "EOF received :(" << std::endl;
close(ctx);
}
};
// chains the handlers together to define the response pipeline
class EchoPipelineFactory : public PipelineFactory<EchoPipeline> {
public:
EchoPipeline::Ptr newPipeline(std::shared_ptr<AsyncTransportWrapper> sock) {
auto pipeline = EchoPipeline::create();
pipeline->addBack(AsyncSocketHandler(sock));
pipeline->addBack(
EventBaseHandler()); // ensure we can write from any thread
pipeline->addBack(LineBasedFrameDecoder(8192, false));
pipeline->addBack(StringCodec());
pipeline->addBack(EchoHandler());
pipeline->finalize();
return pipeline;
}
};
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
ClientBootstrap<EchoPipeline> client;
client.group(std::make_shared<wangle::IOThreadPoolExecutor>(1));
client.pipelineFactory(std::make_shared<EchoPipelineFactory>());
auto pipeline = client.connect(SocketAddress(FLAGS_host, FLAGS_port)).get();
try {
while (true) {
std::string line;
std::getline(std::cin, line);
if (line == "") {
break;
}
pipeline->write(line + "\r\n").get();
if (line == "bye") {
pipeline->close();
break;
}
}
} catch (const std::exception& e) {
std::cout << exceptionStr(e) << std::endl;
}
return 0;
}
它在一个 loop 中读取 stdin 的输入并写入到 Pipeline, 在响应被处理前它都是阻塞的。
Summary
上述就是使用 Wangle 来写一个基本的高性能 modern C++ 服务器。你应该了解 Wangle 的基础并有信心来用 C++ 写自己的服务。强烈推荐深入理解 Wangle 里的 Service 抽象并用来构建复杂的 Server.