[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v9 01/14] iotests: do a light delinting
From: |
John Snow |
Subject: |
[PATCH v9 01/14] iotests: do a light delinting |
Date: |
Tue, 24 Mar 2020 19:20:50 -0400 |
This doesn't fix everything in here, but it does help clean up the
pylint report considerably.
This should be 100% style changes only; the intent is to make pylint
more useful by working on establishing a baseline for iotests that we
can gate against in the future.
Signed-off-by: John Snow <address@hidden>
Reviewed-by: Philippe Mathieu-Daudé <address@hidden>
Reviewed-by: Max Reitz <address@hidden>
---
tests/qemu-iotests/iotests.py | 83 ++++++++++++++++++-----------------
1 file changed, 43 insertions(+), 40 deletions(-)
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py
index 7bc4934cd2..886ae962ae 100644
--- a/tests/qemu-iotests/iotests.py
+++ b/tests/qemu-iotests/iotests.py
@@ -16,11 +16,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
-import errno
import os
import re
import subprocess
-import string
import unittest
import sys
import struct
@@ -35,7 +33,7 @@
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu import qtest
-assert sys.version_info >= (3,6)
+assert sys.version_info >= (3, 6)
faulthandler.enable()
@@ -141,11 +139,11 @@ def qemu_img_log(*args):
return result
def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
- args = [ 'info' ]
+ args = ['info']
if imgopts:
args.append('--image-opts')
else:
- args += [ '-f', imgfmt ]
+ args += ['-f', imgfmt]
args += extra_args
args.append(filename)
@@ -224,7 +222,7 @@ def cmd(self, cmd):
# quit command is in close(), '\n' is added automatically
assert '\n' not in cmd
cmd = cmd.strip()
- assert cmd != 'q' and cmd != 'quit'
+ assert cmd not in ('q', 'quit')
self._p.stdin.write(cmd + '\n')
self._p.stdin.flush()
return self._read_output()
@@ -246,10 +244,8 @@ def qemu_nbd_early_pipe(*args):
sys.stderr.write('qemu-nbd received signal %i: %s\n' %
(-exitcode,
' '.join(qemu_nbd_args + ['--fork'] + list(args))))
- if exitcode == 0:
- return exitcode, ''
- else:
- return exitcode, subp.communicate()[0]
+
+ return exitcode, subp.communicate()[0] if exitcode else ''
def qemu_nbd_popen(*args):
'''Run qemu-nbd in daemon mode and return the parent's exit code'''
@@ -313,7 +309,7 @@ def filter_qmp(qmsg, filter_fn):
items = qmsg.items()
for k, v in items:
- if isinstance(v, list) or isinstance(v, dict):
+ if isinstance(v, (dict, list)):
qmsg[k] = filter_qmp(v, filter_fn)
else:
qmsg[k] = filter_fn(k, v)
@@ -324,7 +320,7 @@ def filter_testfiles(msg):
return msg.replace(prefix, 'TEST_DIR/PID-')
def filter_qmp_testfiles(qmsg):
- def _filter(key, value):
+ def _filter(_key, value):
if is_str(value):
return filter_testfiles(value)
return value
@@ -350,7 +346,7 @@ def filter_imgfmt(msg):
return msg.replace(imgfmt, 'IMGFMT')
def filter_qmp_imgfmt(qmsg):
- def _filter(key, value):
+ def _filter(_key, value):
if is_str(value):
return filter_imgfmt(value)
return value
@@ -361,7 +357,7 @@ def log(msg, filters=[], indent=None):
If indent is provided, JSON serializable messages are pretty-printed.'''
for flt in filters:
msg = flt(msg)
- if isinstance(msg, dict) or isinstance(msg, list):
+ if isinstance(msg, (dict, list)):
# Python < 3.4 needs to know not to add whitespace when
pretty-printing:
separators = (', ', ': ') if indent is None else (',', ': ')
# Don't sort if it's already sorted
@@ -372,14 +368,14 @@ def log(msg, filters=[], indent=None):
print(msg)
class Timeout:
- def __init__(self, seconds, errmsg = "Timeout"):
+ def __init__(self, seconds, errmsg="Timeout"):
self.seconds = seconds
self.errmsg = errmsg
def __enter__(self):
signal.signal(signal.SIGALRM, self.timeout)
signal.setitimer(signal.ITIMER_REAL, self.seconds)
return self
- def __exit__(self, type, value, traceback):
+ def __exit__(self, exc_type, value, traceback):
signal.setitimer(signal.ITIMER_REAL, 0)
return False
def timeout(self, signum, frame):
@@ -388,7 +384,7 @@ def timeout(self, signum, frame):
def file_pattern(name):
return "{0}-{1}".format(os.getpid(), name)
-class FilePaths(object):
+class FilePaths:
"""
FilePaths is an auto-generated filename that cleans itself up.
@@ -535,11 +531,11 @@ def pause_drive(self, drive, event=None):
self.pause_drive(drive, "write_aio")
return
self.qmp('human-monitor-command',
- command_line='qemu-io %s "break %s bp_%s"' % (drive,
event, drive))
+ command_line='qemu-io %s "break %s bp_%s"' % (drive, event,
drive))
def resume_drive(self, drive):
self.qmp('human-monitor-command',
- command_line='qemu-io %s "remove_break bp_%s"' % (drive,
drive))
+ command_line='qemu-io %s "remove_break bp_%s"' % (drive,
drive))
def hmp_qemu_io(self, drive, cmd):
'''Write to a given drive using an HMP command'''
@@ -550,8 +546,8 @@ def flatten_qmp_object(self, obj, output=None, basestr=''):
if output is None:
output = dict()
if isinstance(obj, list):
- for i in range(len(obj)):
- self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
+ for i, atom in enumerate(obj):
+ self.flatten_qmp_object(atom, output, basestr + str(i) + '.')
elif isinstance(obj, dict):
for key in obj:
self.flatten_qmp_object(obj[key], output, basestr + key + '.')
@@ -709,9 +705,7 @@ def get_bitmap(self, node_name, bitmap_name,
recording=None, bitmaps=None):
for bitmap in bitmaps[node_name]:
if bitmap.get('name', '') == bitmap_name:
- if recording is None:
- return bitmap
- elif bitmap.get('recording') == recording:
+ if recording is None or bitmap.get('recording') == recording:
return bitmap
return None
@@ -762,12 +756,13 @@ def assert_block_path(self, root, path, expected_node,
graph=None):
assert node is not None, 'Cannot follow path %s%s' % (root, path)
try:
- node_id = next(edge['child'] for edge in graph['edges'] \
- if edge['parent'] == node['id']
and
- edge['name'] == child_name)
+ node_id = next(edge['child'] for edge in graph['edges']
+ if (edge['parent'] == node['id'] and
+ edge['name'] == child_name))
+
+ node = next(node for node in graph['nodes']
+ if node['id'] == node_id)
- node = next(node for node in graph['nodes'] \
- if node['id'] == node_id)
except StopIteration:
node = None
@@ -785,6 +780,12 @@ def assert_block_path(self, root, path, expected_node,
graph=None):
class QMPTestCase(unittest.TestCase):
'''Abstract base class for QMP test cases'''
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ # Many users of this class set a VM property we rely on heavily
+ # in the methods below.
+ self.vm = None
+
def dictpath(self, d, path):
'''Traverse a path in a nested dict'''
for component in path.split('/'):
@@ -830,7 +831,7 @@ def assert_qmp(self, d, path, value):
else:
self.assertEqual(result, value,
'"%s" is "%s", expected "%s"'
- % (path, str(result), str(value)))
+ % (path, str(result), str(value)))
def assert_no_active_block_jobs(self):
result = self.vm.qmp('query-block-jobs')
@@ -840,15 +841,15 @@ def assert_has_block_node(self, node_name=None,
file_name=None):
"""Issue a query-named-block-nodes and assert node_name and/or
file_name is present in the result"""
def check_equal_or_none(a, b):
- return a == None or b == None or a == b
+ return a is None or b is None or a == b
assert node_name or file_name
result = self.vm.qmp('query-named-block-nodes')
for x in result["return"]:
if check_equal_or_none(x.get("node-name"), node_name) and \
check_equal_or_none(x.get("file"), file_name):
return
- self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
- (node_name, file_name, result))
+ self.fail("Cannot find %s %s in result:\n%s" %
+ (node_name, file_name, result))
def assert_json_filename_equal(self, json_filename, reference):
'''Asserts that the given filename is a json: filename and that its
@@ -897,13 +898,13 @@ def wait_until_completed(self, drive='drive0',
check_offset=True, wait=60.0,
self.assert_qmp(event, 'data/error', error)
self.assert_no_active_block_jobs()
return event
- elif event['event'] == 'JOB_STATUS_CHANGE':
+ if event['event'] == 'JOB_STATUS_CHANGE':
self.assert_qmp(event, 'data/id', drive)
def wait_ready(self, drive='drive0'):
- '''Wait until a block job BLOCK_JOB_READY event'''
- f = {'data': {'type': 'mirror', 'device': drive } }
- event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
+ """Wait until a BLOCK_JOB_READY event, and return the event."""
+ f = {'data': {'type': 'mirror', 'device': drive}}
+ return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
def wait_ready_and_cancel(self, drive='drive0'):
self.wait_ready(drive=drive)
@@ -932,7 +933,7 @@ def pause_wait(self, job_id='job0'):
for job in result['return']:
if job['device'] == job_id:
found = True
- if job['paused'] == True and job['busy'] == False:
+ if job['paused'] and not job['busy']:
return job
break
assert found
@@ -1029,8 +1030,8 @@ def qemu_pipe(*args):
universal_newlines=True)
exitcode = subp.wait()
if exitcode < 0:
- sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode,
- ' '.join(args)))
+ sys.stderr.write('qemu received signal %i: %s\n' %
+ (-exitcode, ' '.join(args)))
return subp.communicate()[0]
def supported_formats(read_only=False):
@@ -1062,6 +1063,7 @@ def func_wrapper(test_case: QMPTestCase, *args, **kwargs):
if usf_list:
test_case.case_skip('{}: formats {} are not
whitelisted'.format(
test_case, usf_list))
+ return None
else:
return func(test_case, *args, **kwargs)
return func_wrapper
@@ -1073,6 +1075,7 @@ def skip_if_user_is_root(func):
def func_wrapper(*args, **kwargs):
if os.getuid() == 0:
case_notrun('{}: cannot be run as root'.format(args[0]))
+ return None
else:
return func(*args, **kwargs)
return func_wrapper
--
2.21.1