programing

Python 다중 처리 Pickling Error:절일 수 없다

firstcheck 2022. 12. 27. 21:54
반응형

Python 다중 처리 Pickling Error:절일 수 없다

더 간단한 예로는 오류를 재현할 수 없고, 제 코드가 너무 복잡해서 게시할 수 없어서 죄송합니다.일반 Python이 아닌 IPython 쉘에서 프로그램을 실행하면 잘 됩니다.

나는 이 문제에 대한 이전의 노트를 찾아보았다.모두 클래스 함수 내에서 정의된 함수를 호출하기 위해 풀을 사용한 것이 원인입니다.하지만 나는 그렇지 않다.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

어떤 도움이라도 주시면 감사하겠습니다.

업데이트: 피클 기능이 모듈의 최상위 레벨에 정의되어 있습니다.중첩된 함수를 포함하는 함수를 호출하지만,f()g()h()네스트된 함수를 가지고 있다.i()전화드렸습니다.pool.apply_async(f).f(),g(),h()모두 최상위 수준에서 정의됩니다.이 패턴으로 간단한 예를 들어봤는데 효과가 있어요.

여기 절일 수 있는 것의 목록이 있습니다.특히 함수는 모듈의 최상위 수준에서 정의된 경우에만 선택할 수 있습니다.

이 코드:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

는, 투고한 에러와 거의 같은 에러를 생성합니다.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

문제는 이 시스템이pool모든 메서드는mp.SimpleQueue작업을 작업자 프로세스에 전달합니다.이 모든 것을 통해mp.SimpleQueue선택 가능해야 합니다.foo.work는 모듈의 최상위 수준에서 정의되어 있지 않기 때문에 선택할 수 없습니다.

최상위 레벨에서 함수를 정의함으로써 수정할 수 있습니다.이 함수는foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

주의해 주세요foo선택할 수 있습니다.Foo최상위 레벨로 정의되어 있습니다.foo.__dict__선택가능합니다.

나는 사용하고 싶다pathos.multiprocesssing대신multiprocessing.pathos.multiprocessing의 포크이다.multiprocessing는 를 사용합니다.dill.dillpython으로 거의 모든 것을 직렬화할 수 있기 때문에 더 많은 것을 병렬로 전송할 수 있습니다.pathosfork에는 클래스 메서드에 필요한 여러 인수 함수를 직접 사용할 수도 있습니다.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

얻다pathos(그리고 당신이 원한다면)dill)는 이쪽: https://github.com/uqfoundation

이 문제가 발생했을 때multiprocessing간단한 해결책은 에서 바꾸는 것이다.Pool로.ThreadPool이것은, Import 이외에는 코드를 변경하지 않고 실시할 수 있습니다.

from multiprocessing.pool import ThreadPool as Pool

이것은 ThreadPool이 새로운 프로세스를 작성하는 것이 아니라 메인 스레드와 메모리를 공유하기 때문에 기능합니다.즉, 피클링은 불필요합니다.

이 방법의 단점은 python이 스레드 처리에서 가장 뛰어난 언어가 아니라는 것입니다.이것은 Global Interpreter Lock이라고 불리는 것을 사용하여 스레드를 안전하게 유지함으로써, 여기서의 사용 사례를 늦출 수 있습니다.그러나 주로 다른 시스템(HTTP 명령 실행, 데이터베이스와의 대화, 파일 시스템 쓰기)과 대화하는 경우 코드가 CPU에 의해 구속되지 않고 큰 타격을 받지 않을 수 있습니다.실제로 HTTP/HTTPS 벤치마크를 작성할 때 새로운 프로세스를 작성하는 데 드는 오버헤드가 새로운 스레드 작성에 드는 오버헤드보다 훨씬 높고 프로그램이 HTTP 응답을 기다리고 있기 때문에 여기서 사용하는 스레드 모델은 오버헤드와 지연이 적다는 것을 알게 되었습니다.

따라서 python 사용자 공간에서 많은 양의 데이터를 처리하는 경우에는 이 방법이 최선의 방법이 아닐 수 있습니다.

이 말하듯이multiprocessingPython 객체를 피클할 수 있는 워커 프로세스에만 전송할 수 있습니다.한 대로 할 수 없는 unutbu를 사용할 수 .dill데이터(특히 코드 데이터)를 전송하기 위한 확장 피클링/언픽링 기능을 다음에 나타냅니다.

에서는, .dill.pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

또, 완전하게 동작하고 있는 코드에 프로파일러를 사용하면, 에러 출력을 정확하게 생성할 수 있는 것을 알았습니다.

이것은 Windows(포킹이 다소 우아하지 않은 경우)에서 사용되었습니다.

나는 달리고 있었다:

python -m profile -o output.pstats <script> 

프로파일링을 삭제하면 오류가 제거되고 프로파일링을 실행하면 복구됩니다.나도 그 암호가 먹혀들었다는 걸 알았기 때문에 날 미치게 만들었지뭔가 pool.py를 업데이트 한 게 있나 확인하고 있었는데... 뭔가 가라앉은 느낌이 들어서 프로파일링을 없앴어요. 그게 다예요.

다른 사람이 마주칠 경우를 대비해서 기록 보관소에 글을 올립니다

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

이 오류는 비동기 작업에 전달된 모델 개체 내에 내장된 함수가 있는 경우에도 발생합니다.

따라서 전달된 모델 객체에 기능이 내장되어 있지 않은지 확인하십시오.(당사의 경우,FieldTracker()특정 필드를 추적하기 위한 모델 내 django-model-sums의 기능).관련 GitHub 문제 링크입니다.

이 솔루션에서는 dill 설치만 필요하며 다른 라이브러리는 pathos로 설치할 필요가 없습니다.

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Numpy 어레이에서도 사용할 수 있습니다.

@rocksportrocker 솔루션을 기반으로 하기 때문에 결과를 전송 및 RECV할 때 딜링하는 것이 좋습니다.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

빠른 해결책은 기능을 글로벌하게 만드는 것입니다.

from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

@penky Suresh가 이 답변에서 제시한 바와 같이 삽입 키워드를 사용하지 마십시오.

★★★★★★★★★★★★★★.args 시 입니다.


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.



PS: 탭/스페이스가 조금 어긋날 수 있습니다.

언급URL : https://stackoverflow.com/questions/8804830/python-multiprocessing-picklingerror-cant-pickle-type-function

반응형