gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r18219 - in gnunet-update: gnunet_update test


From: gnunet
Subject: [GNUnet-SVN] r18219 - in gnunet-update: gnunet_update test
Date: Sat, 19 Nov 2011 19:52:25 +0100

Author: harsha
Date: 2011-11-19 19:52:25 +0100 (Sat, 19 Nov 2011)
New Revision: 18219

Modified:
   gnunet-update/gnunet_update/install.py
   gnunet-update/gnunet_update/package.py
   gnunet-update/gnunet_update/util.py
   gnunet-update/test/test_util.py
Log:
install time metadata signature validation and install manifest

Modified: gnunet-update/gnunet_update/install.py
===================================================================
--- gnunet-update/gnunet_update/install.py      2011-11-19 14:35:25 UTC (rev 
18218)
+++ gnunet-update/gnunet_update/install.py      2011-11-19 18:52:25 UTC (rev 
18219)
@@ -28,16 +28,19 @@
 import getopt
 import platform
 import subprocess
+import tempfile
+import shutil
 
 import util
 from metadata import Metadata
 from dependency import Dependency
+from config import GnunetUpdateConfig
 
 
 def usage():
     """Print helpful usage information."""    
     print """
-Usage arguments: [options] <metadata_file> <package_file> </install/location>
+Usage arguments: [options] <package_file> </install/location>
 This script tries to install the contents in the package file in the given 
 location.
 
@@ -65,9 +68,9 @@
    search for it while linking
  """)
 
-def get_installed_deps():
+def get_available_libs():
     """Finds from `ldconfig -v' the installed deps."""
-    global installed_deps
+    global available_libs
     proc = subprocess.Popen(["ldconfig", "-p"], stdout=subprocess.PIPE)
     (ldconfig_output, ldconfig_err) = proc.communicate()
     proc.stdout.close()
@@ -91,30 +94,57 @@
             usage()
             sys.exit(2)
     
-    if len(args) != 3:
+    if len(args) != 2:
         print "Incorrect number of arguments"
         usage()
         sys.exit(1)
     
-    metadata = Metadata()
-    metadata.read_from_file(args[0])
-    package_tarfile = tarfile.open(args[1],'r')
-    install_dir = args[2]
-    
+    config = GnunetUpdateConfig() # Configuration
+    package_tarfile = tarfile.open(args[0],'r')
+    install_dir = args[1]
+    # Check for the metadata file and its signature in the given tarfile
     try: 
         metadata_tarinfo = package_tarfile.getmember("metadata.dat")
-    except KeyError, err:
-        print err
-        print "Metadata not found in the given tarfile. Quitting"
+        metadata_sig_tarinfo = package_tarfile.getmember("metadata.dat.asc")
+    except KeyError as no_file:
+        print no_file + " not found in the given tarfile. Quitting"
         package_tarfile.close()
         sys.exit(2)
-    
+
+    # Temporary directory for package extraction
+    temp_dir = tempfile.mkdtemp()
+    package_tarfile.extract(metadata_tarinfo, temp_dir)
+    package_tarfile.extract(metadata_sig_tarinfo, temp_dir)
+
+
+    # Verify metadata signature
+    metadata_fd = open(os.path.join(temp_dir, metadata_tarinfo.name), "rb")
+    metadata_sig_fd = open(os.path.join(temp_dir, metadata_sig_tarinfo.name), 
"rb")
+
+    sig = util.gpg_verify_sign(metadata_fd, 
+                               metadata_sig_fd,
+                               config.get('SECURITY', 'PGP_SIGN_KEY'),
+                               detached=True)
+    metadata_sig_fd.close()
+    metadata_fd.close()
+
+    if sig[0].status is not None:
+        print "Error verifying the signature of metadata: " + sig[0].status[2]
+        shutil.rmtree(temp_dir)
+        package_tarfile.close()
+        sys.exit(2)
+
+    metadata = Metadata()
+    metadata.read_from_file(os.path.join(temp_dir, metadata_tarinfo.name))
+    shutil.rmtree(temp_dir)
+
     #check whether the system and machine architecture match
     host_system = platform.system()
     host_machine = platform.machine()
     
     if metadata.system != host_system or metadata.machine != host_machine:
         print "The given package is not suited for this platform."
+        package_tarfile.close()
         sys.exit(1)
     
     #Platform check is done; now unpack the tarfile into destination directory
@@ -123,27 +153,33 @@
     except OSError:             # Given directory not present 
         os.mkdir(install_dir, 0755)
     
-    installed_deps = get_installed_deps()    # already available dependencies
+    available_libs = get_available_libs()    # already available dependencies
     new_binary_objects = metadata.binary_objects # To be installed objects
-    installed_deps.sort(key=(lambda dep: dep.name))
+    available_libs.sort(key=(lambda dep: dep.name))
 
+    installed_files = list()    # List of files that are installed from package
+
     needed_deps = metadata.dependencies
 
+    # FIXME: Add filtering in config
     dep_filter = ["/lib/ld-linux.so.2"]
     to_be_installed_deps = [(dep) for dep in needed_deps 
-                            if dep not in installed_deps and 
+                            if dep not in available_libs and 
                             dep.name not in dep_filter]
 
     #FIXME: Security warning! Perhaps we should examin the contents of tarfile
     #before extracting
     for member in package_tarfile.getmembers():
-        if ("metadata.dat" == member.name or "install-prefix" == member.name 
or 
-                 member.name.startswith("dependencies/")):
+        if ("metadata.dat" == member.name or 
+            "metadata.dat.asc" == member.name or
+            "install-prefix" == member.name or 
+            member.name.startswith("dependencies/")):
             continue
         elif member.name.startswith("install-prefix"):
             # Remove the `install-prefix' directory
             member.name = member.name.replace("install-prefix/","",1)
         package_tarfile.extract(member, install_dir)
+        installed_files.append(member.name)
     
     # Install the needed dependencies from tarfile
     dep_dir = install_dir + "/lib/gnunet-deps"
@@ -159,18 +195,40 @@
         # Remove the `dependencies/' in member.name
         dep_tarinfo.name = dep_tarinfo.name.replace("dependencies/","",1)
         package_tarfile.extract(dep_tarinfo, "./")
+        installed_files.append(os.path.join(dep_dir, dep_tarinfor.name))
+        # Check the hash of the extracted file
+        if sha512_hexdigest(dep_tarinfo.name) != dep.hash:
+            print (dep_tarinfo.name + 
+                   " not matched with the expected hash " + dep.hash)
+            print "Given package contains code not signed by trusted packager"
+            print "Installation failed due to security reasons."
+            package_tarfile.close()
+            os.chdir(orig_working_dir)
+            shutil.rmtree(install_dir)
+            sys.exit(0)
+        
         # Generate symbolic link from dep.name to dep.realname
         # NOTE: Available only on Unix type systems!
         if os.path.exists(dep.name):
             os.remove(dep.name)
         os.symlink(dep.realname, dep.name)
+        installed_files.append(os.path.join(dep_dir, dep.name))
 
+    package_tarfile.close()
+
     # run ldconfig -n in the dep_dir
     proc = subprocess.Popen(["ldconfig", "-n"])
     proc.wait()
 
     os.chdir(orig_working_dir)
-    package_tarfile.close()
+    if not os.path.exists(os.path.join(install_dir, "share/gnunet-update/")):
+        os.makedirs(os.path.join(install_dir, "share/gnunet-update/"))
+    install_manifest_fd = open(os.path.join(install_dir,
+                                            
"share/gnunet-update/install-manifest"),
+                               "wb") 
+    map((lambda name: install_manifest_fd.write(name + '\n')),
+        installed_files)
+    install_manifest_fd.close()
 
     print "Installation Successful!"
     print "GNUNET has been installed at: " + install_dir

Modified: gnunet-update/gnunet_update/package.py
===================================================================
--- gnunet-update/gnunet_update/package.py      2011-11-19 14:35:25 UTC (rev 
18218)
+++ gnunet-update/gnunet_update/package.py      2011-11-19 18:52:25 UTC (rev 
18219)
@@ -206,8 +206,9 @@
     #generate the metadata file and add it to tar
     metadata_file = metadata.write_to_file(package_file + ".meta")
     #generate the metadata file signature
+    metadata_sig_file = metadata_file + ".asc"
     metadata_fd = open(metadata_file, "rb")
-    metadata_sig_fd = open(metadata_file + ".asc", "wb")
+    metadata_sig_fd = open(metadata_sig_file, "wb")
     skey_passphrase = config.get('SECURITY', 'PGP_SIGN_KEY_PASSWORD')
     if skey_passphrase is None:
         # FIXME: Hide the characters while typing password
@@ -223,6 +224,10 @@
     metadata_sig_fd.close()
 
     tar_file.add(metadata_file, "metadata.dat")
+    tar_file.add(metadata_sig_file, "metadata.dat.asc")
+
+    os.remove(metadata_file)
+    os.remove(metadata_sig_file)
     
     print "Here are the dependencies:"
     for dep in dependencies:

Modified: gnunet-update/gnunet_update/util.py
===================================================================
--- gnunet-update/gnunet_update/util.py 2011-11-19 14:35:25 UTC (rev 18218)
+++ gnunet-update/gnunet_update/util.py 2011-11-19 18:52:25 UTC (rev 18219)
@@ -106,3 +106,27 @@
                         gpgme.SIG_MODE_CLEAR if detached is False
                         else gpgme.SIG_MODE_DETACH)
     return new_sigs
+
+def gpg_verify_sign(plain_fd, sign_fd, key_fpr, detached=False):
+    """Verifys whether the signature is valid for the given input.
+    
+    plain_fd: File like object pointing to the original file for which the
+              given signature is made if the signature is detached; in case of
+              undetached signatures it should point to an empty file into
+              which the original content is restored from the signature
+    sign_fd: File like object pointing to the signature
+    key_fpr: The fingerprint of the key that should be used for verifying
+    detached: If detached is false, then the original content is restored from
+              the signature in to the file pointed by plain_fd
+    """
+    ctx = gpgme.Context()
+    ctx.armor = True
+    key = ctx.get_key(key_fpr.replace(' ',''))
+    if detached is False:
+        new_sigs = ctx.verify(sign_fd, None, plain_fd)
+    else:
+        new_sigs = ctx.verify(sign_fd, plain_fd, None)
+
+
+
+    return new_sigs

Modified: gnunet-update/test/test_util.py
===================================================================
--- gnunet-update/test/test_util.py     2011-11-19 14:35:25 UTC (rev 18218)
+++ gnunet-update/test/test_util.py     2011-11-19 18:52:25 UTC (rev 18219)
@@ -150,13 +150,18 @@
         # Now verify the signature
         signature.seek(0)
         plaintext = StringIO()
-        sigs = ctx.verify(signature, None, plaintext)
+        # sigs = ctx.verify(signature, None, plaintext)
+        sigs = util.gpg_verify_sign(plaintext,
+                                    signature,
+                                    key_fpr,
+                                    detached=False)
         self.assertEqual(plaintext.getvalue(), self.sample_test_data)
         self.assertEqual(len(sigs),1)
         self.assertEqual(sigs[0].fpr, key_fpr)
         self.assertEqual(sigs[0].status, None)
         self.assertEqual(sigs[0].wrong_key_usage, False)
-        # Verify detached signature
+
+        # Verifying detached signature
         plaintext.seek(0)
         signature = StringIO()
         sigs = util.gpg_sign_file(plaintext,
@@ -171,11 +176,26 @@
         # Now verify the signature
         signature.seek(0)
         plaintext.seek(0)
-        sigs = ctx.verify(signature, plaintext, None)
+        # sigs = ctx.verify(signature, plaintext, None)
+        sigs = util.gpg_verify_sign(plaintext,
+                                    signature,
+                                    key_fpr,
+                                    detached=True)
         self.assertEqual(len(sigs),1)
         self.assertEqual(sigs[0].fpr, key_fpr)
         self.assertEqual(sigs[0].status, None)
         self.assertEqual(sigs[0].wrong_key_usage, False)
+
+        # verify whether bad signature is caught
+        signature.seek(0)
+        plaintext = StringIO("This text should raise an error")
+        sigs = util.gpg_verify_sign(plaintext,
+                                    signature,
+                                    key_fpr,
+                                    detached=True)
+        self.assertEqual(len(sigs),1)
+        self.assertEqual(sigs[0].status[2], 'Bad signature')
+        self.assertEqual(sigs[0].wrong_key_usage, False)
         shutil.rmtree(temp_gpghome);
 
 if __name__ == '__main__':




reply via email to

[Prev in Thread] Current Thread [Next in Thread]