# # # patch "README.nuskool" # from [37607bce4fb4f81331a1878c8739294d3bbabf8f] # to [8f46f9b68913ac1bd66c0983266afa1f6355700b] # # patch "cmd_netsync.cc" # from [2ef7eec07debb123c3f5e05fc4f0602a6fd15b19] # to [b7569a40bb72b7e7191c234cb1644bb7c0ca79d8] # # patch "cmd_scgi.cc" # from [c628910886e8e6a8be7a957cb3a8d0e83cf7629d] # to [5c90b4a62489274184aa09143475be6b0023a930] # # patch "gsync.cc" # from [9a94a20ce0017c3c268935924fa044b2b467f6d3] # to [fb99b07bf594689a1ff6dec788432e8abb172b2a] # # patch "http.cc" # from [852d451ec80ab2d935e7a369bb7125481a57a779] # to [fa7a0679195ed9d461ab9ea0826b1516ad262644] # # patch "http.hh" # from [c340ecd627d001bf0b3375a8a6e202509711f58f] # to [4e2113742fb5f3d34d0ee800aca448262194e35c] # # patch "http_client.cc" # from [d93bba1032bf5b94648705eeee7b58d38e0901dd] # to [4aed572910a76dd286a7dc66780746c15d36f551] # # patch "http_client.hh" # from [0d36dc926a20df01f225f45e143f671c4e2520f6] # to [4d2456596cb320d884f93e29b25010b1cdaa576a] # ============================================================ --- README.nuskool 37607bce4fb4f81331a1878c8739294d3bbabf8f +++ README.nuskool 8f46f9b68913ac1bd66c0983266afa1f6355700b @@ -38,3 +38,36 @@ 2. run the gsync client and specify the $ mtn --db client.db gsync http://localhost:8008 '*' --debug + + +Here's a quick tour through the nuskool code base: + +* cmd_scgi.cc (gserve): server command to start gsync server + (process_request): called to handle each inbound http request and return a + suitable response + +* cmd_netsync.cc (gsync): client command to push/pull/sync using gsync; + selection of http channel is currently hard-coded here + +* gsync.{cc,hh} (run_gsync_protocol): main protocol driver called by gsync + command above + (channel): declaration of generic channel + +* http_client.{cc,hh}: http request/response execution; implementation of raw + and json channels + +* http.{cc,hh}: generic http request/response io and connection handling + +* json_msgs.{cc,hh}: json messages used with gsync protocol; transcoding between + existing revision formats and equivalent json representations for transport + over http + +* json_io.{cc,hh}: json message printing and parsing + + +TODO + +* replace various connection options (--bind, --http, etc.) with nice url's +* do something rsync or netsync like to handle certs after rev transmission +* see if this can be made to go as fast as netsync +* possibly remove the overly verbose and repetetive json stuff ============================================================ --- cmd_netsync.cc 2ef7eec07debb123c3f5e05fc4f0602a6fd15b19 +++ cmd_netsync.cc b7569a40bb72b7e7191c234cb1644bb7c0ca79d8 @@ -540,7 +540,8 @@ CMD(gsync, "gsync", "", CMD_REF(network) netsync_connection_info info; extract_client_connection_info(app.opts, app.lua, db, keys, args, info); - // how do we select between json and basic_io? + // FIXME how do we select between json_channel with json messages and + // raw_channel with old basic_io messages? http_client h(app.opts, app.lua, info); run_gsync_protocol(app.lua, db, json_channel(h), ============================================================ --- cmd_scgi.cc c628910886e8e6a8be7a957cb3a8d0e83cf7629d +++ cmd_scgi.cc 5c90b4a62489274184aa09143475be6b0023a930 @@ -12,6 +12,8 @@ #include #include +#include + #include "app_state.hh" #include "cmd.hh" #include "constants.hh" @@ -50,6 +52,7 @@ using boost::lexical_cast; using std::vector; using boost::lexical_cast; +using boost::shared_ptr; // SCGI interface is pretty straightforward // @@ -270,90 +273,187 @@ do_cmd(database & db, json_io::json_obje } -void -process_json_request(database & db, - http::request const & request, http::response & response) +struct request_handler { - json_io::input_source in(request.body, "json"); - json_io::tokenizer tok(in); - json_io::parser p(tok); - json_io::json_object_t obj = p.parse_object(); + string method; + http::header_map headers; + request_handler(string const & method) : method(method) {} + virtual ~request_handler() {} + virtual void execute(database & db, + http::request const & request, + http::response & response) const = 0; - if (static_cast(obj)) - { - transaction_guard guard(db); - L(FL("read JSON object")); + bool verify_method(http::request const & request) const + { + return method == request.method; + } - json_io::json_value_t res = do_cmd(db, obj); // process json request + bool verify_headers(http::request const & request) const + { + for (http::header_iterator i = headers.begin(); i != headers.end(); ++i) + { + http::header_iterator j = request.headers.find(i->first); + if (j == request.headers.end()) return false; + if (j->second != i->second) return false; + } + return true; + } - if (static_cast(res)) - { - json_io::printer out_data; - res->write(out_data); - L(FL("sending JSON %d-byte response") % (out_data.buf.size())); +}; - response.version = http::version; - response.status_code = 200; - response.status_message = "OK"; - response.headers["Connection"] = "close"; - response.headers["Status"] = "200 OK"; - response.headers["Content-Length"] = lexical_cast(out_data.buf.size()); - response.headers["Content-Type"] = "application/jsonrequest"; - response.body = out_data.buf; - } - } - else - { - // FIXME: do something better for reporting errors from the server - std::cerr << "parse error" << std::endl; - } -} +typedef map > handler_map; +struct json_handler : public request_handler +{ + json_handler() : request_handler(http::post) + { + headers["Content-Type"] = "application/jsonrequest"; + headers["Accept"] = "application/jsonrequest"; + } + + void execute(database & db, + http::request const & request, http::response & response) const + { + json_io::input_source in(request.body, "json"); + json_io::tokenizer tok(in); + json_io::parser p(tok); + json_io::json_object_t obj = p.parse_object(); + + if (static_cast(obj)) + { + transaction_guard guard(db); + L(FL("read JSON object")); + + json_io::json_value_t res = do_cmd(db, obj); // process json request + + if (static_cast(res)) + { + json_io::printer out_data; + res->write(out_data); + L(FL("sending JSON %d-byte response") % (out_data.buf.size())); + + response.status = http::status::ok; + response.headers["Content-Type"] = "application/jsonrequest"; + response.body = out_data.buf; + } + // else ?!? + } + else + { + response.status = http::status::bad_request; + } + } +}; + +struct inquire_handler : public request_handler +{ + inquire_handler() : request_handler(http::post) + { + headers["Content-Type"] = "text/plain"; + headers["Accept"] = "text/plain"; + } + + void execute(database & db, + http::request const & request, http::response & response) const + { + } +}; + + void -process_request(database & db, http::connection & connection) +process_request(database & db, http::connection & connection, + handler_map const & handlers) { http::request request; http::response response; + // 411 Length Required -- this should be in the reader if (connection.read(request)) { try { - if (request.method == http::post && - request.headers["Content-Type"] == "application/jsonrequest") - process_json_request(db, request, response); + // note that the following uri's may be prefixed with a scgi mount + // point such as "/monotone" from the lighttpd.conf. the + // strip-request-uri option sounds like it could help with this + // but doesn't seem to work and apache doesn't seem to have any + // configurable way of removing the mount point. this should + // possibly be using PATH_INFO instead of REQUEST_URI or should be + // stripping the prefix as specified in the service url. + + string uri = request.uri; + L(FL("checking uri: %s") % uri); + if (uri.find("/monotone") != string::npos) + { + // FIXME: this assumes the scgi mount point is /monotone! + uri = uri.substr(9); + L(FL("removed uri prefix: %s") % uri); + } + + size_t pos = uri.find_last_of('/'); + string id; + if (pos != string::npos && pos != 0) + { + id = uri.substr(pos+1); + uri = uri.substr(0, pos); + L(FL("split uri: %s + %s") % uri % id); + } + + handler_map::const_iterator h = handlers.find(uri); + + // FIXME make handler_map a std::multimap with url as the key + // + // (1) lookup set of handlers for a given url + // (2) remove handlers not matching method + // return method_not_allowed if no remaining handlers + // (3) remove handlers not matching headers + // return not_acceptable if no remaining handlers + // + // if more than one handler remains return internal_server_error + + if (h != handlers.end()) + { + request_handler const & handler = *(h->second); + + if (!handler.verify_method(request)) + { + response.status = http::status::method_not_allowed; + } + else if (!handler.verify_headers(request)) + { + response.status = http::status::not_acceptable; + } + else + handler.execute(db, request, response); + } else - I(false); + { + response.status = http::status::not_found; + } } catch (gserve_error & e) { std::cerr << "gserve error -- " << e.msg << std::endl; - response.version = connection.version(); - response.status_code = 500; - response.status_message = "Internal Server Error"; - response.headers["Status"] = "500 Internal Server Error"; + response.status = http::status::internal_server_error; } catch (recoverable_failure & e) { std::cerr << "recoverable failure -- " << e.what() << std::endl; - response.version = connection.version(); - response.status_code = 500; - response.status_message = "Internal Server Error"; - response.headers["Status"] = "500 Internal Server Error"; + response.status = http::status::internal_server_error; } } else { - std::cerr << "bad request" << std::endl; + response.status = http::status::bad_request; + } - response.version = connection.version(); - response.status_code = 400; - response.status_message = "Bad Request"; - response.headers["Status"] = "400 Bad Request"; - } + response.version = connection.version(); + response.headers["Status"] = + lexical_cast(response.status.code) + " " + response.status.message; + response.headers["Content-Length"] = + lexical_cast(response.body.size()); + // Connection: close ?!? connection.write(response); - } @@ -400,6 +500,36 @@ CMD_NO_WORKSPACE(gserve, // C else if (!app.opts.bind_stdio) W(F("The --no-transport-auth option is usually only used in combination with --stdio")); + handler_map handlers; + handlers["/"] = shared_ptr(new json_handler); + + handlers["/inquire"] = shared_ptr(new inquire_handler); + + // POST / Accept/Content-Type: application/jsonrequest + + // POST /inquire Accept/Content-Type: text/plain + // POST /descendants Accept/Content-Type: text/plain + + // GET /revision/... Accept: text/plain + // PUT /revision/... Content-Type: text/plain + + // GET /revision/... Accept: multipart/mixed; boundary=... + // PUT /revision/... Content-Type: multipart/mixed; boundary=... + + // GET /data/... Accept: application/octet-stream + // PUT /data/... Content-Type: application/octet-stream + + // GET /delta/.-. Accept: application/octet-stream + // PUT /delta/.-. Content-Type: application/octet-stream + + // GET /certs/... Accept: application/octet-stream + // PUT /certs/... Content-Type: application/octet-stream + + // GET /key/... Accept: application/octet-stream + // PUT /key/... Content-Type: application/octet-stream + + // allow multipart GET/PUT revisions that include all associated file data + // if (app.opts.bind_stdio) // process_request(type, db, std::cin, std::cout); // else @@ -456,12 +586,12 @@ CMD_NO_WORKSPACE(gserve, // C if (app.opts.bind_http) { http::connection connection(io); - process_request(db, connection); + process_request(db, connection, handlers); } else { scgi::connection connection(io); - process_request(db, connection); + process_request(db, connection, handlers); } stream.close(); } ============================================================ --- gsync.cc 9a94a20ce0017c3c268935924fa044b2b467f6d3 +++ gsync.cc fb99b07bf594689a1ff6dec788432e8abb172b2a @@ -441,10 +441,10 @@ run_gsync_protocol(lua_hooks & lua, data // always received before child revisions. if (pushing) - push_full_revs(db, ch, outbound_revs, dryrun); + push_revs(db, ch, outbound_revs, dryrun); if (pulling) - pull_full_revs(db, ch, inbound_revs, dryrun); + pull_revs(db, ch, inbound_revs, dryrun); } #ifdef BUILD_UNIT_TESTS ============================================================ --- http.cc 852d451ec80ab2d935e7a369bb7125481a57a779 +++ http.cc fa7a0679195ed9d461ab9ea0826b1516ad262644 @@ -19,8 +19,6 @@ using boost::lexical_cast; using boost::lexical_cast; -typedef http::header_map::const_iterator header_iterator; - namespace http { @@ -61,12 +59,12 @@ namespace http { bool good = read(r.version, " ") && - read(r.status_code, " ") && - read(r.status_message, "\r\n"); + read(r.status.code, " ") && + read(r.status.message, "\r\n"); if (good) L(FL("read http response: %s %s %s") - % r.version % r.status_code % r.status_message); + % r.version % r.status.code % r.status.message); return good && read_headers(r) && read_body(r); } @@ -74,10 +72,10 @@ namespace http void connection::write(response const & r) { - L(FL("write http response: %s %s %s") % r.version % r.status_code % r.status_message); + L(FL("write http response: %s %s %s") % r.version % r.status.code % r.status.message); write(r.version, " "); - write(r.status_code, " "); - write(r.status_message, "\r\n"); + write(r.status.code, " "); + write(r.status.message, "\r\n"); write_headers(r); write_body(r); @@ -149,7 +147,11 @@ namespace http connection::read_body(message & m) { if (m.headers.find("Content-Length") == m.headers.end()) - return false; + { + L(FL("missing content length header")); + // FIXME return 411 Length Required here + return false; + } size_t length = lexical_cast(m.headers["Content-Length"]); L(FL("reading http body: %d bytes") % length); ============================================================ --- http.hh c340ecd627d001bf0b3375a8a6e202509711f58f +++ http.hh 4e2113742fb5f3d34d0ee800aca448262194e35c @@ -15,12 +15,40 @@ namespace http namespace http { + static std::string const version("HTTP/1.1"); + static std::string const post("POST"); static std::string const get("GET"); static std::string const put("PUT"); + namespace status + { + struct value + { + value() : code(0), message("") {} + value(size_t code, std::string const message) : code(code), message(message) {} + size_t code; + std::string message; + + bool operator==(value const & other) const + { + return code == other.code; + } + }; + + static const value ok(200, "OK"); + + static const value bad_request(400, "Bad Request"); + static const value not_found(404, "Not Found"); + static const value method_not_allowed(405, "Method Not Allowed"); + static const value not_acceptable(406, "Not Acceptable"); + + static const value internal_server_error(500, "Internal Server Error"); + } + typedef std::map header_map; + typedef header_map::const_iterator header_iterator; struct message { @@ -46,8 +74,7 @@ namespace http struct response : public message { std::string version; - size_t status_code; - std::string status_message; + status::value status; }; class connection ============================================================ --- http_client.cc d93bba1032bf5b94648705eeee7b58d38e0901dd +++ http_client.cc 4aed572910a76dd286a7dc66780746c15d36f551 @@ -99,6 +99,9 @@ http_client::execute(http::request const // the uri in this request is relative to the server uri and needs to be adjusted http::connection connection(*io); + + P(F("http request: %s %s %s") % request.method % request.uri % request.version); + connection.write(request); if (io->good()) @@ -110,8 +113,12 @@ http_client::execute(http::request const if (io->eof()) L(FL("connection is eof")); + // FIXME: this should really attempt to pipeline several requests + I(connection.read(response)); + P(F("http response: %s %d %s") % response.version % response.status.code % response.status.message); + if (io->good()) L(FL("connection is good")); if (io->bad()) @@ -144,12 +151,155 @@ http_client::execute(http::request const open = false; } - E(response.status_code == 200, origin::network, + E(response.status == http::status::ok, origin::network, + // E(response.status_code == 200, origin::network, F("request failed: %s %d %s") - % response.version % response.status_code % response.status_message); + % response.version % response.status.code % response.status.message); } ///////////////////////////////////////////////////////////////////// +// raw_channel adaptor +///////////////////////////////////////////////////////////////////// + +void +raw_channel::inquire_about_revs(set const & query_set, + set & theirs) const +{ + theirs.clear(); + ostringstream request_data; + + for (set::const_iterator i = query_set.begin(); + i != query_set.end(); ++i) + request_data << *i << "\n"; + + string body = request_data.str(); + + http::request request; + http::response response; + + request.method = http::post; + request.uri = client.resolve("/inquire"); + request.version = http::version; + request.headers["Content-Type"] = "text/plain"; + request.headers["Content-Length"] = lexical_cast(body.size()); + + request.body = body; + + client.execute(request, response); + + istringstream response_data(response.body); + while (response_data.good()) + { + string rev; + std::getline(response_data, rev); + theirs.insert(revision_id(decode_hexenc(rev, origin::network), origin::network)); + } +} + +void +raw_channel::get_descendants(set const & common_revs, + vector & inbound_revs) const +{ + inbound_revs.clear(); + ostringstream request_data; + + for (set::const_iterator i = common_revs.begin(); + i != common_revs.end(); ++i) + request_data << *i << "\n"; + + string body = request_data.str(); + + http::request request; + http::response response; + + request.method = http::post; + request.uri = client.resolve("/descendants"); + request.version = http::version; + request.headers["Content-Type"] = "text/plain"; + request.headers["Content-Length"] = lexical_cast(body.size()); + //request.headers["Accept"] = "text/plain"; + //request.headers["Accept-Encoding"] = "identity"; + // FIXME: put a real host value in here (lighttpd seems to require a host header) + request.headers["Host"] = "localhost"; + request.body = body; + + client.execute(request, response); + + istringstream response_data(response.body); + while (response_data.good()) + { + string rev; + std::getline(response_data, rev); + inbound_revs.push_back(revision_id(decode_hexenc(rev, origin::network), origin::network)); + } +} + +void +raw_channel::push_full_rev(revision_id const & rid, + revision_t const & rev, + vector const & data_records, + vector const & delta_records) const +{ + // file data records + // file delta records + // revision text + + // name args length\n + // data +} + +void +raw_channel::pull_full_rev(revision_id const & rid, + revision_t & rev, + vector & data_records, + vector & delta_records) const +{ + // file data records + // file delta records + // revision text + + // name args length\n + // data +} + +void +raw_channel::push_file_data(file_id const & id, + file_data const & data) const +{ +} + +void +raw_channel::push_file_delta(file_id const & old_id, + file_id const & new_id, + file_delta const & delta) const +{ +} + +void +raw_channel::push_rev(revision_id const & rid, + revision_t const & rev) const +{ +} + +void +raw_channel::pull_rev(revision_id const & rid, revision_t & rev) const +{ +} + +void +raw_channel::pull_file_data(file_id const & id, + file_data & data) const +{ +} + +void +raw_channel::pull_file_delta(file_id const & old_id, + file_id const & new_id, + file_delta & delta) const +{ +} + +///////////////////////////////////////////////////////////////////// // json_channel adaptor ///////////////////////////////////////////////////////////////////// ============================================================ --- http_client.hh 0d36dc926a20df01f225f45e143f671c4e2520f6 +++ http_client.hh 4d2456596cb320d884f93e29b25010b1cdaa576a @@ -46,6 +46,46 @@ struct http_client void execute(http::request const & request, http::response & response); }; +class raw_channel + : public channel +{ + http_client & client; +public: + raw_channel(http_client & c) + : client(c) + { }; + + virtual void inquire_about_revs(std::set const & query_set, + std::set & theirs) const; + virtual void get_descendants(std::set const & common_revs, + std::vector & inbound_revs) const; + + virtual void push_full_rev(revision_id const & rid, + revision_t const & rev, + std::vector const & data_records, + std::vector const & delta_records) const; + + virtual void pull_full_rev(revision_id const & rid, + revision_t & rev, + std::vector & data_records, + std::vector & delta_records) const; + + virtual void push_file_data(file_id const & id, + file_data const & data) const; + virtual void push_file_delta(file_id const & old_id, + file_id const & new_id, + file_delta const & delta) const; + + virtual void push_rev(revision_id const & rid, revision_t const & rev) const; + virtual void pull_rev(revision_id const & rid, revision_t & rev) const; + + virtual void pull_file_data(file_id const & id, + file_data & data) const; + virtual void pull_file_delta(file_id const & old_id, + file_id const & new_id, + file_delta & delta) const; +}; + class json_channel : public channel {