[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-go] branch master updated: RC1 for milestone 2 (NGI Assure)
From: |
gnunet |
Subject: |
[gnunet-go] branch master updated: RC1 for milestone 2 (NGI Assure) |
Date: |
Mon, 25 Jul 2022 13:44:56 +0200 |
This is an automated email from the git hooks/post-receive script.
bernd-fix pushed a commit to branch master
in repository gnunet-go.
The following commit(s) were added to refs/heads/master by this push:
new 835c8e8 RC1 for milestone 2 (NGI Assure)
835c8e8 is described below
commit 835c8e8b45487c1276426034b5afb915853b6eb1
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Mon Jul 25 13:43:57 2022 +0200
RC1 for milestone 2 (NGI Assure)
---
src/gnunet/cmd/gnunet-service-dht-go/main.go | 8 +
src/gnunet/config/config.go | 17 +-
src/gnunet/core/core.go | 32 +--
src/gnunet/core/peer.go | 4 +-
src/gnunet/crypto/hash.go | 28 ++-
src/gnunet/crypto/signature.go | 11 +
src/gnunet/go.mod | 4 +-
src/gnunet/go.sum | 4 +-
src/gnunet/message/msg_dht_p2p.go | 279 +++++++++++----------
src/gnunet/service/dht/blocks/filters.go | 80 ++++--
src/gnunet/service/dht/blocks/filters_test.go | 24 ++
src/gnunet/service/dht/blocks/generic.go | 44 ++--
src/gnunet/service/dht/blocks/gns.go | 6 +-
src/gnunet/service/dht/blocks/hello.go | 10 +-
src/gnunet/service/dht/local.go | 86 +++++++
src/gnunet/service/dht/messages.go | 211 +++++++++++-----
src/gnunet/service/dht/module.go | 147 +++++++++--
src/gnunet/service/dht/path/elements.go | 129 ++++++++++
src/gnunet/service/dht/path/handling.go | 263 +++++++++++++++++++
src/gnunet/service/dht/path/handling_test.go | 133 ++++++++++
src/gnunet/service/dht/resulthandler.go | 59 ++++-
src/gnunet/service/dht/routingtable.go | 126 ++++++----
src/gnunet/service/dht/routingtable_test.go | 8 +-
src/gnunet/service/module.go | 10 +
src/gnunet/service/namecache/module.go | 10 +-
src/gnunet/service/store/database.go | 12 +-
src/gnunet/service/store/dhtstore_test.go | 32 ++-
src/gnunet/service/store/store.go | 9 +-
src/gnunet/service/store/store_fs.go | 119 ++++++---
src/gnunet/service/store/store_fs_meta.go | 17 +-
src/gnunet/transport/endpoint.go | 10 +-
src/gnunet/transport/reader_writer.go | 32 +--
src/gnunet/transport/transport.go | 25 +-
src/gnunet/util/address.go | 12 +-
src/gnunet/util/map.go | 107 +++++---
src/gnunet/util/misc.go | 24 ++
src/gnunet/util/peer.go | 75 ++++--
.../{crypto/signature.go => util/peer_test.go} | 23 +-
38 files changed, 1700 insertions(+), 530 deletions(-)
diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go
b/src/gnunet/cmd/gnunet-service-dht-go/main.go
index 8e23ac2..d8244d9 100644
--- a/src/gnunet/cmd/gnunet-service-dht-go/main.go
+++ b/src/gnunet/cmd/gnunet-service-dht-go/main.go
@@ -23,6 +23,7 @@ import (
"flag"
"os"
"os/signal"
+ "runtime"
"strings"
"syscall"
"time"
@@ -185,6 +186,13 @@ loop:
// handle heart beat
case now := <-tick.C:
logger.Println(logger.INFO, "[dht] Heart beat at
"+now.String())
+ // print some system statistics
+ logger.Printf(logger.INFO, "[dht] Number of Go
routines: %15d", runtime.NumGoroutine())
+ mem := new(runtime.MemStats)
+ runtime.ReadMemStats(mem)
+ logger.Printf(logger.INFO, "[dht] Allocated
heap: %15d", mem.HeapAlloc)
+ logger.Printf(logger.INFO, "[dht] Idle
heap: %15d", mem.HeapIdle)
+ logger.Printf(logger.INFO, "[dht] Total
allocation: %15d", mem.TotalAlloc)
}
}
diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go
index c4d3722..cfbf705 100644
--- a/src/gnunet/config/config.go
+++ b/src/gnunet/config/config.go
@@ -212,15 +212,16 @@ func applySubstitutions(x interface{}, env
map[string]string) {
switch fld.Kind() {
case reflect.String:
// check for substitution
- s, _ := fld.Interface().(string)
- for {
- s1 := substString(s, env)
- if s1 == s {
- break
+ if s, ok := fld.Interface().(string);
ok {
+ for {
+ s1 := substString(s,
env)
+ if s1 == s {
+ break
+ }
+
logger.Printf(logger.DBG, "[config] %s --> %s\n", s, s1)
+ fld.SetString(s1)
+ s = s1
}
- logger.Printf(logger.DBG,
"[config] %s --> %s\n", s, s1)
- fld.SetString(s1)
- s = s1
}
case reflect.Struct:
diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go
index 128d154..a117643 100644
--- a/src/gnunet/core/core.go
+++ b/src/gnunet/core/core.go
@@ -23,6 +23,7 @@ import (
"encoding/hex"
"errors"
"gnunet/config"
+ "gnunet/crypto"
"gnunet/message"
"gnunet/transport"
"gnunet/util"
@@ -165,13 +166,13 @@ func (c *Core) pump(ctx context.Context) {
select {
// get (next) message from transport
case tm := <-c.incoming:
- logger.Printf(logger.DBG, "[core] Message received from
%s: %s", tm.Peer, transport.Dump(tm.Msg, "json"))
+ logger.Printf(logger.DBG, "[core] Message received from
%s: %s", tm.Peer, util.Dump(tm.Msg, "json"))
// check if peer is already connected (has an entry in
PeerAddrist)
- _, connected := c.connected.Get(tm.Peer.String())
+ _, connected := c.connected.Get(tm.Peer.String(), 0)
if !connected {
// no: mark connected
- c.connected.Put(tm.Peer.String(), true)
+ c.connected.Put(tm.Peer.String(), true, 0)
// generate EV_CONNECT event
c.dispatch(&Event{
ID: EV_CONNECT,
@@ -259,6 +260,11 @@ func (c *Core) Learn(ctx context.Context, peer
*util.PeerID, addrs []*util.Addre
// learn all addresses for peer
newPeer = false
for _, addr := range addrs {
+ // filter out addresses we can't handle (including local
addresses)
+ if !transport.CanHandleAddress(addr) {
+ continue
+ }
+ // learn address
logger.Printf(logger.INFO, "[core] Learning %s for %s (expires
%s)", addr.URI(), peer, addr.Expires)
newPeer = (c.peers.Add(peer, addr) == 1) || newPeer
}
@@ -287,17 +293,8 @@ func (c *Core) PeerID() *util.PeerID {
//----------------------------------------------------------------------
-// Signable interface for objects that can get signed by peer
-type Signable interface {
- // SignedData returns the byte array to be signed
- SignedData() []byte
-
- // SetSignature returns the signature to the signable object
- SetSignature(*util.PeerSignature) error
-}
-
// Sign a signable onject with private peer key
-func (c *Core) Sign(obj Signable) error {
+func (c *Core) Sign(obj crypto.Signable) error {
sd := obj.SignedData()
logger.Printf(logger.DBG, "[core] Signing data '%s'",
hex.EncodeToString(sd))
sig, err := c.local.prv.EdSign(sd)
@@ -315,6 +312,7 @@ func (c *Core) Sign(obj Signable) error {
// When the connection attempt is successful, information on the new
// peer is offered through the PEER_CONNECTED signal.
func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error {
+ // TODO:
return nil
}
@@ -322,7 +320,9 @@ func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr)
error {
// connection to a peer P. Underlays are usually limited in the number
// of active connections. With this function the DHT can indicate to the
// underlay which connections should preferably be preserved.
-func (c *Core) Hold(peer *util.PeerID) {}
+func (c *Core) Hold(peer *util.PeerID) {
+ // TODO:
+}
// Drop is a function which tells the underlay to drop the connection to a
// peer P. This function is only there for symmetry and used during the
@@ -333,7 +333,9 @@ func (c *Core) Hold(peer *util.PeerID) {}
// DROP() also does not imply that the underlay must close the connection:
// it merely removes the preference to preserve the connection that was
// established by HOLD().
-func (c *Core) Drop(peer *util.PeerID) {}
+func (c *Core) Drop(peer *util.PeerID) {
+ // TODO:
+}
//----------------------------------------------------------------------
// Event listener and event dispatch.
diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go
index 5b0c3d0..cf67272 100644
--- a/src/gnunet/core/peer.go
+++ b/src/gnunet/core/peer.go
@@ -143,9 +143,7 @@ func (p *Peer) PubKey() *ed25519.PublicKey {
// GetID returns the node ID (public key) in binary format
func (p *Peer) GetID() *util.PeerID {
- return &util.PeerID{
- Data: util.Clone(p.pub.Bytes()),
- }
+ return util.NewPeerID(p.pub.Bytes())
}
// GetIDString returns the string representation of the public key of the node.
diff --git a/src/gnunet/crypto/hash.go b/src/gnunet/crypto/hash.go
index 437dcb2..49e57ff 100644
--- a/src/gnunet/crypto/hash.go
+++ b/src/gnunet/crypto/hash.go
@@ -28,7 +28,7 @@ import (
// HashCode is the result of a 512-bit hash function (SHA-512)
type HashCode struct {
- Bits []byte `size:"64"`
+ Bits []byte `size:"(Size))"`
}
// Equals tests if two hash results are equal.
@@ -36,9 +36,16 @@ func (hc *HashCode) Equals(n *HashCode) bool {
return bytes.Equal(hc.Bits, n.Bits)
}
+// Size of binary data
+func (hc *HashCode) Size() uint {
+ return 64
+}
+
// Clone the hash code
func (hc *HashCode) Clone() *HashCode {
- return NewHashCode(hc.Bits)
+ return &HashCode{
+ Bits: util.Clone(hc.Bits),
+ }
}
// String returns a hex-representation of the hash code
@@ -47,13 +54,18 @@ func (hc *HashCode) String() string {
}
// NewHashCode creates a new (initialized) hash value
-func NewHashCode(buf []byte) *HashCode {
- hc := &HashCode{
- Bits: make([]byte, 64),
- }
- if buf != nil {
- util.CopyAlignedBlock(hc.Bits, buf)
+func NewHashCode(data []byte) *HashCode {
+ hc := new(HashCode)
+ size := hc.Size()
+ v := make([]byte, size)
+ if data != nil && len(data) > 0 {
+ if uint(len(data)) < size {
+ util.CopyAlignedBlock(v, data)
+ } else {
+ copy(v, data[:size])
+ }
}
+ hc.Bits = v
return hc
}
diff --git a/src/gnunet/crypto/signature.go b/src/gnunet/crypto/signature.go
index 4bc5aac..1e0acbb 100644
--- a/src/gnunet/crypto/signature.go
+++ b/src/gnunet/crypto/signature.go
@@ -18,8 +18,19 @@
package crypto
+import "gnunet/util"
+
// SignaturePurpose is the GNUnet data structure used as header for signed
data.
type SignaturePurpose struct {
Size uint32 `order:"big"` // How many bytes are signed?
Purpose uint32 `order:"big"` // Signature purpose
}
+
+// Signable interface for objects that can get signed by peer
+type Signable interface {
+ // SignedData returns the byte array to be signed
+ SignedData() []byte
+
+ // SetSignature returns the signature to the signable object
+ SetSignature(*util.PeerSignature) error
+}
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 5109da4..8a5db53 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
go 1.18
require (
- github.com/bfix/gospel v1.2.15
+ github.com/bfix/gospel v1.2.17
github.com/go-redis/redis/v8 v8.11.5
github.com/go-sql-driver/mysql v1.6.0
github.com/gorilla/mux v1.8.0
@@ -23,3 +23,5 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.11 // indirect
)
+
+// replace github.com/bfix/gospel v1.2.17 => ../gospel
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index 5a08cee..22c7e09 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,5 @@
-github.com/bfix/gospel v1.2.15 h1:f0t8dvihSXWvhnXDI2q7FCtG7LHg5qImjEWdzIN/luY=
-github.com/bfix/gospel v1.2.15/go.mod
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
+github.com/bfix/gospel v1.2.17 h1:Stvm+OiCA2GIWIhI/HKc6uaLDMtrJNxXgw/g+v9witw=
+github.com/bfix/gospel v1.2.17/go.mod
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
github.com/cespare/xxhash/v2 v2.1.2
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
diff --git a/src/gnunet/message/msg_dht_p2p.go
b/src/gnunet/message/msg_dht_p2p.go
index 55bb71d..ccc0406 100644
--- a/src/gnunet/message/msg_dht_p2p.go
+++ b/src/gnunet/message/msg_dht_p2p.go
@@ -27,11 +27,11 @@ import (
"gnunet/crypto"
"gnunet/enums"
"gnunet/service/dht/blocks"
+ "gnunet/service/dht/path"
"gnunet/util"
"time"
"github.com/bfix/gospel/crypto/ed25519"
- "github.com/bfix/gospel/data"
"github.com/bfix/gospel/logger"
)
@@ -39,90 +39,6 @@ import (
// DHT-P2P is a next-generation implementation of the R5N DHT.
//======================================================================
-// shared path element data across types
-type pathElementData struct {
- Expiration util.AbsoluteTime // expiration date
- BlockHash *crypto.HashCode // block hash
- PeerPredecessor *util.PeerID // predecessor peer
- PeerSuccessor *util.PeerID // successor peer
-}
-
-// helper type for signature creation/verification
-type pathElementSignedData struct {
- Size uint16 `order:"big"` // size of signed data
- Purpose uint16 `order:"big"` // signature purpose
(SIG_DHT_HOP)
- Elem *pathElementData `` // path element data
-}
-
-// PathElement is the full-fledged data assembly for a path element in
-// PUT/GET pathes. It is assembled programatically (on generation[1] and
-// verification[2]) and not transferred in messages directly.
-//
-// [1] spe = &PathElement{...}
-// core.Sign(spe)
-// msg.putpath[i] = spe.Wire()
-//
-// [2] pe = &PathElement{...,Signature: wire.sig}
-// if !pe.Verify(peerId) { ... }
-//
-type PathElement struct {
- pathElementData
- Signature *util.PeerSignature // signature
-}
-
-// NewPathElement creates a new path element from data
-func NewPathElement(key *crypto.HashCode, pred, succ *util.PeerID)
*PathElement {
- return &PathElement{
- pathElementData: pathElementData{
- Expiration: util.AbsoluteTimeNow().Add(12 *
time.Hour),
- BlockHash: key,
- PeerPredecessor: pred,
- PeerSuccessor: succ,
- },
- Signature: nil,
- }
-}
-
-// PathElementWire is the data stored and retrieved from messages
-type PathElementWire struct {
- Predecessor *util.PeerID // peer id of predecessor
- Signature *util.PeerSignature // path signature
-}
-
-// Size returns the size of a path element in wire format
-func (pew *PathElementWire) Size() uint16 {
- return 96
-}
-
-// SignedData gets the data to be signed by peer ('Signable' interface)
-func (pe *PathElement) SignedData() []byte {
- sd := &pathElementSignedData{
- Size: 80,
- Purpose: uint16(enums.SIG_DHT_HOP),
- Elem: &(pe.pathElementData),
- }
- buf, err := data.Marshal(sd)
- if err != nil {
- logger.Println(logger.ERROR, "can't serialize path element for
signature")
- return nil
- }
- return buf
-}
-
-// SetSignature stores the generated signature.
-func (pe *PathElement) SetSignature(sig *util.PeerSignature) error {
- pe.Signature = sig
- return nil
-}
-
-// Wire returns the path element suitable for inclusion into messages
-func (pe *PathElement) Wire() *PathElementWire {
- return &PathElementWire{
- Predecessor: pe.PeerPredecessor,
- Signature: pe.Signature,
- }
-}
-
//----------------------------------------------------------------------
// DHT-P2P-GET messages are used to request information from other
// peers in the DHT.
@@ -171,7 +87,7 @@ func (m *DHTP2PGetMsg) Header() *Header {
return &Header{m.MsgSize, m.MsgType}
}
-// Clone message
+// Update message (forwarding)
func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf blocks.ResultFilter,
hop uint16) *DHTP2PGetMsg {
buf := rf.Bytes()
ns := uint16(len(buf))
@@ -197,39 +113,39 @@ func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf
blocks.ResultFilter, hop
// DHTP2PPutMsg wire layout
type DHTP2PPutMsg struct {
- MsgSize uint16 `order:"big"` // total size of message
- MsgType uint16 `order:"big"` // DHT_P2P_PUT (146)
- BType uint32 `order:"big"` // block type
- Flags uint16 `order:"big"` // processing flags
- HopCount uint16 `order:"big"` // message hops
- ReplLvl uint16 `order:"big"` // replication level
- PathL uint16 `order:"big"` // path length
- Expiration util.AbsoluteTime `` // expiration date
- PeerFilter *blocks.PeerFilter `` // peer bloomfilter
- Key *crypto.HashCode `` // query key to block
- Origin []byte `size:"(PESize)"` // truncated origin (if
TRUNCATED flag set)
- PutPath []*PathElementWire `size:"PathL"` // PUT path
- LastSig []byte `size:"(PESize)"` // signature of last
hop (if RECORD_ROUTE flag is set)
- Block []byte `size:"*"` // block data
+ MsgSize uint16 `order:"big"` // total size of
message
+ MsgType uint16 `order:"big"` // DHT_P2P_PUT (146)
+ BType uint32 `order:"big"` // block type
+ Flags uint16 `order:"big"` // processing flags
+ HopCount uint16 `order:"big"` // message hops
+ ReplLvl uint16 `order:"big"` // replication level
+ PathL uint16 `order:"big"` // path length
+ Expiration util.AbsoluteTime `` // expiration date
+ PeerFilter *blocks.PeerFilter `` // peer bloomfilter
+ Key *crypto.HashCode `` // query key to block
+ TruncOrigin []byte `size:"(PESize)"` // truncated origin
(if TRUNCATED flag set)
+ PutPath []*path.Entry `size:"PathL"` // PUT path
+ LastSig []byte `size:"(PESize)"` // signature of last
hop (if RECORD_ROUTE flag is set)
+ Block []byte `size:"*"` // block data
}
// NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg
func NewDHTP2PPutMsg() *DHTP2PPutMsg {
return &DHTP2PPutMsg{
- MsgSize: 218, // total size without
path and block data
- MsgType: DHT_P2P_PUT, // DHT_P2P_PUT (146)
- BType: 0, // block type
- Flags: 0, // processing flags
- HopCount: 0, // message hops
- ReplLvl: 0, // replication level
- PathL: 0, // no PUT path
- Expiration: util.AbsoluteTimeNever(), // expiration date
- PeerFilter: blocks.NewPeerFilter(), // peer bloom filter
- Key: crypto.NewHashCode(nil), // query key
- Origin: nil, // no truncated path
- PutPath: make([]*PathElementWire, 0), // empty PUT path
- LastSig: nil, // no signature from
last hop
- Block: nil, // no block data
+ MsgSize: 218, // total size without
path and block data
+ MsgType: DHT_P2P_PUT, // DHT_P2P_PUT (146)
+ BType: 0, // block type
+ Flags: 0, // processing flags
+ HopCount: 0, // message hops
+ ReplLvl: 0, // replication level
+ PathL: 0, // no PUT path
+ Expiration: util.AbsoluteTimeNever(), // expiration date
+ PeerFilter: blocks.NewPeerFilter(), // peer bloom filter
+ Key: crypto.NewHashCode(nil), // query key
+ TruncOrigin: nil, // no truncated path
+ PutPath: make([]*path.Entry, 0), // empty PUT path
+ LastSig: nil, // no signature from
last hop
+ Block: nil, // no block data
}
}
@@ -238,24 +154,117 @@ func (m *DHTP2PPutMsg) PESize(field string) uint {
switch field {
case "Origin":
if m.Flags&enums.DHT_RO_TRUNCATED != 0 {
- return 32
+ return util.NewPeerID(nil).Size()
}
case "LastSig":
if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 {
- return 64
+ return util.NewPeerSignature(nil).Size()
}
}
return 0
}
-// AddPutPath adds an element to the PUT path
-func (m *DHTP2PPutMsg) AppendPutPath(pe *PathElement) {
- pew := pe.Wire()
- m.PutPath = append(m.PutPath, pew)
- m.PathL++
- m.MsgSize += pew.Size()
+//----------------------------------------------------------------------
+
+// Update message (forwarding)
+func (m *DHTP2PPutMsg) Update(p *path.Path, pf *blocks.PeerFilter, hop uint16)
*DHTP2PPutMsg {
+ msg := NewDHTP2PPutMsg()
+ msg.Flags = m.Flags
+ msg.HopCount = hop
+ msg.PathL = m.PathL
+ msg.Expiration = m.Expiration
+ msg.PeerFilter = pf
+ msg.Key = m.Key.Clone()
+ msg.TruncOrigin = m.TruncOrigin
+ msg.PutPath = util.Clone(m.PutPath)
+ msg.LastSig = m.LastSig
+ msg.Block = util.Clone(m.Block)
+ msg.SetPath(p)
+ return msg
+}
+
+//----------------------------------------------------------------------
+// Path handling (get/set path in message)
+//----------------------------------------------------------------------
+
+// Path returns the current path from message
+func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path {
+ // create a "real" path list from message data
+ pth := path.NewPath(crypto.Hash(m.Block), m.Expiration)
+
+ // return empty path if recording is switched off
+ if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 {
+ return pth
+ }
+
+ // handle truncate origin
+ if m.Flags&enums.DHT_RO_TRUNCATED == 1 {
+ if m.TruncOrigin == nil || len(m.TruncOrigin) == 0 {
+ logger.Printf(logger.WARN, "[path] truncated but no
origin - flag reset")
+ m.Flags &^= enums.DHT_RO_TRUNCATED
+ } else {
+ pth.TruncOrigin = util.NewPeerID(m.TruncOrigin)
+ pth.Flags |= path.PathTruncated
+ }
+ }
+
+ // copy path elements
+ pth.List = util.Clone(m.PutPath)
+ pth.NumList = uint16(len(pth.List))
+
+ // handle last hop signature
+ if m.LastSig == nil || len(m.LastSig) == 0 {
+ logger.Printf(logger.WARN, "[path] - last hop signature
missing - path reset")
+ return path.NewPath(crypto.Hash(m.Block), m.Expiration)
+ }
+ pth.Flags |= path.PathLastHop
+ pth.LastSig = util.NewPeerSignature(m.LastSig)
+ pth.LastHop = sender
+ return pth
+}
+
+// Set path in message; corrects the message size accordingly
+func (m *DHTP2PPutMsg) SetPath(p *path.Path) {
+
+ // return if recording is switched off (don't touch path)
+ if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 {
+ return
+ }
+ // compute old path size
+ var pes uint
+ if len(m.PutPath) > 0 {
+ pes = m.PutPath[0].Size()
+ }
+ oldSize := uint(len(m.PutPath))*pes + m.PESize("Origin") +
m.PESize("LastSig")
+ // if no new path is defined,...
+ if p == nil {
+ // ... remove existing path
+ m.TruncOrigin = nil
+ m.PutPath = nil
+ m.LastSig = nil
+ m.PathL = 0
+ m.Flags &^= enums.DHT_RO_TRUNCATED
+ m.MsgSize -= uint16(oldSize)
+ return
+ }
+ // adjust message size
+ m.MsgSize += uint16(p.Size() - oldSize)
+
+ // transfer path data
+ if p.TruncOrigin != nil {
+ // truncated path
+ m.Flags |= enums.DHT_RO_TRUNCATED
+ m.TruncOrigin = p.TruncOrigin.Bytes()
+ }
+ m.PutPath = util.Clone(p.List)
+ m.PathL = uint16(len(m.PutPath))
+ if p.LastSig != nil {
+ m.LastSig = p.LastSig.Bytes()
+ }
}
+//----------------------------------------------------------------------
+
// String returns a human-readable representation of the message.
func (m *DHTP2PPutMsg) String() string {
return fmt.Sprintf("DHTP2PPutMsg{btype=%s,hops=%d,flags=%d}",
@@ -274,19 +283,19 @@ func (m *DHTP2PPutMsg) Header() *Header {
// DHTP2PResultMsg wire layout
type DHTP2PResultMsg struct {
- MsgSize uint16 `order:"big"` // total size of message
- MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148)
- BType uint32 `order:"big"` // Block type of result
- Reserved uint32 `order:"big"` // Reserved for further
use
- PutPathL uint16 `order:"big"` // size of PUTPATH field
- GetPathL uint16 `order:"big"` // size of GETPATH field
- Expires util.AbsoluteTime `` // expiration date
- Query *crypto.HashCode `` // Query key for block
- Origin []byte `size:"(PESize)"` // truncated origin (if
TRUNCATED flag set)
- PutPath []*PathElementWire `size:"PutPathL"` // PUTPATH
- GetPath []*PathElementWire `size:"GetPathL"` // GETPATH
- LastSig []byte `size:"(PESize)"` // signature of last hop
(if RECORD_ROUTE flag is set)
- Block []byte `size:"*"` // block data
+ MsgSize uint16 `order:"big"` // total size of message
+ MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148)
+ BType uint32 `order:"big"` // Block type of result
+ Reserved uint32 `order:"big"` // Reserved for further use
+ PutPathL uint16 `order:"big"` // size of PUTPATH field
+ GetPathL uint16 `order:"big"` // size of GETPATH field
+ Expires util.AbsoluteTime `` // expiration date
+ Query *crypto.HashCode `` // Query key for block
+ Origin []byte `size:"(PESize)"` // truncated origin (if
TRUNCATED flag set)
+ PutPath []*path.Entry `size:"PutPathL"` // PUTPATH
+ GetPath []*path.Entry `size:"GetPathL"` // GETPATH
+ LastSig []byte `size:"(PESize)"` // signature of last hop
(if RECORD_ROUTE flag is set)
+ Block []byte `size:"*"` // block data
}
// NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg
diff --git a/src/gnunet/service/dht/blocks/filters.go
b/src/gnunet/service/dht/blocks/filters.go
index 0d194cc..26af0b3 100644
--- a/src/gnunet/service/dht/blocks/filters.go
+++ b/src/gnunet/service/dht/blocks/filters.go
@@ -43,6 +43,13 @@ func NewPeerFilter() *PeerFilter {
}
}
+// NewPeerFilterFromBytes creates a peer filter from data.
+func NewPeerFilterFromBytes(data []byte) *PeerFilter {
+ return &PeerFilter{
+ BF: NewBloomFilterFromBytes(data),
+ }
+}
+
// Add peer id to the filter
func (pf *PeerFilter) Add(p *util.PeerID) {
pf.BF.Add(p.Data)
@@ -112,42 +119,78 @@ type ResultFilter interface {
}
//----------------------------------------------------------------------
-// Dummy result filter
-// [Additional filters (per block type) are defined in corresponding files]
+// Generic result filter:
+// Filter duplicate blocks (identical hash value over content)
//----------------------------------------------------------------------
-// PassResultFilter is a dummy result filter with no state.
-type PassResultFilter struct{}
+// GenericResultFilter is a dummy result filter with no state.
+type GenericResultFilter struct {
+ bf *BloomFilter
+}
+
+// NewGenericResultFilter creates a new empty result bloom filter
+func NewGenericResultFilter() *GenericResultFilter {
+ return &GenericResultFilter{
+ bf: NewBloomFilter(128),
+ }
+}
// Add a block to the result filter.
-func (rf *PassResultFilter) Add(Block) {
+func (rf *GenericResultFilter) Add(b Block) {
+ rf.bf.Add(b.Bytes())
}
// Contains returns true if entry (binary representation) is filtered
-func (rf *PassResultFilter) Contains(Block) bool {
- return false
+func (rf *GenericResultFilter) Contains(b Block) bool {
+ return rf.bf.Contains(b.Bytes())
}
// Bytes returns the binary representation of a result filter
-func (rf *PassResultFilter) Bytes() (buf []byte) {
- return
+func (rf *GenericResultFilter) Bytes() (buf []byte) {
+ return rf.bf.Bytes()
}
// Merge two result filters
-func (rf *PassResultFilter) Merge(ResultFilter) bool {
+func (rf *GenericResultFilter) Merge(t ResultFilter) bool {
+ // check for correct type
+ trf, ok := t.(*GenericResultFilter)
+ if !ok {
+ return false
+ }
+ // check for identical mutator (if any)
+ if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) {
+ return false
+ }
+ // check for same size
+ if len(rf.bf.Bits) != len(trf.bf.Bits) {
+ return false
+ }
+ // merge bloomfilters
+ for i := range rf.bf.Bits {
+ rf.bf.Bits[i] ^= trf.bf.Bits[i]
+ }
return true
}
// Compare two result filters
-func (rf *PassResultFilter) Compare(t ResultFilter) int {
- if _, ok := t.(*PassResultFilter); ok {
+func (rf *GenericResultFilter) Compare(t ResultFilter) int {
+ trf, ok := t.(*GenericResultFilter)
+ if !ok {
+ return CMP_DIFFER
+ }
+ // check for identical mutator (if any)
+ if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) {
+ return CMP_DIFFER
+ }
+ // check for identical bits
+ if bytes.Equal(rf.bf.Bits, trf.bf.Bits) {
return CMP_SAME
}
- return CMP_DIFFER
+ return CMP_MERGE
}
//======================================================================
-// Generic bllom filter with mutator
+// Generic bloom filter with mutator
//======================================================================
// BloomFilter is a space-efficient probabilistic datastructure to test if
@@ -171,6 +214,15 @@ func NewBloomFilter(n int) *BloomFilter {
}
}
+// NewBloomFilterFromBytes creates a new filter from data
+func NewBloomFilterFromBytes(data []byte) *BloomFilter {
+ return &BloomFilter{
+ Bits: util.Clone(data),
+ mInput: nil,
+ mData: nil,
+ }
+}
+
// SetMutator to define a mutator for randomization. If 'm' is nil,
// the mutator is removed from the filter (use with care!)
func (bf *BloomFilter) SetMutator(m any) {
diff --git a/src/gnunet/service/dht/blocks/filters_test.go
b/src/gnunet/service/dht/blocks/filters_test.go
index ef1331c..533bab8 100644
--- a/src/gnunet/service/dht/blocks/filters_test.go
+++ b/src/gnunet/service/dht/blocks/filters_test.go
@@ -21,6 +21,8 @@ package blocks
import (
"bytes"
"crypto/rand"
+ "encoding/base64"
+ "gnunet/util"
"sort"
"testing"
)
@@ -108,3 +110,25 @@ func TestBloomfilter(t *testing.T) {
t.Logf("FAILED with %d false-positives", count)
}
}
+
+func TestBFCase1(t *testing.T) {
+ senderS := "83JF73PZ69ZFVCHH9VDEGY673EH4H3B4Y4XRV8XB3PQHP8SFN220"
+ pfS := "AAAAABAACAAQAAAAACAAgAAAAIAAAACAAAAAAAAABAAQAAAADAAAAABA" +
+
"AAAAAAAAAAAQAAAAAAAAAAAACAAIAAAAAACAAABAAAAAAIgEAAAABAAACAAAAA" +
+ "EAAAAAAAAAAEABAAAAAAAAFIAAEAAAAAAAAAAAAABAAIAAAAAAAAA="
+
+ // decode sender
+ buf, err := util.DecodeStringToBinary(senderS, 32)
+ if err != nil {
+ t.Fatal(err)
+ }
+ sender := util.NewPeerID(buf)
+
+ // decode peer filter
+ if buf, err = base64.StdEncoding.DecodeString(pfS); err != nil {
+ t.Fatal(err)
+ }
+ pf := NewPeerFilterFromBytes(buf)
+ rc := pf.Contains(sender)
+ t.Logf("contains? %v", rc)
+}
diff --git a/src/gnunet/service/dht/blocks/generic.go
b/src/gnunet/service/dht/blocks/generic.go
index 2962025..c67213e 100644
--- a/src/gnunet/service/dht/blocks/generic.go
+++ b/src/gnunet/service/dht/blocks/generic.go
@@ -39,11 +39,14 @@ type Query interface {
Key() *crypto.HashCode
// Type returns the requested block type
- Type() uint16
+ Type() enums.BlockType
// Flags returns the query flags
Flags() uint16
+ // Params holds additional info for queries
+ Params() util.ParameterSet
+
// Verify the integrity of a retrieved block (optional). Override in
// custom query types to implement block-specific integrity checks
// (see GNSQuery for example).
@@ -60,12 +63,12 @@ type Query interface {
// DHT Block interface
type Block interface {
- // Data returns the DHT block data (unstructured without type and
+ // Bytes returns the DHT block data (unstructured without type and
// expiration information.
- Data() []byte
+ Bytes() []byte
// Return the block type
- Type() uint16
+ Type() enums.BlockType
// Expire returns the block expiration
Expire() util.AbsoluteTime
@@ -82,7 +85,7 @@ type Block interface {
// Unwrap (raw) block to a specific block type
func Unwrap(blk Block, obj interface{}) error {
- return data.Unmarshal(obj, blk.Data())
+ return data.Unmarshal(obj, blk.Bytes())
}
//----------------------------------------------------------------------
@@ -95,13 +98,13 @@ type GenericQuery struct {
key *crypto.HashCode
// block type requested
- btype uint16
+ btype enums.BlockType
// query flags
flags uint16
// Params holds additional query parameters
- Params util.ParameterSet
+ params util.ParameterSet
}
// Key interface method implementation
@@ -110,7 +113,7 @@ func (q *GenericQuery) Key() *crypto.HashCode {
}
// Type returns the requested block type
-func (q *GenericQuery) Type() uint16 {
+func (q *GenericQuery) Type() enums.BlockType {
return q.btype
}
@@ -119,6 +122,11 @@ func (q *GenericQuery) Flags() uint16 {
return q.flags
}
+// Params holds additional info for queries
+func (q *GenericQuery) Params() util.ParameterSet {
+ return q.params
+}
+
// Verify interface method implementation
func (q *GenericQuery) Verify(b Block) error {
// no verification, no errors ;)
@@ -137,12 +145,12 @@ func (q *GenericQuery) String() string {
}
// NewGenericQuery creates a simple Query from hash code.
-func NewGenericQuery(key []byte, btype enums.BlockType, flags uint16)
*GenericQuery {
+func NewGenericQuery(key *crypto.HashCode, btype enums.BlockType, flags
uint16) *GenericQuery {
return &GenericQuery{
- key: crypto.NewHashCode(key),
- btype: uint16(btype),
+ key: key,
+ btype: btype,
flags: flags,
- Params: make(util.ParameterSet),
+ params: make(util.ParameterSet),
}
}
@@ -151,17 +159,17 @@ func NewGenericQuery(key []byte, btype enums.BlockType,
flags uint16) *GenericQu
// GenericBlock is the block in simple binary representation
type GenericBlock struct {
block []byte // block data
- btype uint16 // block type
+ btype enums.BlockType // block type
expire util.AbsoluteTime // expiration date
}
-// Data interface method implementation
-func (b *GenericBlock) Data() []byte {
+// Bytes returns the binary representation
+func (b *GenericBlock) Bytes() []byte {
return b.block
}
// Type returns the block type
-func (b *GenericBlock) Type() uint16 {
+func (b *GenericBlock) Type() enums.BlockType {
return b.btype
}
@@ -186,7 +194,7 @@ func (b *GenericBlock) Verify() (bool, error) {
func NewGenericBlock(buf []byte) *GenericBlock {
return &GenericBlock{
block: util.Clone(buf),
- btype: uint16(enums.BLOCK_TYPE_ANY), // unknown block type
- expire: util.AbsoluteTimeNever(), // never expires
+ btype: enums.BLOCK_TYPE_ANY, // unknown block type
+ expire: util.AbsoluteTimeNever(), // never expires
}
}
diff --git a/src/gnunet/service/dht/blocks/gns.go
b/src/gnunet/service/dht/blocks/gns.go
index 0225003..9668202 100644
--- a/src/gnunet/service/dht/blocks/gns.go
+++ b/src/gnunet/service/dht/blocks/gns.go
@@ -104,7 +104,7 @@ func NewGNSQuery(zkey *crypto.ZoneKey, label string)
*GNSQuery {
if err != nil {
logger.Printf(logger.ERROR, "[NewGNSQuery] failed: %s",
err.Error())
}
- gq := crypto.Hash(pd.Bytes()).Bits
+ gq := crypto.Hash(pd.Bytes())
return &GNSQuery{
GenericQuery: *NewGenericQuery(gq,
enums.BLOCK_TYPE_GNS_NAMERECORD, 0),
Zone: zkey,
@@ -141,8 +141,8 @@ type GNSBlock struct {
data []byte // decrypted data
}
-// Data block interface implementation
-func (b *GNSBlock) Data() []byte {
+// Bytes return th binary representation of block
+func (b *GNSBlock) Bytes() []byte {
buf, _ := data.Marshal(b)
return buf
}
diff --git a/src/gnunet/service/dht/blocks/hello.go
b/src/gnunet/service/dht/blocks/hello.go
index c884fd2..32f803f 100644
--- a/src/gnunet/service/dht/blocks/hello.go
+++ b/src/gnunet/service/dht/blocks/hello.go
@@ -198,12 +198,12 @@ func (h *HelloBlock) finalize() (err error) {
}
// Return the block type
-func (h *HelloBlock) Type() uint16 {
- return uint16(enums.BLOCK_TYPE_DHT_URL_HELLO)
+func (h *HelloBlock) Type() enums.BlockType {
+ return enums.BLOCK_TYPE_DHT_URL_HELLO
}
-// Data returns the raw block data
-func (h *HelloBlock) Data() []byte {
+// Bytes returns the raw block data
+func (h *HelloBlock) Bytes() []byte {
buf, err := data.Marshal(h)
if err != nil {
logger.Println(logger.ERROR, "[hello] Failed to serialize HELLO
block: "+err.Error())
@@ -375,6 +375,8 @@ func (bh *HelloBlockHandler) FilterResult(b Block, key
*crypto.HashCode, rf Resu
return RF_LAST
}
+//----------------------------------------------------------------------
+// HELLO result filter
//----------------------------------------------------------------------
// HelloResultFilter is a result filter implementation for HELLO blocks
diff --git a/src/gnunet/service/dht/local.go b/src/gnunet/service/dht/local.go
new file mode 100644
index 0000000..1e6b100
--- /dev/null
+++ b/src/gnunet/service/dht/local.go
@@ -0,0 +1,86 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package dht
+
+import (
+ "errors"
+ "gnunet/enums"
+ "gnunet/service/dht/blocks"
+ "gnunet/service/store"
+
+ "github.com/bfix/gospel/logger"
+ "github.com/bfix/gospel/math"
+)
+
+// getHelloCache tries to find the requested HELLO block in the HELLO cache
+func (m *Module) getHelloCache(label string, addr *PeerAddress, rf
blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int) {
+ logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache",
label)
+ // find best cached HELLO
+ var block blocks.Block
+ block, dist = m.rtable.BestHello(addr, rf)
+
+ // if block is filtered, skip it
+ if block != nil {
+ if !rf.Contains(block) {
+ entry = &store.DHTEntry{Blk: block}
+ } else {
+ logger.Printf(logger.DBG, "[%s] GET message for HELLO:
matching DHT block is filtered", label)
+ entry = nil
+ dist = nil
+ }
+ }
+ return
+}
+
+// getLocalStorage tries to find the requested block in local storage
+func (m *Module) getLocalStorage(label string, query blocks.Query, rf
blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int, err error) {
+
+ // query DHT store for exact match (9.4.3.3c)
+ if entry, err = m.store.Get(query); err != nil {
+ logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from
storage: %s", label, err.Error())
+ return
+ }
+ if entry != nil {
+ dist = math.ZERO
+ // check if we are filtered out
+ if rf.Contains(entry.Blk) {
+ logger.Printf(logger.DBG, "[%s] matching DHT block is
filtered", label)
+ entry = nil
+ dist = nil
+ }
+ }
+ // if we have no exact match, find approximate block if requested
+ if entry == nil || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 {
+ // no exact match: find approximate (9.4.3.3b)
+ match := func(e *store.DHTEntry) bool {
+ return rf.Contains(e.Blk)
+ }
+ var d any
+ entry, d, err = m.store.GetApprox(query, match)
+ var ok bool
+ dist, ok = d.(*math.Int)
+ if !ok {
+ err = errors.New("no approx distance")
+ }
+ if err != nil {
+ logger.Printf(logger.ERROR, "[%s] Failed to get
(approx.) DHT block from storage: %s", label, err.Error())
+ }
+ }
+ return
+}
diff --git a/src/gnunet/service/dht/messages.go
b/src/gnunet/service/dht/messages.go
index 38f0753..deb8461 100644
--- a/src/gnunet/service/dht/messages.go
+++ b/src/gnunet/service/dht/messages.go
@@ -23,6 +23,8 @@ import (
"gnunet/enums"
"gnunet/message"
"gnunet/service/dht/blocks"
+ "gnunet/service/dht/path"
+ "gnunet/service/store"
"gnunet/transport"
"gnunet/util"
@@ -40,11 +42,18 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
// assemble log label
label := "dht"
if v := ctx.Value("label"); v != nil {
- if s, _ := v.(string); len(s) > 0 {
+ if s, ok := v.(string); ok && len(s) > 0 {
label = "dht-" + s
}
}
logger.Printf(logger.INFO, "[%s] message received from %s", label,
sender)
+ local := m.core.PeerID()
+
+ // check for local message
+ if sender.Equals(local) {
+ logger.Printf(logger.WARN, "[%s] dropping local message
received: %s", label, util.Dump(msgIn, "json"))
+ return false
+ }
// process message
switch msg := msgIn.(type) {
@@ -54,11 +63,10 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
// DHT-P2P GET
//--------------------------------------------------------------
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message",
label)
- query := blocks.NewGenericQuery(msg.Query.Bits,
enums.BlockType(msg.BType), msg.Flags)
+ query := blocks.NewGenericQuery(msg.Query,
enums.BlockType(msg.BType), msg.Flags)
- var block blocks.Block
+ var entry *store.DHTEntry
var dist *math.Int
- var err error
//--------------------------------------------------------------
// validate query (based on block type requested) (9.4.3.1)
@@ -79,83 +87,79 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
if !msg.PeerFilter.Contains(sender) {
logger.Printf(logger.WARN, "[%s] sender not in peer
filter", label)
}
- // parse result filter
- var rf blocks.ResultFilter = new(blocks.PassResultFilter)
+ // parse result filter ...
+ var rf blocks.ResultFilter
if msg.ResFilter != nil && len(msg.ResFilter) > 0 {
if blockHdlr != nil {
rf = blockHdlr.ParseResultFilter(msg.ResFilter)
} else {
logger.Printf(logger.WARN, "[%s] unknown result
filter implementation -- skipped", label)
}
+ } else {
+ // ... or create a new one
+ if blockHdlr != nil {
+ rf = blockHdlr.SetupResultFilter(128,
util.RndUInt32())
+ } else {
+ logger.Printf(logger.WARN, "[%s] using default
result filter", label)
+ rf = blocks.NewGenericResultFilter()
+ }
}
// clone peer filter
pf := msg.PeerFilter.Clone()
//----------------------------------------------------------
// check if we need to respond (and how) (9.4.3.3)
- addr := NewQueryAddress(msg.Query)
- closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter)
+ addr := NewQueryAddress(query.Key())
+ closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0)
demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0
approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0
// actions
doResult := closest || (demux && approx)
doForward := !closest || (demux && !approx)
- logger.Printf(logger.DBG, "[dht] GET message: closest=%v,
demux=%v, approx=%v --> result=%v, forward=%v",
- closest, demux, approx, doResult, doForward)
+ logger.Printf(logger.DBG, "[%s] GET message: closest=%v,
demux=%v, approx=%v --> result=%v, forward=%v",
+ label, closest, demux, approx, doResult, doForward)
//------------------------------------------------------
// query for a HELLO? (9.4.3.3a)
- if msg.BType == uint32(enums.BLOCK_TYPE_DHT_URL_HELLO) {
- logger.Println(logger.DBG, "[dht] GET message for
HELLO: check cache")
- // find best cached HELLO
- block, dist = m.rtable.BestHello(addr, rf)
+ if btype == enums.BLOCK_TYPE_DHT_URL_HELLO {
+ // try to find result in HELLO cache
+ entry, dist = m.getHelloCache(label, addr, rf)
}
//--------------------------------------------------------------
- // find the closest block that has that is not filtered/ by the
result
+ // find the closest block that has that is not filtered by the
result
// filter (in case we did not find an appropriate block in
cache).
if doResult {
// save best-match values from cache
- blockCache := block
+ entryCache := entry
distCache := dist
-
- // query DHT store for exact match (9.4.3.3c)
- if block, err = m.Get(ctx, query); err != nil {
- logger.Printf(logger.ERROR, "[%s] Failed to get
DHT block from storage: %s", label, err.Error())
- return true
- }
- // if block is filtered, skip it
- if rf.Contains(block) {
- logger.Println(logger.DBG, "[dht] GET message
for HELLO: matching DHT block is filtered")
- block = nil
- }
- // if we have no exact match, find approximate block if
requested
- if block == nil || approx {
- // no exact match: find approximate (9.4.3.3b)
- match := func(b blocks.Block) bool {
- return rf.Contains(b)
+ dist = nil
+
+ // if we don't have an exact match, try storage lookup
+ if entryCache == nil || (distCache != nil &&
!distCache.Equals(math.ZERO)) {
+ // get entry from local storage
+ var err error
+ if entry, dist, err = m.getLocalStorage(label,
query, rf); err != nil {
+ entry = nil
+ dist = nil
}
- block, dist, err = m.GetApprox(ctx, query,
match)
- if err != nil {
- logger.Printf(logger.ERROR, "[%s]
Failed to get (approx.) DHT block from storage: %s", label, err.Error())
- return true
+ // if we have a block from cache, check if it
is better than the
+ // block found in the DHT
+ if entryCache != nil && dist != nil &&
distCache.Cmp(dist) < 0 {
+ entry = entryCache
+ dist = distCache
}
}
- // if we have a block from cache, check if it is better
than the
- // block found in the DHT
- if blockCache != nil && distCache.Cmp(dist) < 0 {
- block = blockCache
- }
// if we have a block, send it as response
- if block != nil {
+ if entry != nil {
logger.Printf(logger.INFO, "[%s] sending DHT
result message to caller", label)
- if err := m.sendResult(ctx, query, block,
back); err != nil {
+ if err := m.sendResult(ctx, query, entry.Blk,
back); err != nil {
logger.Printf(logger.ERROR, "[%s]
Failed to send DHT result message: %s", label, err.Error())
}
}
}
// check if we need to forward message based on filter result
- if block != nil && blockHdlr != nil {
- switch blockHdlr.FilterResult(block, query.Key(), rf,
msg.XQuery) {
+ if entry != nil && blockHdlr != nil {
+ switch blockHdlr.FilterResult(entry.Blk, query.Key(),
rf, msg.XQuery) {
case blocks.RF_LAST:
// no need for further results
case blocks.RF_MORE:
@@ -167,14 +171,13 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
}
if doForward {
// build updated GET message
- pf.Add(m.core.PeerID())
+ pf.Add(local)
msgOut := msg.Update(pf, rf, msg.HopCount+1)
// forward to number of peers
numForward := m.rtable.ComputeOutDegree(msg.ReplLevel,
msg.HopCount)
- key := NewQueryAddress(query.Key())
for n := 0; n < numForward; n++ {
- if p := m.rtable.SelectClosestPeer(key, pf); p
!= nil {
+ if p := m.rtable.SelectClosestPeer(addr, pf,
0); p != nil {
// forward message to peer
logger.Printf(logger.INFO, "[%s]
forward DHT get message to %s", label, p.String())
if err := back.Send(ctx, msgOut); err
!= nil {
@@ -183,7 +186,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
pf.Add(p.Peer)
// create open get-forward result
handler
rh := NewForwardResultHandler(msg, rf,
back)
- logger.Printf(logger.INFO, "[%s]
DHT-P2P-GET task #%d started", label, rh.ID())
+ logger.Printf(logger.INFO, "[%s]
DHT-P2P-GET task #%d (%s) started", label, rh.ID(), rh.Key())
m.reshdlrs.Add(rh)
} else {
break
@@ -198,6 +201,13 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
//----------------------------------------------------------
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message",
label)
+ // assemble query and entry
+ query := blocks.NewGenericQuery(msg.Key,
enums.BlockType(msg.BType), msg.Flags)
+ entry := &store.DHTEntry{
+ Blk: blocks.NewGenericBlock(msg.Block),
+ Path: nil,
+ }
+
//--------------------------------------------------------------
// check if request is expired (9.3.2.1)
if msg.Expiration.Expired() {
@@ -226,27 +236,93 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
logger.Printf(logger.INFO, "[%s] No validator defined
for block type %s", label, btype.String())
blockHdlr = nil
}
+ // clone peer filter
+ pf := msg.PeerFilter.Clone()
+
+ //----------------------------------------------------------
+ // check if we need to respond (and how)
+ addr := NewQueryAddress(msg.Key)
+ closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0)
+ demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0
+ logger.Printf(logger.DBG, "[%s] PUT message: closest=%v,
demux=%v", label, closest, demux)
+
//--------------------------------------------------------------
// check if sender is in peer filter (9.3.2.5)
if !msg.PeerFilter.Contains(sender) {
logger.Printf(logger.WARN, "[%s] Sender not in peer
filter", label)
}
//--------------------------------------------------------------
- // check if route is recorded (9.3.2.6)
- /*
- withPath := msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0
- if withPath {
- spe := message.NewPathElement(msg.Key, sender,
p)
- m.core.Sign(spe)
- msg.AppendPutPath(spe)
- }
- */
- //--------------------------------------------------------------
// verify PUT path (9.3.2.7)
- if msg.PathL > 0 {
+ // 'entry.Path' will be used as path in stored and forwarded
messages.
+ // The resulting path is always valid; it is truncated/reset on
+ // signature failure.
+ entry.Path = msg.Path(sender)
+ entry.Path.Verify(local)
+ //--------------------------------------------------------------
+ // store locally if we are closest peer or demux is set
(9.3.2.8)
+ if closest || demux {
+ // store in local storage
+ if err := m.store.Put(query, entry); err != nil {
+ logger.Printf(logger.ERROR, "[%s] failed to
store DHT entry: %s", label, err.Error())
+ }
}
+ //--------------------------------------------------------------
+ // if the put is for a HELLO block, add the sender to the
+ // routing table (9.3.2.9)
+ if btype == enums.BLOCK_TYPE_DHT_HELLO {
+ // get addresses from HELLO block
+ hello, err := blocks.ParseHelloFromBytes(msg.Block)
+ if err != nil {
+ logger.Printf(logger.ERROR, "[%s] failed to
parse HELLO block: %s", label, err.Error())
+ } else {
+ // check state of bucket for given address
+ if m.rtable.Check(NewPeerAddress(sender)) == 0 {
+ // we could add the sender to the
routing table
+ for _, addr := range hello.Addresses() {
+ if
transport.CanHandleAddress(addr) {
+ // try to connect to
peer (triggers EV_CONNECTED on success)
+
m.core.TryConnect(sender, addr)
+ }
+ }
+ }
+ }
+ }
+
+ //--------------------------------------------------------------
+ // check if we need to forward
+ if !closest || demux {
+ // add local node to filter
+ pf.Add(local)
+
+ // forward to computed number of peers
+ numForward := m.rtable.ComputeOutDegree(msg.ReplLvl,
msg.HopCount)
+ for n := 0; n < numForward; n++ {
+ if p := m.rtable.SelectClosestPeer(addr, pf,
0); p != nil {
+ // check if route is recorded (9.3.2.6)
+ var pp *path.Path
+ if msg.Flags&enums.DHT_RO_RECORD_ROUTE
!= 0 {
+ // yes: add path element
+ pp = entry.Path.Clone()
+ pe := pp.NewElement(sender,
local, p.Peer)
+ pp.Add(pe)
+ }
+ // build updated PUT message
+ msgOut := msg.Update(pp, pf,
msg.HopCount+1)
+
+ // forward message to peer
+ logger.Printf(logger.INFO, "[%s]
forward DHT put message to %s", label, p.String())
+ if err := back.Send(ctx, msgOut); err
!= nil {
+ logger.Printf(logger.ERROR,
"[%s] Failed to forward DHT put message: %s", label, err.Error())
+ }
+ // add forward node to filter
+ pf.Add(p.Peer)
+ } else {
+ break
+ }
+ }
+ }
logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message
done", label)
case *message.DHTP2PResultMsg:
@@ -257,18 +333,21 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
// check task list for handler
key := msg.Query.String()
+ logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s",
label, key)
handled := false
if list, ok := m.reshdlrs.Get(key); ok {
for _, rh := range list {
logger.Printf(logger.DBG, "[%s] Task #%d for
DHT-P2P-RESULT found", label, rh.ID())
// handle the message
go rh.Handle(ctx, msg)
+ handled = true
}
return true
}
if !handled {
logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not
processed (no handler)", label)
}
+ logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT
message done", label)
return handled
case *message.DHTP2PHelloMsg:
@@ -279,16 +358,16 @@ func (m *Module) HandleMessage(ctx context.Context,
sender *util.PeerID, msgIn m
// verify integrity of message
if ok, err := msg.Verify(sender); !ok || err != nil {
- logger.Println(logger.WARN, "[dht] Received invalid
DHT_P2P_HELLO message")
+ logger.Printf(logger.WARN, "[%s] Received invalid
DHT_P2P_HELLO message", label)
if err != nil {
- logger.Println(logger.ERROR, "[dht] -->
"+err.Error())
+ logger.Printf(logger.ERROR, "[%s] --> %s",
label, err.Error())
}
return false
}
- // keep peer addresses in core for transport
+ // keep peer addresses in core for transports
aList, err := msg.Addresses()
if err != nil {
- logger.Println(logger.ERROR, "[dht] Failed to parse
addresses from DHT_P2P_HELLO message")
+ logger.Printf(logger.ERROR, "[%s] Failed to parse
addresses from DHT_P2P_HELLO message", label)
return false
}
if newPeer := m.core.Learn(ctx, sender, aList); newPeer {
@@ -297,7 +376,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender
*util.PeerID, msgIn m
if msgOut, err = m.getHello(); err != nil {
return false
}
- logger.Printf(logger.INFO, "[dht] Sending HELLO to %s:
%s", sender, msgOut)
+ logger.Printf(logger.INFO, "[%s] Sending HELLO to %s:
%s", label, sender, msgOut)
err = m.core.Send(ctx, sender, msgOut)
// no error if the message might have been sent
if err == transport.ErrEndpMaybeSent {
@@ -373,7 +452,7 @@ func (m *Module) sendResult(ctx context.Context, query
blocks.Query, blk blocks.
out.BType = uint32(query.Type())
out.Expires = blk.Expire()
out.Query = query.Key()
- out.Block = blk.Data()
+ out.Block = blk.Bytes()
out.MsgSize += uint16(len(out.Block))
// send message
return back.Send(ctx, out)
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index ed28827..6207f94 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -23,11 +23,11 @@ import (
"errors"
"gnunet/config"
"gnunet/core"
+ "gnunet/crypto"
"gnunet/message"
"gnunet/service"
"gnunet/service/dht/blocks"
"gnunet/service/store"
- "gnunet/transport"
"gnunet/util"
gmath "math"
"time"
@@ -40,6 +40,26 @@ import (
// "DHT" implementation
//======================================================================
+//----------------------------------------------------------------------
+// Responder for local message handling
+//----------------------------------------------------------------------
+
+type LocalResponder struct {
+ ch chan blocks.Block // out-going channel for incoming blocks
+}
+
+func (lr *LocalResponder) Send(ctx context.Context, msg message.Message) error
{
+ return nil
+}
+
+func (lr *LocalResponder) Receiver() string {
+ return "@"
+}
+
+func (lr *LocalResponder) Close() {
+ close(lr.ch)
+}
+
//----------------------------------------------------------------------
// Put and get blocks into/from a DHT.
//----------------------------------------------------------------------
@@ -82,26 +102,112 @@ func NewModule(ctx context.Context, c *core.Core, cfg
*config.DHTConfig) (m *Mod
return
}
+//----------------------------------------------------------------------
+// DHT methods for local use
//----------------------------------------------------------------------
-// Get a block from the DHT ["dht:get"]
-func (m *Module) Get(ctx context.Context, query blocks.Query) (block
blocks.Block, err error) {
- return m.store.Get(query)
+// Get blocks from the DHT ["dht:get"]
+// Locally request blocks for a given query. The res channel will deliver the
+// returned results to the caller; the channel is closed if no further blocks
+// are expected or the query times out.
+func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan
blocks.Block) {
+ // get the block handler for given block type to construct an empty
+ // result filter. If no handler is defined, a default PassResultFilter
+ // is created.
+ var rf blocks.ResultFilter = new(blocks.GenericResultFilter)
+ blockHdlr, ok := blocks.BlockHandlers[query.Type()]
+ if ok {
+ // create result filter
+ rf = blockHdlr.SetupResultFilter(128, util.RndUInt32())
+ } else {
+ logger.Println(logger.WARN, "[dht] unknown result filter
implementation -- skipped")
+ }
+ // get additional query parameters
+ xquery, ok := util.GetParam[[]byte](query.Params(), "xquery")
+
+ // assemble a new GET message
+ msg := message.NewDHTP2PGetMsg()
+ msg.BType = uint32(query.Type())
+ msg.Flags = query.Flags()
+ msg.HopCount = 0
+ msg.ReplLevel = 10
+ msg.PeerFilter = blocks.NewPeerFilter()
+ msg.ResFilter = rf.Bytes()
+ msg.RfSize = uint16(len(msg.ResFilter))
+ msg.XQuery = xquery
+ msg.MsgSize += msg.RfSize + uint16(len(xquery))
+
+ // compose a response channel and handler
+ res = make(chan blocks.Block)
+ hdlr := &LocalResponder{
+ ch: res,
+ }
+ // time-out handling
+ ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout")
+ if !ok {
+ // defaults to 10 minutes
+ ttl = 600 * time.Second
+ }
+ lctx, cancel := context.WithTimeout(ctx, ttl)
+
+ // send message
+ self := m.core.PeerID()
+ msg.PeerFilter.Add(self)
+ go m.HandleMessage(lctx, self, msg, hdlr)
+ go func() {
+ <-lctx.Done()
+ hdlr.Close()
+ cancel()
+ }()
+ return res
}
// GetApprox returns the first block not excluded ["dht:getapprox"]
-func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl
func(blocks.Block) bool) (block blocks.Block, dist *math.Int, err error) {
- var d any
- block, d, err = m.store.GetApprox(query, excl)
- dist, _ = d.(*math.Int)
+func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl
func(*store.DHTEntry) bool) (entry *store.DHTEntry, dist *math.Int, err error) {
+ var val any
+ if entry, val, err = m.store.GetApprox(query, excl); err != nil {
+ return
+ }
+ hc, ok := val.(*crypto.HashCode)
+ if !ok {
+ err = errors.New("no approx result")
+ }
+ asked := NewQueryAddress(query.Key())
+ found := NewQueryAddress(hc)
+ dist, _ = found.Distance(asked)
return
}
// Put a block into the DHT ["dht:put"]
-func (m *Module) Put(ctx context.Context, key blocks.Query, block
blocks.Block) error {
- return m.store.Put(key, block)
+func (m *Module) Put(ctx context.Context, query blocks.Query, block
blocks.Block) error {
+ // get additional query parameters
+ expire, ok := util.GetParam[util.AbsoluteTime](query.Params(), "expire")
+ if !ok {
+ expire = util.AbsoluteTimeNever()
+ }
+ // assemble a new PUT message
+ msg := message.NewDHTP2PPutMsg()
+ msg.BType = uint32(query.Type())
+ msg.Flags = query.Flags()
+ msg.HopCount = 0
+ msg.PeerFilter = blocks.NewPeerFilter()
+ msg.ReplLvl = 10
+ msg.Expiration = expire
+ msg.Block = block.Bytes()
+ msg.Key = query.Key().Clone()
+ msg.TruncOrigin = nil
+ msg.PutPath = nil
+ msg.LastSig = nil
+ msg.MsgSize += uint16(len(msg.Block))
+
+ // send message
+ go m.HandleMessage(ctx, nil, msg, nil)
+
+ return nil
}
+//----------------------------------------------------------------------
+// Event handling
//----------------------------------------------------------------------
// Filter returns the event filter for the module
@@ -133,31 +239,32 @@ func (m *Module) event(ctx context.Context, ev
*core.Event) {
// New peer connected:
case core.EV_CONNECT:
// Add peer to routing table
- logger.Printf(logger.INFO, "[dht] Peer %s connected", ev.Peer)
- m.rtable.Add(NewPeerAddress(ev.Peer))
+ logger.Printf(logger.INFO, "[dht-event] Peer %s connected",
ev.Peer)
+ m.rtable.Add(NewPeerAddress(ev.Peer), "dht-event")
// Peer disconnected:
case core.EV_DISCONNECT:
// Remove peer from routing table
- logger.Printf(logger.INFO, "[dht] Peer %s disconnected",
ev.Peer)
- m.rtable.Remove(NewPeerAddress(ev.Peer))
+ logger.Printf(logger.INFO, "[dht-event] Peer %s disconnected",
ev.Peer)
+ m.rtable.Remove(NewPeerAddress(ev.Peer), 0)
// Message received.
case core.EV_MESSAGE:
- logger.Printf(logger.INFO, "[dht] Message received: %s",
ev.Msg.String())
+ logger.Printf(logger.INFO, "[dht-event] Message received: %s",
ev.Msg.String())
// check if peer is in routing table (connected peer)
if !m.rtable.Contains(NewPeerAddress(ev.Peer)) {
- logger.Printf(logger.WARN, "[dht] message %d from
unregistered peer -- discarded", ev.Msg.Header().MsgType)
+ logger.Printf(logger.WARN, "[dht-event] message %d from
unregistered peer -- discarded", ev.Msg.Header().MsgType)
return
}
// process message
if !m.HandleMessage(ctx, ev.Peer, ev.Msg, ev.Resp) {
- logger.Println(logger.WARN, "[dht] Message NOT
handled!")
+ logger.Println(logger.WARN, "[dht-event] Message NOT
handled!")
}
}
}
+//----------------------------------------------------------------------
// Heartbeat handler for periodic tasks
func (m *Module) heartbeat(ctx context.Context) {
// run heartbeat for routing table
@@ -167,6 +274,10 @@ func (m *Module) heartbeat(ctx context.Context) {
m.reshdlrs.Cleanup()
}
+//----------------------------------------------------------------------
+// HELLO handling
+//----------------------------------------------------------------------
+
// Send the currently active HELLO to given network address
func (m *Module) SendHello(ctx context.Context, addr *util.Address) (err
error) {
// get (buffered) HELLO
@@ -217,7 +328,7 @@ func (m *Module) getHello() (msg *message.DHTP2PHelloMsg,
err error) {
logger.Println(logger.ERROR, err.Error())
return
}
- logger.Println(logger.DBG, "[dht] New HELLO:
"+transport.Dump(msg, "hex"))
+ logger.Println(logger.DBG, "[dht] New HELLO: "+util.Dump(msg,
"hex"))
return
}
// we have a valid HELLO for re-use.
diff --git a/src/gnunet/service/dht/path/elements.go
b/src/gnunet/service/dht/path/elements.go
new file mode 100644
index 0000000..5e26e57
--- /dev/null
+++ b/src/gnunet/service/dht/path/elements.go
@@ -0,0 +1,129 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package path
+
+import (
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "gnunet/crypto"
+ "gnunet/enums"
+ "gnunet/util"
+
+ "github.com/bfix/gospel/data"
+ "github.com/bfix/gospel/logger"
+)
+
+// Error values
+var (
+ ErrPathNoSig = errors.New("missing signature for path element
verification")
+)
+
+//----------------------------------------------------------------------
+// Entry is an element of the path list
+type Entry struct {
+ Signer *util.PeerID // path element signer
+ Signature *util.PeerSignature // path element signature
+}
+
+// Size returns the size of a path element in wire format
+func (e *Entry) Size() uint {
+ return e.Signer.Size() + e.Signature.Size()
+}
+
+// String returns a human-readable representation
+func (e *Entry) String() string {
+ s := hex.EncodeToString(e.Signature.Bytes())
+ num := len(s)
+ if num > 16 {
+ s = s[:8] + ".." + s[num-8:]
+ }
+ return fmt.Sprintf("(%s,%s)", e.Signer.String(), s)
+}
+
+//----------------------------------------------------------------------
+// shared path element data across types
+type elementData struct {
+ Expiration util.AbsoluteTime // expiration date
+ BlockHash *crypto.HashCode // block hash
+ PeerPredecessor *util.PeerID // predecessor peer
+ PeerSuccessor *util.PeerID // successor peer
+}
+
+// helper type for signature creation/verification
+type elementSignedData struct {
+ Size uint16 `order:"big"` // size of signed data
+ Purpose uint16 `order:"big"` // signature purpose (SIG_DHT_HOP)
+ Elem *elementData `` // path element data
+}
+
+//----------------------------------------------------------------------
+// Element is the full-fledged data assembly for a path element in
+// PUT/GET pathes. It is assembled programatically (on generation[1] and
+// verification[2]) and not transferred in messages directly.
+//
+// [1] spe = &Element{...}
+// core.Sign(spe)
+// msg.putpath[i] = spe.Wire()
+//
+// [2] pe = &Element{...,Signature: wire.sig}
+// if !pe.Verify(peerId) { ... }
+//
+type Element struct {
+ elementData
+ Entry
+}
+
+// SignedData gets the data to be signed by peer ('Signable' interface)
+func (pe *Element) SignedData() []byte {
+ sd := &elementSignedData{
+ Size: 80,
+ Purpose: uint16(enums.SIG_DHT_HOP),
+ Elem: &(pe.elementData),
+ }
+ buf, err := data.Marshal(sd)
+ if err != nil {
+ logger.Println(logger.ERROR, "can't serialize path element for
signature")
+ return nil
+ }
+ return buf
+}
+
+// SetSignature stores the generated signature.
+func (pe *Element) SetSignature(sig *util.PeerSignature) error {
+ pe.Signature = sig
+ return nil
+}
+
+// Wire returns the path element suitable for inclusion into messages
+func (pe *Element) Wire() *Entry {
+ return &(pe.Entry)
+}
+
+// Verify signature for a path element. If the signature argument
+// is zero, use the signature store with the element
+func (pe *Element) Verify(sig *util.PeerSignature) (bool, error) {
+ if sig == nil {
+ sig = pe.Signature
+ if sig == nil {
+ return false, ErrPathNoSig
+ }
+ }
+ return pe.Signer.Verify(pe.SignedData(), sig)
+}
diff --git a/src/gnunet/service/dht/path/handling.go
b/src/gnunet/service/dht/path/handling.go
new file mode 100644
index 0000000..412f646
--- /dev/null
+++ b/src/gnunet/service/dht/path/handling.go
@@ -0,0 +1,263 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package path
+
+import (
+ "bytes"
+ "encoding/hex"
+ "fmt"
+ "gnunet/crypto"
+ "gnunet/util"
+
+ "github.com/bfix/gospel/data"
+ "github.com/bfix/gospel/logger"
+)
+
+//----------------------------------------------------------------------
+// Path handling
+//----------------------------------------------------------------------
+
+// path flags
+const (
+ PathTruncated = iota
+ PathLastHop
+)
+
+// Path is the complete list of verified hops a message travelled.
+// It also keeps the associated block hash and expiration time of
+// the request for signature verification purposes.
+type Path struct {
+ Flags uint32 `order:"big"` // flags
+ BlkHash *crypto.HashCode `` // block hash value
+ Expire util.AbsoluteTime `` // expiration time
+ TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin
(optional)
+ NumList uint16 `order:"big"` // number of list
entries
+ List []*Entry `size:"NumList"` // list of path entries
+ LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature
+ LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id
+}
+
+// IsUsed checks if an optional field is used
+func (p *Path) IsUsed(field string) bool {
+ switch field {
+ case "TruncOrigin":
+ return p.Flags&PathTruncated != 0
+ case "LastSig", "LastHop":
+ return p.Flags&PathLastHop != 0
+ }
+ return false
+}
+
+// NewPath returns a new, empty path
+func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path {
+ return &Path{
+ Flags: 0,
+ BlkHash: bh,
+ Expire: expire,
+ TruncOrigin: nil,
+ NumList: 0,
+ List: make([]*Entry, 0),
+ LastSig: nil,
+ LastHop: nil,
+ }
+}
+
+// NewPathFromBytes reconstructs a path instance from binary data. The layout
+// of the data must match with the layout used in Path.Bytes().
+func NewPathFromBytes(buf []byte) (path *Path, err error) {
+ if buf == nil || len(buf) == 0 {
+ return
+ }
+ path = new(Path)
+ err = data.Unmarshal(&path, buf)
+ return
+}
+
+// Size of the binary representation (in message)
+func (p *Path) Size() uint {
+ var size uint
+ if p.TruncOrigin != nil {
+ size += p.TruncOrigin.Size()
+ }
+ size += uint(p.NumList) * p.List[0].Size()
+ if p.LastSig != nil {
+ size += p.LastSig.Size() + p.LastHop.Size()
+ }
+ return size
+}
+
+// Bytes returns a binary representation
+func (p *Path) Bytes() []byte {
+ buf, _ := data.Marshal(p)
+ return buf
+}
+
+// Clone path instance
+func (p *Path) Clone() *Path {
+ return &Path{
+ Flags: p.Flags,
+ BlkHash: p.BlkHash,
+ Expire: p.Expire,
+ TruncOrigin: p.TruncOrigin,
+ NumList: p.NumList,
+ List: util.Clone(p.List),
+ LastSig: p.LastSig,
+ LastHop: p.LastHop,
+ }
+}
+
+// NewElement creates a new path element from data
+func (p *Path) NewElement(pred, signer, succ *util.PeerID) *Element {
+ return &Element{
+ elementData: elementData{
+ Expiration: p.Expire,
+ BlockHash: p.BlkHash,
+ PeerPredecessor: pred,
+ PeerSuccessor: succ,
+ },
+ Entry: Entry{
+ Signer: signer,
+ Signature: nil,
+ },
+ }
+}
+
+// Add new path element with signature (append to path)
+func (p *Path) Add(elem *Element) {
+ // append path element if we have a last hop signature
+ if p.LastSig != nil {
+ e := &Entry{
+ Signer: elem.PeerPredecessor,
+ Signature: p.LastSig,
+ }
+ p.List = append(p.List, e)
+ p.NumList++
+ }
+ // update last hop signature
+ p.LastSig = elem.Signature
+ p.LastHop = elem.Signer
+ p.Flags |= PathLastHop
+}
+
+// Verify path: process list entries from right to left (decreasing index).
+// If an invalid signature is encountered, the path is truncated; only checked
+// elements up to this point are included in the path (left trim).
+// The method does not return a state; if the verification fails, the path is
+// corrected (truncated or deleted) and would always verify OK.
+func (p *Path) Verify(local *util.PeerID) {
+
+ // do we have path elements?
+ if len(p.List) == 0 {
+ // no elements: last hop signature available?
+ if p.LastSig == nil {
+ // no: nothing to verify
+ return
+ }
+ // get predecessor (either 0 or truncated origins)
+ pred := util.NewPeerID(nil)
+ if p.TruncOrigin != nil {
+ pred = p.TruncOrigin
+ }
+ // check last hop signature
+ pe := p.NewElement(pred, p.LastHop, local)
+ ok, err := pe.Verify(p.LastSig)
+ if err != nil || !ok {
+ // remove last hop signature and truncated origin;
reset flags
+ p.LastSig = nil
+ p.LastHop = nil
+ p.TruncOrigin = nil
+ p.Flags = 0
+ }
+ return
+ } else {
+ // yes: process list of path elements
+ signer := p.LastHop
+ sig := p.LastSig
+ succ := local
+ num := len(p.List)
+ var pred *util.PeerID
+ for i := num - 1; i >= 0; i-- {
+ if i == -1 {
+ if p.TruncOrigin != nil {
+ pred = p.TruncOrigin
+ } else {
+ pred = util.NewPeerID(nil)
+ }
+ } else {
+ pred = p.List[i].Signer
+ }
+ pe := p.NewElement(pred, signer, succ)
+ ok, err := pe.Verify(sig)
+ if err != nil || !ok {
+ // we need to truncate:
+ logger.Printf(logger.WARN, "[path] Truncating
path (invalid signature at hop %d)", i)
+
+ // are we at the end of the list?
+ if i == num-1 {
+ // yes: the last hop signature failed
-> reset path
+ p.LastSig = nil
+ p.LastHop = nil
+ p.TruncOrigin = nil
+ p.Flags = 0
+ p.List = make([]*Entry, 0)
+ return
+ }
+ // trim list
+ p.Flags |= PathTruncated
+ p.TruncOrigin = signer
+ size := num - 2 - i
+ list := make([]*Entry, size)
+ if size > 0 {
+ copy(list, p.List[i+2:])
+ }
+ p.List = list
+ return
+ }
+ // check next path element
+ succ = signer
+ signer = pred
+ if i != -1 {
+ sig = p.List[i].Signature
+ }
+ }
+ }
+}
+
+// String returs a uman-readbale representation
+func (p *Path) String() string {
+ buf := new(bytes.Buffer)
+ s := "0"
+ if p.TruncOrigin != nil {
+ s = p.TruncOrigin.String()
+ }
+ buf.WriteString(fmt.Sprintf("{to=%s, (%d)[", s, len(p.List)))
+ for _, e := range p.List {
+ buf.WriteString(e.String())
+ }
+ s = "0"
+ if p.LastSig != nil {
+ s = hex.EncodeToString(p.LastSig.Bytes())
+ }
+ num := len(s)
+ if num > 16 {
+ s = s[:8] + ".." + s[num-8:]
+ }
+ buf.WriteString(fmt.Sprintf("], ls=%s}", s))
+ return buf.String()
+}
diff --git a/src/gnunet/service/dht/path/handling_test.go
b/src/gnunet/service/dht/path/handling_test.go
new file mode 100644
index 0000000..a456dd4
--- /dev/null
+++ b/src/gnunet/service/dht/path/handling_test.go
@@ -0,0 +1,133 @@
+// This file is part of gnunet-go, a GNUnet-implementation in Golang.
+// Copyright (C) 2019-2022 Bernd Fix >Y<
+//
+// gnunet-go is free software: you can redistribute it and/or modify it
+// under the terms of the GNU Affero General Public License as published
+// by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// gnunet-go is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+// SPDX-License-Identifier: AGPL3.0-or-later
+
+package path
+
+import (
+ "gnunet/crypto"
+ "gnunet/util"
+ "testing"
+
+ "github.com/bfix/gospel/crypto/ed25519"
+)
+
+type hop struct {
+ peerid *util.PeerID
+ seckey *ed25519.PrivateKey
+}
+
+func newHop() *hop {
+ h := new(hop)
+ var pk *ed25519.PublicKey
+ pk, h.seckey = ed25519.NewKeypair()
+ h.peerid = util.NewPeerID(pk.Bytes())
+ return h
+}
+
+func sign(sd []byte, pk *ed25519.PrivateKey) (sig *util.PeerSignature, err
error) {
+ var s *ed25519.EdSignature
+ if s, err = pk.EdSign(sd); err != nil {
+ return
+ }
+ sig = util.NewPeerSignature(s.Bytes())
+ return
+}
+
+func GenerateTestPath(n int) (pth *Path, local *util.PeerID, err error) {
+ // create hops
+ hops := make([]*hop, n)
+ for i := range hops {
+ hops[i] = newHop()
+ }
+ // start with empty path
+ pth = NewPath(crypto.NewHashCode(nil), util.AbsoluteTimeNever())
+ //fmt.Println("Empty path: " + pth.String())
+
+ // build path
+ pred := util.NewPeerID(nil)
+ for i := 0; i < n-1; i++ {
+ pe := pth.NewElement(pred, hops[i].peerid, hops[i+1].peerid)
+ if pe.Signature, err = sign(pe.SignedData(), hops[i].seckey);
err != nil {
+ return
+ }
+ pth.Add(pe)
+ //fmt.Printf("[%d] %s\n", i, pth.String())
+ pred = hops[i].peerid
+ }
+ local = hops[n-1].peerid
+ return
+}
+
+func TestPathSimple(t *testing.T) {
+
+ n := 10
+
+ pth, local, err := GenerateTestPath(n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ps1 := pth.String()
+ pth.Verify(local)
+ ps2 := pth.String()
+ if ps1 != ps2 {
+ t.Fatal("path mismatch")
+ }
+}
+
+func TestPathBadElemSig(t *testing.T) {
+
+ n := 10
+
+ for f := 0; ; f++ {
+ pth, local, err := GenerateTestPath(n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if f >= len(pth.List) {
+ break
+ }
+ // invalidate signature
+ pth.List[f].Signature = util.NewPeerSignature(nil)
+ pth.Verify(local)
+ ps3 := pth.String()
+ pth.Verify(local)
+ ps4 := pth.String()
+ if ps3 != ps4 {
+ t.Fatal("truncated path mismatch")
+ }
+ }
+}
+
+func TestPathBadLastSig(t *testing.T) {
+
+ n := 10
+
+ pth, local, err := GenerateTestPath(n)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // invalidate signature
+ pth.LastSig = util.NewPeerSignature(nil)
+ pth.Verify(local)
+ ps3 := pth.String()
+ pth.Verify(local)
+ ps4 := pth.String()
+ if ps3 != ps4 {
+ t.Fatal("truncated path mismatch")
+ }
+}
diff --git a/src/gnunet/service/dht/resulthandler.go
b/src/gnunet/service/dht/resulthandler.go
index 6564abd..faab93e 100644
--- a/src/gnunet/service/dht/resulthandler.go
+++ b/src/gnunet/service/dht/resulthandler.go
@@ -67,10 +67,10 @@ type ResultHandler interface {
// Compare return values
//nolint:stylecheck // allow non-camel-case in constants
const (
- RHC_SAME = blocks.CMP_SAME // the two result handlers are the same
- RHC_MERGE = blocks.CMP_MERGE // the two result handlers can be merged
- RHC_DIFFER = blocks.CMP_DIFFER // the two result handlers are different
- RHC_SIBL = blocks.CMP_1 // the two result handlers are siblings
+ RHC_SAME = blocks.CMP_SAME // the two result handlers are the same
+ RHC_MERGE = blocks.CMP_MERGE // the two result handlers can be merged
+ RHC_DIFFER = blocks.CMP_DIFFER // the two result handlers are different
+ RHC_REPLACE = blocks.CMP_1 // the two result handlers are siblings
)
//----------------------------------------------------------------------
@@ -119,13 +119,21 @@ func (t *GenericResultHandler) Done() bool {
// Compare two handlers
func (t *GenericResultHandler) Compare(h *GenericResultHandler) int {
- if t.key.Equals(h.key) ||
+ // check if base attributes differ
+ if !t.key.Equals(h.key) ||
t.btype != h.btype ||
t.flags != h.flags ||
!bytes.Equal(t.xQuery, h.xQuery) {
+ logger.Printf(logger.DBG, "[grh] base fields differ")
return RHC_DIFFER
}
- return t.resFilter.Compare(h.resFilter)
+ // compare result filters; if they are different, replace
+ // the old filter with the new one
+ rc := t.resFilter.Compare(h.resFilter)
+ if rc == RHC_DIFFER {
+ rc = RHC_REPLACE
+ }
+ return rc
}
// Merge two result handlers that are the same except for result filter
@@ -133,6 +141,16 @@ func (t *GenericResultHandler) Merge(a
*GenericResultHandler) bool {
return t.resFilter.Merge(a.resFilter)
}
+// Proceed return true if the message is to be processed in derived
implementations
+func (t *GenericResultHandler) Proceed(ctx context.Context, msg
*message.DHTP2PResultMsg) bool {
+ block := blocks.NewGenericBlock(msg.Block)
+ if !t.resFilter.Contains(block) {
+ t.resFilter.Add(block)
+ return true
+ }
+ return false
+}
+
//----------------------------------------------------------------------
// Result handler for forwarded GET requests
//----------------------------------------------------------------------
@@ -159,6 +177,11 @@ func NewForwardResultHandler(msgIn message.Message, rf
blocks.ResultFilter, back
// Handle incoming DHT-P2P-RESULT message
func (t *ForwardResultHandler) Handle(ctx context.Context, msg
*message.DHTP2PResultMsg) bool {
+ // don't send result if it is filtered out
+ if !t.Proceed(ctx, msg) {
+ logger.Printf(logger.DBG, "[dht-task-%d] result filtered out --
already known", t.id)
+ return false
+ }
// send result message back to originator (result forwarding).
logger.Printf(logger.INFO, "[dht-task-%d] sending result back to
originator", t.id)
if err := t.resp.Send(ctx, msg); err != nil && err !=
transport.ErrEndpMaybeSent {
@@ -173,10 +196,12 @@ func (t *ForwardResultHandler) Compare(h ResultHandler)
int {
// check for correct handler type
ht, ok := h.(*ForwardResultHandler)
if !ok {
+ logger.Println(logger.DBG, "[frh] can't compare apples with
oranges")
return RHC_DIFFER
}
// check for same recipient
if ht.resp.Receiver() != t.resp.Receiver() {
+ logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s",
ht.resp.Receiver(), t.resp.Receiver())
return RHC_DIFFER
}
// check generic handler data
@@ -238,6 +263,11 @@ func NewDirectResultHandler(msgIn message.Message, rf
blocks.ResultFilter, hdlr
// Handle incoming DHT-P2P-RESULT message
func (t *DirectResultHandler) Handle(ctx context.Context, msg
*message.DHTP2PResultMsg) bool {
+ // don't send result if it is filtered out
+ if !t.Proceed(ctx, msg) {
+ logger.Printf(logger.DBG, "[dht-task-%d] result filtered out --
already known", t.id)
+ return false
+ }
// check for correct message type and handler function
if t.hdlr != nil {
logger.Printf(logger.INFO, "[dht-task-%d] handling result
message", t.id)
@@ -289,7 +319,7 @@ func NewResultHandlerList() *ResultHandlerList {
func (t *ResultHandlerList) Add(hdlr ResultHandler) bool {
// get current list of handlers for key
key := hdlr.Key()
- list, ok := t.list.Get(key)
+ list, ok := t.list.Get(key, 0)
modified := false
if !ok {
list = make([]ResultHandler, 0)
@@ -300,18 +330,23 @@ func (t *ResultHandlerList) Add(hdlr ResultHandler) bool {
switch h.Compare(hdlr) {
case RHC_SAME:
// already in list; no need to add again
+ logger.Println(logger.DBG, "[rhl] SAME")
return false
case RHC_MERGE:
// merge the two result handlers
+ oldMod := modified
modified = h.Merge(hdlr) || modified
+ logger.Printf(logger.DBG, "[rhl] MERGE (%v --
%v)", oldMod, modified)
break loop
- case RHC_SIBL:
+ case RHC_REPLACE:
// replace the old handler with the new one
+ logger.Println(logger.DBG, "[rhl] REPLACE")
list[i] = hdlr
modified = true
break loop
case RHC_DIFFER:
// try next
+ logger.Println(logger.DBG, "[rhl] DIFFER")
}
}
}
@@ -319,18 +354,18 @@ func (t *ResultHandlerList) Add(hdlr ResultHandler) bool {
// append new handler to list
list = append(list, hdlr)
}
- t.list.Put(key, list)
+ t.list.Put(key, list, 0)
return true
}
// Get handler list for given key
func (t *ResultHandlerList) Get(key string) ([]ResultHandler, bool) {
- return t.list.Get(key)
+ return t.list.Get(key, 0)
}
// Cleanup removes expired tasks from list
func (t *ResultHandlerList) Cleanup() {
- err := t.list.ProcessRange(func(key string, list []ResultHandler) error
{
+ err := t.list.ProcessRange(func(key string, list []ResultHandler, pid
int) error {
var newList []ResultHandler
changed := false
for _, rh := range list {
@@ -341,7 +376,7 @@ func (t *ResultHandlerList) Cleanup() {
}
}
if changed {
- t.list.Put(key, newList)
+ t.list.Put(key, newList, pid)
}
return nil
}, false)
diff --git a/src/gnunet/service/dht/routingtable.go
b/src/gnunet/service/dht/routingtable.go
index fe9f956..98f0819 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -107,7 +107,7 @@ type RoutingTable struct {
buckets []*Bucket // list of buckets
list *util.Map[string, *PeerAddress] // keep list of peers
l2nse float64 // log2 of estimated
network size
- inProcess bool // flag if Process()
is running
+ inProcess map[int]struct{} // flag if Process()
is running
cfg *config.RoutingConfig // routing parameters
helloCache *util.Map[string, *blocks.HelloBlock] // HELLO block cache
}
@@ -120,7 +120,7 @@ func NewRoutingTable(ref *PeerAddress, cfg
*config.RoutingConfig) *RoutingTable
list: util.NewMap[string, *PeerAddress](),
buckets: make([]*Bucket, 512),
l2nse: -1,
- inProcess: false,
+ inProcess: make(map[int]struct{}),
cfg: cfg,
helloCache: util.NewMap[string, *blocks.HelloBlock](),
}
@@ -137,33 +137,51 @@ func NewRoutingTable(ref *PeerAddress, cfg
*config.RoutingConfig) *RoutingTable
// Add new peer address to routing table.
// Returns true if the entry was added, false otherwise.
-func (rt *RoutingTable) Add(p *PeerAddress) bool {
+func (rt *RoutingTable) Add(p *PeerAddress, label string) bool {
k := p.String()
- logger.Printf(logger.DBG, "[RT] Add(%s)", k)
+ logger.Printf(logger.DBG, "[%s] Add(%s)", label, k)
// check if peer is already known
- if px, ok := rt.list.Get(k); ok {
- logger.Println(logger.DBG, "[RT] --> already known")
+ if px, ok := rt.list.Get(k, 0); ok {
+ logger.Printf(logger.DBG, "[%s] --> already known", label)
px.lastSeen = util.AbsoluteTimeNow()
return false
}
-
// compute distance (bucket index) and insert address.
_, idx := p.Distance(rt.ref)
if rt.buckets[idx].Add(p) {
- logger.Println(logger.DBG, "[RT] --> entry added")
+ logger.Printf(logger.DBG, "[%s] --> entry added", label)
p.lastUsed = util.AbsoluteTimeNow()
- rt.list.Put(k, p)
+ rt.list.Put(k, p, 0)
return true
}
// Full bucket: we did not add the address to the routing table.
- logger.Println(logger.DBG, "[RT] --> bucket full -- discarded")
+ logger.Printf(logger.DBG, "[%s] --> bucket[%d] full -- discarded",
label, idx)
return false
}
+// check if peer address is in routing table (=1) or if the corresponding
+// k-bucket has free space (=0) or not (-1).
+func (rt *RoutingTable) Check(p *PeerAddress) int {
+ k := p.String()
+
+ // check if peer is already known
+ if px, ok := rt.list.Get(k, 0); ok {
+ px.lastSeen = util.AbsoluteTimeNow()
+ return 1
+ }
+ // compute distance (bucket index)
+ _, idx := p.Distance(rt.ref)
+
+ if rt.buckets[idx].FreeSpace() > 0 {
+ return 0
+ }
+ return -1
+}
+
// Remove peer address from routing table.
// Returns true if the entry was removed, false otherwise.
-func (rt *RoutingTable) Remove(p *PeerAddress) bool {
+func (rt *RoutingTable) Remove(p *PeerAddress, pid int) bool {
k := p.String()
logger.Printf(logger.DBG, "[RT] Remove(%s)", k)
@@ -177,9 +195,9 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool {
// remove from internal list
logger.Println(logger.DBG, "[RT] --> entry removed from
internal lists only")
}
- rt.list.Delete(k)
+ rt.list.Delete(k, 0)
// delete from HELLO cache
- rt.helloCache.Delete(p.Peer.String())
+ rt.helloCache.Delete(p.Peer.String(), pid)
return rc
}
@@ -189,10 +207,10 @@ func (rt *RoutingTable) Contains(p *PeerAddress) bool {
logger.Printf(logger.DBG, "[RT] Contains(%s)?", k)
// check for peer in internal list
- px, ok := rt.list.Get(k)
+ px, ok := rt.list.Get(k, 0)
if !ok {
logger.Println(logger.DBG, "[RT] --> NOT found in current
list:")
- _ = rt.list.ProcessRange(func(key string, val *PeerAddress)
error {
+ _ = rt.list.ProcessRange(func(key string, val *PeerAddress, _
int) error {
logger.Printf(logger.DBG, "[RT] * %s", val)
return nil
}, true)
@@ -206,16 +224,17 @@ func (rt *RoutingTable) Contains(p *PeerAddress) bool {
//----------------------------------------------------------------------
// Process a function f in the locked context of a routing table
-func (rt *RoutingTable) Process(f func() error, readonly bool) error {
+func (rt *RoutingTable) Process(f func(pid int) error, readonly bool) error {
// handle locking
- rt.lock(readonly)
- rt.inProcess = true
+ rt.lock(readonly, 0)
+ pid := util.NextID()
+ rt.inProcess[pid] = struct{}{}
defer func() {
- rt.inProcess = false
- rt.unlock(readonly)
+ delete(rt.inProcess, pid)
+ rt.unlock(readonly, 0)
}()
// call function in unlocked context
- return f()
+ return f(pid)
}
//----------------------------------------------------------------------
@@ -223,10 +242,10 @@ func (rt *RoutingTable) Process(f func() error, readonly
bool) error {
//----------------------------------------------------------------------
// SelectClosestPeer for a given peer address and peer filter.
-func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, pf
*blocks.PeerFilter) (n *PeerAddress) {
+func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, pf
*blocks.PeerFilter, pid int) (n *PeerAddress) {
// no writer allowed
- rt.RLock()
- defer rt.RUnlock()
+ rt.lock(true, pid)
+ defer rt.unlock(true, pid)
// find closest peer in routing table
var dist *math.Int
@@ -245,15 +264,15 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress,
pf *blocks.PeerFilter)
// SelectRandomPeer returns a random address from table (that is not
// included in the bloomfilter)
-func (rt *RoutingTable) SelectRandomPeer(pf *blocks.PeerFilter) (p
*PeerAddress) {
+func (rt *RoutingTable) SelectRandomPeer(pf *blocks.PeerFilter, pid int) (p
*PeerAddress) {
// no writer allowed
- rt.RLock()
- defer rt.RUnlock()
+ rt.lock(true, pid)
+ defer rt.unlock(true, pid)
// select random entry from list
var ok bool
for {
- if _, p, ok = rt.list.GetRandom(); !ok {
+ if _, p, ok = rt.list.GetRandom(pid); !ok {
return nil
}
if !pf.Contains(p.Peer) {
@@ -268,19 +287,19 @@ func (rt *RoutingTable) SelectRandomPeer(pf
*blocks.PeerFilter) (p *PeerAddress)
// SelectPeer selects a neighbor depending on the number of hops parameter.
// If hops < NSE this function MUST return SelectRandomPeer() and
// SelectClosestpeer() otherwise.
-func (rt *RoutingTable) SelectPeer(p *PeerAddress, hops int, bf
*blocks.PeerFilter) *PeerAddress {
+func (rt *RoutingTable) SelectPeer(p *PeerAddress, hops int, bf
*blocks.PeerFilter, pid int) *PeerAddress {
if float64(hops) < rt.l2nse {
- return rt.SelectRandomPeer(bf)
+ return rt.SelectRandomPeer(bf, pid)
}
- return rt.SelectClosestPeer(p, bf)
+ return rt.SelectClosestPeer(p, bf, pid)
}
// IsClosestPeer returns true if p is the closest peer for k. Peers with a
// positive test in the Bloom filter are not considered. If p is nil, our
// reference address is used.
-func (rt *RoutingTable) IsClosestPeer(p, k *PeerAddress, pf
*blocks.PeerFilter) bool {
+func (rt *RoutingTable) IsClosestPeer(p, k *PeerAddress, pf
*blocks.PeerFilter, pid int) bool {
// get closest peer in routing table
- n := rt.SelectClosestPeer(k, pf)
+ n := rt.SelectClosestPeer(k, pf, pid)
// check SELF?
if p == nil {
// if no peer in routing table found
@@ -326,24 +345,22 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
// check for dead or expired peers
logger.Println(logger.DBG, "[dht] RT heartbeat...")
timeout := util.NewRelativeTime(time.Duration(rt.cfg.PeerTTL) *
time.Second)
- if err := rt.Process(func() error {
- return rt.list.ProcessRange(func(k string, p *PeerAddress)
error {
- // check if we can/need to drop a peer
- drop := timeout.Compare(p.lastSeen.Elapsed()) < 0
- if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 {
- logger.Printf(logger.DBG, "[RT] removing %v:
%v, %v", p, p.lastSeen.Elapsed(), p.lastUsed.Elapsed())
- rt.Remove(p)
- }
- return nil
- }, false)
+ if err := rt.list.ProcessRange(func(k string, p *PeerAddress, pid int)
error {
+ // check if we can/need to drop a peer
+ drop := timeout.Compare(p.lastSeen.Elapsed()) < 0
+ if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 {
+ logger.Printf(logger.DBG, "[RT] removing %v: %v, %v",
p, p.lastSeen.Elapsed(), p.lastUsed.Elapsed())
+ rt.Remove(p, pid)
+ }
+ return nil
}, false); err != nil {
logger.Println(logger.ERROR, "[dht] RT heartbeat failed:
"+err.Error())
}
// drop expired entries from the HELLO cache
- _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock)
error {
+ _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock,
pid int) error {
if val.Expires.Expired() {
- rt.helloCache.Delete(key)
+ rt.helloCache.Delete(key, pid)
}
return nil
}, false)
@@ -356,7 +373,7 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) {
func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter)
(hb *blocks.HelloBlock, dist *math.Int) {
// iterate over cached HELLOs to find (best) match first
- _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock)
error {
+ _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock,
_ int) error {
// check if block is excluded by result filter
if !rf.Contains(val) {
// check for better match
@@ -374,19 +391,19 @@ func (rt *RoutingTable) BestHello(addr *PeerAddress, rf
blocks.ResultFilter) (hb
// CacheHello adds a HELLO block to the list of cached entries.
func (rt *RoutingTable) CacheHello(hb *blocks.HelloBlock) {
- rt.helloCache.Put(hb.PeerID.String(), hb)
+ rt.helloCache.Put(hb.PeerID.String(), hb, 0)
}
// GetHello returns a HELLO block for key k (if available)
func (rt *RoutingTable) GetHello(k string) (*blocks.HelloBlock, bool) {
- return rt.helloCache.Get(k)
+ return rt.helloCache.Get(k, 0)
}
//----------------------------------------------------------------------
// lock with given mode (if not in processing function)
-func (rt *RoutingTable) lock(readonly bool) {
- if !rt.inProcess {
+func (rt *RoutingTable) lock(readonly bool, pid int) {
+ if _, ok := rt.inProcess[pid]; !ok {
if readonly {
rt.RLock()
} else {
@@ -396,8 +413,8 @@ func (rt *RoutingTable) lock(readonly bool) {
}
// lock with given mode (if not in processing function)
-func (rt *RoutingTable) unlock(readonly bool) {
- if !rt.inProcess {
+func (rt *RoutingTable) unlock(readonly bool, pid int) {
+ if _, ok := rt.inProcess[pid]; !ok {
if readonly {
rt.RUnlock()
} else {
@@ -441,6 +458,11 @@ func (b *Bucket) Add(p *PeerAddress) bool {
return false
}
+// FreeSpace returns the number of empty slots in bucket
+func (b *Bucket) FreeSpace() int {
+ return numK - len(b.list)
+}
+
// Remove peer address from the bucket.
// Returns true if entry is removed (found), false otherwise.
func (b *Bucket) Remove(p *PeerAddress) bool {
diff --git a/src/gnunet/service/dht/routingtable_test.go
b/src/gnunet/service/dht/routingtable_test.go
index 16f39de..42e2349 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -95,13 +95,13 @@ func TestRT(t *testing.T) {
// actions:
connected := func(task *Entry, e int64, msg string) {
- rt.Add(task.addr)
+ rt.Add(task.addr, "test")
task.online = true
task.last = e
t.Logf("[%6d] %s %s\n", e, task.addr, msg)
}
disconnected := func(task *Entry, e int64, msg string) {
- rt.Remove(task.addr)
+ rt.Remove(task.addr, 0)
task.online = false
task.last = e
t.Logf("[%6d] %s %s\n", e, task.addr, msg)
@@ -140,10 +140,10 @@ func TestRT(t *testing.T) {
// execute some routing functions on remaining table
k := genRemotePeer()
pf := blocks.NewPeerFilter()
- n := rt.SelectClosestPeer(k, pf)
+ n := rt.SelectClosestPeer(k, pf, 0)
t.Logf("Closest: %s -> %s\n", k, n)
- n = rt.SelectRandomPeer(pf)
+ n = rt.SelectRandomPeer(pf, 0)
t.Logf("Random: %s\n", n)
}
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go
index 7311c4f..8835fab 100644
--- a/src/gnunet/service/module.go
+++ b/src/gnunet/service/module.go
@@ -24,6 +24,7 @@ import (
"time"
)
+//----------------------------------------------------------------------
// Module is an interface for GNUnet service modules (workers).
//
// Modules can call other GNUnet services; these services can be used by
@@ -55,6 +56,7 @@ import (
// Exported and imported module function are identified by name defined in the
// Export() function. Import() functions that access functions in other modules
// need to use the same name for linking.
+//
type Module interface {
// Export functions by name
Export(map[string]any)
@@ -69,6 +71,10 @@ type Module interface {
Filter() *core.EventFilter
}
+//----------------------------------------------------------------------
+// Event handling
+//----------------------------------------------------------------------
+
// EventHandler is a function prototype for event handling
type EventHandler func(context.Context, *core.Event)
@@ -78,6 +84,10 @@ type Heartbeat func(context.Context)
// CtxKey is a value-context key
type CtxKey string
+//----------------------------------------------------------------------
+// Generic module implementation
+//----------------------------------------------------------------------
+
// ModuleImpl is an event-handling type used by Module implementations.
type ModuleImpl struct {
// channel for core events.
diff --git a/src/gnunet/service/namecache/module.go
b/src/gnunet/service/namecache/module.go
index 42d38a2..642355b 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -69,15 +69,15 @@ func (m *Module) Import(fcm map[string]any) {
// Get an entry from the cache if available.
func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block
*blocks.GNSBlock, err error) {
- var b blocks.Block
- if b, err = m.cache.Get(query); err != nil {
+ var e *store.DHTEntry
+ if e, err = m.cache.Get(query); err != nil {
return
}
- err = blocks.Unwrap(b, block)
+ err = blocks.Unwrap(e.Blk, block)
return
}
// Put entry into the cache.
-func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block
*blocks.GNSBlock) error {
- return m.cache.Put(query, block)
+func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, entry
*store.DHTEntry) error {
+ return m.cache.Put(query, entry)
}
diff --git a/src/gnunet/service/store/database.go
b/src/gnunet/service/store/database.go
index 2b94122..1f25d80 100644
--- a/src/gnunet/service/store/database.go
+++ b/src/gnunet/service/store/database.go
@@ -108,9 +108,9 @@ type dbPool struct {
// remove a database instance from the pool based on its connect string.
func (p *dbPool) remove(key string) error {
- return p.insts.Process(func() (err error) {
+ return p.insts.Process(func(pid int) (err error) {
// get pool entry
- pe, ok := p.insts.Get(key)
+ pe, ok := p.insts.Get(key, pid)
if !ok {
return nil
}
@@ -119,7 +119,7 @@ func (p *dbPool) remove(key string) error {
if pe.refs == 0 {
// no more refs: close database
err = pe.db.Close()
- p.insts.Delete(key)
+ p.insts.Delete(key, pid)
}
return
}, false)
@@ -138,10 +138,10 @@ func (p *dbPool) remove(key string) error {
// information required to log into the database (e.g.
// "[user[:passwd]@][proto[(addr)]]/dbname[?param1=value1&...]").
func (p *dbPool) Connect(spec string) (db *DBConn, err error) {
- err = p.insts.Process(func() error {
+ err = p.insts.Process(func(pid int) error {
// check if we have a connection to this database.
db = new(DBConn)
- inst, ok := p.insts.Get(spec)
+ inst, ok := p.insts.Get(spec, pid)
if !ok {
inst = new(DBPoolEntry)
inst.refs = 0
@@ -174,7 +174,7 @@ func (p *dbPool) Connect(spec string) (db *DBConn, err
error) {
return ErrSQLInvalidDatabaseSpec
}
// save database in pool
- p.insts.Put(spec, inst)
+ p.insts.Put(spec, inst, pid)
}
// increment reference count
inst.refs++
diff --git a/src/gnunet/service/store/dhtstore_test.go
b/src/gnunet/service/store/dhtstore_test.go
index 14da6ff..ba5fae2 100644
--- a/src/gnunet/service/store/dhtstore_test.go
+++ b/src/gnunet/service/store/dhtstore_test.go
@@ -25,6 +25,7 @@ import (
"gnunet/service/dht/blocks"
"gnunet/util"
"math/rand"
+ "os"
"testing"
)
@@ -38,13 +39,21 @@ const (
// each block from storage and checks for matching hash.
func TestDHTFilesStore(t *testing.T) {
// test configuration
+ path := "/tmp/dht-store"
+ defer func() {
+ os.RemoveAll(path)
+ }()
+
cfg := make(util.ParameterSet)
cfg["mode"] = "file"
cfg["cache"] = false
- cfg["path"] = "/var/lib/gnunet/dht/store"
+ cfg["path"] = path
cfg["maxGB"] = 10
// create file store
+ if _, err := os.Stat(path); err != nil {
+ os.MkdirAll(path, 0755)
+ }
fs, err := NewFileStore(cfg)
if err != nil {
t.Fatal(err)
@@ -58,27 +67,30 @@ func TestDHTFilesStore(t *testing.T) {
size := 1024 + rand.Intn(62000)
buf := make([]byte, size)
rand.Read(buf)
- val := blocks.NewGenericBlock(buf)
+ blk := blocks.NewGenericBlock(buf)
// generate associated key
- k := crypto.Hash(buf).Bits
+ k := crypto.Hash(buf)
key := blocks.NewGenericQuery(k, enums.BLOCK_TYPE_ANY, 0)
- // store block
+ // store entry
+ val := &DHTEntry{
+ Blk: blk,
+ }
if err := fs.Put(key, val); err != nil {
- t.Fatal(err)
+ t.Fatalf("[%d] %s", i, err)
}
// remember key
keys = append(keys, key)
}
// Second round: retrieve blocks and check
- for _, key := range keys {
+ for i, key := range keys {
// get block
val, err := fs.Get(key)
if err != nil {
- t.Fatal(err)
+ t.Fatalf("[%d] %s", i, err)
}
- buf := val.Data()
+ buf := val.Blk.Bytes()
// re-create key
k := crypto.Hash(buf)
@@ -91,3 +103,7 @@ func TestDHTFilesStore(t *testing.T) {
}
}
}
+
+func TestDHTEntryStore(t *testing.T) {
+ // pth, sender, local := path.GenerateTestPath(10)
+}
diff --git a/src/gnunet/service/store/store.go
b/src/gnunet/service/store/store.go
index d5ef05d..ed55547 100644
--- a/src/gnunet/service/store/store.go
+++ b/src/gnunet/service/store/store.go
@@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"gnunet/service/dht/blocks"
+ "gnunet/service/dht/path"
"gnunet/util"
redis "github.com/go-redis/redis/v8"
@@ -70,8 +71,14 @@ type Store[K, V any] interface {
// Types for custom store requirements
//------------------------------------------------------------
+// DHTEntry to be stored/retrieved
+type DHTEntry struct {
+ Blk blocks.Block
+ Path *path.Path
+}
+
// DHTStore for DHT queries and blocks
-type DHTStore Store[blocks.Query, blocks.Block]
+type DHTStore Store[blocks.Query, *DHTEntry]
// KVStore for key/value string pairs
type KVStore Store[string, string]
diff --git a/src/gnunet/service/store/store_fs.go
b/src/gnunet/service/store/store_fs.go
index a33c317..bbbe7cc 100644
--- a/src/gnunet/service/store/store_fs.go
+++ b/src/gnunet/service/store/store_fs.go
@@ -22,10 +22,11 @@ import (
"encoding/hex"
"fmt"
"gnunet/service/dht/blocks"
+ "gnunet/service/dht/path"
"gnunet/util"
- "io/ioutil"
"os"
+ "github.com/bfix/gospel/data"
"github.com/bfix/gospel/logger"
"github.com/bfix/gospel/math"
)
@@ -101,7 +102,7 @@ func (s *FileStore) Close() (err error) {
}
// Put block into storage under given key
-func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) {
+func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) {
// check for free space
if !s.cache {
if int(s.totalSize>>30) > s.maxSpace {
@@ -111,29 +112,18 @@ func (s *FileStore) Put(query blocks.Query, block
blocks.Block) (err error) {
}
// get parameters
btype := query.Type()
- expire := block.Expire()
+ expire := entry.Blk.Expire()
+ blkSize := len(entry.Blk.Bytes())
- // get path and filename from key
- path, fname := s.expandPath(query.Key().Bits)
- // make sure the path exists
- if err = os.MkdirAll(path, 0755); err != nil {
+ // write entry to file for storage
+ if err = s.writeEntry(query.Key().Bits, entry); err != nil {
return
}
- // write to file for storage
- var fp *os.File
- bd := block.Data()
- if fp, err = os.Create(path + "/" + fname); err == nil {
- defer fp.Close()
- // write block data
- if _, err = fp.Write(bd); err != nil {
- return
- }
- }
// compile metadata
now := util.AbsoluteTimeNow()
meta := &FileMetadata{
key: query.Key().Bits,
- size: uint64(len(bd)),
+ size: uint64(blkSize),
btype: btype,
expires: expire,
stored: now,
@@ -156,7 +146,7 @@ func (s *FileStore) Put(query blocks.Query, block
blocks.Block) (err error) {
}
// Get block with given key from storage
-func (s *FileStore) Get(query blocks.Query) (block blocks.Block, err error) {
+func (s *FileStore) Get(query blocks.Query) (entry *DHTEntry, err error) {
// check if we have metadata for the query
key := query.Key().Bits
btype := query.Type()
@@ -173,14 +163,14 @@ func (s *FileStore) Get(query blocks.Query) (block
blocks.Block, err error) {
if err = s.meta.Used(key, btype); err != nil {
return
}
- return s.readBlock(query.Key().Bits)
+ return s.readEntry(key)
}
// GetApprox returns the best-matching value with given key from storage
// that is not excluded
-func (s *FileStore) GetApprox(query blocks.Query, excl func(blocks.Block)
bool) (block blocks.Block, key any, err error) {
+func (s *FileStore) GetApprox(query blocks.Query, excl func(*DHTEntry) bool)
(entry *DHTEntry, key any, err error) {
var bestKey []byte
- var bestBlk blocks.Block
+ var bestEntry *DHTEntry
var bestDist *math.Int
// distance function
dist := func(a, b []byte) *math.Int {
@@ -194,32 +184,32 @@ func (s *FileStore) GetApprox(query blocks.Query, excl
func(blocks.Block) bool)
check := func(md *FileMetadata) {
// check for better match
d := dist(md.key, query.Key().Bits)
+ var entry *DHTEntry
if bestKey == nil || d.Cmp(bestDist) < 0 {
// we might have a match. check block for exclusion
- block, err = s.readBlock(md.key)
- if err != nil {
- logger.Printf(logger.ERROR, "[dhtstore] failed
to retrieve blok for %s", hex.EncodeToString(md.key))
+ if entry, err = s.readEntry(md.key); err != nil {
+ logger.Printf(logger.ERROR, "[dhtstore] failed
to retrieve block for %s", hex.EncodeToString(md.key))
return
}
- if excl(block) {
+ if excl(entry) {
return
}
// remember best match
bestKey = md.key
- bestBlk = block
+ bestEntry = entry
bestDist = d
}
}
if err = s.meta.Traverse(check); err != nil {
return
}
- if bestBlk != nil {
+ if bestEntry != nil {
// mark the block as newly used
- if err = s.meta.Used(bestKey, bestBlk.Type()); err != nil {
+ if err = s.meta.Used(bestKey, bestEntry.Blk.Type()); err != nil
{
return
}
}
- return bestBlk, bestDist, nil
+ return bestEntry, bestDist, nil
}
// Get a list of all stored block keys (generic query).
@@ -227,24 +217,75 @@ func (s *FileStore) List() ([]blocks.Query, error) {
return nil, ErrStoreNoList
}
-// read block from storage for given key
-func (s *FileStore) readBlock(key []byte) (block blocks.Block, err error) {
+//----------------------------------------------------------------------
+
+type entryLayout struct {
+ SizeBlk uint16 `order:"big"` // size of block data
+ SizePth uint16 `order:"big"` // size of path data
+ Block []byte `size:"SizeBlk"` // block data
+ Path []byte `size:"SizePth"` // path data
+}
+
+// read entry from storage for given key
+func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) {
// get path and filename from key
- path, fname := s.expandPath(key)
- // read file content (block data)
+ folder, fname := s.expandPath(key)
+
+ // open file for reading
+ var file *os.File
+ if file, err = os.Open(folder + "/" + fname); err != nil {
+ return
+ }
+ defer file.Close()
+
+ // get file size
+ fi, _ := file.Stat()
+ size := int(fi.Size())
+
+ // read data
+ val := new(entryLayout)
+ if err = data.UnmarshalStream(file, val, size); err != nil {
+ return
+ }
+ // assemble entry
+ entry = new(DHTEntry)
+ entry.Blk = blocks.NewGenericBlock(val.Block)
+ entry.Path, err = path.NewPathFromBytes(val.Path)
+ return
+}
+
+// write entry to storage for given key
+func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) {
+ // get folder and filename from key
+ folder, fname := s.expandPath(key)
+ // make sure the folder exists
+ if err = os.MkdirAll(folder, 0755); err != nil {
+ return
+ }
+ // write to file content (block data)
var file *os.File
- if file, err = os.Open(path + "/" + fname); err != nil {
+ if file, err = os.Create(folder + "/" + fname); err != nil {
return
}
defer file.Close()
- // read block data
- var data []byte
- if data, err = ioutil.ReadAll(file); err == nil {
- block = blocks.NewGenericBlock(data)
+
+ // assemble and write entry
+ val := new(entryLayout)
+ val.Block = entry.Blk.Bytes()
+ val.SizeBlk = uint16(len(val.Block))
+ if entry.Path != nil {
+ val.Path = entry.Path.Bytes()
+ val.SizePth = uint16(len(val.Path))
+ } else {
+ val.Path = nil
+ val.SizePth = 0
}
+ err = data.MarshalStream(file, val)
return
}
+//----------------------------------------------------------------------
+
// expandPath returns the full path to the file for given key.
func (s *FileStore) expandPath(key []byte) (string, string) {
h := hex.EncodeToString(key)
diff --git a/src/gnunet/service/store/store_fs_meta.go
b/src/gnunet/service/store/store_fs_meta.go
index 414921c..5710286 100644
--- a/src/gnunet/service/store/store_fs_meta.go
+++ b/src/gnunet/service/store/store_fs_meta.go
@@ -21,6 +21,7 @@ package store
import (
"database/sql"
_ "embed"
+ "gnunet/enums"
"gnunet/util"
"os"
)
@@ -34,7 +35,7 @@ import (
type FileMetadata struct {
key []byte // storage key
size uint64 // size of file
- btype uint16 // block type
+ btype enums.BlockType // block type
stored util.AbsoluteTime // time added to store
expires util.AbsoluteTime // expiration time
lastUsed util.AbsoluteTime // time last used
@@ -91,16 +92,18 @@ func (db *FileMetaDB) Store(md *FileMetadata) (err error) {
}
// Get block metadata from database
-func (db *FileMetaDB) Get(key []byte, btype uint16) (md *FileMetadata, err
error) {
+func (db *FileMetaDB) Get(key []byte, btype enums.BlockType) (md
*FileMetadata, err error) {
md = new(FileMetadata)
md.key = util.Clone(key)
md.btype = btype
stmt := "select size,stored,expires,lastUsed,usedCount from meta where
qkey=? and btype=?"
row := db.conn.QueryRow(stmt, key, btype)
var st, exp, lu uint64
- if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err ==
sql.ErrNoRows {
- md = nil
- err = nil
+ if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err != nil {
+ if err == sql.ErrNoRows {
+ md = nil
+ err = nil
+ }
} else {
md.stored.Val = st * 1000000
md.expires.Val = exp * 1000000
@@ -110,13 +113,13 @@ func (db *FileMetaDB) Get(key []byte, btype uint16) (md
*FileMetadata, err error
}
// Drop metadata for block from database
-func (db *FileMetaDB) Drop(key []byte, btype uint16) error {
+func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) error {
_, err := db.conn.Exec("delete from meta where qkey=? and btype=?",
key, btype)
return err
}
// Used a block from store: increment usage count and lastUsed time.
-func (db *FileMetaDB) Used(key []byte, btype uint16) error {
+func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) error {
_, err := db.conn.Exec("update meta set
usedCount=usedCount+1,lastUsed=unixepoch() where qkey=? and btype=?", key,
btype)
return err
}
diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go
index a2f54d7..83a3cc8 100644
--- a/src/gnunet/transport/endpoint.go
+++ b/src/gnunet/transport/endpoint.go
@@ -234,14 +234,12 @@ func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok
bool) {
// try to convert addr to compatible type
switch ep.netw {
case "udp", "udp4", "udp6":
- var ua *net.UDPAddr
var err error
- if ua, err = net.ResolveUDPAddr(ep.netw,
addr.String()); err != nil {
+ if _, err = net.ResolveUDPAddr(ep.netw, addr.String());
err != nil {
ok = false
}
- logger.Printf(logger.DBG, "[pkt_ep] %s + %v -> %v
(%v)", ep.netw, addr, ua, ok)
default:
- logger.Printf(logger.DBG, "[pkt_ep] unknown network
%s", ep.netw)
+ logger.Printf(logger.WARN, "[pkt_ep] unknown network
%s", ep.netw)
ok = false
}
} else {
@@ -309,7 +307,7 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr
chan *Message) (err erro
return
}
session := util.NextID()
- ep.conns.Put(session, conn)
+ ep.conns.Put(session, conn, 0)
go func() {
for {
// read next message from connection
@@ -324,7 +322,7 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr
chan *Message) (err erro
}
// connection ended.
conn.Close()
- ep.conns.Delete(session)
+ ep.conns.Delete(session, 0)
}()
}
}()
diff --git a/src/gnunet/transport/reader_writer.go
b/src/gnunet/transport/reader_writer.go
index 13028fe..ee76783 100644
--- a/src/gnunet/transport/reader_writer.go
+++ b/src/gnunet/transport/reader_writer.go
@@ -19,10 +19,7 @@
package transport
import (
- "bytes"
"context"
- "encoding/hex"
- "encoding/json"
"errors"
"fmt"
"gnunet/message"
@@ -42,21 +39,16 @@ func WriteMessage(ctx context.Context, wrt io.WriteCloser,
msg message.Message)
// convert message to binary data
var buf []byte
if buf, err = data.Marshal(msg); err != nil {
- return err
+ return
}
// check message header size and packet size
mh, err := message.GetMsgHeader(buf)
if err != nil {
- return err
+ return
}
if len(buf) != int(mh.MsgSize) {
return errors.New("WriteMessage: message size mismatch")
}
- // watch dog for write operation
- go func() {
- <-ctx.Done()
- wrt.Close()
- }()
// perform write operation
var n int
if n, err = wrt.Write(buf); err != nil {
@@ -119,26 +111,6 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser,
buf []byte) (msg messag
return
}
-//----------------------------------------------------------------------
-// Dump message
-func Dump(msg message.Message, format string) string {
- switch format {
- case "json":
- buf, err := json.Marshal(msg)
- if err != nil {
- return err.Error()
- }
- return string(buf)
- case "hex":
- buf := new(bytes.Buffer)
- if err := WriteMessageDirect(buf, msg); err != nil {
- return err.Error()
- }
- return hex.EncodeToString(buf.Bytes())
- }
- return "unknown message dump format"
-}
-
//----------------------------------------------------------------------
// helper for wrapped ReadCloser/WriteCloser (close is nop)
//----------------------------------------------------------------------
diff --git a/src/gnunet/transport/transport.go
b/src/gnunet/transport/transport.go
index bc4b632..805aac2 100644
--- a/src/gnunet/transport/transport.go
+++ b/src/gnunet/transport/transport.go
@@ -25,6 +25,7 @@ import (
"gnunet/message"
"gnunet/util"
"net"
+ "strings"
"github.com/bfix/gospel/network"
)
@@ -128,7 +129,7 @@ func (t *Transport) Shutdown() {
func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *Message)
(err error) {
// select best endpoint able to handle address
var bestEp Endpoint
- err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
+ err = t.endpoints.ProcessRange(func(_ int, ep Endpoint, _ int) error {
if ep.CanSendTo(addr) {
if bestEp == nil {
bestEp = ep
@@ -139,7 +140,7 @@ func (t *Transport) Send(ctx context.Context, addr
net.Addr, msg *Message) (err
// }
}
return nil
- }, true)
+ }, false)
if err != nil {
return
}
@@ -160,7 +161,7 @@ func (t *Transport) AddEndpoint(ctx context.Context, addr
*util.Address) (ep End
}
// check if endpoint is already available
as := addr.Network() + "://" + addr.String()
- if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error {
+ if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint, _ int) error
{
ae := ep.Address().Network() + "://" + ep.Address().String()
if as == ae {
return ErrEndpExists
@@ -174,7 +175,7 @@ func (t *Transport) AddEndpoint(ctx context.Context, addr
*util.Address) (ep End
return
}
// add endpoint to list and run it
- t.endpoints.Put(ep.ID(), ep)
+ t.endpoints.Put(ep.ID(), ep, 0)
err = ep.Run(ctx, t.incoming)
return
}
@@ -203,3 +204,19 @@ func (t *Transport) ForwardClose(id string) error {
}
return t.upnp.Unassign(id)
}
+
+//----------------------------------------------------------------------
+// Helper functions
+//----------------------------------------------------------------------
+
+// CanHandleAddress returns true, if a given address can be handled by the
+// transport framework
+func CanHandleAddress(addr *util.Address) bool {
+ // filter out local addresses
+ s := addr.String()
+ if idx := strings.LastIndex(s, ":"); idx != -1 {
+ s = s[:idx]
+ }
+ ip := net.ParseIP(s)
+ return !(ip == nil || ip.IsLoopback())
+}
diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go
index 35e0030..bf6f562 100644
--- a/src/gnunet/util/address.go
+++ b/src/gnunet/util/address.go
@@ -117,9 +117,9 @@ func (a *PeerAddrList) Add(peer *PeerID, addr *Address)
(mode int) {
mode = 0
if !addr.Expires.Expired() {
// run add operation
- _ = a.list.Process(func() error {
+ _ = a.list.Process(func(pid int) error {
id := peer.String()
- list, ok := a.list.Get(id)
+ list, ok := a.list.Get(id, pid)
if !ok {
list = make([]*Address, 0)
mode = 1
@@ -132,7 +132,7 @@ func (a *PeerAddrList) Add(peer *PeerID, addr *Address)
(mode int) {
mode = 2
}
list = append(list, addr)
- a.list.Put(id, list)
+ a.list.Put(id, list, pid)
return nil
}, false)
}
@@ -142,7 +142,7 @@ func (a *PeerAddrList) Add(peer *PeerID, addr *Address)
(mode int) {
// Get address for peer
func (a *PeerAddrList) Get(peer *PeerID, transport string) (res []*Address) {
id := peer.String()
- list, ok := a.list.Get(id)
+ list, ok := a.list.Get(id, 0)
if ok {
for _, addr := range list {
// check for expired address.
@@ -163,12 +163,12 @@ func (a *PeerAddrList) Get(peer *PeerID, transport
string) (res []*Address) {
// Delete a list entry by key.
func (a *PeerAddrList) Delete(peer *PeerID) {
- a.list.Delete(peer.String())
+ a.list.Delete(peer.String(), 0)
}
// Contains checks if a peer is contained in the list. Does not check
// for expired entries.
func (a *PeerAddrList) Contains(peer *PeerID) (ok bool) {
- _, ok = a.list.Get(peer.String())
+ _, ok = a.list.Get(peer.String(), 0)
return
}
diff --git a/src/gnunet/util/map.go b/src/gnunet/util/map.go
index adfcc0e..ab48089 100644
--- a/src/gnunet/util/map.go
+++ b/src/gnunet/util/map.go
@@ -23,23 +23,66 @@ import (
"sync"
)
+//----------------------------------------------------------------------
+// ID list of active map processes:
+// An active process (wrapped in a 'Process()' and 'ProcessRange()'
+// call) locks (and unlocks) map access only once around a process, so
+// calls to map methods from within a process are safe (no lock/unlock
+// required).
+//----------------------------------------------------------------------
+
+// PIDList is a thread-safe list of active process IDs
+type PIDList struct {
+ sync.RWMutex
+ list map[int]struct{}
+}
+
+// NewPIDList creates a new PID list instance
+func NewPIDList() *PIDList {
+ return &PIDList{
+ list: make(map[int]struct{}),
+ }
+}
+
+// Add pid to list
+func (pl *PIDList) Add(pid int) {
+ pl.Lock()
+ defer pl.Unlock()
+ pl.list[pid] = struct{}{}
+}
+
+// Remove pid from list
+func (pl *PIDList) Remove(pid int) {
+ pl.Lock()
+ defer pl.Unlock()
+ delete(pl.list, pid)
+}
+
+// Contains returns true if 'pid' is a list element
+func (pl *PIDList) Contains(pid int) bool {
+ pl.RLock()
+ defer pl.RUnlock()
+ _, ok := pl.list[pid]
+ return ok
+}
+
//----------------------------------------------------------------------
// Thread-safe map implementation
//----------------------------------------------------------------------
-// Map keys to values
+// Map comparable keys to values of any type
type Map[K comparable, V any] struct {
sync.RWMutex
list map[K]V
- inProcess bool
+ inProcess *PIDList
}
// NewMap allocates a new mapping.
func NewMap[K comparable, V any]() *Map[K, V] {
return &Map[K, V]{
list: make(map[K]V),
- inProcess: false,
+ inProcess: NewPIDList(),
}
}
@@ -47,31 +90,33 @@ func NewMap[K comparable, V any]() *Map[K, V] {
// Process a function in the locked map context. Calls
// to other map functions in 'f' will skip their locks.
-func (m *Map[K, V]) Process(f func() error, readonly bool) error {
+func (m *Map[K, V]) Process(f func(pid int) error, readonly bool) error {
// handle locking
- m.lock(readonly)
- m.inProcess = true
+ m.lock(readonly, 0)
+ pid := NextID()
+ m.inProcess.Add(pid)
defer func() {
- m.inProcess = false
- m.unlock(readonly)
+ m.inProcess.Remove(pid)
+ m.unlock(readonly, 0)
}()
// function call in unlocked environment
- return f()
+ return f(pid)
}
// Process a ranged function in the locked map context. Calls
// to other map functions in 'f' will skip their locks.
-func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool)
error {
+func (m *Map[K, V]) ProcessRange(f func(key K, value V, pid int) error,
readonly bool) error {
// handle locking
- m.lock(readonly)
- m.inProcess = true
+ m.lock(readonly, 0)
+ pid := NextID()
+ m.inProcess.Add(pid)
defer func() {
- m.inProcess = false
- m.unlock(readonly)
+ m.inProcess.Remove(pid)
+ m.unlock(readonly, 0)
}()
// range over map and call function.
for key, value := range m.list {
- if err := f(key, value); err != nil {
+ if err := f(key, value, pid); err != nil {
return err
}
}
@@ -86,24 +131,24 @@ func (m *Map[K, V]) Size() int {
}
// Put value into map under given key.
-func (m *Map[K, V]) Put(key K, value V) {
- m.lock(false)
- defer m.unlock(false)
+func (m *Map[K, V]) Put(key K, value V, pid int) {
+ m.lock(false, pid)
+ defer m.unlock(false, pid)
m.list[key] = value
}
// Get value with iven key from map.
-func (m *Map[K, V]) Get(key K) (value V, ok bool) {
- m.lock(true)
- defer m.unlock(true)
+func (m *Map[K, V]) Get(key K, pid int) (value V, ok bool) {
+ m.lock(true, pid)
+ defer m.unlock(true, pid)
value, ok = m.list[key]
return
}
// GetRandom returns a random map entry.
-func (m *Map[K, V]) GetRandom() (key K, value V, ok bool) {
- m.lock(true)
- defer m.unlock(true)
+func (m *Map[K, V]) GetRandom(pid int) (key K, value V, ok bool) {
+ m.lock(true, pid)
+ defer m.unlock(true, pid)
ok = false
if size := m.Size(); size > 0 {
@@ -120,17 +165,17 @@ func (m *Map[K, V]) GetRandom() (key K, value V, ok bool)
{
}
// Delete key/value pair from map.
-func (m *Map[K, V]) Delete(key K) {
- m.lock(false)
- defer m.unlock(false)
+func (m *Map[K, V]) Delete(key K, pid int) {
+ m.lock(false, pid)
+ defer m.unlock(false, pid)
delete(m.list, key)
}
//----------------------------------------------------------------------
// lock with given mode (if not in processing function)
-func (m *Map[K, V]) lock(readonly bool) {
- if !m.inProcess {
+func (m *Map[K, V]) lock(readonly bool, pid int) {
+ if !m.inProcess.Contains(pid) {
if readonly {
m.RLock()
} else {
@@ -140,8 +185,8 @@ func (m *Map[K, V]) lock(readonly bool) {
}
// lock with given mode (if not in processing function)
-func (m *Map[K, V]) unlock(readonly bool) {
- if !m.inProcess {
+func (m *Map[K, V]) unlock(readonly bool, pid int) {
+ if !m.inProcess.Contains(pid) {
if readonly {
m.RUnlock()
} else {
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index 1768a96..562bdaa 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -19,7 +19,11 @@
package util
import (
+ "encoding/hex"
+ "encoding/json"
"strings"
+
+ "github.com/bfix/gospel/data"
)
//----------------------------------------------------------------------
@@ -80,3 +84,23 @@ func StripPathRight(s string) string {
}
return s
}
+
+//----------------------------------------------------------------------
+// Dump instance
+func Dump(obj any, format string) string {
+ switch format {
+ case "json":
+ buf, err := json.Marshal(obj)
+ if err != nil {
+ return err.Error()
+ }
+ return string(buf)
+ case "hex":
+ buf, err := data.Marshal(obj)
+ if err != nil {
+ return err.Error()
+ }
+ return hex.EncodeToString(buf)
+ }
+ return "unknown message dump format"
+}
diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go
index f880040..62ae2e2 100644
--- a/src/gnunet/util/peer.go
+++ b/src/gnunet/util/peer.go
@@ -20,6 +20,8 @@ package util
import (
"bytes"
+
+ "github.com/bfix/gospel/crypto/ed25519"
)
//----------------------------------------------------------------------
@@ -28,34 +30,54 @@ import (
// PeerPublicKey is the binary representation of an Ed25519 public key
type PeerPublicKey struct {
- Data []byte `size:"32"` // Ed25519 public key data
+ Data []byte `size:"(Size)"` // Ed25519 public key data
}
// NewPeerPublicKey creates a key instance from binary data
func NewPeerPublicKey(data []byte) *PeerPublicKey {
- pk := &PeerPublicKey{
- Data: make([]byte, 32),
- }
- if data != nil {
- if len(data) < 32 {
- CopyAlignedBlock(pk.Data, data)
+ pk := new(PeerPublicKey)
+ size := pk.Size()
+ v := make([]byte, size)
+ if data != nil && len(data) > 0 {
+ if uint(len(data)) < size {
+ CopyAlignedBlock(v, data)
} else {
- copy(pk.Data, data[:32])
+ copy(v, data[:size])
}
}
+ pk.Data = v
return pk
}
+// Size returns the length of the binary data
+func (pk *PeerPublicKey) Size() uint {
+ return 32
+}
+
+// Verify peer signature
+func (pk *PeerPublicKey) Verify(data []byte, sig *PeerSignature) (bool, error)
{
+ xpk := ed25519.NewPublicKeyFromBytes(pk.Data)
+ xsig, err := ed25519.NewEdSignatureFromBytes(sig.Data)
+ if err != nil {
+ return false, err
+ }
+ return xpk.EdVerify(data, xsig)
+}
+
//----------------------------------------------------------------------
// Peer identifier:
//----------------------------------------------------------------------
// PeerID is a wrpped PeerPublicKey
-type PeerID PeerPublicKey
+type PeerID struct {
+ PeerPublicKey
+}
// NewPeerID creates a new peer id from data.
func NewPeerID(data []byte) (p *PeerID) {
- return (*PeerID)(NewPeerPublicKey(data))
+ return &PeerID{
+ PeerPublicKey: *NewPeerPublicKey(data),
+ }
}
// Equals returns true if two peer IDs match.
@@ -73,22 +95,37 @@ func (p *PeerID) Bytes() []byte {
return Clone(p.Data)
}
+//----------------------------------------------------------------------
+// Peer signature (EdDSA signature)
//----------------------------------------------------------------------
// PeerSignature is a EdDSA signature from the peer
type PeerSignature struct {
- Data []byte `size:"64"`
+ Data []byte `size:"(Size)"`
}
// NewPeerSignature is a EdDSA signatre with the private peer key
func NewPeerSignature(data []byte) *PeerSignature {
- var v []byte
- if data == nil {
- v = make([]byte, 64)
- } else {
- v = Clone(data)
- }
- return &PeerSignature{
- Data: v,
+ s := new(PeerSignature)
+ size := s.Size()
+ v := make([]byte, size)
+ if data != nil && len(data) > 0 {
+ if uint(len(data)) < size {
+ CopyAlignedBlock(v, data)
+ } else {
+ copy(v, data[:size])
+ }
}
+ s.Data = v
+ return s
+}
+
+// Size returns the length of the binary data
+func (s *PeerSignature) Size() uint {
+ return 64
+}
+
+// Bytes returns the binary representation of a peer signature.
+func (s *PeerSignature) Bytes() []byte {
+ return Clone(s.Data)
}
diff --git a/src/gnunet/crypto/signature.go b/src/gnunet/util/peer_test.go
similarity index 68%
copy from src/gnunet/crypto/signature.go
copy to src/gnunet/util/peer_test.go
index 4bc5aac..d0c3464 100644
--- a/src/gnunet/crypto/signature.go
+++ b/src/gnunet/util/peer_test.go
@@ -16,10 +16,23 @@
//
// SPDX-License-Identifier: AGPL3.0-or-later
-package crypto
+package util
-// SignaturePurpose is the GNUnet data structure used as header for signed
data.
-type SignaturePurpose struct {
- Size uint32 `order:"big"` // How many bytes are signed?
- Purpose uint32 `order:"big"` // Signature purpose
+import (
+ "encoding/base64"
+ "testing"
+
+ "github.com/bfix/gospel/crypto/ed25519"
+)
+
+func TestPeerIDString(t *testing.T) {
+ seed := "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24="
+ data, err := base64.StdEncoding.DecodeString(seed)
+ if err != nil {
+ t.Fatal(err)
+ }
+ prv := ed25519.NewPrivateKeyFromSeed(data)
+ pub := prv.Public()
+ id := NewPeerID(pub.Bytes())
+ t.Log(id)
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-go] branch master updated: RC1 for milestone 2 (NGI Assure),
gnunet <=