Быстрый в изучении - мощный в программировании
>> Telegram ЧАТ для Python Программистов

Свободное общение и помощь советом и решением проблем с кодом! Заходите в наш TELEGRAM ЧАТ!

>> Python Форум Помощи!

Мы создали форум где отвечаем на все вопросы связанные с языком программирования Python. Ждем вас там!

>> Python Канал в Telegram

Обучающие статьи, видео и новости из мира Python. Подпишитесь на наш TELEGRAM КАНАЛ!

Модуль multiprocessing на примерах

2 февраля 2018 г. Archy Просмотров: 140158 RSS 6
Примеры Python » Общие вопросы , , , , ,
Модуль multiprocessing на примерах

Модуль multiprocessing был добавлен в Python версии 2.6. Изначально он был определен в PEP 371 Джесси Ноллером и Ричардом Одкерком. Модуль multiprocessing позволяет вам создавать процессы таким же образом, как при создании потоков при помощи модуля threading. Суть в том, что, в связи с тем, что мы теперь создаем процессы, вы можете обойти GIL (Global Interpreter Lock) и воспользоваться возможностью использования нескольких процессоров на компьютере. Пакет multiprocessing также включает ряд API, которых вообще нет в модуле threading. Например, есть очень удобный класс Pool, который вы можете использовать для параллельного выполнения функции между несколькими входами. Мы рассмотрим Pool немного позже. Мы начнем с класса Process модуля multiprocessing.

Приступим к работе с Multiprocessing

Класс Process очень похож на класс Thread модуля threading. Давайте попробуем создать несколько процессов, которые вызывают одну и ту же функцию, и посмотрим, как это сработает:

import os
from multiprocessing import Process
 
 
def doubler(number):
    """
    Функция умножитель на два
    """
    result = number * 2
    proc = os.getpid()
    print('{0} doubled to {1} by process id: {2}'.format(
        number, result, proc))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
    
    for proc in procs:
        proc.join()

Для этого примера мы импортируем Process и создаем функцию doubler. Внутри функции, мы дублируем число, которое мы ей передали. Мы также используем модуль os, чтобы получить ID нынешнего процесса. Это скажет нам, какой именно процесс вызывает функцию. Далее, в нижнем блоке кода, мы создаем несколько Процессов и начинаем их.

Самый последний цикл только вызывает метод join() для каждого из процессов, что говорит Python подождать, пока процесс завершится. Если вам нужно остановить процесс, вы можете вызвать метод terminate(). Когда вы запустите этот код, вы получите выдачу, на подобие этой:

5 doubled to 10 by process id: 10468
10 doubled to 20 by process id: 10469
15 doubled to 30 by process id: 10470
20 doubled to 40 by process id: 10471
25 doubled to 50 by process id: 10472

Все же иногда приятно иметь читабельное название процессов. К счастью, класс Process дает возможность вам получить доступ к названию вашего процесса. Давайте посмотрим:

import os
from multiprocessing import Process, current_process
 
 
def doubler(number):
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    proc = Process(target=doubler, args=(5,))
    
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
    
    proc = Process(target=doubler, name='Test', args=(2,))
    proc.start()
    procs.append(proc)
    
    for proc in procs:
        proc.join()

На этот раз мы импортируем кое-что дополнительно: current_process. Это примерно то же самое, что и current_thread модуля threading. Мы используем его для того, чтобы получить имя потока, который вызывает нашу функцию. Обратите внимание на то, что мы не указывали название первых пяти процессов. И только шестой мы назвали Test. Давайте посмотрим, какую выдачу мы получим:

5 doubled to 10 by: Process-2
10 doubled to 20 by: Process-3
15 doubled to 30 by: Process-4
20 doubled to 40 by: Process-5
25 doubled to 50 by: Process-6
2 doubled to 4 by: Test

Выдача показывает, что модуль multiprocessing назначает номер каждому процессу, как часть его названия по умолчанию. Конечно, когда мы лично определяем название, модуль не будет добавлять число к нашему названию

Замки (Locks)

Модуль multiprocessing поддерживает замки так же, как и модуль threading. Все что вам нужно, это импортировать Lock, повесить его, сделать что-нибудь и снять его. Давайте посмотрим:

from multiprocessing import Process, Lock
 
 
def printer(item, lock):
    """
    Выводим то что передали
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
 
if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

Здесь мы создали простую функцию вывода, которая выводит все, что вы ей передаете. Чтобы не дать процессам конфликтовать друг с другом, мы используем объект Lock. Этот код зациклится над нашим списком трех объектов и создаст процесс для каждого из них. Каждый процесс будет вызывать нашу функцию, и передавать её одному из объектов. Так как мы используем замки, следующий процесс в строке будет ждать, пока замок не снимается, после чего он сможет продолжить.

Логирование (Logging)

Логирование процессов немного отличается от логирования потоков. Причина в том, что пакет logging не использует замки, предназначенные для процессов, так что в итоге вы можете получить результат, который состоит из кучи перемешанных между собой процессов. Давайте попробуем добавить базовый логгинг к предыдущему примеру. Вот код:

import logging
import multiprocessing
from multiprocessing import Process, Lock
 
 
def printer(item, lock):
    """
    Выводим то что передали
    """
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()
 
 
if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    multiprocessing.log_to_stderr()
    
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

Простейший способ вести журнал, это отправить все на stderr. Мы можем сделать это, вызвав функцию log_to_stderr(). Далее мы вызываем функцию get_logger для получения доступа к логгеру и настраиваем его уровень логгинга на INFO. Остальная часть кода остается такой же, какой и была. Обратите внимание на то, что я не вызываю метод join() здесь. Вместо этого, поток parent (другими словами, ваш скрипт) вызовет join() лично. Когда вы сделаете это, вы получите что-то на подобие:

[INFO/Process-1] child process calling self.run()
tango
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
[INFO/MainProcess] process shutting down
foxtrot
[INFO/Process-2] process shutting down
[INFO/Process-3] child process calling self.run()
[INFO/Process-2] process exiting with exitcode 0
10
[INFO/MainProcess] calling join() for process Process-3
[INFO/Process-3] process shutting down
[INFO/Process-3] process exiting with exitcode 0
[INFO/MainProcess] calling join() for process Process-2

Давайте пойдем дальше, и рассмотрим класс Pool поближе

Класс Pool

Класс Pool используется для показа пула рабочих процессов. Он включает в себя методы, которые позволяют вам разгружать задачи к рабочим процессам. Давайте посмотрим на простейший пример:

from multiprocessing import Pool
 
 
def doubler(number):
    return number * 2
 
 
if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)
    print(pool.map(doubler, numbers))

Здесь мы создали экземпляр Pool и указали ему создать три рабочих процесса. Далее мы используем метод map для отображения функции для каждого процесса. Наконец мы выводим результат, что в нашем случае является списком: [10, 20, 40]. Вы также можете получить результат вашего процесса в пуле, используя метод apply_async:

from multiprocessing import Pool
 
 
def doubler(number):
    return number * 2
 
 
if __name__ == '__main__':
    pool = Pool(processes=3)
    result = pool.apply_async(doubler, (25,))
    print(result.get(timeout=1))

Так мы можем запросить результат процесса. В этом суть работы функции get. Она пытается получить наши результаты. Обратите внимание на то, что мы также настроили обратный отсчет, на тот случай, если что-нибудь произойдет с вызываемой нами функцией. Мы не хотим, чтобы она была заблокирована.

Связь между процессами

Когда речь заходит о связи между процессами, модули нашего multiprocessing включают в себя два главных метода: Queue и Pipe. Работа Queue защищена как от процессов, так и от потоков. Давайте взглянем на достаточно простой пример:

from multiprocessing import Process, Queue
 
 
sentinel = -1
 
def creator(data, q):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    print('Creating data and putting it on the queue')
    for item in data:
        q.put(item)
 
 
def my_consumer(q):
    """
    Consumes some data and works on it
    In this case, all it does is double the input
    """
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
    
        processed = data * 2
        print(processed)
    
        if data is sentinel:
            break
 
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    
    process_one.start()
    process_two.start()
    
    q.close()
    q.join_thread()
    
    process_one.join()
    process_two.join()

Здесь нам только и нужно, что импортировать Process и Queue. Далее мы создаем две функции, одна для создания данных и добавления их в очередь, и вторая для использования данных и обработки их. Добавление данных в Queue выполняется при помощи метода put(), в то время как получение данных из Queue выполняется через метод get. Последний кусок кода только создает объект Queue и несколько экземпляров Process, после чего возвращает их. Обратите внимание на то, что мы вызываем join() в наших объектах process больше, чем Queue.

Подведем итоги

Здесь мы прошли через достаточно большое количество материала. Вы узнали много чего нового о модуле multiprocessing для направления обычных функций, связи между процессами при помощи Queue, наименований потоков и многого другого. Разумеется, в документации Python предоставлено намного больше развернутой информации, которую я даже не начинал затрагивать в данной статье, так что настоятельно рекомендую с ней ознакомиться. Тем не менее, вы все-таки узнали много чего о том, как усилить мощность обработки вашего компьютера при помощи Python!

Комментариев: 6
  1. Спасибо, такой код я применил когда создавал свой проект https://spin-city-online.com/azartnye-igry

  2. Андрей | 2019-06-26 в 20:21:32

    Просто копирую второй код и вставляю себе, не выводит ничего. Проверил с помощью принта - не заходит в функцию, выводит принт на всех этапах кроме принта в функции

  3. Отличная подборка, хорошо подойдет для онлайн игр.

  4. Спасибо за нормальные примеры.

    А то некоторые:"Вот давайте факториалы в классах считать", засоряют суть кода. Для новичка это просто катастрофа.

    Хотелось бы ещё узнать, как Multiprocessing относится к кроссплатформености, Android например.

  5. Сергей | 2020-05-05 в 16:44:23

    Реально помогло, подшаманил немного https://gpbet-casino.com/vulcan-casino - теперь еще удобней искать казино и крутить барабаны автоматов

  6. Всё бы хорошо, только не указали, что этот модуль не подходит для третьего питона