自动化痛点

  • 异步接口需要定时获取结果【未想到最优解决方式】
  • 运行速度慢
  • 容错性差,维护性差
  • 不能很好的兼容多进程方式运行
  • 多进程下运行session块被重复运行,初始化代码被多次初始化覆盖删除,多个session的文件锁
  • 多进程下api层单例所遇问题
  • 多进程下token的设计
  • 多进程下数据共享问题

解决:

1、运行速度慢,我们可以采用多进程方式下去执行自动化,有pytest提供的常用插件pytest-xsdit 插件,插件用法就是一条命令这里就不过多解释了

1
pytest -n 2 dir

采用此插件就能很好的解决多进程吗?答案是肯定是不是的:相信大家都知道session是一个进程是一个会话,那多个进程相比是有多个会话了?我们来验证我们的想法

目录结构为

1
2
3
4
5
6
7
8
9
comm
__init__.py
testcase
test_*.py
export
allure
html
conftest.py
run.py

conftest.py

image-20210927143425947

pytest-xdist自动分配了四个进程来执行我们的自动化,注:pytest-xdist的auto会根据电脑的内核自动分配进程【明明我的电脑是8核,明天就提刀去联想官方店,希望给我一个完美的解释】

image-20210927143220236

由此可见,我们session级别的conftest被执行多次,说明session确实会存在问题,那要如何解决呢?

image-20210927144512429

看到了pytest-xdist官方解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import json

import pytest
from filelock import FileLock


@pytest.fixture(scope="session")
def session_data(tmp_path_factory, worker_id):
if worker_id == "master":
# not executing in with multiple workers, just produce the data and let
# pytest's fixture caching do its job
return produce_expensive_data()

# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent

fn = root_tmp_dir / "data.json"
with FileLock(str(fn) + ".lock"):
if fn.is_file():
data = json.loads(fn.read_text())
else:
data = produce_expensive_data()
fn.write_text(json.dumps(data))
return data

看到官方给的解决方案其实是采用FileLock文件锁的方式进行解决,亲测,完全可以解决此问题

2、第二种多进程方案multprocessing Pool

image-20210927145749854

image-20210927145740377

自动化是按照模块进行多进程执行,有多少模块启动多少个进程

为什么要用apply_async呢?因为他是非阻塞异步的, 他不会等待子进程执行完毕, 主进程会继续执行, 他会根据系统调度来进行进程切换

有不懂的吗?

不懂的话自己去敲代码吧,算了解释一下吧,我们来看一个栗子【比较好吃的那种】

首先我们看一下apply()阻塞异步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
from multiprocessing import Pool


def doIt(num):
print("Process num is : %s" % num)
time.sleep(1)
print('process %s end' % num)


if __name__ == '__main__':
print('mainProcess start')
# 记录一下开始执行的时间
start_time = time.time()
# 创建三个子进程
pool = Pool(3)
print('Child start')
for i in range(3):
pool.apply(doIt, [i])
print('mainProcess done time:%s s' % (time.time() - start_time))

image-20210927151339788

执行结果 我们可以看到, 主进程开始执行之后, 创建的三个子进程也随即开始执行, 主进程被阻塞, 这里跟上一篇文章介绍的join() 很类似, 而且接下来三个子进程是一个接一个按顺序地执行, 等到子进程全部执行完毕之后, 主进程就会继续执行, 打印出最后一句,

接下来我们看看apply_asyn()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
from multiprocessing import Pool


def doIt(num):
print("Process num is : %s" % num)
time.sleep(1)
print('process %s end' % num)


if __name__ == '__main__':
print('mainProcess start')
# 记录一下开始执行的时间
start_time = time.time()
# 创建三个子进程
pool = Pool(3)
print('Child start')
for i in range(3):
pool.apply_async(doIt, [i])
print('mainProcess done time:%s s' % (time.time() - start_time))

image-20210927151549546

OMG发生了什么,怎么之有一个主进程执行了?我的子进程呢?

我们来看看运行结果 , 可以看出来, 截图的第一句是上一个程序的执行消耗时间, 最后一句是使用apply_async() 所消耗的时间, 在这里, 主进程没有被阻塞, 验证了他是非阻塞的, 子进程没有执行, 验证了他是根据系统调度完成的, 为什么会这样呢?
原因是, 进程的切换时操作系统控制的, 我们首先运行的是主进程, 而CPU运行得又很快, 快到还没等系统调度到子线程, 主线程就已经运行完毕了, 并且退出程序. 所以子进程就没有运行了.

那我 调用了apply_async() 是不是就不能运行子进程了吗? 肯定可以呀!!小老弟,想啥呢??

还记得**join()**告诉主进程老子要运行子进程就好呀, 还记得 join()的作用吗, 他可以阻塞主进程, 等待所有子进程结束之后再运行, 我们在

1
print('mainProcess done time:%s s' % (time.time() - start_time))

执行之前加入两句代码

1
2
pool.close()
pool.join()

我们先要 close()一下子进程, 而且要先在 join() 前运行它,不逼逼赖赖,上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
from multiprocessing import Pool


def doIt(num):
print("Process num is : %s" % num)
time.sleep(1)
print('process %s end' % num)


if __name__ == '__main__':
print('mainProcess start')
# 记录一下开始执行的时间
start_time = time.time()
# 创建三个子进程
pool = Pool(3)
print('Child start')
for i in range(3):
pool.apply_async(doIt, [i])
pool.close()
pool.join()
print('mainProcess done time:%s s' % (time.time() - start_time))

image-20210927152537833

这是什么速度,这也太快了,比**apply()**快了六倍?

我们看看加入这两句的运行结果, 首先我们可以看到即使是使用了非阻塞主进程的apply_async() 也能让子进程运行了, 在这里子进程按顺序交替运行了,
CPU在执行第一个子进程的时候, 还没等第一个子进程结束, 系统调度到了按顺序调度到了第二个子进程, 以此类推, 一直调度运行子进程, 一个接一个地结束子进程的运行, 最后运行主进程, 而且我们可以看到使用apply_async()的执行效力会更高, 你看一下他们各自执行结果最后一句的执行消耗时间就知道了, 这也是官方推荐我们使用apply_async()的主要原因吧

还有一个非常重要的原因win上multprocessing和linux下的multprocessing的区别,相当于说为什么windows下需要freeze_support()

Windows下面的multiprocessing跟Linux下面略有不同,Linux下面的multiprocessing基于fork,fork之后所有的本地变量都复制一份,因此可以使用任意的全局变量;
在Windows下面,多进程是通过启动新进程完成的,所有的全局变量都是重新初始化的,在运行过程中动态生成、修改过的全局变量是不能使用的。

multiprocessing内部使用pickling传递map的参数到不同的进程,当传递一个函数或类时,pickling将函数或者类用所在模块+函数/类名的方式表示,
如果对端的Python进程无法在对应的模块中找到相应的函数或者类,就会出错。
当你在Interactive Console当中创建函数的时候,这个函数是动态添加到__main__模块中的,在重新启动的新进程当中不存在,所以会出错。
当不在Console中,而是在独立Python文件中运行时,你会遇到另一个问题:由于你下面调用multiprocessing的代码没有保护,
在新进程加载这个模块的时候会重新执行这段代码,创建出新的multiprocessing池,无限调用下去。

image-20210927153035952

好了扯了一大堆没有用的言归正传

那我们自己实现的多进程是不是也会有conftest的session问题呢?答案是肯定的,当然有了怎么会没有,自己去按照思路去实现一下吧

那既然官方解释是说通过FileLock来实现,那我们是不是也可以自己实现一个FileLock呢?最笨方法的那种,话不多说,说干就干

conftest.py修改为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pytest
from loguru import logger

def log():
logger.add("aa.log", encoding="UTF-8")
logger.info("我是一个fixture,session级别的")

@pytest.fixture(scope="session")
def fixture_print(request):
import inspect
call_name = inspect.stack()[1].function
with open("FileLock.lock", "r+", encoding="UTF-8") as f:
if call_name in f.read():
return False
else:
f.write(call_name)
request.addfinalizer(log)
# log()
return True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import os
import pytest
from random import random
from filelock import FileLock

@pytest.fixture(scope="session")
def test(tmp_path_factory, worker_id):
# 如果是单机运行 则运行这里的代码块【不可删除、修改】
if worker_id == "master":
"""
【自定义代码块】
这里就写你要本身应该要做的操作,比如:登录请求、新增数据、清空数据库历史数据等等
"""
token = str(random())
print("fixture:请求登录接口,获取token", token)
os.environ['token'] = token

# 如果测试用例有需要,可以返回对应的数据,比如 token
return token

# 如果是分布式运行
# 获取所有子节点共享的临时目录,无需修改【不可删除、修改】
root_tmp_dir = tmp_path_factory.getbasetemp().parent
# 【不可删除、修改】
fn = root_tmp_dir / "data.json"
# 【不可删除、修改】
with FileLock(str(fn) + ".lock"):
# 【不可删除、修改】
if fn.is_file():
# 缓存文件中读取数据,像登录操作的话就是 token 【不可删除、修改】
token = json.loads(fn.read_text())
print(f"读取缓存文件,token 是{token} ")
else:
"""
【自定义代码块】
跟上面 if 的代码块一样就行
"""
token = str(random())
print("fixture:请求登录接口,获取token", token)
# 【不可删除、修改】
fn.write_text(json.dumps(token))
print(f"首次执行,token 是{token} ")

# 最好将后续需要保留的数据存在某个地方,比如这里是 os 的环境变量
os.environ['token'] = token
return token

main.py修改为:

image-20210927154308215

结果:意外的惊喜,session只运行了一次

image-20210927154802299

注:我们只需要限制pytest的hook函数执行conftest的session,千万不能限制自定义的函数去调用session,否在会出现异常

session问题二:如果我有多个session呢?你看这代码,得到pytest的hook都是call_fixture_func啊这要怎么搞?

call_fixture_limit.py限制一下就好了啊,注:采用a+方式去写入文件的时候,需要调整指针

1
2
3
4
5
6
7
8
9
10
11
12
13
def call_limit():
"""限制多进程下fixture重复调用"""
import inspect
call_name = inspect.stack()[1].function
with open("FileLock.lock", "a+", encoding="UTF-8") as f:
f.seek(0)
CALL_LIST = f.read().split(",")
if call_name in CALL_LIST:
return False
else:
f.seek(2)
f.write(f"{call_name},")
return True

image-20210927173555822

问题三:多进程下执行失败会出现Cache问题,如何解决呢?不要慌,不是在报错就是在报错的路上

1
2
3
4
def run(index):
import time
pytest.main(["-v", f"{index}", "--cache-clear", "--alluredir=reports/allure"])
time.sleep(1)

官方提供了专门的清除缓存的命令,是不是很银杏,但是用了这个命令又有问题了,有人就问了,这会有什么问题,怎么那么多问题,是的不要慌

采用了–cache-clear后呢会出现–lf命令的问题,因为lf失败重跑他是在缓存中去收集的,现在你把我缓存清掉了我怎么跑是吧,就很头疼了,所以呢可以提供一种解决思路,那就是手动去控制是否需要清除,如果不重跑呢?就清除,如果重跑呢?就不清除了,当然了有更好的解决方案可以提供一下给我

问题四:多进程下多个session级别的fixtures文件之间互相调用问题,都说了不是在报错就是在报错的路上

问题:多个session级别的fixtures下会存在拿不到已经调用的fixtures文件的result,导致了获取不到值直接报错

解决方案:可以采用os.envircon来进行数据共享,记住,os.envircon只能设置字符串,一定要进行解析

问题五:多进程下数据共享问题,会导致数据共享反射无法生效

问题:参数化反射无法生效

解决方案:

可以写入整个系统的环境变量,例如os.environ[key]=value

思路就是重写object对象和metaclass元类的__setattr__魔术方法和__getattribute__方法

话不多说,完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os


class ParamsPoolMetaClass(type):

def __new__(cls, *args, **kwargs):
return super().__new__(cls, *args, **kwargs)

def __getattribute__(self, item):
try:
result = super().__getattribute__(item)
except (AttributeError, RecursionError) as AE:
try:
result = eval(os.environ.get(item.upper()))
except (SyntaxError, TypeError) as SE:
result = os.environ.get(item.upper())

return result

def __setattr__(self, key, value):
super().__setattr__(key, value)
os.environ[key] = str(value)


class TestParamsPool(metaclass=ParamsPoolMetaClass):

def __getattribute__(self, item):
try:
result = super().__getattribute__(item)
except (AttributeError, RecursionError) as AE:
try:
result = eval(os.environ.get(item.upper()))
except (SyntaxError, TypeError) as SE:
result = os.environ.get(item.upper())
return result

def __setattr__(self, key: str, value):
super().__setattr__(key, value)
os.environ[key.upper()] = str(value)

多进程下API层单例问题

疑问:不就是多进程吗?我直接一个multprocessing.Pool()一顿操作就好了,不就是把我的代码给不同得人一起跑吗?还会有那么多花里胡哨的问题?你搁这搁这搁这呢?

如你所愿啊,多进程下单例确实会有问题,虽然是单例,但是在多进程下单例是无法挡住的,会重新申请开辟地址并且返回对象,啊这么神奇

那么如何解决呢?当然是加锁了,铐起来,叉出去,获取了枷锁是不是就跑不了了。【又不是人人都是武松,只是想不想的问题】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading


def synchronized(func):
func.__lock__ = threading.Lock()

def lock_func(*args, **kwargs):
with func.__lock__:
return func(*args, **kwargs)

return lock_func


class Singleton(object):
isstance = None

@synchronized
def __new__(cls, *args, **kwargs):
if cls.isstance is None:
cls.isstance = super().__new__(cls)
return cls.isstance
def printf(self):
print(id(self))

if __name__ == '__main__':
from multiprocessing import Pool

p = Pool(4)
for i in range(4):
p.apply_async(Singleton().printf)
p.close()
p.join()

运行结果:发现id都一样,是不是就实现了单例,这玩意竟然那么神奇

image-20210927161824853

3、多进程下Token的设计

还是可以基于单例的思想进行设计Token,Token类去继承单例类,实例化一个Token对象,这样在整个过程采用的Token都是相同不会出现Token问题

4、用例执行过慢,维护性差

疑问:为什么要选择多进程,为什么不选择多线程呢?是不是大家都有这种疑问

其实多进程和多线程都可以,问题是,多线程的bug无法解决啊,所以曲线救国,那就用高资源来换取bug吧