您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

为什么我可以将实例方法传递给multiprocessing.Process,而不是multiprocessing.Pool?

为什么我可以将实例方法传递给multiprocessing.Process,而不是multiprocessing.Pool?

pickle模块通常不能腌制实例方法

>>> import pickle
>>> class A(object):
...  def z(self): print "hi"
... 
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

但是,该multiprocessing模块具有一个自定义Pickler,该自定义添加了一些代码以启用此功能

#
# Try making some callable types picklable
#

from pickle import Pickler
class ForkingPickler(Pickler):
    dispatch = Pickler.dispatch.copy()

    @classmethod
    def register(cls, type, reduce):
        def dispatcher(self, obj):
            rv = reduce(obj)
            self.save_reduce(obj=obj, *rv)
        cls.dispatch[type] = dispatcher

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

您可以使用copy_reg模块复制此文件,以查看它是否可以自己运行:

>>> import copy_reg
>>> def _reduce_method(m):
...     if m.im_self is None:
...         return getattr, (m.im_class, m.im_func.func_name)
...     else:
...         return getattr, (m.im_self, m.im_func.func_name)
... 
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."

当您用来Process.start在Windows上生成新进程时,它会使用以下自定义来腌制传递给子进程的所有参数ForkingPickler

#
# Windows
#

else:
    # snip...
    from pickle import load, HIGHEST_PROTOCOL

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)

    #
    # We define a Popen class similar to the one from subprocess, but
    # whose constructor takes a process object as its argument.
    #

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()

        def __init__(self, process_obj):
            # create pipe for communication with child
            rfd, wfd = os.pipe()

            # get handle for read end of the pipe and make it inheritable
            ...
            # start process
            ...

            # set attributes of self
            ...

            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()

请注意“发送信息给孩子”部分。它使用的dump功能ForkingPickler用来腌制数据,这意味着您的实例方法可以被腌制。

现在,使用on方法multiprocessing.Pool方法发送给子进程时,它是使用amultiprocessing.Pipe来腌制数据。在python 2.7中,它multiprocessing.Pipe是用C实现的,并且pickle_dumps直接调用,因此它没有利用ForkingPickler。这意味着腌制实例方法不起作用。

但是,如果您使用类型而不是customcopy_reg注册instancemethod类型Pickler,则 所有 对酸洗的尝试都会受到影响。因此,您甚至可以通过Pool以下方式使用它来启用酸洗实例方法

import multiprocessing
import copy_reg
import types

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)

def test1():
    print("Hello, world 1")

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print(answer)
        print
        for answer in pool.imap(self.square, range(10)):
            print(answer)

    def test2(self):
        print("Hello, world 2")

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

输出

Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
 1GOT (0, 5, (True, 6))

GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
 GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10

GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
 GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
 GOT (1, 6, (True, 36))
36
 GOT (1, 7, (True, 49))
49
 GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None

还要注意,在Python 3.x中,pickle可以本地腌制实例方法类型,因此这些东西都不再重要了。:)

其他 2022/1/1 18:46:46 有491人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶