gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 13/49: dht/client: Send PUT messages.


From: gnunet
Subject: [gnunet-scheme] 13/49: dht/client: Send PUT messages.
Date: Sat, 25 Dec 2021 22:59:50 +0100

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 008e48e11c24b9a82e725c55e00c7e0279e162af
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Mon Sep 20 12:28:52 2021 +0200

    dht/client: Send PUT messages.
    
    * gnu/gnunet/dht/client.scm
      (<server>)[new-put-operationzs,new-put-operations-trigger]: New
      record type.
      (<put>): New record type.
      (canonical-block-type): Extract from ...
      (start-get!): ... here.
      (put!): New procedure.
      (connect): Set new '<put>' fields and 'reconnect' arguments.
      (reconnect)[error-handler]: Pass new arguments when reconnecting.
      (process-new-put-operations): New fiber.
    * examples/web.scm: Insert and retrieve something.
---
 examples/web.scm          | 13 +++++++
 gnu/gnunet/dht/client.scm | 89 +++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 91 insertions(+), 11 deletions(-)

diff --git a/examples/web.scm b/examples/web.scm
index be4fa7c..dde6446 100644
--- a/examples/web.scm
+++ b/examples/web.scm
@@ -6,6 +6,10 @@
 ;; without any warranty.
 
 (use-modules (fibers)
+            (rnrs bytevectors)
+            (gnu extractor enum)
+            (gnu gnunet block)
+            (gnu gnunet utils bv-slice)
             (gnu gnunet config db)
             (gnu gnunet config fs)
             (rnrs hashtables)
@@ -36,6 +40,15 @@
   (define server (open-server impl `(#:port 8089)))
   (define (url-handler* request body)
     (url-handler nse-server request body))
+  ;; TODO: Form to start GET and PUT requests?
+  ;; For now, hard code the data to insert.
+  (dht:put! dht-server
+           (symbol-value block-type block:test)
+           (bv-slice/read-write (make-bytevector 64))
+           (bv-slice/read-write #vu8(#xde #xad #xbe #xef)))
+  (dht:start-get! dht-server
+                 (symbol-value block-type block:test)
+                 (bv-slice/read-write (make-bytevector 64)) pk)
   (let loop ()
     (let-values (((client request body)
                  (read-client impl server)))
diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm
index 0a03ad1..25aa880 100644
--- a/gnu/gnunet/dht/client.scm
+++ b/gnu/gnunet/dht/client.scm
@@ -63,7 +63,8 @@
          (only (gnu gnunet netstruct syntactic)
                read% sizeof set%! select)
          (only (gnu gnunet utils bv-slice)
-               slice-length slice/read-only make-slice/read-write slice-copy!)
+               slice-length slice/read-only make-slice/read-write slice-copy!
+               slice-slice)
          (only (rnrs base)
                and >= = quote * + - define begin ... let*
                quote case else values apply let cond if >
@@ -87,6 +88,13 @@
              ;; responsible for processing the new get operations.
              (immutable new-get-operaton-trigger
                         server-new-get-operation-trigger)
+             ;; Hash table from new <put> to #true.  These put operations
+             ;; are not yet sent to the service, and not yet queued for
+             ;; sending.
+             (immutable new-put-operations
+                        server-new-put-operations)
+             (immutable new-put-operation-trigger
+                        server-new-put-operation-trigger)
              ;; Atomic box holding an unsigned 64-bit integer.
              (immutable next-unique-id/box server-next-unique-id/box)))
 
@@ -100,6 +108,12 @@
              (immutable type get:type)
              (immutable options get:options)))
 
+    (define-record-type (<put> %make-put put?)
+      (fields (immutable server put:server)
+             (immutable inserted put:inserted) ; thunk
+             ;; bytevector slice (/:msg:dht:client:put)
+             (immutable message put:message)))
+
     (define (send-get! mq get)
       "Send a GET message for @var{get}."
       (pk 'new get)
@@ -134,6 +148,17 @@
             expected
             (loop actual)))))
 
+    (define (canonical-block-type type)
+      "Return the numeric value of the block type @var{type}
+(a @code{block-type?} or in-bounds integer)."
+      (cond ((integer? type)
+            (unless (and (<= 0 type (- (expt 2 32) 1)))
+              (error "block type out of bounds"))
+            type)
+           (#t
+            (assert (block-type? type))
+            (value->index type))))
+
     (define* (start-get! server type key found
                         #:key (desired-replication-level 3))
       "Perform an asynchronous GET operation on the DHT, and return a handle
@@ -141,26 +166,48 @@ to control the GET operation.  Search for a block of type 
@var{type} (a
 @code{block-type} or its numeric value) and key @var{key}, a readable 
bytevector
 slice.  Call @var{found} on every search result."
       ;; TODO: options, xquery ...
-      (define canonical-type
-       (cond ((integer? type)
-              (unless (and (<= 0 type (- (expt 2 32) 1)))
-                (error "block type out of bounds")
-                type))
-             (#t
-              (assert (block-type? type))
-              (value->index type))))
       (unless (= (slice-length key) (sizeof /hashcode:512 '()))
        (error "length of key incorrect"))
       (define handle (%make-get server found (slice/read-only key)
                                (fresh-id server)
                                desired-replication-level
-                               type
+                               (canonical-block-type type)
                                0)) ; TODO
       (hashq-set! (server-new-get-operations server) handle #t)
       ;; Asynchronuously process the new get request.
       (trigger-condition! (server-new-get-operation-trigger server))
       handle)
 
+    (define* (put! server type key data #:key (desired-replication-level 3)
+                  (confirmed values))
+      "Perform an asynchronous PUT operation on the DHT, inserting @var{data}
+(a readable bytevector slice) under @var{key} (a readable bytevector slice
+holding a @code{/hashcode:512}).  The block type is @var{type} (a
+@code{block-type} or its numeric value).
+
+TODO expiration, replication, confirm ..."
+      ;; Prepare the message to send.
+      (define put-message
+       (make-slice/read-write (+ (sizeof /:msg:dht:client:put '())
+                                 (slice-length data))))
+      (define meta (slice-slice put-message 0
+                               (sizeof /:msg:dht:client:put '())))
+      (set%! /:msg:dht:client:put '(header type) meta
+            (value->index (symbol-value message-type msg:dht:client:put)))
+      (set%! /:msg:dht:client:put '(header size) meta (slice-length 
put-message))
+      (set%! /:msg:dht:client:put '(type) meta (pk 'can (canonical-block-type 
type)))
+      (set%! /:msg:dht:client:put '(option) meta 0) ; TODO
+      (set%! /:msg:dht:client:put '(desired-replication-level) meta
+            desired-replication-level)
+      (set%! /:msg:dht:client:put '(expiration) meta 0) ; TODO
+      ;; Copy data to insert into the DHT.
+      (slice-copy! data
+                  (slice-slice put-message (sizeof /:msg:dht:client:put '())))
+      (define handle (%make-put server confirmed put-message))
+      (hashq-set! (server-new-put-operations server) handle #t)
+      (trigger-condition! (server-new-put-operation-trigger server))
+      handle)
+
     (define-syntax-rule (well-formed?/path-length slice type (field ...) 
compare)
       "Verify the TYPE message in @var{slice}, which has @var{field ...} ...
 (e.g. one or more of get-path-length or put-path-length) and corresponding
@@ -193,15 +240,20 @@ even if not connected.  This is an idempotent operation."
       (define request-close-condition (make-condition))
       (define new-get-operation-trigger (make-repeated-condition))
       (define new-get-operations (make-hash-table))
+      (define new-put-operation-trigger (make-repeated-condition))
+      (define new-put-operations (make-hash-table))
       (reconnect new-get-operations new-get-operation-trigger
+                new-put-operations new-put-operation-trigger
                 request-close?/box request-close-condition config
                 #:spawn spawn)
       (%make-server request-close?/box request-close-condition
                    new-get-operations new-get-operation-trigger
+                   new-put-operations new-put-operation-trigger
                    ;; Any ‘small’ exact natural number will do.
                    (make-atomic-box 0)))
 
     (define* (reconnect new-get-operations new-get-operation-trigger
+                       new-put-operations new-put-operation-trigger
                        request-close?/box request-close-condition config
                        #:key (spawn spawn-fiber)
                        #:rest rest)
@@ -253,7 +305,9 @@ even if not connected.  This is an idempotent operation."
          ((input:regular-end-of-file input:premature-end-of-file)
           (signal-condition! mq-closed)
           (unless (atomic-box-ref request-close?/box)
-            (apply reconnect new-get-operations new-get-operation-trigger
+            (apply reconnect
+                   new-get-operations new-get-operation-trigger
+                   new-put-operations new-put-operation-trigger
                    request-close?/box request-close-condition
                    config rest)))
          ((connection:interrupted)
@@ -282,9 +336,22 @@ to the DHT service."
        ;; TODO reconnection, closing queues and cancelling get operations,
        ;; processing answers ...
        (process-new-get-operations))
+      ;; TODO: remove duplication with process-new-get-operations
+      (define (process-new-put-operations)
+       (await-trigger! new-put-operation-trigger)
+       ;; Extract the latest new put operations
+       (define new (hash-map->list (lambda (put _) put) new-put-operations))
+       ;; And remove them from the hash table
+       (for-each (lambda (put) (hashq-remove! new-put-operations put)) new)
+       ;; and (asynchronuously) sent the PUT message
+       (for-each (lambda (put) (send-message! mq (put:message put))) new)
+       ;; TODO notify-sent callbacks, closing queues, cancelling put 
operations,
+       ;; processing answers ...
+       (process-new-put-operations))
       (define mq (connect/fibers config "dht" handlers error-handler
                                 #:spawn spawn))
       (spawn request-close-handler)
       (spawn process-new-get-operations)
+      (spawn process-new-put-operations)
       ;; TODO: use new-get-operations
       'todo)))

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]