自动化痛点
异步接口需要定时获取结果【未想到最优解决方式】
运行速度慢
容错性差,维护性差
不能很好的兼容多进程方式运行
多进程下运行session块被重复运行,初始化代码被多次初始化覆盖删除,多个session的文件锁
多进程下api层单例所遇问题
多进程下token的设计
多进程下数据共享问题
解决:
1、运行速度慢,我们可以采用多进程方式下去执行自动化,有pytest提供的常用插件pytest-xsdit 插件,插件用法就是一条命令这里就不过多解释了
采用此插件就能很好的解决多进程吗?答案是肯定是不是的:相信大家都知道session是一个进程是一个会话,那多个进程相比是有多个会话了?我们来验证我们的想法
目录结构为
1 2 3 4 5 6 7 8 9 comm __init__.py testcase test_*.py export allure html conftest.py run.py
conftest.py
pytest-xdist自动分配了四个进程来执行我们的自动化,注:pytest-xdist的auto会根据电脑的内核自动分配进程【明明我的电脑是8核,明天就提刀去联想官方店,希望给我一个完美的解释】
由此可见,我们session级别的conftest被执行多次,说明session确实会存在问题,那要如何解决呢?
看到了pytest-xdist官方解决方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import jsonimport pytestfrom filelock import FileLock@pytest.fixture(scope="session" ) def session_data (tmp_path_factory, worker_id ): if worker_id == "master" : return produce_expensive_data() root_tmp_dir = tmp_path_factory.getbasetemp().parent fn = root_tmp_dir / "data.json" with FileLock(str (fn) + ".lock" ): if fn.is_file(): data = json.loads(fn.read_text()) else : data = produce_expensive_data() fn.write_text(json.dumps(data)) return data
看到官方给的解决方案其实是采用FileLock文件锁的方式进行解决,亲测,完全可以解决此问题
2、第二种多进程方案multprocessing Pool
自动化是按照模块进行多进程执行,有多少模块启动多少个进程
为什么要用apply_async呢?因为他是非阻塞异步的, 他不会等待子进程执行完毕, 主进程会继续执行, 他会根据系统调度来进行进程切换
有不懂的吗?
不懂的话自己去敲代码吧,算了解释一下吧,我们来看一个栗子【比较好吃的那种】
首先我们看一下apply()阻塞异步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import timefrom multiprocessing import Pooldef doIt (num ): print ("Process num is : %s" % num) time.sleep(1 ) print ('process %s end' % num) if __name__ == '__main__' : print ('mainProcess start' ) start_time = time.time() pool = Pool(3 ) print ('Child start' ) for i in range (3 ): pool.apply(doIt, [i]) print ('mainProcess done time:%s s' % (time.time() - start_time))
执行结果 我们可以看到, 主进程开始执行之后, 创建的三个子进程也随即开始执行, 主进程被阻塞, 这里跟上一篇文章介绍的join()
很类似, 而且接下来三个子进程是一个接一个按顺序地执行, 等到子进程全部执行完毕之后, 主进程就会继续执行, 打印出最后一句,
接下来我们看看apply_asyn()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import timefrom multiprocessing import Pooldef doIt (num ): print ("Process num is : %s" % num) time.sleep(1 ) print ('process %s end' % num) if __name__ == '__main__' : print ('mainProcess start' ) start_time = time.time() pool = Pool(3 ) print ('Child start' ) for i in range (3 ): pool.apply_async(doIt, [i]) print ('mainProcess done time:%s s' % (time.time() - start_time))
OMG发生了什么,怎么之有一个主进程执行了?我的子进程呢?
我们来看看运行结果 , 可以看出来, 截图的第一句是上一个程序的执行消耗时间, 最后一句是使用apply_async() 所消耗的时间, 在这里, 主进程没有被阻塞, 验证了他是非阻塞的, 子进程没有执行, 验证了他是根据系统调度完成的, 为什么会这样呢? 原因是, 进程的切换时操作系统控制的, 我们首先运行的是主进程, 而CPU运行得又很快, 快到还没等系统调度到子线程, 主线程就已经运行完毕了, 并且退出程序. 所以子进程就没有运行了.
那我 调用了apply_async() 是不是就不能运行子进程了吗? 肯定可以呀!!小老弟,想啥呢??
还记得**join()**告诉主进程老子要运行子进程就好呀, 还记得 join()的作用吗, 他可以阻塞主进程, 等待所有子进程结束之后再运行, 我们在
1 print('mainProcess done time:%s s' % (time.time() - start_time))
执行之前加入两句代码
1 2 pool.close() pool.join()
我们先要 close()一下子进程, 而且要先在 join() 前运行它,不逼逼赖赖,上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import timefrom multiprocessing import Pooldef doIt (num ): print ("Process num is : %s" % num) time.sleep(1 ) print ('process %s end' % num) if __name__ == '__main__' : print ('mainProcess start' ) start_time = time.time() pool = Pool(3 ) print ('Child start' ) for i in range (3 ): pool.apply_async(doIt, [i]) pool.close() pool.join() print ('mainProcess done time:%s s' % (time.time() - start_time))
这是什么速度,这也太快了,比**apply()**快了六倍?
我们看看加入这两句的运行结果, 首先我们可以看到即使是使用了非阻塞主进程的apply_async() 也能让子进程运行了, 在这里子进程按顺序交替运行了, CPU在执行第一个子进程的时候, 还没等第一个子进程结束, 系统调度到了按顺序调度到了第二个子进程, 以此类推, 一直调度运行子进程, 一个接一个地结束子进程的运行, 最后运行主进程, 而且我们可以看到使用apply_async()的执行效力会更高, 你看一下他们各自执行结果最后一句的执行消耗时间就知道了, 这也是官方推荐我们使用apply_async()的主要原因吧
还有一个非常重要的原因win上multprocessing和linux下的multprocessing的区别,相当于说为什么windows下需要freeze_support() Windows下面的multiprocessing跟Linux下面略有不同,Linux下面的multiprocessing基于fork,fork之后所有的本地变量都复制一份,因此可以使用任意的全局变量; 在Windows下面,多进程是通过启动新进程完成的,所有的全局变量都是重新初始化的,在运行过程中动态生成、修改过的全局变量是不能使用的。
multiprocessing内部使用pickling传递map的参数到不同的进程,当传递一个函数或类时,pickling将函数或者类用所在模块+函数/类名的方式表示, 如果对端的Python进程无法在对应的模块中找到相应的函数或者类,就会出错。 当你在Interactive Console当中创建函数的时候,这个函数是动态添加到__main__模块中的,在重新启动的新进程当中不存在,所以会出错。 当不在Console中,而是在独立Python文件中运行时,你会遇到另一个问题:由于你下面调用multiprocessing的代码没有保护, 在新进程加载这个模块的时候会重新执行这段代码,创建出新的multiprocessing池,无限调用下去。
好了扯了一大堆没有用的言归正传 那我们自己实现的多进程是不是也会有conftest的session问题呢?答案是肯定的,当然有了怎么会没有,自己去按照思路去实现一下吧
那既然官方解释是说通过FileLock来实现,那我们是不是也可以自己实现一个FileLock呢?最笨方法的那种,话不多说,说干就干
conftest.py修改为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import pytestfrom loguru import loggerdef log (): logger.add("aa.log" , encoding="UTF-8" ) logger.info("我是一个fixture,session级别的" ) @pytest.fixture(scope="session" ) def fixture_print (request ): import inspect call_name = inspect.stack()[1 ].function with open ("FileLock.lock" , "r+" , encoding="UTF-8" ) as f: if call_name in f.read(): return False else : f.write(call_name) request.addfinalizer(log) return True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import jsonimport osimport pytestfrom random import randomfrom filelock import FileLock@pytest.fixture(scope="session" ) def test (tmp_path_factory, worker_id ): if worker_id == "master" : """ 【自定义代码块】 这里就写你要本身应该要做的操作,比如:登录请求、新增数据、清空数据库历史数据等等 """ token = str (random()) print ("fixture:请求登录接口,获取token" , token) os.environ['token' ] = token return token root_tmp_dir = tmp_path_factory.getbasetemp().parent fn = root_tmp_dir / "data.json" with FileLock(str (fn) + ".lock" ): if fn.is_file(): token = json.loads(fn.read_text()) print (f"读取缓存文件,token 是{token} " ) else : """ 【自定义代码块】 跟上面 if 的代码块一样就行 """ token = str (random()) print ("fixture:请求登录接口,获取token" , token) fn.write_text(json.dumps(token)) print (f"首次执行,token 是{token} " ) os.environ['token' ] = token return token
main.py修改为:
结果:意外的惊喜,session只运行了一次
注:我们只需要限制pytest的hook函数执行conftest的session,千万不能限制自定义的函数去调用session,否在会出现异常
session问题二:如果我有多个session呢?你看这代码,得到pytest的hook都是call_fixture_func啊这要怎么搞? call_fixture_limit.py限制一下就好了啊,注:采用a+方式去写入文件的时候,需要调整指针
1 2 3 4 5 6 7 8 9 10 11 12 13 def call_limit (): """限制多进程下fixture重复调用""" import inspect call_name = inspect.stack()[1 ].function with open ("FileLock.lock" , "a+" , encoding="UTF-8" ) as f: f.seek(0 ) CALL_LIST = f.read().split("," ) if call_name in CALL_LIST: return False else : f.seek(2 ) f.write(f"{call_name} ," ) return True
问题三:多进程下执行失败会出现Cache问题,如何解决呢?不要慌,不是在报错就是在报错的路上 1 2 3 4 def run (index ): import time pytest.main(["-v" , f"{index} " , "--cache-clear" , "--alluredir=reports/allure" ]) time.sleep(1 )
官方提供了专门的清除缓存的命令,是不是很银杏,但是用了这个命令又有问题了,有人就问了,这会有什么问题,怎么那么多问题,是的不要慌
采用了–cache-clear后呢会出现–lf命令的问题,因为lf失败重跑他是在缓存中去收集的,现在你把我缓存清掉了我怎么跑是吧,就很头疼了,所以呢可以提供一种解决思路,那就是手动去控制是否需要清除,如果不重跑呢?就清除,如果重跑呢?就不清除了,当然了有更好的解决方案可以提供一下给我
问题四:多进程下多个session级别的fixtures文件之间互相调用问题,都说了不是在报错就是在报错的路上 问题:多个session级别的fixtures下会存在拿不到已经调用的fixtures文件的result,导致了获取不到值直接报错
解决方案:可以采用os.envircon来进行数据共享,记住,os.envircon只能设置字符串,一定要进行解析
问题五:多进程下数据共享问题,会导致数据共享反射无法生效 问题:参数化反射无法生效
解决方案:
可以写入整个系统的环境变量,例如os.environ[key]=value
思路就是重写object对象和metaclass元类的__setattr__魔术方法和__getattribute__方法
话不多说,完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import osclass ParamsPoolMetaClass (type ): def __new__ (cls, *args, **kwargs ): return super ().__new__(cls, *args, **kwargs) def __getattribute__ (self, item ): try : result = super ().__getattribute__(item) except (AttributeError, RecursionError) as AE: try : result = eval (os.environ.get(item.upper())) except (SyntaxError, TypeError) as SE: result = os.environ.get(item.upper()) return result def __setattr__ (self, key, value ): super ().__setattr__(key, value) os.environ[key] = str (value) class TestParamsPool (metaclass=ParamsPoolMetaClass ): def __getattribute__ (self, item ): try : result = super ().__getattribute__(item) except (AttributeError, RecursionError) as AE: try : result = eval (os.environ.get(item.upper())) except (SyntaxError, TypeError) as SE: result = os.environ.get(item.upper()) return result def __setattr__ (self, key: str , value ): super ().__setattr__(key, value) os.environ[key.upper()] = str (value)
多进程下API层单例问题 疑问:不就是多进程吗?我直接一个multprocessing.Pool()一顿操作就好了,不就是把我的代码给不同得人一起跑吗?还会有那么多花里胡哨的问题?你搁这搁这搁这呢?
如你所愿啊,多进程下单例确实会有问题,虽然是单例,但是在多进程下单例是无法挡住的,会重新申请开辟地址并且返回对象,啊这么神奇
那么如何解决呢?当然是加锁了,铐起来,叉出去,获取了枷锁是不是就跑不了了。【又不是人人都是武松,只是想不想的问题】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import threadingdef synchronized (func ): func.__lock__ = threading.Lock() def lock_func (*args, **kwargs ): with func.__lock__: return func(*args, **kwargs) return lock_func class Singleton (object ): isstance = None @synchronized def __new__ (cls, *args, **kwargs ): if cls.isstance is None : cls.isstance = super ().__new__(cls) return cls.isstance def printf (self ): print (id (self)) if __name__ == '__main__' : from multiprocessing import Pool p = Pool(4 ) for i in range (4 ): p.apply_async(Singleton().printf) p.close() p.join()
运行结果:发现id都一样,是不是就实现了单例,这玩意竟然那么神奇
3、多进程下Token的设计 还是可以基于单例的思想进行设计Token,Token类去继承单例类,实例化一个Token对象,这样在整个过程采用的Token都是相同不会出现Token问题
4、用例执行过慢,维护性差 疑问:为什么要选择多进程,为什么不选择多线程呢?是不是大家都有这种疑问 其实多进程和多线程都可以,问题是,多线程的bug无法解决啊,所以曲线救国,那就用高资源来换取bug吧