gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-go] branch master updated: Heartbeat handling in modules added.


From: gnunet
Subject: [gnunet-go] branch master updated: Heartbeat handling in modules added.
Date: Sat, 04 Jun 2022 11:33:35 +0200

This is an automated email from the git hooks/post-receive script.

bernd-fix pushed a commit to branch master
in repository gnunet-go.

The following commit(s) were added to refs/heads/master by this push:
     new de57742  Heartbeat handling in modules added.
de57742 is described below

commit de577428a69a0002c3194afdf0562cf5a4dc1bdc
Author: Bernd Fix <brf@hoi-polloi.org>
AuthorDate: Sat Jun 4 11:32:09 2022 +0200

    Heartbeat handling in modules added.
---
 src/gnunet/cmd/peer_mockup/main.go          |   2 +-
 src/gnunet/cmd/revoke-zonekey/main.go       |   2 +-
 src/gnunet/go.mod                           |   4 +-
 src/gnunet/go.sum                           |   4 +-
 src/gnunet/service/dht/module.go            |  22 ++++-
 src/gnunet/service/dht/routingtable.go      | 120 ++++++++++++++++++++--------
 src/gnunet/service/dht/routingtable_test.go |   3 +-
 src/gnunet/service/gns/module.go            |   2 +-
 src/gnunet/service/module.go                |  23 +++++-
 src/gnunet/service/revocation/module.go     |   2 +-
 src/gnunet/util/time.go                     |  29 ++++++-
 11 files changed, 162 insertions(+), 51 deletions(-)

diff --git a/src/gnunet/cmd/peer_mockup/main.go 
b/src/gnunet/cmd/peer_mockup/main.go
index 4288fb1..58f4baf 100644
--- a/src/gnunet/cmd/peer_mockup/main.go
+++ b/src/gnunet/cmd/peer_mockup/main.go
@@ -71,7 +71,7 @@ func main() {
 
        // handle messages coming from network
        module := service.NewModuleImpl()
-       listener := module.Run(ctx, process, nil)
+       listener := module.Run(ctx, process, nil, 0, nil)
        c.Register("mockup", listener)
 
        if !asServer {
diff --git a/src/gnunet/cmd/revoke-zonekey/main.go 
b/src/gnunet/cmd/revoke-zonekey/main.go
index 298a7e4..aab9602 100644
--- a/src/gnunet/cmd/revoke-zonekey/main.go
+++ b/src/gnunet/cmd/revoke-zonekey/main.go
@@ -299,7 +299,7 @@ func main() {
                        }
                }
                // update elapsed time
-               rd.T.Add(util.AbsoluteTimeNow().Diff(startTime))
+               rd.T.Add(startTime.Elapsed())
                rd.Last = last
 
                log.Println("Writing revocation data to file...")
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 7383eaf..ad203ca 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
 go 1.18
 
 require (
-       github.com/bfix/gospel v1.2.11
+       github.com/bfix/gospel v1.2.14
        github.com/go-redis/redis/v8 v8.11.5
        github.com/go-sql-driver/mysql v1.6.0
        github.com/gorilla/mux v1.8.0
@@ -21,5 +21,3 @@ require (
        golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect
        golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
 )
-
-replace github.com/bfix/gospel v1.2.11 => /vault/prj/libs/Go/Gospel
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index ea3c328..f2baf8e 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,5 @@
-github.com/bfix/gospel v1.2.11 h1:z/c6MFNq/lz4mO8+PK60a3NvH+lbTKAlLCShuFFZUvg=
-github.com/bfix/gospel v1.2.11/go.mod 
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
+github.com/bfix/gospel v1.2.14 h1:lIdagJvkebG+uYbVdfK6XbT1udnq/ezd/Gi54EaMtV0=
+github.com/bfix/gospel v1.2.14/go.mod 
h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI=
 github.com/cespare/xxhash/v2 v2.1.2 
h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
 github.com/cespare/xxhash/v2 v2.1.2/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f 
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index d369f3f..5f04d54 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -26,6 +26,7 @@ import (
        "gnunet/service"
        "gnunet/service/dht/blocks"
        "net/http"
+       "time"
 )
 
 //======================================================================
@@ -72,7 +73,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) 
{
                rtable:     rt,
        }
        // register as listener for core events
-       listener := m.Run(ctx, m.event, m.Filter())
+       listener := m.Run(ctx, m.event, m.Filter(), 15*time.Minute, m.heartbeat)
        c.Register("dht", listener)
 
        return
@@ -119,9 +120,24 @@ func (m *Module) Filter() *core.EventFilter {
        return f
 }
 
-// Event handler
-func (nc *Module) event(ctx context.Context, ev *core.Event) {
+// Event handler for infrastructure signals
+func (m *Module) event(ctx context.Context, ev *core.Event) {
+       switch ev.ID {
+       // New peer connected:
+       case core.EV_CONNECT:
+               // Add peer to routing table
 
+       }
+
+}
+
+// Heartbeat handler for periodic tasks
+func (m *Module) heartbeat(ctx context.Context) {
+       // update the estimated network size
+       m.rtable.l2nse = m.core.L2NSE()
+
+       // run heartbeat for routing table
+       m.rtable.heartbeat(ctx)
 }
 
 //----------------------------------------------------------------------
diff --git a/src/gnunet/service/dht/routingtable.go 
b/src/gnunet/service/dht/routingtable.go
index 895a1b2..0078b71 100644
--- a/src/gnunet/service/dht/routingtable.go
+++ b/src/gnunet/service/dht/routingtable.go
@@ -20,12 +20,15 @@ package dht
 
 import (
        "bytes"
+       "context"
        "crypto/sha512"
        "encoding/hex"
        "gnunet/util"
        "math/rand"
        "sync"
+       "time"
 
+       "github.com/bfix/gospel/logger"
        "github.com/bfix/gospel/math"
 )
 
@@ -48,7 +51,10 @@ const (
 // PeerAddress is the identifier for a peer in the DHT network.
 // It is the SHA-512 hash of the PeerID (public Ed25519 key).
 type PeerAddress struct {
-       addr [sizeAddr]byte
+       addr      [sizeAddr]byte    // hash value as bytes
+       connected bool              // is peer connected?
+       lastSeen  util.AbsoluteTime // time the peer was last seen
+       lastUsed  util.AbsoluteTime // time the peer was last used
 }
 
 // NewPeerAddress returns the DHT address of a peer.
@@ -57,6 +63,8 @@ func NewPeerAddress(peer *util.PeerID) *PeerAddress {
        h := rtHash()
        h.Write(peer.Key)
        copy(r.addr[:], h.Sum(nil))
+       r.lastSeen = util.AbsoluteTimeNow()
+       r.lastUsed = util.AbsoluteTimeNow()
        return r
 }
 
@@ -90,36 +98,51 @@ func (addr *PeerAddress) Distance(p *PeerAddress) 
(*math.Int, int) {
 // distance to the reference address, so smaller index means
 // "nearer" to the reference address.
 type RoutingTable struct {
-       ref     *PeerAddress          // reference address for distance
-       buckets []*Bucket             // list of buckets
-       list    map[*PeerAddress]bool // keep list of peers
-       rwlock  sync.RWMutex          // lock for write operations
-       l2nse   float64               // log2 of estimated network size
+       ref       *PeerAddress              // reference address for distance
+       buckets   []*Bucket                 // list of buckets
+       list      map[*PeerAddress]struct{} // keep list of peers
+       rwlock    sync.RWMutex              // lock for write operations
+       l2nse     float64                   // log2 of estimated network size
+       inProcess bool                      // flag if Process() is running
 }
 
 // NewRoutingTable creates a new routing table for the reference address.
 func NewRoutingTable(ref *PeerAddress) *RoutingTable {
-       rt := new(RoutingTable)
-       rt.ref = ref
-       rt.list = make(map[*PeerAddress]bool)
-       rt.buckets = make([]*Bucket, numBuckets)
+       // create routing table
+       rt := &RoutingTable{
+               ref:       ref,
+               list:      make(map[*PeerAddress]struct{}),
+               buckets:   make([]*Bucket, numBuckets),
+               l2nse:     0.,
+               inProcess: false,
+       }
+       // fill buckets
        for i := range rt.buckets {
                rt.buckets[i] = NewBucket(numK)
        }
        return rt
 }
 
+//----------------------------------------------------------------------
+// Peer management
+//----------------------------------------------------------------------
+
 // Add new peer address to routing table.
 // Returns true if the entry was added, false otherwise.
-func (rt *RoutingTable) Add(p *PeerAddress, connected bool) bool {
+func (rt *RoutingTable) Add(p *PeerAddress) bool {
        // ensure one write and no readers
        rt.rwlock.Lock()
        defer rt.rwlock.Unlock()
 
+       // check if peer is already known
+       if _, ok := rt.list[p]; ok {
+               return false
+       }
+
        // compute distance (bucket index) and insert address.
        _, idx := p.Distance(rt.ref)
-       if rt.buckets[idx].Add(p, connected) {
-               rt.list[p] = true
+       if rt.buckets[idx].Add(p) {
+               rt.list[p] = struct{}{}
                return true
        }
        // Full bucket: we did not add the address to the routing table.
@@ -139,11 +162,23 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool {
                delete(rt.list, p)
                return true
        }
+       // remove from internal list
+       delete(rt.list, p)
        return false
 }
 
 //----------------------------------------------------------------------
-// routing functions
+
+// Process a function f in the locked context of a routing table
+func (rt *RoutingTable) Process(f func() error) error {
+       // ensure one write and no readers
+       rt.rwlock.Lock()
+       defer rt.rwlock.Unlock()
+       return f()
+}
+
+//----------------------------------------------------------------------
+// Routing functions
 //----------------------------------------------------------------------
 
 // SelectClosestPeer for a given peer address and bloomfilter.
@@ -160,6 +195,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, 
bf *PeerBloomFilter) (
                        n = k
                }
        }
+       // mark peer as used
+       n.lastUsed = util.AbsoluteTimeNow()
        return
 }
 
@@ -175,6 +212,8 @@ func (rt *RoutingTable) SelectRandomPeer(bf 
*PeerBloomFilter) *PeerAddress {
                idx := rand.Intn(size)
                for k := range rt.list {
                        if idx == 0 {
+                               // mark peer as used
+                               k.lastUsed = util.AbsoluteTimeNow()
                                return k
                        }
                        idx--
@@ -221,33 +260,50 @@ func (rt *RoutingTable) ComputeOutDegree(repl, hop int) 
int {
        return 1 + int(rm1/(rt.l2nse+rm1*hf))
 }
 
+//----------------------------------------------------------------------
+
+// Heartbeat handler for periodic tasks
+func (rt *RoutingTable) heartbeat(ctx context.Context) {
+
+       // check for dead or expired peers
+       timeout := util.NewRelativeTime(3 * time.Hour)
+       if err := rt.Process(func() error {
+               for addr := range rt.list {
+                       if addr.connected {
+                               continue
+                       }
+                       // check if we can/need to drop a peer
+                       drop := timeout.Compare(addr.lastSeen.Elapsed()) < 0
+                       if drop || timeout.Compare(addr.lastUsed.Elapsed()) < 0 
{
+                               rt.Remove(addr)
+                       }
+               }
+               return nil
+       }); err != nil {
+               logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error())
+       }
+}
+
 //======================================================================
 // Routing table buckets
 //======================================================================
 
-// PeerEntry in a k-Bucket: use routing specific attributes
-// for book-keeping
-type PeerEntry struct {
-       addr      *PeerAddress // peer address
-       connected bool         // is peer connected?
-}
-
 // Bucket holds peer entries with approx. same distance from node
 type Bucket struct {
-       list   []*PeerEntry
+       list   []*PeerAddress
        rwlock sync.RWMutex
 }
 
 // NewBucket creates a new entry list of given size
 func NewBucket(n int) *Bucket {
        return &Bucket{
-               list: make([]*PeerEntry, 0, n),
+               list: make([]*PeerAddress, 0, n),
        }
 }
 
 // Add peer address to the bucket if there is free space.
 // Returns true if entry is added, false otherwise.
-func (b *Bucket) Add(p *PeerAddress, connected bool) bool {
+func (b *Bucket) Add(p *PeerAddress) bool {
        // only one writer and no readers
        b.rwlock.Lock()
        defer b.rwlock.Unlock()
@@ -255,11 +311,7 @@ func (b *Bucket) Add(p *PeerAddress, connected bool) bool {
        // check for free space in bucket
        if len(b.list) < numK {
                // append entry at the end
-               pe := &PeerEntry{
-                       addr:      p,
-                       connected: connected,
-               }
-               b.list = append(b.list, pe)
+               b.list = append(b.list, p)
                return true
        }
        return false
@@ -273,7 +325,7 @@ func (b *Bucket) Remove(p *PeerAddress) bool {
        defer b.rwlock.Unlock()
 
        for i, pe := range b.list {
-               if pe.addr.Equals(p) {
+               if pe.Equals(p) {
                        // found entry: remove it
                        b.list = append(b.list[:i], b.list[i+1:]...)
                        return true
@@ -289,16 +341,16 @@ func (b *Bucket) SelectClosestPeer(p *PeerAddress, bf 
*PeerBloomFilter) (n *Peer
        b.rwlock.RLock()
        defer b.rwlock.RUnlock()
 
-       for _, pe := range b.list {
+       for _, addr := range b.list {
                // skip addresses in bloomfilter
-               if bf.Contains(pe.addr) {
+               if bf.Contains(addr) {
                        continue
                }
                // check for shorter distance
-               if d, _ := p.Distance(pe.addr); n == nil || d.Cmp(dist) < 0 {
+               if d, _ := p.Distance(addr); n == nil || d.Cmp(dist) < 0 {
                        // remember best match
                        dist = d
-                       n = pe.addr
+                       n = addr
                }
        }
        return
diff --git a/src/gnunet/service/dht/routingtable_test.go 
b/src/gnunet/service/dht/routingtable_test.go
index 2579356..659f9d4 100644
--- a/src/gnunet/service/dht/routingtable_test.go
+++ b/src/gnunet/service/dht/routingtable_test.go
@@ -87,7 +87,8 @@ func TestRT(t *testing.T) {
 
        // actions:
        connected := func(task *Entry, e int64, msg string) {
-               rt.Add(task.addr, true)
+               task.addr.connected = true
+               rt.Add(task.addr)
                task.online = true
                task.last = e
                t.Logf("[%6d] %s %s\n", e, task.addr, msg)
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 31ad6c7..4878aa0 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -103,7 +103,7 @@ func NewModule(ctx context.Context, c *core.Core) (m 
*Module) {
                ModuleImpl: *service.NewModuleImpl(),
        }
        // register as listener for core events
-       listener := m.Run(ctx, m.event, m.Filter())
+       listener := m.Run(ctx, m.event, m.Filter(), 0, nil)
        c.Register("gns", listener)
 
        return
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go
index 98307d6..5f95975 100644
--- a/src/gnunet/service/module.go
+++ b/src/gnunet/service/module.go
@@ -22,6 +22,7 @@ import (
        "context"
        "gnunet/core"
        "net/http"
+       "time"
 )
 
 // Module is an interface for GNUnet service modules (workers).
@@ -36,6 +37,9 @@ type Module interface {
 // EventHandler is a function prototype for event handling
 type EventHandler func(context.Context, *core.Event)
 
+// Heartbeat is a function prototype for periodic tasks
+type Heartbeat func(context.Context)
+
 // ModuleImpl is an event-handling type used by Module implementations.
 type ModuleImpl struct {
        ch chan *core.Event // channel for core events.
@@ -49,9 +53,19 @@ func NewModuleImpl() (m *ModuleImpl) {
 }
 
 // Run event handling loop
-func (m *ModuleImpl) Run(ctx context.Context, hdlr EventHandler, filter 
*core.EventFilter) (listener *core.Listener) {
+func (m *ModuleImpl) Run(
+       ctx context.Context,
+       hdlr EventHandler, filter *core.EventFilter,
+       pulse time.Duration, heartbeat Heartbeat,
+) (listener *core.Listener) {
        // listener for registration
        listener = core.NewListener(m.ch, filter)
+
+       // if no heartbeat handler is defined, set pulse to near flatline.
+       if heartbeat == nil {
+               pulse = 365 * 24 * time.Hour // once a year
+       }
+       tick := time.Tick(pulse)
        // run event loop
        go func() {
                for {
@@ -63,6 +77,13 @@ func (m *ModuleImpl) Run(ctx context.Context, hdlr 
EventHandler, filter *core.Ev
                        // wait for terminate signal
                        case <-ctx.Done():
                                return
+
+                       // handle heartbeat
+                       case <-tick:
+                               // check for defined heartbeat handler
+                               if heartbeat != nil {
+                                       heartbeat(ctx)
+                               }
                        }
                }
        }()
diff --git a/src/gnunet/service/revocation/module.go 
b/src/gnunet/service/revocation/module.go
index 37b57ab..1f0ab48 100644
--- a/src/gnunet/service/revocation/module.go
+++ b/src/gnunet/service/revocation/module.go
@@ -74,7 +74,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) 
{
                return nil
        }
        // register as listener for core events
-       listener := m.Run(ctx, m.event, m.Filter())
+       listener := m.Run(ctx, m.event, m.Filter(), 0, nil)
        c.Register("gns", listener)
        return m
 }
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go
index 70b91e1..53589b8 100644
--- a/src/gnunet/util/time.go
+++ b/src/gnunet/util/time.go
@@ -81,16 +81,28 @@ func (t AbsoluteTime) Add(d time.Duration) AbsoluteTime {
        }
 }
 
+// Elapsed time since 't'. Return 0 if 't' is in the future.
+func (t AbsoluteTime) Elapsed() RelativeTime {
+       dt, elapsed := t.Diff(AbsoluteTimeNow())
+       if !elapsed {
+               dt = NewRelativeTime(0)
+       }
+       return dt
+}
+
 // Diff returns the relative time between two absolute times;
-// the ordering of the absolute times doesn't matter.
-func (t AbsoluteTime) Diff(t2 AbsoluteTime) RelativeTime {
+// returns true if t2 is after t1.
+func (t AbsoluteTime) Diff(t2 AbsoluteTime) (dt RelativeTime, elapsed bool) {
        var d uint64
        if t.Compare(t2) == 1 {
                d = t.Val - t2.Val
+               elapsed = false
        } else {
                d = t2.Val - t.Val
+               elapsed = true
        }
-       return RelativeTime{d}
+       dt = RelativeTime{d}
+       return
 }
 
 // Expired returns true if the timestamp is in the past.
@@ -150,3 +162,14 @@ func (t RelativeTime) String() string {
 func (t RelativeTime) Add(t2 RelativeTime) {
        t.Val += t2.Val
 }
+
+// Compare two durations
+func (t RelativeTime) Compare(t2 RelativeTime) int {
+       switch {
+       case t.Val < t2.Val:
+               return -1
+       case t.Val > t2.Val:
+               return 1
+       }
+       return 0
+}

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]