=================================================================== Index: lua_hooks.cc --- lua_hooks.cc ff4053ed166f572c179628ec23c9ef263484e234 +++ lua_hooks.cc 72ce1775c14495adca13b80403bd6a3c669f7726 @@ -543,6 +543,107 @@ lua_hooks::hook_get_netsync_key(utf8 con return exec_ok; } +#if defined(RSE) /* netsync-hooks */ +bool lua_hooks::hook_netsync_process_data_epoch(string session_id, string branch, string epoch, string &result) +{ + string res; + bool exec_ok = Lua(st) + .func("hook_netsync_process_data_epoch") + .push_str(session_id) + .push_str(branch) + .push_str(epoch) + .call(3, 1) + .extract_str(res) + .ok(); + if (exec_ok) + result = res; + return exec_ok; +} + +bool lua_hooks::hook_netsync_process_data_key(string session_id, string keyid, string keydata, string &result) +{ + string res; + bool exec_ok = Lua(st) + .func("hook_netsync_process_data_key") + .push_str(session_id) + .push_str(keyid) + .push_str(keydata) + .call(3, 1) + .extract_str(res) + .ok(); + if (exec_ok) + result = res; + return exec_ok; +} + +bool lua_hooks::hook_netsync_process_data_cert(string session_id, string revid, string keyid, string name, string value, string &result) +{ + string res; + bool exec_ok = Lua(st) + .func("hook_netsync_process_data_cert") + .push_str(session_id) + .push_str(revid) + .push_str(keyid) + .push_str(name) + .push_str(value) + .call(5, 1) + .extract_str(res) + .ok(); + if (exec_ok) + result = res; + return exec_ok; +} + +bool lua_hooks::hook_netsync_process_data_revision(string session_id, string revid, string revdata, string &result) +{ + string res; + bool exec_ok = Lua(st) + .func("hook_netsync_process_data_revision") + .push_str(session_id) + .push_str(revid) + .push_str(revdata) + .call(3, 1) + .extract_str(res) + .ok(); + if (exec_ok) + result = res; + return exec_ok; +} + +bool lua_hooks::hook_netsync_process_data_file(string session_id, string fileid, string filedata, string &result) +{ + string res; + bool exec_ok = Lua(st) + .func("hook_netsync_process_data_file") + .push_str(session_id) + .push_str(fileid) + .push_str(filedata) + .call(3, 1) + .extract_str(res) + .ok(); + if (exec_ok) + result = res; + return exec_ok; +} + +bool lua_hooks::hook_netsync_process_data_delta(string session_id, string newfileid, string oldfileid, string delta, string &result) +{ + string res; + bool exec_ok = Lua(st) + .func("hook_netsync_process_data_delta") + .push_str(session_id) + .push_str(newfileid) + .push_str(oldfileid) + .push_str(delta) + .call(4, 1) + .extract_str(res) + .ok(); + if (exec_ok) + result = res; + return exec_ok; +} +#endif + static void push_uri(uri const & u, Lua & ll) { =================================================================== Index: lua_hooks.hh --- lua_hooks.hh 0f79afa4f23d64a221b23ef4b39766528fec8a14 +++ lua_hooks.hh 28a8dbaefd1d5370c22b198e94074c8e72bf9935 @@ -73,6 +73,14 @@ public: globish const & include, globish const & exclude, rsa_keypair_id & k); +#if defined(RSE) /* netsync-hooks */ + bool hook_netsync_process_data_epoch(std::string session_id, std::string branch, std::string epoch, std::string &result); + bool hook_netsync_process_data_key(std::string session_id, std::string keyid, std::string keydata, std::string &result); + bool hook_netsync_process_data_cert(std::string session_id, std::string revid, std::string keyid, std::string name, std::string value, std::string & result); + bool hook_netsync_process_data_revision(std::string session_id, std::string revid, std::string revdata, std::string & result); + bool hook_netsync_process_data_file(std::string session_id, std::string fileid, std::string filedata, std::string & result); + bool hook_netsync_process_data_delta(std::string session_id, std::string fileid, std::string fileid, std::string delta, std::string & result); +#endif bool hook_get_netsync_connect_command(uri const & u, globish const & include_pattern, globish const & exclude_pattern, =================================================================== Index: monotone.texi --- monotone.texi 11dbcb7685ca2369e9131eb7a4c2194f6b5619ae +++ monotone.texi 31fc60cfe3b2ef897169b5a8dad54baa6f2c862c @@ -8129,6 +8129,84 @@ @subsection Event Notifications and Trig @end table address@hidden hook_netsync_process_data_epoch(@var{session_id}, @var{branch}, @var{epoch}) + +Called by Monotone to check whether a received "epoch" data can be +processed (stored into the repository). By default no such Lua callback +function is defined and processing is granted. The return value of +the Lua callback function controls the further processing of the +data: "@code{accept}" accepts data (data is stored into repository), +"@code{ignore}" ignores data (data is silently skipped and not stored +into repository), "@code{abort:address@hidden" aborts session and commits +transaction (already received data is still stored into repository), +"@code{rollback:address@hidden" aborts session and rollbacks transaction +(no received data is stored into repository). + address@hidden hook_netsync_process_data_key(@var{session_id}, @var{keyid}, @var{keydata}) + +Called by Monotone to check whether a received (public) "key" data can be +processed (stored into the repository). By default no such Lua callback +function is defined and processing is granted. The return value of +the Lua callback function controls the further processing of the +data: "@code{accept}" accepts data (data is stored into repository), +"@code{ignore}" ignores data (data is silently skipped and not stored +into repository), "@code{abort:address@hidden" aborts session and commits +transaction (already received data is still stored into repository), +"@code{rollback:address@hidden" aborts session and rollbacks transaction +(no received data is stored into repository). + address@hidden hook_netsync_process_data_file(@var{session_id}, @var{file_id}, @var{filedata}) + +Called by Monotone to check whether a received "file" data can be +processed (stored into the repository). By default no such Lua callback +function is defined and processing is granted. The return value of +the Lua callback function controls the further processing of the +data: "@code{accept}" accepts data (data is stored into repository), +"@code{ignore}" ignores data (data is silently skipped and not stored +into repository), "@code{abort:address@hidden" aborts session and commits +transaction (already received data is still stored into repository), +"@code{rollback:address@hidden" aborts session and rollbacks transaction +(no received data is stored into repository). + address@hidden hook_netsync_process_data_delta(@var{session_id}, @var{file_old_id}, @var{file_new_id}, @var{delta}, @var{filedata}) + +Called by Monotone to check whether a received "delta" data can be +processed (stored into the repository). By default no such Lua callback +function is defined and processing is granted. The return value of +the Lua callback function controls the further processing of the +data: "@code{accept}" accepts data (data is stored into repository), +"@code{ignore}" ignores data (data is silently skipped and not stored +into repository), "@code{abort:address@hidden" aborts session and commits +transaction (already received data is still stored into repository), +"@code{rollback:address@hidden" aborts session and rollbacks transaction +(no received data is stored into repository). + address@hidden hook_netsync_process_data_revision(@var{session_id}, @var{revid}, @var{revdata}) + +Called by Monotone to check whether a received "revision" data can be +processed (stored into the repository). By default no such Lua callback +function is defined and processing is granted. The return value of +the Lua callback function controls the further processing of the +data: "@code{accept}" accepts data (data is stored into repository), +"@code{ignore}" ignores data (data is silently skipped and not stored +into repository), "@code{abort:address@hidden" aborts session and commits +transaction (already received data is still stored into repository), +"@code{rollback:address@hidden" aborts session and rollbacks transaction +(no received data is stored into repository). + address@hidden hook_netsync_process_data_cert(@var{session_id}, @var{revid}, @var{keyid}, @var{name}, @var{value}) + +Called by Monotone to check whether a received "cert" data can be +processed (stored into the repository). By default no such Lua callback +function is defined and processing is granted. The return value of +the Lua callback function controls the further processing of the +data: "@code{accept}" accepts data (data is stored into repository), +"@code{ignore}" ignores data (data is silently skipped and not stored +into repository), "@code{abort:address@hidden" aborts session and commits +transaction (already received data is still stored into repository), +"@code{rollback:address@hidden" aborts session and rollbacks transaction +(no received data is stored into repository). + @item note_netsync_revision_received (@var{new_id}, @var{revision}, @var{certs}, @var{session_id}) Called by monotone after the revision @var{new_id} is received through =================================================================== Index: netsync.cc --- netsync.cc 08f811440a008298f385e7d0cbd907e60c1421d3 +++ netsync.cc bcf57d0335ae40bd296c92427d70c40f80a2b8c8 @@ -286,6 +286,63 @@ struct netsync_error netsync_error(string const & s): msg(s) {} }; +#if defined(RSE) /* netsync-hooks */ +class session; +typedef enum { + db_queue_type_key, + db_queue_type_file, + db_queue_type_delta, + db_queue_type_revision, + db_queue_type_cert, +} db_queue_type; +class db_queue_obj { + public: + db_queue_type type; + virtual ~db_queue_obj() {} + virtual bool put(session *session) = 0; +}; +class db_queue_key: public db_queue_obj { + private: + rsa_keypair_id my_pub_id; + base64 my_pub_encoded; + public: + db_queue_key(rsa_keypair_id const &pub_id, base64 const &pub_encoded); + bool put(session *session); +}; +class db_queue_revision: public db_queue_obj { + private: + revision_id my_new_id; + revision_data my_rev; + public: + db_queue_revision(revision_id const &new_id, revision_data const &rev); + bool put(session *session); +}; +class db_queue_cert: public db_queue_obj { + private: + revision my_cert; + public: + db_queue_cert(revision const &cert); + bool put(session *session); +}; +class db_queue_file: public db_queue_obj { + private: + file_id my_id; + file_data my_dat; + public: + db_queue_file(file_id const &id, file_data const &dat); + bool put(session *session); +}; +class db_queue_delta: public db_queue_obj { + private: + file_id my_old_id; + file_id my_new_id; + file_delta my_del; + public: + db_queue_delta(file_id const &old_id, file_id const &new_id, file_delta const &del); + bool put(session *session); +}; +#endif + struct session: public refiner_callbacks, @@ -296,6 +353,10 @@ session: globish our_include_pattern; globish our_exclude_pattern; globish_matcher our_matcher; +#if defined(RSE) /* netsync-hooks */ + transaction_guard &guard; + vector db_queue; +#endif app_state & app; string peer_id; @@ -394,6 +455,9 @@ session: protocol_voice voice, globish const & our_include_pattern, globish const & our_exclude_pattern, +#if defined(RSE) /* netsync-hooks */ + transaction_guard &guard, +#endif app_state & app, string const & peer, shared_ptr sock, @@ -474,13 +538,28 @@ session: string const & signature); bool process_refine_cmd(refinement_type ty, merkle_node const & node); bool process_done_cmd(netcmd_item_type type, size_t n_items); +#if defined(RSE) /* netsync-hooks */ bool process_data_cmd(netcmd_item_type type, id const & item, + string const & dat, + transaction_guard & guard); +#else + bool process_data_cmd(netcmd_item_type type, + id const & item, string const & dat); +#endif +#if defined(RSE) /* netsync-hooks */ bool process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, + delta const & del, + transaction_guard & guard); +#else + bool process_delta_cmd(netcmd_item_type type, + id const & base, + id const & ident, delta const & del); +#endif bool process_usher_cmd(utf8 const & msg); // The incoming dispatcher. @@ -504,13 +583,116 @@ session: bool process(transaction_guard & guard); bool initiated_by_server; + +#if defined(RSE) /* netsync-hooks */ + bool put_key(rsa_keypair_id const &pub_id, base64 const &pub_encoded) { + db_queue_flush(); + db_queue.push_back(new db_queue_key(pub_id, pub_encoded)); + return true; + } + bool put_revision(revision_id const &new_id, revision_data const &rev) { + db_queue_flush(); + db_queue.push_back(new db_queue_revision(new_id, rev)); + return true; + } + bool put_file(file_id const &id, file_data const &dat) { + db_queue_flush(); + db_queue.push_back(new db_queue_file(id, dat)); + return true; + } + bool put_file_version(file_id const &old_id, file_id const &new_id, file_delta const &del) { + db_queue_flush(); + db_queue.push_back(new db_queue_delta(old_id, new_id, del)); + return true; + } + bool put_revision_cert(revision const &cert) { + db_queue.push_back(new db_queue_cert(cert)); + return true; + } + void db_queue_flush(void) { + if (!db_queue.empty()) + if (db_queue.back()->type == db_queue_type_cert) + db_queue_commit(); + } + bool db_queue_commit(void) { + vector::iterator iter; + for (iter = db_queue.begin(); iter != db_queue.end(); iter++) + (*iter)->put(this); + db_queue.clear(); + size_t sz = cmd.encoded_size(); + guard.maybe_checkpoint(sz); + return true; + } + bool db_queue_rollback(void) { + db_queue.clear(); + return true; + } +#endif }; + +#if defined(RSE) /* netsync-hook */ +db_queue_key::db_queue_key(rsa_keypair_id const &pub_id, base64 const &pub_encoded) { + type = db_queue_type_key; + my_pub_id = pub_id; + my_pub_encoded = pub_encoded; +} +bool db_queue_key::put(session *session) { + bool rc = session->app.db.put_key(my_pub_id, my_pub_encoded); + if (rc) + session->written_keys.push_back(my_pub_id); + return rc; +} +db_queue_revision::db_queue_revision(revision_id const &new_id, revision_data const &rev) { + type = db_queue_type_revision; + my_new_id = new_id; + my_rev = rev; +} +bool db_queue_revision::put(session *session) { + bool rc = session->app.db.put_revision(my_new_id, my_rev); + if (rc) + session->written_revisions.push_back(my_new_id); + return rc; +} +db_queue_cert::db_queue_cert(revision const &cert) { + type = db_queue_type_cert; + my_cert = cert; +} +bool db_queue_cert::put(session *session) { + bool rc = session->app.db.put_revision_cert(my_cert); + if (rc) + session->written_certs.push_back(my_cert.inner()); + return rc; +} +db_queue_file::db_queue_file(file_id const &id, file_data const &dat) { + type = db_queue_type_file; + my_id = id; + my_dat = dat; +} +bool db_queue_file::put(session *session) { + session->app.db.put_file(my_id, my_dat); + return true; +} +db_queue_delta::db_queue_delta(file_id const &old_id, file_id const &new_id, file_delta const &del) { + type = db_queue_type_delta; + my_old_id = old_id; + my_new_id = new_id; + my_del = del; +} +bool db_queue_delta::put(session *session) { + session->app.db.put_file_version(my_old_id, my_new_id, my_del); + return true; +} +#endif + size_t session::session_count = 0; session::session(protocol_role role, protocol_voice voice, globish const & our_include_pattern, globish const & our_exclude_pattern, +#if defined(RSE) /* netsync-hooks */ + transaction_guard &guard, +#endif app_state & app, string const & peer, shared_ptr sock, @@ -520,6 +702,9 @@ session::session(protocol_role role, our_include_pattern(our_include_pattern), our_exclude_pattern(our_exclude_pattern), our_matcher(our_include_pattern, our_exclude_pattern), +#if defined(RSE) /* netsync-hooks */ + guard(guard), +#endif app(app), peer_id(peer), str(sock), @@ -559,6 +744,9 @@ session::~session() session::~session() { +#if defined(RSE) /* netsync-hook */ + db_queue_commit(); +#endif if (protocol_state == confirmed_state) error_code = no_error; else if (error_code == no_transfer && @@ -1878,10 +2066,18 @@ session::load_data(netcmd_item_type type } } +#if defined(RSE) /* netsync-hooks */ bool session::process_data_cmd(netcmd_item_type type, id const & item, + string const & dat, + transaction_guard & guard) +#else +bool +session::process_data_cmd(netcmd_item_type type, + id const & item, string const & dat) +#endif { hexenc hitem; encode_hexenc(item, hitem); @@ -1912,6 +2108,28 @@ session::process_data_cmd(netcmd_item_ty i = epochs.find(branch); if (i == epochs.end()) { +#if defined(RSE) /* netsync-hooks */ + /* process optional Lua hook "hook_netsync_process_data_epoch" */ + std::ostringstream session_oss; + session_oss << session_id; + string result; + if (app.lua.hook_netsync_process_data_epoch( + session_oss.str(), + string(branch()), + string((epoch.inner())()), + result + )) { + if (result.compare("ignore") == 0) + break; + else if (result.compare(0, 9, "rollback:") == 0) { + db_queue_rollback(); + error(not_permitted, result.substr(9)); + } + else if (result.compare(0, 6, "abort:") == 0) + error(not_permitted, result.substr(6)); + /* else "accept" */ + } +#endif L(FL("branch %s has no epoch; setting epoch to %s") % branch % epoch); app.db.set_epoch(branch, epoch); } @@ -1951,8 +2169,32 @@ session::process_data_cmd(netcmd_item_ty throw bad_decode(F("hash check failed for public key '%s' (%s);" " wanted '%s' got '%s'") % hitem % keyid % hitem % tmp); +#if defined(RSE) /* netsync-hooks */ + /* process optional Lua hook "hook_netsync_process_data_key" */ + std::ostringstream session_oss; + session_oss << session_id; + string result; + if (app.lua.hook_netsync_process_data_key( + session_oss.str(), + string(keyid()), + string(pub()), + result + )) { + if (result.compare("ignore") == 0) + break; + else if (result.compare(0, 9, "rollback:") == 0) { + db_queue_rollback(); + error(not_permitted, result.substr(9)); + } + else if (result.compare(0, 6, "abort:") == 0) + error(not_permitted, result.substr(6)); + /* else "accept" */ + } + put_key(keyid, pub); +#else if (app.db.put_key(keyid, pub)) written_keys.push_back(keyid); +#endif } break; @@ -1964,34 +2206,120 @@ session::process_data_cmd(netcmd_item_ty cert_hash_code(c, tmp); if (! (tmp == hitem)) throw bad_decode(F("hash check failed for revision cert '%s'") % hitem); +#if defined(RSE) /* netsync-hooks */ + /* process optional Lua hook "hook_netsync_process_data_cert" */ + std::ostringstream session_oss; + session_oss << session_id; + revision rc = revision(c); + cert_value cv; + decode_base64(rc.inner().value, cv); + string result; + if (app.lua.hook_netsync_process_data_cert( + session_oss.str(), + string(rc.inner().ident()), + string(rc.inner().key()), + string(rc.inner().name()), + string(cv()), + result + )) { + if (result.compare("ignore") == 0) + break; + else if (result.compare(0, 9, "rollback:") == 0) { + db_queue_rollback(); + error(not_permitted, result.substr(9)); + } + else if (result.compare(0, 6, "abort:") == 0) + error(not_permitted, result.substr(6)); + /* else "accept" */ + } + put_revision_cert(revision(c)); +#else if (app.db.put_revision_cert(revision(c))) written_certs.push_back(c); +#endif } break; case revision_item: { L(FL("received revision '%s'") % hitem); +#if defined(RSE) /* netsync-hooks */ + /* process optional Lua hook "hook_netsync_process_data_revision" */ + std::ostringstream session_oss; + session_oss << session_id; + string result; + if (app.lua.hook_netsync_process_data_revision( + session_oss.str(), + string(hitem()), + dat, + result + )) { + if (result.compare("ignore") == 0) + break; + else if (result.compare(0, 9, "rollback:") == 0) { + db_queue_rollback(); + error(not_permitted, result.substr(9)); + } + else if (result.compare(0, 6, "abort:") == 0) + error(not_permitted, result.substr(6)); + /* else "accept" */ + } + put_revision(revision_id(hitem), revision_data(dat)); +#else if (app.db.put_revision(revision_id(hitem), revision_data(dat))) written_revisions.push_back(revision_id(hitem)); +#endif } break; case file_item: { L(FL("received file '%s'") % hitem); +#if defined(RSE) /* netsync-hooks */ + /* process optional Lua hook "hook_netsync_process_data_file" */ + std::ostringstream session_oss; + session_oss << session_id; + string result; + if (app.lua.hook_netsync_process_data_file( + session_oss.str(), + string(hitem()), + dat, + result + )) { + if (result.compare("ignore") == 0) + break; + else if (result.compare(0, 9, "rollback:") == 0) { + db_queue_rollback(); + error(not_permitted, result.substr(9)); + } + else if (result.compare(0, 6, "abort:") == 0) + error(not_permitted, result.substr(6)); + /* else "accept" */ + } + put_file(file_id(hitem), file_data(dat)); +#else app.db.put_file(file_id(hitem), file_data(dat)); +#endif } break; } return true; } +#if defined(RSE) /* netsync-hooks */ bool session::process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, + delta const & del, + transaction_guard & guard) +#else +bool +session::process_delta_cmd(netcmd_item_type type, + id const & base, + id const & ident, delta const & del) +#endif { string typestr; netcmd_item_type_to_string(type, typestr); @@ -2008,7 +2336,32 @@ session::process_delta_cmd(netcmd_item_t case file_item: { file_id src_file(hbase), dst_file(hident); +#if defined(RSE) /* netsync-hooks */ + /* process optional Lua hook "hook_netsync_process_data_delta" */ + std::ostringstream session_oss; + session_oss << session_id; + string result; + if (app.lua.hook_netsync_process_data_delta( + session_oss.str(), + string(hbase()), + string(hident()), + string(del()), + result + )) { + if (result.compare("ignore") == 0) + break; + else if (result.compare(0, 9, "rollback:") == 0) { + db_queue_rollback(); + error(not_permitted, result.substr(9)); + } + else if (result.compare(0, 6, "abort:") == 0) + error(not_permitted, result.substr(6)); + /* else "accept" */ + } + put_file_version(src_file, dst_file, file_delta(del)); +#else app.db.put_file_version(src_file, dst_file, file_delta(del)); +#endif } break; @@ -2199,7 +2552,11 @@ session::dispatch_payload(netcmd const & id item; string dat; cmd.read_data_cmd(type, item, dat); +#if defined(RSE) /* netsync-hooks */ + return process_data_cmd(type, item, dat, guard); +#else return process_data_cmd(type, item, dat); +#endif } break; @@ -2213,7 +2570,11 @@ session::dispatch_payload(netcmd const & id base, ident; delta del; cmd.read_delta_cmd(type, base, ident, del); +#if defined(RSE) /* netsync-hooks */ + return process_delta_cmd(type, base, ident, del, guard); +#else return process_delta_cmd(type, base, ident, del); +#endif } break; @@ -2303,7 +2664,9 @@ bool session::process(transaction_guard W(F("input buffer for peer %s is overfull " "after netcmd dispatch") % peer_id); +#if !defined(RSE) /* netsync-hook */ guard.maybe_checkpoint(sz); +#endif if (!ret) L(FL("finishing processing with '%d' packet") @@ -2402,6 +2765,9 @@ call_server(protocol_role role, session sess(role, client_voice, include_pattern, exclude_pattern, +#if defined(RSE) /* netsync-hooks */ + guard, +#endif app, address(), server); while (true) @@ -2567,6 +2933,9 @@ handle_new_connection(Netxx::Address & a globish const & include_pattern, globish const & exclude_pattern, map > & sessions, +#if defined(RSE) /* netsync-hooks */ + transaction_guard &guard, +#endif app_state & app) { L(FL("accepting new connection on %s : %s") @@ -2593,6 +2962,9 @@ handle_new_connection(Netxx::Address & a shared_ptr sess(new session(role, server_voice, include_pattern, exclude_pattern, +#if defined(RSE) /* netsync-hooks */ + guard, +#endif app, lexical_cast(client), str)); sess->begin_service(); @@ -2863,6 +3235,9 @@ serve_connections(protocol_role role, shared_ptr sess(new session(role, client_voice, inc, exc, +#if defined(RSE) /* netsync-hooks */ + *guard, +#endif app, addr(), server, true)); sessions.insert(make_pair(server->get_socketfd(), sess)); @@ -2902,7 +3277,11 @@ serve_connections(protocol_role role, else if (fd == server) handle_new_connection(addr, server, timeout, role, include_pattern, exclude_pattern, +#if defined(RSE) /* netsync-hooks */ + sessions, *guard, app); +#else sessions, app); +#endif // or an existing session woke up else @@ -3285,8 +3664,14 @@ run_netsync_protocol(protocol_voice voic if (app.opts.bind_stdio) { shared_ptr str(new Netxx::PipeStream(0,1)); +#if defined(RSE) /* netsync-hooks */ + transaction_guard guard(app.db); +#endif shared_ptr sess(new session(role, server_voice, include_pattern, exclude_pattern, +#if defined(RSE) /* netsync-hooks */ + guard, +#endif app, "stdio", str)); serve_single_connection(sess,constants::netsync_timeout_seconds); }