Page MenuHomePhabricator

output-streaming-v2.patch

Authored By
tstarling
Apr 9 2015, 1:16 AM
Size
12 KB
Referenced Files
None
Subscribers
None

output-streaming-v2.patch

diff --git a/hphp/runtime/base/execution-context.cpp b/hphp/runtime/base/execution-context.cpp
index f3a9a5b..11146ba 100644
--- a/hphp/runtime/base/execution-context.cpp
+++ b/hphp/runtime/base/execution-context.cpp
@@ -214,6 +214,7 @@ static void safe_stdout(const void *ptr, size_t size) {
}
void ExecutionContext::writeStdout(const char *s, int len) {
+ fflush(stdout);
if (m_stdout == nullptr) {
if (s_stdout_color) {
safe_stdout(s_stdout_color, strlen(s_stdout_color));
@@ -228,6 +229,14 @@ void ExecutionContext::writeStdout(const char *s, int len) {
}
}
+void ExecutionContext::writeTransport(const char *s, int len) {
+ if (m_transport) {
+ m_transport->sendRaw((void*)s, len, 200, false, true);
+ } else {
+ writeStdout(s, len);
+ }
+}
+
size_t ExecutionContext::getStdoutBytesWritten() const {
return m_stdoutBytesWritten;
}
@@ -240,10 +249,10 @@ void ExecutionContext::write(const char *s, int len) {
obFlush();
}
}
+ if (m_implicitFlush) flush();
} else {
- writeStdout(s, len);
+ writeTransport(s, len);
}
- if (m_implicitFlush) flush();
}
///////////////////////////////////////////////////////////////////////////////
@@ -306,7 +315,7 @@ void ExecutionContext::obClean(int handler_flag) {
bool ExecutionContext::obFlush() {
assert(m_protectedLevel >= 0);
- if ((int)m_buffers.size() <= m_protectedLevel) {
+ if (m_buffers.empty()) {
return false;
}
@@ -352,17 +361,19 @@ bool ExecutionContext::obFlush() {
}
str = tout.toString();
} catch (...) {
- writeStdout(str.data(), str.size());
+ writeTransport(str.data(), str.size());
throw;
}
}
- writeStdout(str.data(), str.size());
+ writeTransport(str.data(), str.size());
return true;
}
void ExecutionContext::obFlushAll() {
- while (obFlush()) { obEnd();}
+ do {
+ obFlush();
+ } while (obEnd());
}
bool ExecutionContext::obEnd() {
@@ -431,20 +442,16 @@ Array ExecutionContext::obGetHandlers() {
}
void ExecutionContext::flush() {
- if (m_buffers.empty()) {
- fflush(stdout);
- } else if (RuntimeOption::EnableEarlyFlush && m_protectedLevel &&
- (m_transport == nullptr ||
- (m_transport->getHTTPVersion() == "1.1" &&
- m_transport->getMethod() != Transport::Method::HEAD))) {
+ // No need to do anything if m_buffers is empty, since we already did an
+ // unbuffered write to the FD or transport
+ if (!m_buffers.empty() &&
+ RuntimeOption::EnableEarlyFlush && m_protectedLevel &&
+ (m_transport == nullptr ||
+ (m_transport->getHTTPVersion() == "1.1" &&
+ m_transport->getMethod() != Transport::Method::HEAD))) {
StringBuffer &oss = m_buffers.front().oss;
if (!oss.empty()) {
- if (m_transport) {
- m_transport->sendRaw((void*)oss.data(), oss.size(), 200, false, true);
- } else {
- writeStdout(oss.data(), oss.size());
- fflush(stdout);
- }
+ writeTransport(oss.data(), oss.size());
oss.clear();
}
}
diff --git a/hphp/runtime/base/execution-context.h b/hphp/runtime/base/execution-context.h
index dc82eb2..d74c32a 100644
--- a/hphp/runtime/base/execution-context.h
+++ b/hphp/runtime/base/execution-context.h
@@ -170,6 +170,11 @@ public:
void writeStdout(const char* s, int len);
size_t getStdoutBytesWritten() const;
+ /**
+ * Write to the transport, or to stdout if there is no transport.
+ */
+ void writeTransport(const char* s, int len);
+
using PFUNC_STDOUT = void (*)(const char* s, int len, void* data);
void setStdout(PFUNC_STDOUT func, void* data);
diff --git a/hphp/runtime/base/program-functions.cpp b/hphp/runtime/base/program-functions.cpp
index 1036b59..bb4cd23 100644
--- a/hphp/runtime/base/program-functions.cpp
+++ b/hphp/runtime/base/program-functions.cpp
@@ -1879,7 +1879,7 @@ void hphp_session_init() {
ExecutionContext *hphp_context_init() {
ExecutionContext *context = g_context.getNoCheck();
- context->obStart();
+ context->obStart(uninit_null(), RuntimeOption::ServerChunkSize);
context->obProtect(true);
return context;
}
diff --git a/hphp/runtime/base/runtime-option.cpp b/hphp/runtime/base/runtime-option.cpp
index 7343d60..a3a01d0 100644
--- a/hphp/runtime/base/runtime-option.cpp
+++ b/hphp/runtime/base/runtime-option.cpp
@@ -179,6 +179,7 @@ int RuntimeOption::GzipMaxCompressionLevel = 9;
std::string RuntimeOption::ForceCompressionURL;
std::string RuntimeOption::ForceCompressionCookie;
std::string RuntimeOption::ForceCompressionParam;
+int64_t RuntimeOption::ServerChunkSize = 64000;
bool RuntimeOption::EnableKeepAlive = true;
bool RuntimeOption::ExposeHPHP = true;
bool RuntimeOption::ExposeXFBServer = false;
@@ -1132,6 +1133,8 @@ void RuntimeOption::Load(IniSetting::Map& ini, Hdf& config,
Config::Bind(ForceCompressionParam, ini,
server["ForceCompression"]["Param"]);
+ Config::Bind(ServerChunkSize, ini, server["ChunkSize"], 64000);
+
Config::Bind(EnableKeepAlive, ini, server["EnableKeepAlive"], true);
Config::Bind(ExposeHPHP, ini, server["ExposeHPHP"], true);
Config::Bind(ExposeXFBServer, ini, server["ExposeXFBServer"], false);
diff --git a/hphp/runtime/base/runtime-option.h b/hphp/runtime/base/runtime-option.h
index 57a9aae..4a0abc7 100644
--- a/hphp/runtime/base/runtime-option.h
+++ b/hphp/runtime/base/runtime-option.h
@@ -132,6 +132,7 @@ public:
static std::vector<std::string> ServerWarmupRequests;
static boost::container::flat_set<std::string> ServerHighPriorityEndPoints;
static bool ServerExitOnBindFail;
+ static int64_t ServerChunkSize;
static int PageletServerThreadCount;
static bool PageletServerThreadRoundRobin;
static int PageletServerThreadDropCacheTimeoutSeconds;
diff --git a/hphp/runtime/server/fastcgi/fastcgi-session.cpp b/hphp/runtime/server/fastcgi/fastcgi-session.cpp
index 00bd7fe..e88e97f 100644
--- a/hphp/runtime/server/fastcgi/fastcgi-session.cpp
+++ b/hphp/runtime/server/fastcgi/fastcgi-session.cpp
@@ -323,12 +323,27 @@ void FastCGISession::readErr(const folly::AsyncSocketException&) noexcept {
ioStop();
}
-void FastCGISession::writeErr(size_t,
- const folly::AsyncSocketException&) noexcept {
+void FastCGISession::WriteCallback::writeErr(
+ size_t bytesWritten,
+ const folly::AsyncSocketException& ex
+) noexcept {
+ m_session->writeErr(m_bufferSize, bytesWritten, ex);
+ delete this;
+}
+
+void FastCGISession::WriteCallback::writeSuccess() noexcept {
+ m_session->writeSuccess(m_bufferSize);
+ delete this;
+}
+
+void FastCGISession::writeErr(size_t bufferSize, size_t,
+ const folly::AsyncSocketException&) noexcept {
+ discardWriteBuffer(bufferSize);
ioStop();
}
-void FastCGISession::writeSuccess() noexcept {
+void FastCGISession::writeSuccess(size_t bufferSize) noexcept {
+ discardWriteBuffer(bufferSize);
if (--m_eventCount == 0 && m_shutdown) {
// If we were terminating and this was the last pending event then trigger
// the delete.
@@ -336,16 +351,55 @@ void FastCGISession::writeSuccess() noexcept {
}
}
+void FastCGISession::addWriteBuffer(size_t bufferSize) noexcept {
+ Lock lock(&m_writeSync);
+ m_writeQueueSize += bufferSize;
+}
+
+void FastCGISession::discardWriteBuffer(size_t bufferSize) noexcept {
+ Lock lock(&m_writeSync);
+ size_t oldSize = m_writeQueueSize;
+ m_writeQueueSize -= bufferSize;
+ if (oldSize >= m_maxWriteQueueSize &&
+ m_writeQueueSize < m_maxWriteQueueSize) {
+ m_writeSync.notify();
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
-void FastCGISession::onStdOut(std::unique_ptr<IOBuf> chain) {
+const int k_maxEventQueueSize = 8;
+
+void FastCGISession::blockingWriteStdOut(std::unique_ptr<IOBuf> chain) {
+ long maxWait = RuntimeOption::ConnectionTimeoutSeconds;
+ if (maxWait <= 0) {
+ maxWait = 50; // this was the default read timeout in LibEventServer
+ }
+
+ {
+ Lock lock(&m_writeSync);
+ while (m_writeQueueSize >= m_maxWriteQueueSize) {
+ m_writeSync.wait(maxWait);
+ }
+ }
+
// FastCGITransport doesn't run in the same event base. Calling into internal
// functions here is unsafe from other threads so we enqueue the work for the
// event base.
+
folly::MoveWrapper<std::unique_ptr<IOBuf>> chain_wrapper(std::move(chain));
- m_eventBase->runInEventBaseThread([this, chain_wrapper]() mutable {
+ auto callback = [this, chain_wrapper]() mutable {
writeStream(fcgi::STDOUT, std::move(*chain_wrapper));
- });
+ };
+
+ // Queueing the work unconditionally can lead to the queue size growing to
+ // hundreds of thousands of entries, if events are not dequeued fast enough.
+ // So clear the queue periodically if it gets too large.
+ if (m_eventBase->getNotificationQueueSize() > k_maxEventQueueSize) {
+ m_eventBase->runInEventBaseThreadAndWait(callback);
+ } else {
+ m_eventBase->runInEventBaseThread(callback);
+ }
}
void FastCGISession::onComplete() {
@@ -432,7 +486,16 @@ void FastCGISession::enqueueWrite(std::unique_ptr<IOBuf> chain) {
return;
}
++m_eventCount;
- m_sock->writeChain(this, std::move(chain));
+
+ uint64_t length = chain->computeChainDataLength();
+ addWriteBuffer(length);
+
+ // Create a callback object to track the chain length, so that we can
+ // account for it when the chain is destroyed. AsyncSocket::destroy() will
+ // call all the pending callbacks before it deletes itself, so the session
+ // pointer should remain valid until the callback is called.
+ WriteCallback * callback = new WriteCallback(this, length);
+ m_sock->writeChain(callback, std::move(chain));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/hphp/runtime/server/fastcgi/fastcgi-session.h b/hphp/runtime/server/fastcgi/fastcgi-session.h
index 2c9109d..3373fe7 100644
--- a/hphp/runtime/server/fastcgi/fastcgi-session.h
+++ b/hphp/runtime/server/fastcgi/fastcgi-session.h
@@ -178,7 +178,6 @@ private:
struct FastCGISession
: public folly::wangle::ManagedConnection
, private folly::AsyncSocket::ReadCallback
- , private folly::AsyncSocket::WriteCallback
{
FastCGISession(
folly::EventBase* evBase,
@@ -211,8 +210,29 @@ private:
void readErr(const folly::AsyncSocketException&) noexcept override;
// Async write callbacks
- void writeErr(size_t, const folly::AsyncSocketException&) noexcept override;
- void writeSuccess() noexcept override;
+ void writeErr(size_t bufferSize, size_t bytesWritten,
+ const folly::AsyncSocketException& ex) noexcept;
+ void writeSuccess(size_t bufferSize) noexcept;
+
+ struct WriteCallback : folly::AsyncSocket::WriteCallback {
+ WriteCallback(FastCGISession* session,
+ size_t bufferSize)
+ : m_session(session), m_bufferSize(bufferSize)
+ {}
+
+ void writeErr(size_t bytesWritten,
+ const folly::AsyncSocketException& ex) noexcept override;
+ void writeSuccess() noexcept override;
+
+ FastCGISession* m_session;
+ size_t m_bufferSize;
+ };
+
+ // Add a buffer to m_writeQueueSize.
+ void addWriteBuffer(size_t bufferSize) noexcept;
+
+ // Remove a buffer from m_writeQueueSize and wake up any blocked writers.
+ void discardWriteBuffer(size_t bufferSize) noexcept;
public:
// Callbacks to send data back to webserver for FastCGITransport. Ideally
@@ -220,7 +240,7 @@ public:
//
// NB: FastCGITransport runs in its own thread and these callbacks need to
// explicitly place their work onto the event base thread!
- void onStdOut(std::unique_ptr<folly::IOBuf> chain);
+ void blockingWriteStdOut(std::unique_ptr<folly::IOBuf> chain);
void onComplete();
private:
@@ -295,7 +315,7 @@ private:
//
// writeEndRequest and writeUnknownType send discrete records
//
- // writeStream will write either FCGI_STDIN or FCGI_STDOUT streams
+ // writeStream will write either FCGI_STDERR or FCGI_STDOUT streams
// Respond to the webserver with a requested capability
void writeCapability(const std::string& key); // Send FCGI_GET_VALUES_RESULT
@@ -376,6 +396,13 @@ private:
// transport may try to call us to write data until it completes.
size_t m_eventCount{0};
+ // The size of the write queue currently active inside m_session->m_sock
+ size_t m_writeQueueSize{0};
+ size_t m_maxWriteQueueSize{131072};
+
+ // A condition variable to support blocking write emulation
+ Synchronizable m_writeSync;
+
//////////////////////////////////////////////////////////////////////////////
// Request data- transport and flags
//
diff --git a/hphp/runtime/server/fastcgi/fastcgi-transport.cpp b/hphp/runtime/server/fastcgi/fastcgi-transport.cpp
index b21feeb..30eea4c 100644
--- a/hphp/runtime/server/fastcgi/fastcgi-transport.cpp
+++ b/hphp/runtime/server/fastcgi/fastcgi-transport.cpp
@@ -173,7 +173,7 @@ void FastCGITransport::sendImpl(const void *data, int size, int code,
}
m_txBuf.append(data, size);
- m_session->onStdOut(m_txBuf.move()); // session will handle locking
+ m_session->blockingWriteStdOut(m_txBuf.move());
if (eom) {
onSendEndImpl();

File Metadata

Mime Type
text/x-diff
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
107010
Default Alt Text
output-streaming-v2.patch (12 KB)

Event Timeline