{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# **Paradygmaty programowania**\n", "## Materiały przygotowawcze do Laboratorium nr. 4\n", "\n", "**Temat**: Przetwarzanie równoległe i mechanizmy synchronizacji \n", "\n", "\n", "### Wprowadzenie\n", "\n", "Obliczenia równoległe jest to sposób wykonywania zadań obliczeniowych, w którym wiele instrukcji jest wykonywanych jednocześnie.\n", "\n", "W języku *Python* dostępne są następujęce mechanizmy wykonywania zadań równolegle:\n", "\n", "1. wątki – moduł **threading**, \n", "2. procesy – moduł **multiprocessing**,\n", "3. obliczenia rozproszone – np: moduł **ipyparallel**,\n", "4. programowanie asynchroniczne – moduł **asyncio** (wykonywanie, korordynacja oraz przełącznie zadań dokonuje się w ramach pojedynczego procesu) \n", "\n", "Na zajęciach laboratoryjnych będą Państwo wykorzystywali tylko trzy pierwsze mechanizmy. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Wątki\n", "\n", "Moduł **threading** ([dokumentacja](https://docs.python.org/3/library/threading.html#)) – dostarcza definicję klasy **Thread**, która zapewnia pełną funkcjonalność dla pojedynczego wątku.\n", "\n", "Aby móc korzystać z tej funkcjonalności, należy w pierwszym kroku zaimportować do skryptu odpowiednie klasy:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from threading import Thread" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Najprostszą metodą wskazania zadania do wykonania w ramach wątku jest:\n", "\n", "* zdefiniowanie funkcji, która będzie wykonana w ramach wątku,\n", "* przekazanie tej funkcji jako argumentu do konstruktora tworzonego obiektu nowego wątku.\n", "\n", "Zdefiniujmy przykładową funkcję:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import time\n", "def work(number):\n", " print (\"Thread number:\",number)\n", " current_time = time.time()\n", " while (time.time() < current_time+2):\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "teraz zdefiniujemy grupę wątków:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "threads = [Thread(target=work, args=(number,)) for number in range(5)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Spowoduje to utworzenie listy pięciu obiektów typu `Thread`. Każdemu z nich w momencie utworzenia zostanie \"przydzielona\" do wykonania funkcja `work` oraz lista argumentów tej funkcji – tutaj jednoelementowa, zawierająca numer wątku, czyli kolejną liczbę całkowitą z zakresu [0, 5).\n", "\n", "Dodatkowo, żeby zablokować główny wątek wywołujący do momentu zakończenia działania konkretnego wątku, należy wywołać metodę **join** na rzecz tego wątku" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Thread number: 0\n", "Thread number: 1\n", "Thread number: 2\n", "Thread number: 3\n", "Thread number: 4\n" ] } ], "source": [ "for thread in threads:\n", " thread.start()\n", "for thread in threads:\n", " thread.join()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "> Uwaga:\n", "\n", "> W najpowszechniej używanym interpreterze Pythona *Cpython*, z powodu istnienia mechanizmu [Global Interpreter Lock](https://docs.python.org/3/glossary.html#term-global-interpreter-lock), tylko jeden wątek może wykonywać Pythonowy `bytecode`. Dlatego też mechanizm ten (w przypadku Pythona) używany jest głównie do zrównoleglania operacji I/O (wejścia/wyjscia).\n", "\n", "Ponieważ wątki współdzielą pamięć, w przypadku wykorzystania mechanizmu wątków pojawia się problem synchronizacji.\n", "\n", "Poniższy skrypt obrazuje zjawisko tzw \"wyścigów\":\n", "\n", "* w linii nr. 4 zdefiniowana została zmienna globalna **data** – wspólna dla wszystkich wątków\n", "* funkcja **work** wykonuje 100000 razy inkrementację zmiennej globalnej\n", "* funkcja **main** - zeruje wartość zmiennej globalnej i uruchamia cztery wątki wykonujace zadanie zdefiniowane w funkcji **work**\n", "* głowne ciało funkcji wykonuje dziesięć razy funkcję **main** i wyświetla końcową wartość zmiennej globalnej **data**\n", "\n", "Tip: Numerację linii kodu można włączyć naciskająć kombinację: `Shift+l`" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Iteration 1: x = 311324\n", "Iteration 2: x = 400000\n", "Iteration 3: x = 349781\n", "Iteration 4: x = 327742\n", "Iteration 5: x = 400000\n", "Iteration 6: x = 368896\n", "Iteration 7: x = 333892\n", "Iteration 8: x = 364395\n", "Iteration 9: x = 259823\n", "Iteration 10: x = 365273\n" ] } ], "source": [ "from threading import Thread \n", "\n", "# global variable \n", "data = 0\n", "\n", "def work():\n", " global data \n", " for _ in range(100000):\n", " data +=1\n", "\n", "def main(): \n", " global data \n", " data = 0\n", " threads=[Thread(target=work) for _ in range(4)]\n", " for thread in threads:\n", " thread.start() \n", " for thread in threads:\n", " thread.join() \n", "\n", "if __name__ == \"__main__\": \n", " for i in range(10): \n", " main() \n", " print(\"Iteration {0}: x = {1}\".format(i+1,data)) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Jak widać, końcowa wartość zmiennej globalnej (dla każdej iteracji) nie zawsze równa się oczekiwanej wartości (4 x 100000=>400000), tylko przyjmuje \"różne wartości\". Zjawisko to wynika z braku synchronizacji dostępu do zmiennej globalnej.\n", "\n", "Po to, by wyeliminować to zjawisko, należy zastosować jakiś mechanizm synchronizacji. Najprostrzym jest tzw. lock.\n", "\n", "W kolejnym skrypcie w wierszu nr 5 zdefiniowany został – wspólny dla wszystkich wątków – **obiekt blokady** typu `Lock`. Jest on czymś w rodzaju pałeczki w sztafecie, w której zawodnikami są wątki. Jeśli obiekt blokady jest dostępny, pierwszy wątek, który wywoła jego metodę `acquire`, przejmuje go na wyłączność (wiersz 9). Późniejsze wywołania metody `acquire` przez inne wątki powodują wstrzymanie ich pracy do momentu, w którym obiekt blokady zostanie zwolniony przez posiadający go wątek. Służy do tego metoda `release` (wiersz 11).\"\n", "\n", "Dzięki temu fragment kodu realizujący inkrementację zmiennej globalnej jest realizowany zawsze tylko przez **jeden** wątek." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Iteration 1: x = 400000\n", "Iteration 2: x = 400000\n", "Iteration 3: x = 400000\n", "Iteration 4: x = 400000\n", "Iteration 5: x = 400000\n", "Iteration 6: x = 400000\n", "Iteration 7: x = 400000\n", "Iteration 8: x = 400000\n", "Iteration 9: x = 400000\n", "Iteration 10: x = 400000\n" ] } ], "source": [ "from threading import Thread, Lock\n", "\n", "# global variable \n", "data = 0\n", "lock = Lock()\n", "def work():\n", " global data\n", " for _ in range(100000):\n", " lock.acquire()\n", " data +=1\n", " lock.release()\n", "\n", "def main(): \n", " global data \n", " data = 0\n", " threads=[Thread(target=work) for _ in range(4)]\n", " for thread in threads:\n", " thread.start() \n", " for thread in threads:\n", " thread.join() \n", "\n", "if __name__ == \"__main__\": \n", " for i in range(10): \n", " main() \n", " print(\"Iteration {0}: x = {1}\".format(i+1,data)) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Multiprocessing\n", "\n", "Obliczenia równoległe można realizować także poprzez uruchamianie wielu procesów. Będą one działały niezależnie od siebie (nie współdzielą obszarów pamięci) i dlatego, aby zapewnić współpracę między nimi, stosuje się komunikację [IPC](https://pl.qaz.wiki/wiki/Inter-process_communication).\n", "\n", "W języku *Python* dostępny jest pakiet *processing* ([dokumentacja](https://docs.python.org/3/library/multiprocessing.html)), który udostępnia funkcjonalność uruchamiania oddzielnych procesów, a jego **API** jest bardzo podobne do tego używanego w ramach modułu *threading*.\n", "\n", "Aby móc korzystać z tej funkcjonalności, należy w pierwszym kroku zaimportować do skryptu pdpowiednie klasy:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "from multiprocessing import Process" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "najprostszą metodą wskazania zadania do wykonania w ramach procesu jest:\n", "\n", "* zdefiniowanie funkcji, która będzie wykonana w ramach procesu,\n", "* przekazanie tej funkcji jako argumentu do konstruktora tworzonego obiektu nowego procesu.\n", "\n", "Zdefiniujmy przykładową funkcję:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "def work(name):\n", " print(\"Process:\",name,\"; Process id:\",os.getpid(), \"; Parent process id:\",os.getppid())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "będzie ona wyświetlała kolejno: *numer procesu*, jego *process id*, oraz *process id* jego \"rodzica\".\n", "\n", "Teraz zdefiniujemy grupę procesów:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "processes=[Process(target=work, args=(i,)) for i in range(3)]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Następnie – podobnie jak w przypadku wątków – dla każdego obiektu procesu należy wywołać metodę **start**. \n", "Dodatkowo, żeby zablokować główny wątek wywołujący do momentu zakończenia działania wszystkich potomnych procesów, należy wywołać metodę **join** (dla każdego procesu)." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Process: 0 ; Process id: 34303 ; Parent process id: 34151\n", "Process: 1 ; Process id: 34306 ; Parent process id: 34151\n", "Process: 2 ; Process id: 34309 ; Parent process id: 34151\n" ] } ], "source": [ "for process in processes:\n", " process.start() \n", "for process in processes:\n", " process.join() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Jednym ze sposobów komunikacji międzyprocesowej (IPC) jest wykorzystanie bezpiecznych kolejek **FIFO** lub **LIFO** (są one bezpieczne w tym sensie, że w danej chwili tylko jeden proces może modyfikować zawartość kolejki). Pakiet *multiprocessing* dostarcza definicję klas *Queue* (FIFO), *LifoQueue* oraz *PriorityQueue*.\n", "\n", "Udostępniają one API, które składa się miedzy innnymi z następujących metod:\n", "\n", "* *put()* – wkłada dowolny obiekt do kolejki,\n", "* *get()* – usuwa z kolejki i zwraca dostępny obiekt.\n", "\n", "Kolejny skrypt zademonstruje sposób współpracy między wątkami:\n", "\n", "Mamy w nim dwa obiekty – procesy producentów, które wkładają dane do kolejki **FIFO** oraz dwa obiekty – procesy konsumentów, które odczytują dane. Kolejka ma maksymalny rozmiar **3**, a każdy producent ma wyprodukować po **5** danych (tutaj musi zajść synchronizacja działań procesów). Do kolejki można wstawiać dowolne obiekty – w tym przypadku wstawiane są listy dwuelementowe, zawierające nazwę producenta oraz numer wytworzonego przez niego obiektu. W celu zasygnalizowania procesowi konsumenta o zakończeniu generacji danych przez producenta, do kolejki wstawiany jest obiekt listy, której pierwszym elementem jest ciąg znaków **END** (takie rozwiązanie wymusza by liczba producentów i konsumentów była identyczna).\n" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "producer:1 putting data 0 ; queue size before put operation = 0\n", "END OF MAIN\n", "producer:1 putting data 1 ; queue size before put operation = 1\n", "producer:2 putting data 0 ; queue size before put operation = 0\n", "producer:1 putting data 2 ; queue size before put operation = 2\n", "producer:2 putting data 1 ; queue size before put operation = 1\n", "consumer: 2 getting data: 0 from producer:2 ; queue size after get operation = 1\n", "producer:2 putting data 2 ; queue size before put operation = 2\n", "consumer: 2 getting data: 1 from producer:1 ; queue size after get operation = 1\n", "producer:2 putting data 3 ; queue size before put operation = 2\n", "consumer: 2 getting data: 1 from producer:2 ; queue size after get operation = 1\n", "consumer: 1 getting data: 0 from producer:1 ; queue size after get operation = 0\n", "producer:2 putting data 4 ; queue size before put operation = 2\n", "consumer: 2 getting data: 2 from producer:2 ; queue size after get operation = 1\n", "consumer: 2 getting data: 3 from producer:2 ; queue size after get operation = 2\n", "consumer: 1 getting data: 4 from producer:2 ; queue size after get operation = 1\n", "END signal from producer: 2\n", "producer:1 putting data 4 ; queue size before put operation = 1\n", "consumer: 1 getting data: 2 from producer:1 ; queue size after get operation = 0\n", "producer:1 putting data 3 ; queue size before put operation = 1\n", "consumer: 1 getting data: 3 from producer:1 ; queue size after get operation = 0\n", "consumer: 1 getting data: 4 from producer:1 ; queue size after get operation = 1\n", "END signal from producer: 1\n" ] } ], "source": [ "from multiprocessing import Process, Queue\n", "\n", "def producer(name,q):\n", " for i in range(5):\n", " item=[\"producer:\"+str(name),i] \n", " print(item[0],\"putting data\",item[1],\"; queue size before put operation =\",q.qsize())\n", " q.put(item)\n", " item=[\"END\",name]\n", " q.put(item)\n", "\n", "def consumer(name,q):\n", " while True:\n", " item=q.get()\n", " if item[0]==\"END\":\n", " print(\"END signal from producer:\",item[1])\n", " break\n", " else:\n", " print (\"consumer:\",name,\"getting data:\",item[1], \"from\",item[0],\"; queue size after get operation =\",q.qsize())\n", " \n", "def main():\n", " queue=Queue(3)\n", " producers=[Process(target=producer, args=(i+1,queue)) for i in range(2)]\n", " consumers=[Process(target=consumer, args=(i+1,queue)) for i in range(2)]\n", " for producer_process in producers:\n", " producer_process.start()\n", " for consumer_process in consumers:\n", " consumer_process.start()\n", " print (\"END OF MAIN\")\n", " \n", "if __name__ == \"__main__\": \n", " main() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Proszę zauważyć, że w funkcji **main** nie ma wywołania metod **join** dla poszczególnych procesów producentów i konsumentów, w związku z czym główny proces skryptu kończy swoje działanie **przed** zakończeniem działania procesów potomnych." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Pule procesów\n", "\n", "Istnieje grupa problemów obliczeniowych dla których ich zrównoleglenie nie wymaga współdzielenia obszarów pamięci jak i komunikacji pomiędzy poszczególnymi zadaniami obliczeniowymi.\n", "\n", "Przykładowo problem numerycznego obliczania całek oznaczonych, należy do tej grupy z uwagi na własność:\n", "\n", "$$I=\\int_a^b f(x)dx= \\int_a^k f(x)dx + \\int_k^b f(x)dx, \\; i \\; a