multiprocessing.Lock
使用操作系统提供的信号量对象实现。在Linux上,子级只是通过继承了父级的信号量句柄os.fork
。这不是信号量的副本。它实际上继承了父级具有的相同句柄,可以继承文件描述符的相同方式。另一方面,Windows不支持os.fork
,因此必须使Windows处于“腌制”状态Lock
。它multiprocessing.Lock
使用WindowsDuplicateHandle
API通过创建对象内部使用的Windows信号灯的重复句柄来实现此目的,该API指出:
重复的句柄引用与原始句柄相同的对象。因此,对对象的任何更改都会通过两个手柄反映出来
该DuplicateHandle
API允许您将重复句柄的所有权交给子进程,以便子进程在取消选择之后实际上可以使用它。通过创建由孩子拥有的重复句柄,您可以有效地“共享”锁对象。
这是中的信号量对象 multiprocessing/synchronize.py
class SemLock(object):
def __init__(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug('created semlock with handle %s' % sl.handle)
self._make_methods()
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
register_after_fork(self, _after_fork)
def _make_methods(self):
self.acquire = self._semlock.acquire
self.release = self._semlock.release
self.__enter__ = self._semlock.__enter__
self.__exit__ = self._semlock.__exit__
def __getstate__(self): # This is called when you try to pickle the `Lock`.
assert_spawning(self)
sl = self._semlock
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
def __setstate__(self, state): # This is called when unpickling a `Lock`
self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug('recreated blocker with handle %r' % state[0])
self._make_methods()
请注意assert_spawning
in中的调用__getstate__
,该调用在腌制对象时被调用。这是如何实现的:
#
# Check that the current thread is spawning a child process
#
def assert_spawning(self):
if not Popen.thread_is_spawning():
raise RuntimeError(
'%s objects should only be shared between processes'
' through inheritance' % type(self).__name__
)
该函数可以确保您Lock
通过调用来“继承” the函数thread_is_spawning
。在Linux上,该方法仅返回False
:
@staticmethod
def thread_is_spawning():
return False
这是因为Linux不需要腌制即可继承Lock
,因此,如果__getstate__
实际上是在Linux上调用它,则我们一定不能继承。在Windows上,还有更多操作:
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
@staticmethod
def thread_is_spawning():
return getattr(Popen._tls, 'process_handle', None) is not None
在这里,thread_is_spawning
返回True
如果Popen._tls
对象具有process_handle
的属性。我们可以看到process_handle
在中创建了属性__init__
,然后使用将要继承的数据从父级传递给子级dump
,然后删除了该属性。所以thread_is_spawning
只会在True
期间__init__
。根据这个python- ideas邮件列表线程,实际上这是一个人为限制,它被添加来模拟与os.fork
Linux上相同的行为。Windows实际上 可以 支持Lock
随时通过传递,因为DuplicateHandle
可以随时运行。
以上所有内容均适用于该Queue
对象,因为它在Lock
内部使用。
我想说,继承Lock
对象比使用a更可取Manager.Lock()
,因为当使用a时Manager.Lock
,Lock
必须通过IPC将每个对的调用都通过IPC发送到Manager
进程,这比使用Lock
调用内部的共享要慢得多。处理。不过,这两种方法都是完全有效的。
最后,可以使用/关键字参数将a传递给aLock
的所有成员,Pool
而无需使用a :Manager``initializer``initargs
lock = None
def initialize_lock(l):
global lock
lock = l
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
"""
lock = mp.Lock()
mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
queue = mp.Queue()
iterator = make_iterator(args, queue)
mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.
mypool.close()
mypool.join()
return read_queue(queue)
之所以可行,是因为传递的参数传递给在中运行的对象initargs
的__init__
方法,因此它们最终被继承而不是被腌制。Process``Pool