Skip to content

Un-equal behavior of onData/OnDrain in http_t on server and client sides #25

@lsergiy

Description

@lsergiy

I built small test app starting HTTP servers on different interfaces on dedicated server thread and calling them from different client thread. Server side and client side have same http_t cli parameter with is used to receive and send data from/to network socket. I am naive enough to expect these code on client and server side to bee very similar to each other. This seems impossible because http_t::onData and http_t::onDrain are not called on server side at all. This means I cannot receive big data on server in asynchronous manner. I had to read data on server using synchronous auto data = cli.read(); code. On client side, both synchronous and asynchronous approaches work. On server side - not. I would be happy to use http_t::onData and http_t::onDrain on server side to accumulate big data in a same way I do on client.
Here is my test code:

#pragma warning( push )
#pragma GCC diagnostic ignored "-Wdeprecated"
    // notice: warning suppression here os applied only due to latest Apple's clang on OSX
    #include <nodepp/nodepp.h>
    #include <nodepp/http.h>
    #include <nodepp/date.h>
#pragma warning( pop )

using namespace nodepp;

struct server_settings_t {
    std::string str_listen_address_;
    int n_listen_port_;
};
typedef std::vector < server_settings_t >  vec_server_settings_t;

static void stat_http_server( const vec_server_settings_t & vec_server_settings, const bool & is_continue_wait ) {
    typedef std::vector < tcp_t > vec_servers_t;
    vec_servers_t vec_servers;
    for( const server_settings_t & server_settings : vec_server_settings ) {
        std::string str_listen_address = server_settings.str_listen_address_;
        int n_listen_port = server_settings.n_listen_port_;
        std::string str_server_url = std::format( "http://{}:{}", str_listen_address, n_listen_port );
        console::log( std::format( "server thread will start server for URL {}...", str_server_url ) );
        vec_servers.emplace_back( http::server( [ =, &is_continue_wait ] ( http_t cli ) {
            console::log( std::format( "server side has client connected as {}", s2s( cli.headers["Host"] ) ) );
            console::log( std::format( "start of handler proc {} {}", s2s( cli.path ), cli.get_fd() ) );
            //
            const auto arr_header_keys = cli.headers.keys();
            for( const auto k : arr_header_keys )
                console::log( std::format( "   - server side header \"{}\" = \"{}\"", s2s( k ), s2s( cli.headers[k] ) ) );
            //
            auto data = cli.read();
            console::log( std::format( console::log( std::format( "direct - server side has data: {}", s2s( data ) ) ) );
            //
            // cli.onData( [&] ( string_t a_chunk ) {
            //     console::log( std::format( "onData - server side received data chunk: {}", s2s( a_chunk ) ) );
            //     // if( p_is_continue_wait != nullptr )
            //     //     (*p_is_continue_wait) = false;
            // } );
            cli.onDrain( [=] () {
                console::log( std::format( "--> server side connection drained" ) );
                //
                // cli.write_header( 200, header_t( {
                //     { "content-type", "text/html" }
                // } ) );
                // cli.write( s2s( std::format( "!!! This is reply from server, {}", s2s( date::fulltime() ) ) ) );
                // cli.close();
            } );
            cli.onClose( [&] () {
                console::log( std::format( "--> server side connection closed" ) );
                // if( p_is_continue_wait != nullptr )
                //     (*p_is_continue_wait) = false;
            });
            //
            cli.write_header( 200, header_t( {
                { "content-type", "text/html" }
            } ) );
            cli.write( s2s( std::format( "!!! This is reply from server, {}", s2s( date::fulltime() ) ) ) );
            cli.close();
            //
            console::log( std::format( "end of handler proc {} {}", s2s( cli.path ), cli.get_fd() ) );
        } ) );
        tcp_t & server = vec_servers.back();
        server.listen( s2s( str_listen_address ), n_listen_port, [&] ( socket_t server ) {
            console::log( std::format( "Server listener started at {}", str_server_url ) );
        } );
    }
    //
    console::log( std::format( "Server thread will wait..." ) );
    while( is_continue_wait )
        process::next();
    console::log( std::format( "End of server thread" ) );
}

static void stat_http_client( const char * str_connect_host, const int n_connect_port, bool * p_is_continue_wait ) {
    std::string str_connect_url = std::format( "http://{}:{}", str_connect_host, n_connect_port );
    console::log( std::format( "start of client thread for {}", str_connect_url ) );
    fetch_t args;
    args.method = "GET"; // "GET"; // "POST";
    args.url = s2s( str_connect_url );
    // args.headers = header_t( {
    //     { "Host", url::host( args.url ) }
    // } );
    args.body = s2s( std::format( "!!! This is request body from client, {}", s2s( date::fulltime() ) ) );
    http::fetch( args )
        .then( [&] ( http_t cli ) {
            console::log( std::format( "client side did connected to {}", str_connect_url ) );
            //
            const auto arr_header_keys = cli.headers.keys();
            for( const auto k : arr_header_keys )
                console::log( std::format( "   - client side header \"{}\" = \"{}\"", s2s( k ), s2s( cli.headers[k] ) ) );
            //
            // auto data = cli.read();
            // console::log( std::format( "direct - client side has data: {}", s2s( data ) ) );
            //
            cli.onData( [&] ( string_t a_chunk ) {
                console::log( std::format( "onData - client side received data chunk: {}", s2s( a_chunk ) ) );
                if( p_is_continue_wait != nullptr )
                    (*p_is_continue_wait) = false;
            } );
            cli.onDrain( [&] () {
                console::log( std::format( "--> client side connection drained" ) );
            } );
            cli.onClose( [&] () {
                console::log( std::format( "--> client side connection closed" ) );
                if( p_is_continue_wait != nullptr )
                    (*p_is_continue_wait) = false;
            });
            stream::pipe( cli );
            cli.write( s2s( std::format( "!!! This is data from client, {}", s2s( date::fulltime() ) ) ) );
        } )
        .fail( [&] ( except_t err ) {
            console::log( std::format( "client fetch error: {}", err.what() ) );
            if( p_is_continue_wait != nullptr )
                (*p_is_continue_wait) = false;
        } );
    // std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
    if( p_is_continue_wait != nullptr ) {
        console::log( std::format( "client thread for {} will wait...", str_connect_url ) );
        while( (*p_is_continue_wait) )
            process::next();
    }
    console::log( std::format( "end of client thread for {}", str_connect_url ) );
}

TEST( test_network_layer, test_nodepp_http_00 ) { // notice: this is gtest test
    static const int n_port = 8000;
    bool is_continue_wait = true;
    std::jthread server_thread( [&] () {
        stat_http_server( {
            { "[::1]", n_port },
            { "127.0.0.1", n_port }
        }, is_continue_wait );
    } );
    std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
    // stat_http_client( "127.0.0.1", n_port, nullptr );
    // stat_http_client( "[::1]", n_port, nullptr );
    stat_http_client( "127.0.0.1", n_port, nullptr );
    stat_http_client( "[::1]", n_port, &is_continue_wait );
}

You can notice, server side is a bit different and I had to commend asynchronous code and replace it to make test working.
Additionally, please suggest a way to to pipe-stream big memory buffer when:

  • answering from server
  • calling from client with huge POST body

It would be also nice to figure out HTTP method on server side using some easy way like presence of "methid" header or some property of some data structure, I was unable to figure out if this already done in Nodepp?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions