diff --git a/event/hevent.c b/event/hevent.c index 25bfb8b4f..b1a95f8b1 100644 --- a/event/hevent.c +++ b/event/hevent.c @@ -77,6 +77,9 @@ void hio_init(hio_t* io) { // write_queue_init(&io->write_queue, 4); hrecursive_mutex_init(&io->write_mutex); + io->sendfile_fd = -1; + io->sendfile_offset = 0; + io->sendfile_remain = 0; } void hio_ready(hio_t* io) { @@ -752,7 +755,7 @@ void hio_set_max_write_bufsize(hio_t* io, uint32_t size) { } size_t hio_write_bufsize(hio_t* io) { - return io->write_bufsize; + return io->write_bufsize + io->sendfile_remain; } int hio_read_once (hio_t* io) { diff --git a/event/hevent.h b/event/hevent.h index 1650b22a5..ac3dc3910 100644 --- a/event/hevent.h +++ b/event/hevent.h @@ -175,6 +175,10 @@ struct hio_s { char* hostname; // for hssl_set_sni_hostname // context void* ctx; // for hio_context / hio_set_context + // sendfile + int sendfile_fd; // file descriptor for sendfile, -1 when inactive + off_t sendfile_offset; // current offset in file + size_t sendfile_remain; // remaining bytes to send // private: #if defined(EVENT_POLL) || defined(EVENT_KQUEUE) int event_index[2]; // for poll,kqueue diff --git a/event/hloop.h b/event/hloop.h index 3e268fc3b..b7c950af5 100644 --- a/event/hloop.h +++ b/event/hloop.h @@ -380,6 +380,15 @@ HV_EXPORT int hio_read_remain(hio_t* io); HV_EXPORT int hio_write (hio_t* io, const void* buf, size_t len); HV_EXPORT int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr); +// NOTE: hio_sendfile uses zero-copy sendfile(2) on Linux, sendfile(2) on macOS/FreeBSD. +// Falls back to read+write for SSL connections and unsupported platforms. +// @param in_fd: file descriptor of the file to send from +// @param offset: starting offset in the file +// @param length: number of bytes to send +// @return 0 on success (async operation started), -1 on error +// hwrite_cb is called as data is sent. When complete, write_queue is empty. +HV_EXPORT int hio_sendfile(hio_t* io, int in_fd, off_t offset, size_t length); + // NOTE: hio_close is thread-safe, hio_close_async will be called actually in other thread. // hio_del(io, HV_RDWR) => close => hclose_cb HV_EXPORT int hio_close (hio_t* io); diff --git a/event/nio.c b/event/nio.c index 337299ac0..92259343c 100644 --- a/event/nio.c +++ b/event/nio.c @@ -7,6 +7,15 @@ #include "herr.h" #include "hthread.h" +#ifdef OS_LINUX +#include +#endif +#if defined(OS_DARWIN) || defined(OS_FREEBSD) +#include +#include +#include +#endif + static void __connect_timeout_cb(htimer_t* timer) { hio_t* io = (hio_t*)timer->privdata; if (io) { @@ -304,6 +313,60 @@ static int __nio_write(hio_t* io, const void* buf, int len, struct sockaddr* add return nwrite; } +// Platform-abstracted sendfile: returns bytes sent, -1 on error +static ssize_t __nio_sendfile_sys(int out_fd, int in_fd, off_t* offset, size_t count) { +#ifdef OS_LINUX + return sendfile(out_fd, in_fd, offset, count); +#elif defined(OS_DARWIN) + off_t len = count; + int ret = sendfile(in_fd, out_fd, *offset, &len, NULL, 0); + if (ret == 0 || (ret == -1 && errno == EAGAIN && len > 0)) { + *offset += len; + return (ssize_t)len; + } + return -1; +#elif defined(OS_FREEBSD) + off_t sbytes = 0; + int ret = sendfile(in_fd, out_fd, *offset, count, NULL, &sbytes, 0); + if (ret == 0 || (ret == -1 && errno == EAGAIN && sbytes > 0)) { + *offset += sbytes; + return (ssize_t)sbytes; + } + return -1; +#else + // Fallback: pread + write (sends one chunk per call to integrate with event loop) + char buf[65536]; + size_t to_read = count < sizeof(buf) ? count : sizeof(buf); + ssize_t nread = pread(in_fd, buf, to_read, *offset); + if (nread <= 0) return nread; + ssize_t total_written = 0; + while (total_written < nread) { + ssize_t nwrite = write(out_fd, buf + total_written, nread - total_written); + if (nwrite < 0) { + if (total_written > 0) { + *offset += total_written; + return total_written; + } + return -1; + } + if (nwrite == 0) break; + total_written += nwrite; + } + *offset += total_written; + return total_written; +#endif +} + +// Try sendfile for the current io, called with write_mutex held +// Returns: > 0 bytes sent, 0 if nothing sent, < 0 on error +static ssize_t __nio_sendfile(hio_t* io) { + ssize_t nsent = __nio_sendfile_sys(io->fd, io->sendfile_fd, &io->sendfile_offset, io->sendfile_remain); + if (nsent > 0) { + io->sendfile_remain -= nsent; + } + return nsent; +} + static void nio_read(hio_t* io) { // printd("nio_read fd=%d\n", io->fd); void* buf; @@ -361,6 +424,10 @@ static void nio_write(hio_t* io) { hrecursive_mutex_lock(&io->write_mutex); write: if (write_queue_empty(&io->write_queue)) { + // Check for pending sendfile after write queue is drained + if (io->sendfile_fd >= 0 && io->sendfile_remain > 0) { + goto do_sendfile; + } hrecursive_mutex_unlock(&io->write_mutex); if (io->close) { io->close = 0; @@ -407,6 +474,33 @@ static void nio_write(hio_t* io) { } hrecursive_mutex_unlock(&io->write_mutex); return; + +do_sendfile: + { + ssize_t nsent = __nio_sendfile(io); + if (nsent < 0) { + err = socket_errno(); + if (err == EAGAIN || err == EINTR) { + hrecursive_mutex_unlock(&io->write_mutex); + return; + } + io->error = err; + goto write_error; + } + if (nsent == 0 && (io->io_type & HIO_TYPE_SOCK_STREAM)) { + goto disconnect; + } + int complete = (io->sendfile_remain == 0); + hrecursive_mutex_unlock(&io->write_mutex); + if (nsent > 0) { + __write_cb(io, NULL, nsent); + } + if (complete) { + io->sendfile_fd = -1; + } + return; + } + write_error: disconnect: hrecursive_mutex_unlock(&io->write_mutex); @@ -426,9 +520,10 @@ static void hio_handle_events(hio_t* io) { } if ((io->events & HV_WRITE) && (io->revents & HV_WRITE)) { - // NOTE: del HV_WRITE, if write_queue empty + // NOTE: del HV_WRITE, if write_queue empty and no pending sendfile hrecursive_mutex_lock(&io->write_mutex); - if (write_queue_empty(&io->write_queue)) { + if (write_queue_empty(&io->write_queue) && + !(io->sendfile_fd >= 0 && io->sendfile_remain > 0)) { hio_del(io, HV_WRITE); } hrecursive_mutex_unlock(&io->write_mutex); @@ -590,6 +685,83 @@ int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) { return hio_write4(io, buf, len, addr ? addr : io->peeraddr); } +int hio_sendfile (hio_t* io, int in_fd, off_t offset, size_t length) { + if (io->closed) { + hloge("hio_sendfile called but fd[%d] already closed!", io->fd); + return -1; + } + if (in_fd < 0) { + hloge("hio_sendfile invalid file descriptor: %d", in_fd); + return -1; + } + if (length == 0) return 0; + + // SSL: fall back to read + hio_write (sendfile cannot bypass SSL encryption) + // NOTE: hio_write is non-blocking and queues data internally, + // so this won't block the event loop even for large files. + if (io->io_type == HIO_TYPE_SSL) { + char buf[65536]; + off_t cur_offset = offset; + size_t remaining = length; + while (remaining > 0) { + size_t to_read = remaining < sizeof(buf) ? remaining : sizeof(buf); + ssize_t nread = pread(in_fd, buf, to_read, cur_offset); + if (nread < 0) { + hloge("hio_sendfile pread error: %s", strerror(errno)); + return -1; + } + if (nread == 0) { + hlogw("hio_sendfile: unexpected EOF at offset %lld", (long long)cur_offset); + break; + } + int nwrite = hio_write(io, buf, nread); + if (nwrite < 0) return nwrite; + cur_offset += nread; + remaining -= nread; + } + return 0; + } + + hrecursive_mutex_lock(&io->write_mutex); + + io->sendfile_fd = in_fd; + io->sendfile_offset = offset; + io->sendfile_remain = length; + + // If write queue is empty, try sendfile immediately + if (write_queue_empty(&io->write_queue)) { + ssize_t nsent = __nio_sendfile(io); + if (nsent < 0) { + int err = socket_errno(); + if (err != EAGAIN && err != EINTR) { + io->error = err; + io->sendfile_fd = -1; + hrecursive_mutex_unlock(&io->write_mutex); + hio_close_async(io); + return -1; + } + } + if (nsent > 0) { + int complete = (io->sendfile_remain == 0); + hrecursive_mutex_unlock(&io->write_mutex); + __write_cb(io, NULL, nsent); + if (complete) { + io->sendfile_fd = -1; + return 0; + } + hrecursive_mutex_lock(&io->write_mutex); + } + } + + // If still remaining, register for writable event + if (io->sendfile_remain > 0) { + hio_add(io, hio_handle_events, HV_WRITE); + } + + hrecursive_mutex_unlock(&io->write_mutex); + return 0; +} + int hio_close (hio_t* io) { if (io->closed) return 0; if (io->destroy == 0 && hv_gettid() != io->loop->tid) { @@ -611,6 +783,8 @@ int hio_close (hio_t* io) { return 0; } io->closed = 1; + io->sendfile_fd = -1; + io->sendfile_remain = 0; hrecursive_mutex_unlock(&io->write_mutex); hio_done(io); diff --git a/event/overlapio.c b/event/overlapio.c index 61f24937f..da563fe71 100644 --- a/event/overlapio.c +++ b/event/overlapio.c @@ -387,6 +387,29 @@ int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) { return hio_write4(io, buf, len, addr ? addr : io->peeraddr); } +int hio_sendfile (hio_t* io, int in_fd, off_t offset, size_t length) { + if (io->closed) return -1; + if (in_fd < 0) return -1; + if (length == 0) return 0; + // NOTE: Windows fallback uses read + hio_write. + // hio_write is non-blocking (queues via IOCP), so this won't block. + char buf[65536]; + off_t cur_offset = offset; + size_t remaining = length; + while (remaining > 0) { + size_t to_read = remaining < sizeof(buf) ? remaining : sizeof(buf); + if (_lseeki64(in_fd, cur_offset, SEEK_SET) < 0) return -1; + ssize_t nread = _read(in_fd, buf, (unsigned int)to_read); + if (nread < 0) return -1; + if (nread == 0) break; // EOF + int nwrite = hio_write(io, buf, nread); + if (nwrite < 0) return nwrite; + cur_offset += nread; + remaining -= nread; + } + return 0; +} + int hio_close (hio_t* io) { if (io->closed) return 0; io->closed = 1; diff --git a/http/server/HttpHandler.cpp b/http/server/HttpHandler.cpp index 005967328..f17f69f7b 100644 --- a/http/server/HttpHandler.cpp +++ b/http/server/HttpHandler.cpp @@ -659,6 +659,20 @@ int HttpHandler::defaultLargeFileHandler(const std::string &filepath) { // forbidden to send large file resp->content_length = 0; resp->status_code = HTTP_STATUS_FORBIDDEN; + } else if (service->limit_rate < 0 && file->fp && fileno(file->fp) >= 0) { + // unlimited: use zero-copy sendfile + int filefd = fileno(file->fp); + size_t length = resp->content_length; + writer->EndHeaders(); + writer->onwrite = [this](HBuf* buf) { + if (writer->isWriteComplete()) { + resp->content_length = 0; + writer->End(); + closeFile(); + } + }; + hio_sendfile(io, filefd, 0, length); + return HTTP_STATUS_UNFINISHED; } else { size_t bufsize = 40960; // 40K file->buf.resize(bufsize);