gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-go] branch master updated: Improved transport and module code.


From: gnunet
Subject: [gnunet-go] branch master updated: Improved transport and module code.
Date: Tue, 07 Jun 2022 09:32:34 +0200

This is an automated email from the git hooks/post-receive script.

bernd-fix pushed a commit to branch master
in repository gnunet-go.

The following commit(s) were added to refs/heads/master by this push:
     new 7a6ae8f  Improved transport and module code.
7a6ae8f is described below

commit 7a6ae8f61ee7efde161db98462259ea9bbb23386
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Tue Jun 7 09:31:22 2022 +0200

    Improved transport and module code.
---
 .../main.go                                        |   8 +-
 src/gnunet/cmd/peer_mockup/main.go                 |  19 +-
 src/gnunet/config/config.go                        |  30 ++-
 src/gnunet/config/config_test.go                   |  19 +-
 src/gnunet/config/gnunet-config.json               | 119 ++++++------
 src/gnunet/core/core.go                            | 202 ++++++++++++++++----
 src/gnunet/core/core_test.go                       | 203 ++++++++++++++++-----
 src/gnunet/core/peer.go                            |  49 +----
 src/gnunet/core/peer_test.go                       |  23 ++-
 src/gnunet/go.mod                                  |   2 +
 src/gnunet/go.sum                                  |   2 +
 src/gnunet/message/msg_hello.go                    |   2 +-
 src/gnunet/modules.go                              |  84 ---------
 src/gnunet/service/dht/blocks/hello.go             |   2 +-
 src/gnunet/service/dht/module.go                   |  18 +-
 src/gnunet/service/dht/routingtable.go             |  57 ++++--
 src/gnunet/service/dht/routingtable_test.go        |   8 +-
 src/gnunet/service/gns/module.go                   |  18 ++
 src/gnunet/service/module.go                       |  36 ++++
 src/gnunet/service/namecache/module.go             |  26 ++-
 src/gnunet/service/revocation/module.go            |  18 +-
 src/gnunet/transport/endpoint.go                   | 168 ++++++++++++-----
 src/gnunet/transport/reader_writer.go              |   5 +-
 src/gnunet/transport/transport.go                  | 123 +++++++------
 src/gnunet/util/address.go                         |  21 ++-
 src/gnunet/util/database.go                        |   4 +-
 src/gnunet/util/misc.go                            |  80 ++++++--
 src/gnunet/util/peer_id.go                         |  26 ++-
 28 files changed, 907 insertions(+), 465 deletions(-)

diff --git a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go 
b/src/gnunet/cmd/gnunet-service-dht-go/main.go
similarity index 94%
rename from src/gnunet/cmd/gnunet-service-dht-test-go/main.go
rename to src/gnunet/cmd/gnunet-service-dht-go/main.go
index cd2bd1a..3b525fd 100644
--- a/src/gnunet/cmd/gnunet-service-dht-test-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -83,16 +83,12 @@ func main() {
 
        // instantiate core service
        ctx, cancel := context.WithCancel(context.Background())
-       var local *core.Peer
-       if local, err = core.NewLocalPeer(config.Cfg.Local); err != nil {
-               logger.Printf(logger.ERROR, "[dht] No local peer: %s\n", 
err.Error())
-               return
-       }
        var c *core.Core
-       if c, err = core.NewCore(ctx, local); err != nil {
+       if c, err = core.NewCore(ctx, config.Cfg.Local); err != nil {
                logger.Printf(logger.ERROR, "[dht] core failed: %s\n", 
err.Error())
                return
        }
+       defer c.Shutdown()
 
        // start a new DHT service
        dht := dht.NewService(ctx, c)
diff --git a/src/gnunet/cmd/peer_mockup/main.go 
b/src/gnunet/cmd/peer_mockup/main.go
index 58f4baf..96f62da 100644
--- a/src/gnunet/cmd/peer_mockup/main.go
+++ b/src/gnunet/cmd/peer_mockup/main.go
@@ -22,13 +22,19 @@ var (
        // configuration for local node
        localCfg = &config.NodeConfig{
                PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
-               Endpoints: []string{
-                       "udp:127.0.0.1:2086",
+               Endpoints: []*config.EndpointConfig{
+                       {
+                               ID:      "local",
+                               Network: "udp",
+                               Address: "127.0.0.1",
+                               Port:    2086,
+                               TTL:     86400,
+                       },
                },
        }
        // configuration for remote node
        remoteCfg  = "3GXXMNb5YpIUO7ejIR2Yy0Cf5texuLfDjHkXcqbPxkc="
-       remoteAddr = "udp:172.17.0.5:2086"
+       remoteAddr = "udp://172.17.0.5:2086"
 
        // top-level variables used accross functions
        local  *core.Peer // local peer (with private key)
@@ -50,14 +56,11 @@ func main() {
        flag.Parse()
 
        // setup peer and core instances
-       if local, err = core.NewLocalPeer(localCfg); err != nil {
-               fmt.Println("local failed: " + err.Error())
-               return
-       }
-       if c, err = core.NewCore(ctx, local); err != nil {
+       if c, err = core.NewCore(ctx, localCfg); err != nil {
                fmt.Println("core failed: " + err.Error())
                return
        }
+       local = c.Peer()
        if remote, err = core.NewPeer(remoteCfg); err != nil {
                fmt.Println("remote failed: " + err.Error())
                return
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index 41a65f0..3687a3e 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -20,6 +20,7 @@ package config
 
 import (
        "encoding/json"
+       "fmt"
        "io/ioutil"
        "reflect"
        "regexp"
@@ -32,10 +33,26 @@ import (
 // Configuration for local node
 //----------------------------------------------------------------------
 
+// EndpointConfig holds parameters for local network listeners.
+type EndpointConfig struct {
+       ID      string `json:"id"`      // endpoint identifier
+       Network string `json:"network"` // network protocol to use on endpoint
+       Address string `json:"address"` // address to listen on
+       Port    int    `json:"port"`    // port for listening to network
+       TTL     int    `json:"ttl"`     // time-to-live for address (in seconds)
+}
+
+// Addr returns an address string for endpoint configuration; it does NOT
+// handle special cases like UPNP and such.
+func (c *EndpointConfig) Addr() string {
+       return fmt.Sprintf("%s://%s:%d", c.Network, c.Address, c.Port)
+}
+
 // NodeConfig holds parameters for the local node instance
 type NodeConfig struct {
-       PrivateSeed string   `json:"privateSeed"` // Node private key seed 
(base64)
-       Endpoints   []string `json:"endpoints"`   // list of endpoints available
+       Name        string            `json:"name"`        // (short) name for 
local node
+       PrivateSeed string            `json:"privateSeed"` // Node private key 
seed (base64)
+       Endpoints   []*EndpointConfig `json:"endpoints"`   // list of endpoints 
available
 }
 
 //----------------------------------------------------------------------
@@ -139,9 +156,16 @@ func ParseConfig(fileName string) (err error) {
        if err != nil {
                return
        }
+       return ParseConfigBytes(file, true)
+}
+
+// ParseConfigBytes reads a configuration from binary data. The data is
+// a JSON-encoded content. If 'subst' is true, the configuration strings
+// are subsituted
+func ParseConfigBytes(data []byte, subst bool) (err error) {
        // unmarshal to Config data structure
        Cfg = new(Config)
-       if err = json.Unmarshal(file, Cfg); err == nil {
+       if err = json.Unmarshal(data, Cfg); err == nil {
                // process all string-based config settings and apply
                // string substitutions.
                applySubstitutions(Cfg, Cfg.Env)
diff --git a/src/gnunet/config/config_test.go b/src/gnunet/config/config_test.go
index 3afd3a7..d40bc19 100644
--- a/src/gnunet/config/config_test.go
+++ b/src/gnunet/config/config_test.go
@@ -20,6 +20,7 @@ package config
 
 import (
        "encoding/json"
+       "io/ioutil"
        "testing"
 
        "github.com/bfix/gospel/logger"
@@ -27,14 +28,18 @@ import (
 
 func TestConfigRead(t *testing.T) {
        logger.SetLogLevel(logger.WARN)
-       if err := ParseConfig("./gnunet-config.json"); err != nil {
+
+       // read configuration file
+       data, err := ioutil.ReadFile("./gnunet-config.json")
+       if err != nil {
+               t.Fatal(err)
+       }
+       // parse configuration
+       if err := ParseConfigBytes(data, false); err != nil {
                t.Fatal(err)
        }
-       if testing.Verbose() {
-               data, err := json.Marshal(Cfg)
-               if err != nil {
-                       t.Fatal(err)
-               }
-               t.Log("cfg=" + string(data))
+       // write configuration
+       if _, err = json.Marshal(Cfg); err != nil {
+               t.Fatal(err)
        }
 }
diff --git a/src/gnunet/config/gnunet-config.json 
b/src/gnunet/config/gnunet-config.json
index 941cf21..82606d7 100644
--- a/src/gnunet/config/gnunet-config.json
+++ b/src/gnunet/config/gnunet-config.json
@@ -1,58 +1,65 @@
 {
-       "local": {
-               "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
-               "endpoints": [
-                       "r5n+ip+udp:127.0.0.1:6666"
-               ]
-       },
-       "bootstrap": {
-               "nodes": [
-                       
"gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654"
-               ]
-       },
-       "environ": {
-               "TMP": "/tmp",
-               "RT_SYS": "${TMP}/gnunet-system-runtime"
-       },
-       "dht": {
-               "service": {
-                       "socket": "${RT_SYS}/gnunet-service-dht.sock",
-                       "params": {
-                               "perm": "0770"
-                       }
-               },
-               "storage": "dht_file_store+/var/lib/gnunet/dht/store",
-               "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
-       },
-       "gns": {
-               "service": {
-                       "socket": "${RT_SYS}/gnunet-service-gns-go.sock",
-                       "params": {
-                               "perm": "0770"
-                       }
-               },
-               "dhtReplLevel": 10,
-               "maxDepth": 250
-       },
-       "namecache": {
-               "service": {
-                       "socket": "${RT_SYS}/gnunet-service-namecache.sock",
-                       "params": {
-                               "perm": "0770"
-                       }
-               },
-               "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
-       },
-       "revocation": {
-               "service": {
-                       "socket": "${RT_SYS}/gnunet-service-revocation-go.sock",
-                       "params": {
-                               "perm": "0770"
-                       }
-               },
-               "storage": "redis:localhost:6397::15"
-       },
-       "rpc": {
-               "endpoint": "tcp:127.0.0.1:80"
-       }
+    "bootstrap": {
+        "nodes": [
+            
"gnunet://hello/7KTBJ90340HF1Q2GB0A57E2XJER4FDHX8HP5GHEB9125VPWPD27G/BNMDFN6HJCPWSPNBSEC06MC1K8QN1Z2DHRQSRXDTFR7FTBD4JHNBJ2RJAAEZ31FWG1Q3PMN3PXGZQ3Q7NTNEKQZFA7TE2Y46FM8E20R/1653499308?r5n%2Bip%2Budp%3A127.0.0.1%3A7654"
+        ]
+    },
+    "local": {
+        "name": "ygng",
+        "privateSeed": "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
+        "endpoints": [
+            {
+                "id": "test",
+                "network": "ip+udp",
+                "address": "upnp:192.168.178.1",
+                "port": 6666,
+                "ttl": 86400
+            }
+        ]
+    },
+    "environ": {
+        "TMP": "/tmp",
+        "RT_SYS": "${TMP}/gnunet-system-runtime"
+    },
+    "dht": {
+        "service": {
+            "socket": "${RT_SYS}/gnunet-service-dht.sock",
+            "params": {
+                "perm": "0770"
+            }
+        },
+        "storage": "dht_file_store+/var/lib/gnunet/dht/store",
+        "cache": "dht_file_cache+/var/lib/gnunet/dht/cache+1000"
+    },
+    "gns": {
+        "service": {
+            "socket": "${RT_SYS}/gnunet-service-gns-go.sock",
+            "params": {
+                "perm": "0770"
+            }
+        },
+        "dhtReplLevel": 10,
+        "maxDepth": 250
+    },
+    "namecache": {
+        "service": {
+            "socket": "${RT_SYS}/gnunet-service-namecache.sock",
+            "params": {
+                "perm": "0770"
+            }
+        },
+        "storage": "dht_file_cache:/var/lib/gnunet/namecache:1000"
+    },
+    "revocation": {
+        "service": {
+            "socket": "${RT_SYS}/gnunet-service-revocation-go.sock",
+            "params": {
+                "perm": "0770"
+            }
+        },
+        "storage": "redis:localhost:6397::15"
+    },
+    "rpc": {
+        "endpoint": "tcp:127.0.0.1:80"
+    }
 }
\ No newline at end of file
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go
index c3bf355..7582891 100644
--- a/src/gnunet/core/core.go
+++ b/src/gnunet/core/core.go
@@ -20,16 +20,34 @@ package core
 
 import (
        "context"
+       "errors"
+       "gnunet/config"
        "gnunet/message"
        "gnunet/service/dht/blocks"
        "gnunet/transport"
        "gnunet/util"
        "net"
+       "strings"
        "time"
+)
 
-       "github.com/bfix/gospel/data"
+//----------------------------------------------------------------------
+// Core-related error codes
+var (
+       ErrCoreNoUpnpDyn  = errors.New("no dynamic port with UPnP")
+       ErrCoreNoEndpAddr = errors.New("no endpoint for address")
 )
 
+//----------------------------------------------------------------------
+// EndpointRef is a reference to an endpoint instance managed by core.
+type EndpointRef struct {
+       id     string             // endpoint identifier in configuration
+       ep     transport.Endpoint // reference to endpoint
+       addr   *util.Address      // public endpoint address
+       upnpId string             // UPNP identifier (empty if unused)
+}
+
+//----------------------------------------------------------------------
 // Core service
 type Core struct {
        // local peer instance
@@ -41,31 +59,91 @@ type Core struct {
        // reference to transport implementation
        trans *transport.Transport
 
-       // registered listeners
+       // registered signal listeners
        listeners map[string]*Listener
 
        // list of known peers with addresses
        peers *util.PeerAddrList
+
+       // List of registered endpoints
+       endpoints map[string]*EndpointRef
 }
 
 //----------------------------------------------------------------------
 
 // NewCore creates and runs a new core instance.
-func NewCore(ctx context.Context, local *Peer) (c *Core, err error) {
+func NewCore(ctx context.Context, node *config.NodeConfig) (c *Core, err 
error) {
+
+       // instantiate peer
+       var peer *Peer
+       if peer, err = NewLocalPeer(node); err != nil {
+               return
+       }
        // create new core instance
        incoming := make(chan *transport.TransportMessage)
        c = &Core{
-               local:     local,
+               local:     peer,
                incoming:  incoming,
                listeners: make(map[string]*Listener),
-               trans:     transport.NewTransport(ctx, incoming),
+               trans:     transport.NewTransport(ctx, node.Name, incoming),
                peers:     util.NewPeerAddrList(),
+               endpoints: make(map[string]*EndpointRef),
        }
        // add all local peer endpoints to transport.
-       for _, addr := range local.addrList {
-               if _, err = c.trans.AddEndpoint(ctx, addr); err != nil {
+       for _, epCfg := range node.Endpoints {
+               var (
+                       upnpId string             // upnp identifier
+                       local  *util.Address      // local address
+                       remote *util.Address      // remote address
+                       ep     transport.Endpoint // endpoint reference
+               )
+               // handle special addresses:
+               if strings.HasPrefix(epCfg.Address, "upnp:") {
+                       // don't allow dynamic port assignment
+                       if epCfg.Port == 0 {
+                               err = ErrCoreNoUpnpDyn
+                               return
+                       }
+                       // handle UPNP port forwarding
+                       protocol := transport.EpProtocol(epCfg.Network)
+                       var localA, remoteA string
+                       if upnpId, remoteA, localA, err = 
c.trans.ForwardOpen(protocol, epCfg.Address[5:], epCfg.Port); err != nil {
+                               return
+                       }
+                       // parse local and remote addresses
+                       if local, err = util.ParseAddress(epCfg.Network + "://" 
+ localA); err != nil {
+                               return
+                       }
+                       if remote, err = util.ParseAddress(epCfg.Network + 
"://" + remoteA); err != nil {
+                               return
+                       }
+               } else {
+                       // direct address specification:
+                       if local, err = util.ParseAddress(epCfg.Addr()); err != 
nil {
+                               return
+                       }
+                       remote = local
+                       upnpId = ""
+               }
+               // add endpoint for address
+               if ep, err = c.trans.AddEndpoint(ctx, local); err != nil {
                        return
                }
+               // if port is set to 0, replace it with port assigned 
dynamically.
+               // only applies to direct listening addresses!
+               if epCfg.Port == 0 && local == remote {
+                       addr := ep.Address()
+                       if remote, err = util.ParseAddress(addr.Network() + 
"://" + addr.String()); err != nil {
+                               return
+                       }
+               }
+               // save endpoint reference
+               c.endpoints[epCfg.ID] = &EndpointRef{
+                       id:     epCfg.ID,
+                       ep:     ep,
+                       addr:   remote,
+                       upnpId: upnpId,
+               }
        }
        // run message pump
        go func() {
@@ -77,32 +155,31 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, 
err error) {
                                var ev *Event
 
                                // inspect message for peer state events
-                               m, err := tm.Message()
-                               if err == nil {
-                                       switch msg := m.(type) {
-                                       case *message.HelloMsg:
-                                               // keep peer addresses
-                                               for _, addr := range 
msg.Addresses {
-                                                       a := &util.Address{
-                                                               Netw:    
addr.Transport,
-                                                               Address: 
addr.Address,
-                                                               Expires: 
addr.ExpireOn,
-                                                       }
-                                                       c.Learn(ctx, 
msg.PeerID, a)
+                               switch msg := tm.Msg.(type) {
+                               case *message.HelloMsg:
+                                       // keep peer addresses
+                                       for _, addr := range msg.Addresses {
+                                               a := &util.Address{
+                                                       Netw:    addr.Transport,
+                                                       Address: addr.Address,
+                                                       Expires: addr.ExpireOn,
                                                }
-                                               // generate EV_CONNECT event
-                                               ev = new(Event)
-                                               ev.ID = EV_CONNECT
-                                               ev.Peer = tm.Peer
-                                               ev.Msg = msg
-                                               c.dispatch(ev)
+                                               c.Learn(ctx, msg.PeerID, a)
+                                       }
+                                       // generate EV_CONNECT event
+                                       ev = &Event{
+                                               ID:   EV_CONNECT,
+                                               Peer: tm.Peer,
+                                               Msg:  msg,
                                        }
+                                       c.dispatch(ev)
                                }
                                // generate EV_MESSAGE event
-                               ev = new(Event)
-                               ev.ID = EV_MESSAGE
-                               ev.Peer = tm.Peer
-                               ev.Msg, _ = tm.Message()
+                               ev = &Event{
+                                       ID:   EV_MESSAGE,
+                                       Peer: tm.Peer,
+                                       Msg:  tm.Msg,
+                               }
                                c.dispatch(ev)
 
                        // wait for termination
@@ -114,29 +191,63 @@ func NewCore(ctx context.Context, local *Peer) (c *Core, 
err error) {
        return
 }
 
+// Shutdown all core-related processes.
+func (c *Core) Shutdown() {
+       c.trans.Shutdown()
+       c.local.Shutdown()
+}
+
 //----------------------------------------------------------------------
 
 // Send is a function that allows the local peer to send a protocol
 // message to a remote peer.
 func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg 
message.Message) error {
-       // TODO: select best endpoint protocol for transport; now fixed to UDP
-       netw := "udp"
-       addr := c.peers.Get(peer.String(), netw)
-       payload, err := data.Marshal(msg)
-       if err != nil {
-               return err
+       // TODO: select best endpoint protocol for transport; now fixed to 
IP+UDP
+       netw := "ip+udp"
+       addrs := c.peers.Get(peer.String(), netw)
+       if len(addrs) == 0 {
+               return ErrCoreNoEndpAddr
        }
-       tm := transport.NewTransportMessage(c.PeerID(), payload)
+       // TODO: select best address; curently selects first
+       addr := addrs[0]
+
+       // select best endpoint for transport
+       var ep transport.Endpoint
+       for _, epCfg := range c.endpoints {
+               if epCfg.addr.Network() == netw {
+                       if ep == nil {
+                               ep = epCfg.ep
+                       }
+                       // TODO: compare endpoints, select better one:
+                       // if ep.Better(epCfg.ep) {
+                       //     ep = epCfg.ep
+                       // }
+               }
+       }
+       // check we have an endpoint to send on
+       if ep == nil {
+               return ErrCoreNoEndpAddr
+       }
+       // assemble transport message
+       tm := transport.NewTransportMessage(c.PeerID(), msg)
+       // send on transport
        return c.trans.Send(ctx, addr, tm)
 }
 
 // Learn a (new) address for peer
 func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr 
*util.Address) (err error) {
        if c.peers.Add(peer.String(), addr) == 1 {
+               // we added a previously unknown peer: send a HELLO
+
+               // collect endpoint addresses
+               addrList := make([]*util.Address, 0)
+               for _, epRef := range c.endpoints {
+                       addrList = append(addrList, epRef.addr)
+               }
                // new peer id: send HELLO message to newly added peer
                node := c.local
                var hello *blocks.HelloBlock
-               hello, err = node.HelloData(time.Hour)
+               hello, err = node.HelloData(time.Hour, addrList)
                if err != nil {
                        return
                }
@@ -150,11 +261,28 @@ func (c *Core) Learn(ctx context.Context, peer 
*util.PeerID, addr *util.Address)
        return
 }
 
+// Addresses returns the list of listening endpoint addresses
+func (c *Core) Addresses() (list []*util.Address, err error) {
+       for _, epRef := range c.endpoints {
+               list = append(list, epRef.addr)
+       }
+       return
+}
+
+//----------------------------------------------------------------------
+
+// Peer returns the local peer
+func (c *Core) Peer() *Peer {
+       return c.local
+}
+
 // PeerID returns the peer id of the local node.
 func (c *Core) PeerID() *util.PeerID {
        return c.local.GetID()
 }
 
+//----------------------------------------------------------------------
+
 // TryConnect is a function which allows the local peer to attempt the
 // establishment of a connection to another peer using an address.
 // When the connection attempt is successful, information on the new
diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go
index 102abf1..d8a6277 100644
--- a/src/gnunet/core/core_test.go
+++ b/src/gnunet/core/core_test.go
@@ -20,23 +20,149 @@ package core
 
 import (
        "context"
+       "encoding/hex"
        "gnunet/config"
        "gnunet/util"
        "testing"
        "time"
 )
 
-var (
-       peer1Cfg = &config.NodeConfig{
-               PrivateSeed: "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
-               Endpoints:   []string{"udp://127.0.0.1:20861"},
+//----------------------------------------------------------------------
+// Two node GNUnet (smallest and simplest network)
+//----------------------------------------------------------------------
+
+// TestCoreSimple test a two node network
+func TestCoreSimple(t *testing.T) {
+
+       var (
+               peer1Cfg = &config.NodeConfig{
+                       Name:        "p1",
+                       PrivateSeed: 
"iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
+                       Endpoints: []*config.EndpointConfig{
+                               {
+                                       ID:      "p1",
+                                       Network: "ip+udp",
+                                       Address: "127.0.0.1",
+                                       Port:    0,
+                                       TTL:     86400,
+                               },
+                       },
+               }
+
+               peer2Cfg = &config.NodeConfig{
+                       Name:        "p2",
+                       PrivateSeed: 
"Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
+                       Endpoints: []*config.EndpointConfig{
+                               {
+                                       ID:      "p2",
+                                       Network: "ip+udp",
+                                       Address: "127.0.0.1",
+                                       Port:    20862,
+                                       TTL:     86400,
+                               },
+                       },
+               }
+       )
+
+       // setup execution context
+       ctx, cancel := context.WithCancel(context.Background())
+       defer func() {
+               cancel()
+               time.Sleep(time.Second)
+       }()
+
+       // create and run nodes
+       node1, err := NewTestNode(t, ctx, peer1Cfg)
+       if err != nil {
+               t.Fatal(err)
+       }
+       node2, err := NewTestNode(t, ctx, peer2Cfg)
+       if err != nil {
+               t.Fatal(err)
        }
 
-       peer2Cfg = &config.NodeConfig{
-               PrivateSeed: "Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
-               Endpoints:   []string{"udp://127.0.0.1:20862"},
+       // learn peer addresses (triggers HELLO)
+       list, err := node2.core.Addresses()
+       if err != nil {
+               t.Fatal(err)
        }
-)
+       for _, addr := range list {
+               node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
+       }
+
+       // wait for 5 seconds
+       time.Sleep(5 * time.Second)
+}
+
+//----------------------------------------------------------------------
+// Two node GNUnet both running locally, but exchanging messages over
+// the internet (UPNP router required).
+//----------------------------------------------------------------------
+
+// TestCoreSimple test a two node network
+func TestCoreUPNP(t *testing.T) {
+
+       // configuration data
+       var (
+               peer1Cfg = &config.NodeConfig{
+                       Name:        "p1",
+                       PrivateSeed: 
"iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=",
+                       Endpoints: []*config.EndpointConfig{
+                               {
+                                       ID:      "p1",
+                                       Network: "ip+udp",
+                                       Address: "upnp:",
+                                       Port:    2086,
+                                       TTL:     86400,
+                               },
+                       },
+               }
+               peer2Cfg = &config.NodeConfig{
+                       Name:        "p2",
+                       PrivateSeed: 
"Bv9umksEO51jjWWrOGEH+4r8wl9Vi+LItpdBpTOi2PE=",
+                       Endpoints: []*config.EndpointConfig{
+                               {
+                                       ID:      "p2",
+                                       Network: "ip+udp",
+                                       Address: "upnp:",
+                                       Port:    1080,
+                                       TTL:     86400,
+                               },
+                       },
+               }
+       )
+
+       // setup execution context
+       ctx, cancel := context.WithCancel(context.Background())
+       defer func() {
+               cancel()
+               time.Sleep(time.Second)
+       }()
+
+       // create and run nodes
+       node1, err := NewTestNode(t, ctx, peer1Cfg)
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer node1.Shutdown()
+       node2, err := NewTestNode(t, ctx, peer2Cfg)
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer node2.Shutdown()
+
+       // learn peer addresses (triggers HELLO)
+       list, err := node2.core.Addresses()
+       if err != nil {
+               t.Fatal(err)
+       }
+       for _, addr := range list {
+               node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
+       }
+
+       // sleep a bit
+       time.Sleep(3 * time.Second)
+}
 
 //----------------------------------------------------------------------
 // create and run a node with given spec
@@ -50,9 +176,15 @@ type TestNode struct {
        addr *util.Address
 }
 
+func (n *TestNode) Shutdown() {
+       n.core.Shutdown()
+}
+
 func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr 
*util.Address) {
        n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), 
peer.String())
-       n.core.Learn(ctx, peer, addr)
+       if err := n.core.Learn(ctx, peer, addr); err != nil {
+               n.t.Log("Learn: " + err.Error())
+       }
 }
 
 func NewTestNode(t *testing.T, ctx context.Context, cfg *config.NodeConfig) 
(node *TestNode, err error) {
@@ -62,18 +194,25 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg 
*config.NodeConfig) (nod
        node.t = t
        node.id = util.NextID()
 
+       // create core service
+       if node.core, err = NewCore(ctx, cfg); err != nil {
+               return
+       }
+       node.peer = node.core.Peer()
+
        // create peer object
        if node.peer, err = NewLocalPeer(cfg); err != nil {
                return
        }
        t.Logf("[%d] Node %s starting", node.id, node.peer.GetID())
+       t.Logf("[%d]   --> %s", node.id, 
hex.EncodeToString(node.peer.GetID().Key))
 
-       // create core service
-       if node.core, err = NewCore(ctx, node.peer); err != nil {
-               return
+       list, err := node.core.Addresses()
+       if err != nil {
+               t.Fatal(err)
        }
-       for _, addr := range node.core.trans.Endpoints() {
-               s := addr.Network() + ":" + addr.String()
+       for _, addr := range list {
+               s := addr.Network() + "://" + addr.String()
                if node.addr, err = util.ParseAddress(s); err != nil {
                        continue
                }
@@ -82,7 +221,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg 
*config.NodeConfig) (nod
 
        // register as event listener
        incoming := make(chan *Event)
-       node.core.Register("test", NewListener(incoming, nil))
+       node.core.Register(cfg.Name, NewListener(incoming, nil))
 
        // heart beat
        tick := time.NewTicker(5 * time.Minute)
@@ -100,6 +239,7 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg 
*config.NodeConfig) (nod
                                        t.Logf("[%d] <<< Peer %s diconnected", 
node.id, ev.Peer)
                                case EV_MESSAGE:
                                        t.Logf("[%d] <<< Msg from %s of type 
%d", node.id, ev.Peer, ev.Msg.Header().MsgType)
+                                       t.Logf("[%d] <<<    --> %s", node.id, 
ev.Msg.String())
                                }
 
                        // handle termination signal
@@ -115,36 +255,3 @@ func NewTestNode(t *testing.T, ctx context.Context, cfg 
*config.NodeConfig) (nod
        }()
        return
 }
-
-//----------------------------------------------------------------------
-// Two node GNUnet (smallest and simplest network)
-//----------------------------------------------------------------------
-
-// TestCoreSimple test a two node network
-func TestCoreSimple(t *testing.T) {
-
-       // setup execution context
-       ctx, cancel := context.WithCancel(context.Background())
-       defer func() {
-               cancel()
-               time.Sleep(time.Second)
-       }()
-
-       // create and run nodes
-       node1, err := NewTestNode(t, ctx, peer1Cfg)
-       if err != nil {
-               t.Fatal(err)
-       }
-       node2, err := NewTestNode(t, ctx, peer2Cfg)
-       if err != nil {
-               t.Fatal(err)
-       }
-
-       // learn peer addresses (triggers HELLO)
-       for _, addr := range node2.core.trans.Endpoints() {
-               node1.Learn(ctx, node2.peer.GetID(), util.NewAddressWrap(addr))
-       }
-
-       // wait for 5 seconds
-       time.Sleep(5 * time.Second)
-}
diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go
index 2b6fe74..86ed43a 100644
--- a/src/gnunet/core/peer.go
+++ b/src/gnunet/core/peer.go
@@ -48,7 +48,6 @@ type Peer struct {
        prv      *ed25519.PrivateKey      // node private key (long-term 
signing key)
        pub      *ed25519.PublicKey       // node public key (=identifier)
        idString string                   // node identifier as string
-       addrList []*util.Address          // list of addresses associated with 
node
        ephPrv   *ed25519.PrivateKey      // ephemeral signing key
        ephMsg   *message.EphemeralKeyMsg // ephemeral signing key message
 }
@@ -73,16 +72,6 @@ func NewLocalPeer(cfg *config.NodeConfig) (p *Peer, err 
error) {
        if err != nil {
                return
        }
-       // set the endpoint addresses for local node
-       p.addrList = make([]*util.Address, len(cfg.Endpoints))
-       var addr *util.Address
-       for i, a := range cfg.Endpoints {
-               if addr, err = util.ParseAddress(a); err != nil {
-                       return
-               }
-               addr.Expires = util.NewAbsoluteTime(time.Now().Add(12 * 
time.Hour))
-               p.addrList[i] = addr
-       }
        return
 }
 
@@ -98,36 +87,24 @@ func NewPeer(peerID string) (p *Peer, err error) {
        p.prv = nil
        p.pub = ed25519.NewPublicKeyFromBytes(data)
        p.idString = util.EncodeBinaryToString(p.pub.Bytes())
-       p.addrList = make([]*util.Address, 0)
        return
 }
 
+// Shutdown peer-related processes.
+func (p *Peer) Shutdown() {}
+
 //----------------------------------------------------------------------
 //----------------------------------------------------------------------
 
-// Address returns a peer address for the given transport protocol
-func (p *Peer) Address(transport string) *util.Address {
-       for _, addr := range p.addrList {
-               // skip expired entries
-               if addr.Expires.Expired() {
-                       continue
-               }
-               // filter by transport protocol
-               if len(transport) > 0 && transport != addr.Netw {
-                       continue
-               }
-               return addr
-       }
-       return nil
-}
-
-// HelloData returns the current HELLO data for the peer
-func (p *Peer) HelloData(ttl time.Duration) (h *blocks.HelloBlock, err error) {
+// HelloData returns the current HELLO data for the peer. The list of listening
+// endpoint addresses re passed in from core to reflect the actual active
+// endpoints.
+func (p *Peer) HelloData(ttl time.Duration, a []*util.Address) (h 
*blocks.HelloBlock, err error) {
        // assemble HELLO data
        h = new(blocks.HelloBlock)
        h.PeerID = p.GetID()
        h.Expire = util.NewAbsoluteTime(time.Now().Add(ttl))
-       h.SetAddresses(p.addrList)
+       h.SetAddresses(a)
 
        // sign data
        err = h.Sign(p.prv)
@@ -171,16 +148,6 @@ func (p *Peer) GetIDString() string {
        return p.idString
 }
 
-// GetAddressList returns a list of addresses associated with this peer.
-func (p *Peer) GetAddressList() []*util.Address {
-       return p.addrList
-}
-
-// AddAddress adds a new address for a node.
-func (p *Peer) AddAddress(a *util.Address) {
-       p.addrList = append(p.addrList, a)
-}
-
 // Sign a message with the (long-term) private key.
 func (p *Peer) Sign(msg []byte) (*ed25519.EdSignature, error) {
        if p.prv == nil {
diff --git a/src/gnunet/core/peer_test.go b/src/gnunet/core/peer_test.go
index 28b328f..70a5c5f 100644
--- a/src/gnunet/core/peer_test.go
+++ b/src/gnunet/core/peer_test.go
@@ -21,6 +21,7 @@ package core
 import (
        "gnunet/config"
        "gnunet/service/dht/blocks"
+       "gnunet/util"
        "testing"
        "time"
 )
@@ -29,8 +30,13 @@ import (
 var (
        cfg = &config.NodeConfig{
                PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
-               Endpoints: []string{
-                       "r5n+ip+udp://127.0.0.1:6666",
+               Endpoints: []*config.EndpointConfig{
+                       {
+                               ID:      "test",
+                               Network: "r5n+ip+udp",
+                               Address: "127.0.0.1",
+                               Port:    6666,
+                       },
                },
        }
        TTL = 6 * time.Hour
@@ -44,8 +50,17 @@ func TestPeerHello(t *testing.T) {
                t.Fatal(err)
        }
 
-       // get HELLO data for the node
-       h, err := node.HelloData(TTL)
+       // get HELLO data for the node:
+       // This hack will only work for direct listening addresses
+       addrList := make([]*util.Address, 0)
+       for _, epRef := range cfg.Endpoints {
+               addr, err := util.ParseAddress(epRef.Addr())
+               if err != nil {
+                       t.Fatal(err)
+               }
+               addrList = append(addrList, addr)
+       }
+       h, err := node.HelloData(TTL, addrList)
 
        // convert to URL and back
        u := h.URL()
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index ad203ca..eb17b30 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -15,9 +15,11 @@ require (
 require (
        github.com/cespare/xxhash/v2 v2.1.2 // indirect
        github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // 
indirect
+       github.com/huin/goupnp v1.0.0 // indirect
        golang.org/x/mod v0.4.2 // indirect
        golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
        golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+       golang.org/x/text v0.3.7 // indirect
        golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
        golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
 )
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index f2baf8e..17c2d00 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -11,6 +11,7 @@ github.com/go-sql-driver/mysql v1.6.0 
h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
 github.com/go-sql-driver/mysql v1.6.0/go.mod 
h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
 github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
 github.com/gorilla/mux v1.8.0/go.mod 
h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
 github.com/huin/goupnp v1.0.0/go.mod 
h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
 github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod 
h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
 github.com/mattn/go-sqlite3 v1.14.13 
h1:1tj15ngiFfcZzii7yd82foL+ks+ouQcj8j/TPq3fk1I=
@@ -55,6 +56,7 @@ golang.org/x/text v0.3.3/go.mod 
h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 
h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0=
diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go
index 18fe9d5..25ef98a 100644
--- a/src/gnunet/message/msg_hello.go
+++ b/src/gnunet/message/msg_hello.go
@@ -59,7 +59,7 @@ func NewHelloAddress(a *util.Address) *HelloAddress {
 // String returns a human-readable representation of the message.
 func (a *HelloAddress) String() string {
        return fmt.Sprintf("Address{%s,expire=%s}",
-               util.AddressString(a.Transport, a.Address), a.ExpireOn)
+               util.URI(a.Transport, a.Address), a.ExpireOn)
 }
 
 // HelloMsg is a message send by peers to announce their presence
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
deleted file mode 100644
index e47699f..0000000
--- a/src/gnunet/modules.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// This file is part of gnunet-go, a GNUnet-implementation in Golang.
-// Copyright (C) 2019, 2020 Bernd Fix  >Y<
-//
-// gnunet-go is free software: you can redistribute it and/or modify it
-// under the terms of the GNU Affero General Public License as published
-// by the Free Software Foundation, either version 3 of the License,
-// or (at your option) any later version.
-//
-// gnunet-go is distributed in the hope that it will be useful, but
-// WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-// Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program.  If not, see <http://www.gnu.org/licenses/>.
-//
-// SPDX-License-Identifier: AGPL3.0-or-later
-
-//======================================================================
-// Standalone (all-in-one) implementation of GNUnet:
-// -------------------------------------------------
-// Instead of running GNUnet services like GNS or DHT in separate
-// processes communicating (exchanging messages) with each other over
-// Unix Domain Sockets, the standalone implementation combines all
-// service modules into a single binary running go-routines to
-// concurrently performing their tasks.
-//======================================================================
-
-package gnunet
-
-import (
-       "gnunet/service/dht"
-       "gnunet/service/gns"
-       "gnunet/service/namecache"
-       "gnunet/service/revocation"
-       "net/rpc"
-)
-
-// Instances holds a list of all GNUnet service modules
-type Instances struct {
-       GNS        *gns.Module
-       Namecache  *namecache.NamecacheModule
-       DHT        *dht.Module
-       Revocation *revocation.Module
-}
-
-// Register modules for JSON-RPC
-func (inst Instances) Register() {
-       rpc.Register(inst.GNS)
-       rpc.Register(inst.Namecache)
-       rpc.Register(inst.DHT)
-       rpc.Register(inst.Revocation)
-}
-
-// Local reference to instance list. The list is initialized
-// by core.
-var (
-       Modules Instances
-)
-
-/* TODO: implement
-// Initialize instance list and link module functions as required.
-// This function is called by core on start-up.
-func Init(ctx context.Context) {
-
-       // Namecache (no calls to other modules)
-       Modules.Namecache = namecache.NewModule(ctx, c)
-
-       // DHT (no calls to other modules)
-       Modules.DHT = dht.NewModule(ctx, c)
-
-       // Revocation (no calls to other modules)
-       Modules.Revocation = revocation.NewModule(ctx, c)
-
-       // GNS (calls Namecache, DHT and Identity)
-       gns := gns.NewModule(ctx, c)
-       Modules.GNS = gns
-       gns.LookupLocal = Modules.Namecache.Get
-       gns.StoreLocal = Modules.Namecache.Put
-       gns.LookupRemote = Modules.DHT.Get
-       gns.RevocationQuery = Modules.Revocation.Query
-       gns.RevocationRevoke = Modules.Revocation.Revoke
-}
-*/
diff --git a/src/gnunet/service/dht/blocks/hello.go 
b/src/gnunet/service/dht/blocks/hello.go
index 77fc2ae..eb3bf2a 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -177,7 +177,7 @@ func (h *HelloBlock) URL() string {
                if i > 0 {
                        u += "&"
                }
-               u += url.QueryEscape(a.String())
+               u += url.QueryEscape(a.URI())
        }
        return u
 }
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index 5f04d54..3339aa2 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -81,7 +81,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) 
{
 
 //----------------------------------------------------------------------
 
-// Get a block from the DHT
+// Get a block from the DHT ["dht:get"]
 func (nc *Module) Get(ctx context.Context, query blocks.Query) (block 
blocks.Block, err error) {
 
        // check if we have the requested block in cache or permanent storage.
@@ -100,7 +100,7 @@ func (nc *Module) Get(ctx context.Context, query 
blocks.Query) (block blocks.Blo
        return nil, nil
 }
 
-// Put a block into the DHT
+// Put a block into the DHT ["dht:put"]
 func (nc *Module) Put(ctx context.Context, key blocks.Query, block 
blocks.Block) error {
        return nil
 }
@@ -142,6 +142,20 @@ func (m *Module) heartbeat(ctx context.Context) {
 
 //----------------------------------------------------------------------
 
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+       // add exported functions from module
+       fcn["dht:get"] = m.Get
+       fcn["dht:put"] = m.Put
+}
+
+// Import functions
+func (m *Module) Import(fcm map[string]any) {
+       // nothing to import now.
+}
+
+//----------------------------------------------------------------------
+
 // RPC returns the route and handler function for a JSON-RPC request
 func (m *Module) RPC() (string, func(http.ResponseWriter, *http.Request)) {
        return "/gns/", func(wrt http.ResponseWriter, req *http.Request) {
diff --git a/src/gnunet/service/dht/routingtable.go 
b/src/gnunet/service/dht/routingtable.go
index 0078b71..2933e6a 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -101,7 +101,7 @@ type RoutingTable struct {
        ref       *PeerAddress              // reference address for distance
        buckets   []*Bucket                 // list of buckets
        list      map[*PeerAddress]struct{} // keep list of peers
-       rwlock    sync.RWMutex              // lock for write operations
+       mtx       sync.RWMutex              // lock for write operations
        l2nse     float64                   // log2 of estimated network size
        inProcess bool                      // flag if Process() is running
 }
@@ -131,8 +131,8 @@ func NewRoutingTable(ref *PeerAddress) *RoutingTable {
 // Returns true if the entry was added, false otherwise.
 func (rt *RoutingTable) Add(p *PeerAddress) bool {
        // ensure one write and no readers
-       rt.rwlock.Lock()
-       defer rt.rwlock.Unlock()
+       rt.lock(false)
+       defer rt.unlock(false)
 
        // check if peer is already known
        if _, ok := rt.list[p]; ok {
@@ -153,8 +153,8 @@ func (rt *RoutingTable) Add(p *PeerAddress) bool {
 // Returns true if the entry was removed, false otherwise.
 func (rt *RoutingTable) Remove(p *PeerAddress) bool {
        // ensure one write and no readers
-       rt.rwlock.Lock()
-       defer rt.rwlock.Unlock()
+       rt.lock(false)
+       defer rt.unlock(false)
 
        // compute distance (bucket index) and remove entry from bucket
        _, idx := p.Distance(rt.ref)
@@ -170,10 +170,15 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool {
 //----------------------------------------------------------------------
 
 // Process a function f in the locked context of a routing table
-func (rt *RoutingTable) Process(f func() error) error {
-       // ensure one write and no readers
-       rt.rwlock.Lock()
-       defer rt.rwlock.Unlock()
+func (rt *RoutingTable) Process(f func() error, readonly bool) error {
+       // handle locking
+       rt.lock(readonly)
+       rt.inProcess = true
+       defer func() {
+               rt.inProcess = false
+               rt.unlock(readonly)
+       }()
+       // call function in unlocked context
        return f()
 }
 
@@ -184,8 +189,8 @@ func (rt *RoutingTable) Process(f func() error) error {
 // SelectClosestPeer for a given peer address and bloomfilter.
 func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) 
(n *PeerAddress) {
        // no writer allowed
-       rt.rwlock.RLock()
-       defer rt.rwlock.RUnlock()
+       rt.mtx.RLock()
+       defer rt.mtx.RUnlock()
 
        // find closest address
        var dist *math.Int
@@ -204,8 +209,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, 
bf *PeerBloomFilter) (
 // included in the bloomfilter)
 func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress {
        // no writer allowed
-       rt.rwlock.RLock()
-       defer rt.rwlock.RUnlock()
+       rt.mtx.RLock()
+       defer rt.mtx.RUnlock()
 
        // select random entry from list
        if size := len(rt.list); size > 0 {
@@ -279,11 +284,35 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
                        }
                }
                return nil
-       }); err != nil {
+       }, false); err != nil {
                logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error())
        }
 }
 
+//----------------------------------------------------------------------
+
+// lock with given mode (if not in processing function)
+func (rt *RoutingTable) lock(readonly bool) {
+       if !rt.inProcess {
+               if readonly {
+                       rt.mtx.RLock()
+               } else {
+                       rt.mtx.Lock()
+               }
+       }
+}
+
+// lock with given mode (if not in processing function)
+func (rt *RoutingTable) unlock(readonly bool) {
+       if !rt.inProcess {
+               if readonly {
+                       rt.mtx.RUnlock()
+               } else {
+                       rt.mtx.Unlock()
+               }
+       }
+}
+
 //======================================================================
 // Routing table buckets
 //======================================================================
diff --git a/src/gnunet/service/dht/routingtable_test.go 
b/src/gnunet/service/dht/routingtable_test.go
index 659f9d4..33c4b7f 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -45,8 +45,12 @@ type Entry struct {
 var (
        cfg = &config.NodeConfig{
                PrivateSeed: "YGoe6XFH3XdvFRl+agx9gIzPTvxA229WFdkazEMdcOs=",
-               Endpoints: []string{
-                       "r5n+ip+udp://127.0.0.1:6666",
+               Endpoints: []*config.EndpointConfig{
+                       {
+                               Network: "r5n+ip+udp",
+                               Address: "127.0.0.1",
+                               Port:    6666,
+                       },
                },
        }
 )
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 4878aa0..93f9bca 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -98,6 +98,7 @@ type Module struct {
        RevocationRevoke func(ctx context.Context, rd *revocation.RevData) 
(success bool, err error)
 }
 
+// NewModule instantiates a new GNS module.
 func NewModule(ctx context.Context, c *core.Core) (m *Module) {
        m = &Module{
                ModuleImpl: *service.NewModuleImpl(),
@@ -128,6 +129,23 @@ func (m *Module) event(ctx context.Context, ev 
*core.Event) {
 
 //----------------------------------------------------------------------
 
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+       // add exported functions from module
+}
+
+// Import functions
+func (m *Module) Import(fcn map[string]any) {
+       // resolve imports from other modules
+       m.LookupLocal = fcn["namecache:get"].(func(ctx context.Context, query 
*blocks.GNSQuery) (*blocks.GNSBlock, error))
+       m.StoreLocal = fcn["namecache:put"].(func(ctx context.Context, query 
*blocks.GNSQuery, block *blocks.GNSBlock) error)
+       m.LookupRemote = fcn["dht:get"].(func(ctx context.Context, query 
blocks.Query) (blocks.Block, error))
+       m.RevocationQuery = fcn["rev:query"].(func(ctx context.Context, zkey 
*crypto.ZoneKey) (valid bool, err error))
+       m.RevocationRevoke = fcn["rev:revoke"].(func(ctx context.Context, rd 
*revocation.RevData) (success bool, err error))
+}
+
+//----------------------------------------------------------------------
+
 // Resolve a GNS name with multiple labels. If pkey is not nil, the name
 // is interpreted as "relative to current zone".
 func (m *Module) Resolve(
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go
index 5f95975..4109f16 100644
--- a/src/gnunet/service/module.go
+++ b/src/gnunet/service/module.go
@@ -26,7 +26,43 @@ import (
 )
 
 // Module is an interface for GNUnet service modules (workers).
+//
+// Modules can call other GNUnet services; these services can be used by
+// sending messages to the respective service socket (the default way) or by
+// calling the module functions directly (if the other module is compiled
+// along with the calling module into one binary). The latter method requires
+// calls to m.Export() and m.Import() to link the modules together (see
+// example):
+//
+//    // create module instances
+//    gnsMod = gns.NewModule(ctx, core)
+//    dhtMod = dht.NewModule(ctx, core)
+//    ncMod = namecache.NewModule(ctx, core)
+//    revMod = revocation.NewModule(ctx, core)
+//
+//    // export module functions
+//    fcn := make(map[string]any)
+//    gnsMod.Export(fcn)
+//    dhtMod.Export(fcn)
+//    ncMod.Export(fcn)
+//    revMod.Export(fcn)
+//
+//    // import (link) module functions
+//    gnsMod.Import(fcn)
+//    dhtMod.Import(fcn)
+//    ncMod.Import(fcn)
+//    revMod.Import(fcn)
+//
+// Exported and imported module function are identified by name defined in the
+// Export() function. Import() functions that access functions in other modules
+// need to use the same name for linking.
 type Module interface {
+       // Export functions by name
+       Export(map[string]any)
+
+       // Import functions by name
+       Import(map[string]any)
+
        // RPC returns the route and handler for JSON-RPC requests
        RPC() (string, func(http.ResponseWriter, *http.Request))
 
diff --git a/src/gnunet/service/namecache/module.go 
b/src/gnunet/service/namecache/module.go
index 9d5bca1..b9aaad0 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -35,23 +35,39 @@ import (
 //----------------------------------------------------------------------
 
 // Namecache handles the transient storage of GNS blocks under the query key.
-type NamecacheModule struct {
+type Module struct {
        service.ModuleImpl
 
        cache service.DHTStore // transient block cache
 }
 
 // NewModule creates a new module instance.
-func NewModule(ctx context.Context, c *core.Core) (m *NamecacheModule) {
-       m = &NamecacheModule{
+func NewModule(ctx context.Context, c *core.Core) (m *Module) {
+       m = &Module{
                ModuleImpl: *service.NewModuleImpl(),
        }
        m.cache, _ = service.NewDHTStore(config.Cfg.Namecache.Storage)
        return
 }
 
+//----------------------------------------------------------------------
+
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+       // add exported functions from module
+       fcn["namecache:get"] = m.Get
+       fcn["namecache:put"] = m.Put
+}
+
+// Import functions
+func (m *Module) Import(fcm map[string]any) {
+       // nothing to import now.
+}
+
+//----------------------------------------------------------------------
+
 // Get an entry from the cache if available.
-func (m *NamecacheModule) Get(ctx context.Context, query *blocks.GNSQuery) 
(block *blocks.GNSBlock, err error) {
+func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block 
*blocks.GNSBlock, err error) {
        var b blocks.Block
        b, err = m.cache.Get(query)
        err = blocks.Unwrap(b, block)
@@ -59,6 +75,6 @@ func (m *NamecacheModule) Get(ctx context.Context, query 
*blocks.GNSQuery) (bloc
 }
 
 // Put entry into the cache.
-func (m *NamecacheModule) Put(ctx context.Context, query *blocks.GNSQuery, 
block *blocks.GNSBlock) error {
+func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block 
*blocks.GNSBlock) error {
        return m.cache.Put(query, block)
 }
diff --git a/src/gnunet/service/revocation/module.go 
b/src/gnunet/service/revocation/module.go
index 1f0ab48..eade16b 100644
--- a/src/gnunet/service/revocation/module.go
+++ b/src/gnunet/service/revocation/module.go
@@ -98,8 +98,22 @@ func (m *Module) event(ctx context.Context, ev *core.Event) {
 
 //----------------------------------------------------------------------
 
+// Export functions
+func (m *Module) Export(fcn map[string]any) {
+       // add exported functions from module
+       fcn["rev:query"] = m.Query
+       fcn["rev:revoke"] = m.Revoke
+}
+
+// Import functions
+func (m *Module) Import(fcm map[string]any) {
+       // nothing to import now.
+}
+
+//----------------------------------------------------------------------
+
 // Query return true if the pkey is valid (not revoked) and false
-// if the pkey has been revoked.
+// if the pkey has been revoked ["rev:query"]
 func (m *Module) Query(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, 
err error) {
        // fast check first: is the key in the bloomfilter?
        data := zkey.Bytes()
@@ -118,7 +132,7 @@ func (m *Module) Query(ctx context.Context, zkey 
*crypto.ZoneKey) (valid bool, e
        return false, nil
 }
 
-// Revoke a key with given revocation data
+// Revoke a key with given revocation data ["rev:revoke"]
 func (m *Module) Revoke(ctx context.Context, rd *RevData) (success bool, err 
error) {
        // verify the revocation data
        diff, rc := rd.Verify(true)
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go
index b54ee4e..5dabfa2 100644
--- a/src/gnunet/transport/endpoint.go
+++ b/src/gnunet/transport/endpoint.go
@@ -30,6 +30,10 @@ import (
 var (
        ErrEndpNotAvailable     = errors.New("no endpoint for address 
available")
        ErrEndpProtocolMismatch = errors.New("transport protocol mismatch")
+       ErrEndpProtocolUnknown  = errors.New("unknown transport protocol")
+       ErrEndpExists           = errors.New("endpoint exists")
+       ErrEndpNoAddress        = errors.New("no address for endpoint")
+       ErrEndpNoConnection     = errors.New("no connection on endpoint")
 )
 
 // Endpoint represents a local endpoint that can send and receive messages.
@@ -83,9 +87,12 @@ type PaketEndpoint struct {
 func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) 
(err error) {
        // create listener
        var lc net.ListenConfig
-       if ep.conn, err = lc.ListenPacket(ctx, ep.addr.Network(), 
ep.addr.String()); err != nil {
+       xproto := ep.addr.Network()
+       if ep.conn, err = lc.ListenPacket(ctx, EpProtocol(xproto), 
ep.addr.String()); err != nil {
                return
        }
+       // use the actual listening address
+       ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String())
 
        // run watch dog for termination
        go func() {
@@ -95,27 +102,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr 
chan *TransportMessage) (
        // run go routine to handle messages from clients
        go func() {
                for {
-                       // read next message from packet
-                       n, _, err := ep.conn.ReadFrom(ep.buf)
+                       // read next message
+                       tm, err := ep.read()
                        if err != nil {
                                break
                        }
-                       rdr := bytes.NewBuffer(util.Clone(ep.buf[:n]))
-                       msg, err := ReadMessageDirect(rdr, ep.buf)
-                       if err != nil {
-                               break
-                       }
-                       // check for transport message
-                       if msg.Header().MsgType == message.DUMMY {
-                               // set transient attributes
-                               tm := msg.(*TransportMessage)
-                               tm.endp = ep.id
-                               tm.conn = 0
-                               // send to handler
-                               go func() {
-                                       hdlr <- tm
-                               }()
-                       }
+                       // send transport message to handler
+                       go func() {
+                               hdlr <- tm
+                       }()
                }
                // connection ended.
                ep.conn.Close()
@@ -123,29 +118,73 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr 
chan *TransportMessage) (
        return
 }
 
+// Read a transport message from endpoint based on extended protocol
+func (ep *PaketEndpoint) read() (tm *TransportMessage, err error) {
+       // read next packet (assuming that it contains one complete message)
+       var n int
+       if n, _, err = ep.conn.ReadFrom(ep.buf); err != nil {
+               return
+       }
+       // parse transport message based on extended protocol
+       var (
+               peer *util.PeerID
+               msg  message.Message
+       )
+       switch ep.addr.Network() {
+       case "ip+udp":
+               // parse peer id and message in sequence
+               peer = util.NewPeerID(ep.buf[:32])
+               rdr := bytes.NewBuffer(util.Clone(ep.buf[32:n]))
+               if msg, err = ReadMessageDirect(rdr, ep.buf); err != nil {
+                       return
+               }
+       default:
+               panic(ErrEndpProtocolUnknown)
+       }
+       // return transport message
+       return &TransportMessage{
+               Peer: peer,
+               Msg:  msg,
+       }, nil
+}
+
 // Send message to address from endpoint
 func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg 
*TransportMessage) (err error) {
+       // check for valid connection
+       if ep.conn == nil {
+               return ErrEndpNoConnection
+       }
+       // resolve target address
        var a *net.UDPAddr
-       a, err = net.ResolveUDPAddr(addr.Network(), addr.String())
+       a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String())
+
+       // get message content (TransportMessage)
        var buf []byte
        if buf, err = msg.Bytes(); err != nil {
                return
        }
+       // handle extended protocol:
+       switch ep.addr.Network() {
+       case "ip+udp":
+               // no modifications required
+
+       default:
+               // unknown protocol
+               return ErrEndpProtocolUnknown
+       }
        _, err = ep.conn.WriteTo(buf, a)
        return
 }
 
 // Address returms the
 func (ep *PaketEndpoint) Address() net.Addr {
-       if ep.conn != nil {
-               return ep.conn.LocalAddr()
-       }
        return ep.addr
 }
 
 // CanSendTo returns true if the endpoint can sent to address
-func (ep *PaketEndpoint) CanSendTo(addr net.Addr) bool {
-       return epMode(addr.Network()) == "packet"
+func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) {
+       ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network())
+       return
 }
 
 // ID returns the endpoint identifier
@@ -153,6 +192,7 @@ func (ep *PaketEndpoint) ID() int {
        return ep.id
 }
 
+// create a new packet endpoint for protcol and address
 func newPacketEndpoint(addr net.Addr) (ep *PaketEndpoint, err error) {
        // check for matching protocol
        if epMode(addr.Network()) != "packet" {
@@ -185,9 +225,13 @@ type StreamEndpoint struct {
 func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan 
*TransportMessage) (err error) {
        // create listener
        var lc net.ListenConfig
-       if ep.listener, err = lc.Listen(ctx, ep.addr.Network(), 
ep.addr.String()); err != nil {
+       xproto := ep.addr.Network()
+       if ep.listener, err = lc.Listen(ctx, EpProtocol(xproto), 
ep.addr.String()); err != nil {
                return
        }
+       // get actual listening address
+       ep.addr = util.NewAddress(xproto, ep.listener.Addr().String())
+
        // run watch dog for termination
        go func() {
                <-ctx.Done()
@@ -206,21 +250,14 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr 
chan *TransportMessage)
                        go func() {
                                for {
                                        // read next message from connection
-                                       msg, err := ReadMessage(ctx, conn, 
ep.buf)
+                                       tm, err := ep.read(ctx, conn)
                                        if err != nil {
                                                break
                                        }
-                                       // check for transport message
-                                       if msg.Header().MsgType == 
message.DUMMY {
-                                               // set transient attributes
-                                               tm := msg.(*TransportMessage)
-                                               tm.endp = ep.id
-                                               tm.conn = session
-                                               // send to handler
-                                               go func() {
-                                                       hdlr <- tm
-                                               }()
-                                       }
+                                       // send transport message to handler
+                                       go func() {
+                                               hdlr <- tm
+                                       }()
                                }
                                // connection ended.
                                conn.Close()
@@ -231,6 +268,34 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr 
chan *TransportMessage)
        return
 }
 
+// Read a transport message from endpoint based on extended protocol
+func (ep *StreamEndpoint) read(ctx context.Context, conn net.Conn) (tm 
*TransportMessage, err error) {
+       // parse transport message based on extended protocol
+       var (
+               peer *util.PeerID
+               msg  message.Message
+       )
+       switch ep.addr.Network() {
+       case "ip+udp":
+               // parse peer id
+               peer = util.NewPeerID(nil)
+               if _, err = conn.Read(peer.Key); err != nil {
+                       return
+               }
+               // read next message from connection
+               if msg, err = ReadMessage(ctx, conn, ep.buf); err != nil {
+                       break
+               }
+       default:
+               panic(ErrEndpProtocolUnknown)
+       }
+       // return transport message
+       return &TransportMessage{
+               Peer: peer,
+               Msg:  msg,
+       }, nil
+}
+
 // Send message to address from endpoint
 func (ep *StreamEndpoint) Send(ctx context.Context, addr net.Addr, msg 
*TransportMessage) error {
        return nil
@@ -238,9 +303,6 @@ func (ep *StreamEndpoint) Send(ctx context.Context, addr 
net.Addr, msg *Transpor
 
 // Address returns the actual listening endpoint address
 func (ep *StreamEndpoint) Address() net.Addr {
-       if ep.listener != nil {
-               return ep.listener.Addr()
-       }
        return ep.addr
 }
 
@@ -254,6 +316,7 @@ func (ep *StreamEndpoint) ID() int {
        return ep.id
 }
 
+// create a new endpoint based on extended protocol and address
 func newStreamEndpoint(addr net.Addr) (ep *StreamEndpoint, err error) {
        // check for matching protocol
        if epMode(addr.Network()) != "stream" {
@@ -270,10 +333,29 @@ func newStreamEndpoint(addr net.Addr) (ep 
*StreamEndpoint, err error) {
        return
 }
 
+//----------------------------------------------------------------------
+// derive endpoint mode (packet/stream) and transport protocol from
+// net.Adddr.Network() strings
+//----------------------------------------------------------------------
+
+// EpProtocol returns the transport protocol for a given network string
+// that can include extended protocol information like "r5n+ip+udp"
+func EpProtocol(netw string) string {
+       switch netw {
+       case "udp", "udp4", "udp6", "ip+udp":
+               return "udp"
+       case "tcp", "tcp4", "tcp6":
+               return "tcp"
+       case "unix":
+               return "unix"
+       }
+       return ""
+}
+
 // epMode returns the endpoint mode (packet or stream) for a given network
 func epMode(netw string) string {
-       switch netw {
-       case "udp", "udp4", "udp6", "r5n+ip+udp":
+       switch EpProtocol(netw) {
+       case "udp":
                return "packet"
        case "tcp", "unix":
                return "stream"
diff --git a/src/gnunet/transport/reader_writer.go 
b/src/gnunet/transport/reader_writer.go
index 2e5f14a..db3527e 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -113,10 +113,7 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, 
buf []byte) (msg messag
        if err = get(4, int(mh.MsgSize)-4); err != nil {
                return nil, err
        }
-       // handle transport message case
-       if mh.MsgType == message.DUMMY {
-               msg = NewTransportMessage(nil, nil)
-       } else if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil {
+       if msg, err = message.NewEmptyMessage(mh.MsgType); err != nil {
                return nil, err
        }
        if msg == nil {
diff --git a/src/gnunet/transport/transport.go 
b/src/gnunet/transport/transport.go
index 14def98..d1e8249 100644
--- a/src/gnunet/transport/transport.go
+++ b/src/gnunet/transport/transport.go
@@ -25,6 +25,8 @@ import (
        "gnunet/message"
        "gnunet/util"
        "net"
+
+       "github.com/bfix/gospel/network"
 )
 
 // Trnsport layer error codes
@@ -41,32 +43,19 @@ var (
 // Msg is the exchanged GNUnet message. The packet itself satisfies the
 // message.Message interface.
 type TransportMessage struct {
-       Hdr     *message.Header ``         // message header
-       Peer    *util.PeerID    ``         // remote peer
-       Payload []byte          `size:"*"` // GNUnet message
-
-       // package-local attributes (transient)
-       msg  message.Message
-       endp int // id of endpoint (incoming message)
-       conn int // id of connection (optional, incoming message)
-}
-
-func (msg *TransportMessage) Header() *message.Header {
-       return msg.Hdr
-}
-
-func (msg *TransportMessage) Message() (m message.Message, err error) {
-       if m = msg.msg; m == nil {
-               rdr := bytes.NewBuffer(msg.Payload)
-               m, err = ReadMessageDirect(rdr, nil)
-       }
-       return
+       Peer *util.PeerID    // remote peer
+       Msg  message.Message // GNUnet message
 }
 
 // Bytes returns the binary representation of a transport message
 func (msg *TransportMessage) Bytes() ([]byte, error) {
        buf := new(bytes.Buffer)
-       err := WriteMessageDirect(buf, msg)
+       // serialize peer id
+       if _, err := buf.Write(msg.Peer.Key); err != nil {
+               return nil, err
+       }
+       // serialize message
+       err := WriteMessageDirect(buf, msg.Msg)
        return buf.Bytes(), err
 }
 
@@ -76,21 +65,13 @@ func (msg *TransportMessage) String() string {
 }
 
 // NewTransportMessage creates a message suitable for transfer
-func NewTransportMessage(peer *util.PeerID, payload []byte) (tm 
*TransportMessage) {
+func NewTransportMessage(peer *util.PeerID, msg message.Message) (tm 
*TransportMessage) {
        if peer == nil {
                peer = util.NewPeerID(nil)
        }
-       msize := 0
-       if payload != nil {
-               msize = len(payload)
-       }
        tm = &TransportMessage{
-               Hdr: &message.Header{
-                       MsgSize: uint16(36 + msize),
-                       MsgType: message.DUMMY,
-               },
-               Peer:    peer,
-               Payload: payload,
+               Peer: peer,
+               Msg:  msg,
        }
        return
 }
@@ -100,28 +81,41 @@ func NewTransportMessage(peer *util.PeerID, payload 
[]byte) (tm *TransportMessag
 // Transport enables network-oriented (like IP, UDP, TCP or UDS)
 // message exchange on multiple endpoints.
 type Transport struct {
-       incoming  chan *TransportMessage // messages as received from the 
network
-       endpoints map[int]Endpoint       // list of available endpoints
+       incoming  chan *TransportMessage   // messages as received from the 
network
+       endpoints *util.Map[int, Endpoint] // list of available endpoints
+       upnp      *network.PortMapper      // UPnP mapper (optional)
 }
 
 // NewTransport creates and runs a new transport layer implementation.
-func NewTransport(ctx context.Context, ch chan *TransportMessage) (t 
*Transport) {
+func NewTransport(ctx context.Context, tag string, ch chan *TransportMessage) 
(t *Transport) {
        // create transport instance
+       mngr, err := network.NewPortMapper(tag)
+       if err != nil {
+               mngr = nil
+       }
        return &Transport{
                incoming:  ch,
-               endpoints: make(map[int]Endpoint),
+               endpoints: util.NewMap[int, Endpoint](),
+               upnp:      mngr,
+       }
+}
+
+// Shutdown transport-related processes
+func (t *Transport) Shutdown() {
+       if t.upnp != nil {
+               t.upnp.Close()
        }
 }
 
 // Send a message over suitable endpoint
 func (t *Transport) Send(ctx context.Context, addr net.Addr, msg 
*TransportMessage) (err error) {
-       for _, ep := range t.endpoints {
+       // use the first endpoint able to handle address
+       return t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
                if ep.CanSendTo(addr) {
-                       err = ep.Send(ctx, addr, msg)
-                       break
+                       return ep.Send(ctx, addr, msg)
                }
-       }
-       return
+               return nil
+       }, true)
 }
 
 //----------------------------------------------------------------------
@@ -130,22 +124,45 @@ func (t *Transport) Send(ctx context.Context, addr 
net.Addr, msg *TransportMessa
 
 // AddEndpoint instantiates and run a new endpoint handler for the
 // given address (must map to a network interface).
-func (t *Transport) AddEndpoint(ctx context.Context, addr net.Addr) (a 
net.Addr, err error) {
-       // register endpoint
-       var ep Endpoint
+func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep 
Endpoint, err error) {
+       // check for valid address
+       if addr == nil {
+               err = ErrEndpNoAddress
+               return
+       }
+       // check if endpoint is already available
+       as := addr.Network() + "://" + addr.String()
+       if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
+               ae := ep.Address().Network() + "://" + ep.Address().String()
+               if as == ae {
+                       return ErrEndpExists
+               }
+               return nil
+       }, true); err != nil {
+               return
+       }
+       // register new endpoint
        if ep, err = NewEndpoint(addr); err != nil {
                return
        }
-       t.endpoints[ep.ID()] = ep
+       // add endpoint to list and run it
+       t.endpoints.Put(ep.ID(), ep)
        ep.Run(ctx, t.incoming)
-       return ep.Address(), nil
+       return
 }
 
-// Endpoints returns a list of listening addresses managed by transport.
-func (t *Transport) Endpoints() (list []net.Addr) {
-       list = make([]net.Addr, 0)
-       for _, ep := range t.endpoints {
-               list = append(list, ep.Address())
-       }
-       return
+//----------------------------------------------------------------------
+// UPnP handling
+//----------------------------------------------------------------------
+
+// ForwardOpen returns a local address for listening that will receive traffic
+// from a port forward handled by UPnP on the router.
+func (t *Transport) ForwardOpen(protocol, param string, port int) (id, local, 
remote string, err error) {
+       // no parameters currently defined, so just do the assignment.
+       return t.upnp.Assign(protocol, port)
+}
+
+// ForwardClose closes a specific port forwarding
+func (t *Transport) ForwardClose(id string) error {
+       return t.upnp.Unassign(id)
 }
diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go
index 106e671..a56e95f 100644
--- a/src/gnunet/util/address.go
+++ b/src/gnunet/util/address.go
@@ -34,11 +34,11 @@ type Address struct {
 }
 
 // NewAddress returns a new Address for the given transport and specs
-func NewAddress(transport string, addr []byte) *Address {
+func NewAddress(transport string, addr string) *Address {
        return &Address{
                Netw:    transport,
                Options: 0,
-               Address: Clone(addr),
+               Address: Clone([]byte(addr)),
                Expires: AbsoluteTimeNever(),
        }
 }
@@ -61,7 +61,7 @@ func ParseAddress(s string) (addr *Address, err error) {
                err = fmt.Errorf("invalid address format: '%s'", s)
                return
        }
-       addr = NewAddress(p[0], []byte(strings.Trim(p[1], "/")))
+       addr = NewAddress(p[0], strings.Trim(p[1], "/"))
        return
 }
 
@@ -91,8 +91,11 @@ func (a *Address) Network() string {
 
 //----------------------------------------------------------------------
 
-// AddressString returns a string representaion of an address.
-func AddressString(network string, addr []byte) string {
+// URI returns a string representaion of an address.
+func (a *Address) URI() string {
+       return URI(a.Netw, a.Address)
+}
+func URI(network string, addr []byte) string {
        return network + "://" + string(addr)
 }
 
@@ -151,13 +154,13 @@ func (a *PeerAddrList) Add(id string, addr *Address) 
(mode int) {
                        list = append(list, addr)
                        a.list.Put(id, list)
                        return nil
-               })
+               }, false)
        }
        return
 }
 
 // Get address for peer
-func (a *PeerAddrList) Get(id string, transport string) *Address {
+func (a *PeerAddrList) Get(id string, transport string) (res []*Address) {
        list, ok := a.list.Get(id)
        if ok {
                for _, addr := range list {
@@ -171,10 +174,10 @@ func (a *PeerAddrList) Get(id string, transport string) 
*Address {
                                // skip other transports
                                continue
                        }
-                       return addr
+                       res = append(res, addr)
                }
        }
-       return nil
+       return
 }
 
 // Delete a list entry by key.
diff --git a/src/gnunet/util/database.go b/src/gnunet/util/database.go
index a1198fd..f1d1e5f 100644
--- a/src/gnunet/util/database.go
+++ b/src/gnunet/util/database.go
@@ -121,7 +121,7 @@ func (p *dbPool) remove(key string) error {
                        p.insts.Delete(key)
                }
                return
-       })
+       }, false)
 }
 
 // Connect to a SQL database (various types and flavors):
@@ -180,6 +180,6 @@ func (p *dbPool) Connect(spec string) (db *DbConn, err 
error) {
                db = new(DbConn)
                db.conn, err = inst.db.Conn(p.ctx)
                return err
-       })
+       }, false)
        return
 }
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index 7240757..4443737 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -70,43 +70,87 @@ func NewMap[K comparable, V any]() *Map[K, V] {
        }
 }
 
+//----------------------------------------------------------------------
+
 // Process a function in the locked map context. Calls
-// to other map functions in 'f' will use additional locks.
-func (m *Map[K, V]) Process(f func() error) error {
-       m.mtx.Lock()
-       defer m.mtx.Unlock()
+// to other map functions in 'f' will skip their locks.
+func (m *Map[K, V]) Process(f func() error, readonly bool) error {
+       // handle locking
+       m.lock(readonly)
+       m.inProcess = true
+       defer func() {
+               m.inProcess = false
+               m.unlock(readonly)
+       }()
+       // function call in unlocked environment
+       return f()
+}
+
+// Process a ranged function in the locked map context. Calls
+// to other map functions in 'f' will skip their locks.
+func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool) 
error {
+       // handle locking
+       m.lock(readonly)
        m.inProcess = true
-       err := f()
-       m.inProcess = false
-       return err
+       defer func() {
+               m.inProcess = false
+               m.unlock(readonly)
+       }()
+       // range over map and call function.
+       for key, value := range m.list {
+               if err := f(key, value); err != nil {
+                       return err
+               }
+       }
+       return nil
 }
 
+//----------------------------------------------------------------------
+
 // Put value into map under given key.
 func (m *Map[K, V]) Put(key K, value V) {
-       if !m.inProcess {
-               m.mtx.Lock()
-               defer m.mtx.Unlock()
-       }
+       m.lock(false)
+       defer m.unlock(false)
        m.list[key] = value
 }
 
 // Get value with iven key from map.
 func (m *Map[K, V]) Get(key K) (value V, ok bool) {
-       if !m.inProcess {
-               m.mtx.RLock()
-               defer m.mtx.RUnlock()
-       }
+       m.lock(true)
+       defer m.unlock(true)
        value, ok = m.list[key]
        return
 }
 
 // Delete key/value pair from map.
 func (m *Map[K, V]) Delete(key K) {
+       m.lock(false)
+       defer m.unlock(false)
+       delete(m.list, key)
+}
+
+//----------------------------------------------------------------------
+
+// lock with given mode (if not in processing function)
+func (m *Map[K, V]) lock(readonly bool) {
        if !m.inProcess {
-               m.mtx.Lock()
-               defer m.mtx.Unlock()
+               if readonly {
+                       m.mtx.RLock()
+               } else {
+                       m.mtx.Lock()
+               }
+       }
+}
+
+// lock with given mode (if not in processing function)
+func (m *Map[K, V]) unlock(readonly bool) {
+       if !m.inProcess {
+               if readonly {
+                       m.mtx.RUnlock()
+               } else {
+                       m.mtx.Unlock()
+               }
        }
-       delete(m.list, key)
 }
 
 //----------------------------------------------------------------------
diff --git a/src/gnunet/util/peer_id.go b/src/gnunet/util/peer_id.go
index a8202a1..a74958d 100644
--- a/src/gnunet/util/peer_id.go
+++ b/src/gnunet/util/peer_id.go
@@ -25,23 +25,19 @@ type PeerID struct {
        Key []byte `size:"32"`
 }
 
-// NewPeerID creates a new object from the data.
-func NewPeerID(data []byte) *PeerID {
-       if data == nil {
-               data = make([]byte, 32)
-       } else {
-               size := len(data)
-               if size > 32 {
-                       data = data[:32]
-               } else if size < 32 {
-                       buf := make([]byte, 32)
-                       CopyAlignedBlock(buf, data)
-                       data = buf
-               }
+// NewPeerID creates a new peer id from data.
+func NewPeerID(data []byte) (p *PeerID) {
+       p = &PeerID{
+               Key: make([]byte, 32),
        }
-       return &PeerID{
-               Key: data,
+       if data != nil {
+               if len(data) < 32 {
+                       CopyAlignedBlock(p.Key, data)
+               } else {
+                       copy(p.Key, data[:32])
+               }
        }
+       return
 }
 
 // Equals returns true if two peer IDs match.

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]