autor: Krzysztof Rogala
Wyobraźmy sobie bazę danych (już istniejącą). Jedni użytkownicy mogą czytać z niej dane, a innni je zmieniać. Ci pierwsi (dalej: czytelnicy) są niezadowoleni, ponieważ czasem z bazy odczytują nieaktualne lub bezsensowne dane. Ci drudzy (dalej: pisarze) także są niezadowoleni, ponieważ podczas oczekiwania na wprowadzenie poprawek do bazy, nic nie mogą robić. Rozwiązaniem tego problemu jest synchronizacja dostępu. Nie może być tak, że jednocześnie ktoś pisze do bazy i z niej czyta. Preferowani w takiej sytuacji będą pisarze. Ponadto założyłem także, że na raz tylko jeden czytelnik korzysta z bazy. Co więcej, wprowadzamy rozdzielenie wywołania metody i jej wykonania, co pozwala pisarzom natychmiast przejść do innych zadań.
#include <queue> #include <ctime> #include <cstdlib> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/xtime.hpp> #include <iostream> using namespace std; /*przydatne stałe, oraz mutex do synchronizacji wyjścia*/ const int SZ = 10; enum {UP = 1, RD = 2}; boost::mutex io_mutex;
Wzorzec „aktywny obiekt” wykorzystuje następujące klasy: Proxy - klasa widoczna dla programisty, która zawiera interfejs obiektu Servant - klasa implementująca metody Scheduler - klasa odpowiedzialna za ustawienie oczekujących metod w kolejce i wyboru odpowiedniej do zdjęcia z kolejki Request - klasa bazowa reprezentująca żądanie wywołania metody. Dla każdej metody będziemy mieć klasę z niej dziedziczącą Kolejka - można ewentualnie stworzyć własną kolejkę do przechowywania żądań wywołania metody Future - klasa zawierająca wartości zwracane z funkcji, jeżeli jakieś są. Obiekt tej klasy jest natychmiast zwracany do wywołującego, który może później poczekać, aż pojawi się w nim faktyczna wartość zwrócona.
/*to jest nasz Servant*/ class Database; /*proxy*/ class DBProxy; /*planista*/ class Scheduler; /*reprezentacja żądania wywołania metody*/ class Request; /*dwie klasy reprezentujące żądania konkretnych metod*/ class Update; class Read; /*klasa opakowująca zwracaną wartość*/ class RetVal; /*a to już klasy reprezentujące czytelnika i pisarza*/ class Writer; class Reader;
Klasa reprezentująca faktyczną bazę danych, oraz implementująca metody na niej wywoływane
class Database { public: Database() { for(int i=0; i<SZ; ++i) { base[i] = 0; } } void update(int num, int cell) { //na wypadek przekroczenia zasięgu base[cell%SZ] = num; } int read(int cell) { return base[cell%SZ]; } private: int base[SZ]; };
Klasa ta opakowuje wartość int, zwracaną przez funkcję read. Umożliwia ona automatyczną lecz blkującą konwersję na int, lub na pobranie wartości za pomocą funkcji, jeżeli ta jest już gotowa.
class RetVal { public: RetVal() { ready_ = 0; val = -1; } RetVal(const RetVal& r) { ready_ = r.ready_; val = r.val; } RetVal& operator=(const RetVal& r) { ready_ = r.ready_; val = r.val; return *this; } int getVal() const { return val; } /*funkcja ustawiająca wartość zwróconą*/ void set(int num) { ready_ = 1; val = num; } /*funkja do sprawdzenia, czy wartośc została już przekazana*/ int ready() const { return ready_; } /*blokujący operator rzutowania na int*/ operator int() { while(!ready_) { ; } return val; } private: int ready_; int val; };
Implementujemy żądanie wywołania metody. Zawiera ono funkcję guard służącą do zapewnienia spełnienia dodatkowych warunków synchronizacji, oraz call, wywołującą właściwą metodę
class Request { public: /*w tym przykładzie funkcja guard nic nie będzie robić. Standardowo jednak może ona zawierać elementy synchronizacji np jeżeli chcielibyśmy blokować czytanie z pustej kolejki*/ virtual bool guard() const = 0; /*funkcja ta wywołuje faktyczną metodę z bazy danych*/ virtual void call() = 0; };
I dwie klasy reprezentujące konkretne metody:
class Update : public Request { public: Update(Database* d, int n, int c) : database(d), num(n), cell(c) {} bool guard() const { return true; } void call() { database->update(num, cell); } public: Database *database; int num; int cell; }; class Read : public Request { public: Read(Database* d, RetVal* rv, int c) : database(d), cell(c), ret(rv) {} bool guard() const { return true; } /*tutaj jeszcze trzeba zapamiętać wynik*/ void call() { int num = database->read(cell); ret->set(num); } private: Database* database; int cell; RetVal* ret; };
Planista odpowiadający za uszeregowanie metod, korzysta z wewnętrznej kolejki std::queue
class Scheduler { public: Scheduler() : upQueue(new queue<Request*>()), rdQueue(new queue<Request*>), sem_u(new boost::mutex()) , sem_r(new boost::mutex()) { thrd = new boost::thread(*this); } /*funkcja wstawiająca elementy do kolejki zauważmy, że jest tu mały problem producent-konsumer, tak więc potrzebna jest synchronizacja*/ void enqueue(Request* req, int type) { if(type == UP) { boost::mutex::scoped_lock lock_i(*sem_u); upQueue->push(req); } else { boost::mutex::scoped_lock lock_i(*sem_r); rdQueue->push(req); } } /*ta funkcja działa w nieskończoność w osobnym wątku pisarz ma priorytet nad czytelnikiem, dlatego są dwie kolejki i kolejka z pisarzami obsługiwana jest jako pierwsza zakładam sekwencyjny dostęp do bazy danych również dla czytelników, więc zdejmuję ich z kolejki pojedynczo*/ void dequeue() { int flag = 0; while(true) { flag = 0; /*tu powstaje dodatkowy zasięg, po to aby jak najszybciej zniszczyć obiekt lock_i*/ { boost::mutex::scoped_lock lock_i(*sem_u); if(!upQueue->empty()) { flag = 1; Request* req = upQueue->front(); upQueue->pop(); req->call(); } } if(!flag) { boost::mutex::scoped_lock lock_i(*sem_r); if(!rdQueue->empty()) { Request* req = rdQueue->front(); rdQueue->pop(); req->call(); } } } } void operator()(void) { this->dequeue(); } private: queue<Request*>* upQueue; queue<Request*>* rdQueue; boost::mutex* sem_u; boost::mutex* sem_r; boost::thread* thrd; /*do konstruktora thread można podać obiekt funkcyjny, powstaje kopia danego obiektu, a ponieważ składowe są wskaźnikami to operujemy wciąż na tych samych kolejkach i semaforach*/ };
Klasa będąca interfejsem dla programisty. Tworzy ona żądania wywołania do metod, a do wątku wywołującego metodę natychmiast zwraca wartość.
class DBProxy { public: /*tutaj faktycznie powstają planista oraz baza danych*/ DBProxy() : scheduler(new Scheduler()), database(new Database()) {} /*przy wywołaniu metody, tworzone jest żądanie i odkładane na kolejkę*/ void update(int num, int cell) { Request* req = new Update(database, num, cell); scheduler->enqueue(req, UP); } /*tutaj dodatkowo powstaje obiekt przechowujący wartość zwracaną z funkcji*/ RetVal& read(int cell) { RetVal* ret = new RetVal(); Request* req = new Read(database, ret, cell); scheduler->enqueue(req, RD); return *ret; } private: Database* database; Scheduler* scheduler; };
Tworzymy instancję bazy danych, globalna i wspólną dla wszystkich
DBProxy base;
To prosta implementacja pisarza i czytelnika. Jedyne co robią to wpisują/czytają dane z bazy i spią. Można jednak wyobrazić sobie jak np pisarz wymyśla dane (np wyszukuje), a czytelnik jakoś odczytane dane przetwarza. Każdy będzie wywoływał się w osobnym wątku. Do podziału na wątki wykorzystałem boost::thread. Warto też zwrócić uwagę na utworzenie z wypisywania sekcji krytycznej i synchronizacji przy pomocy mutexa.
class Writer { public: Writer(int n) : num(n) { thrd = new boost::thread(*this); } void update() { int val, cell; val = rand()%100 + 1; cell = (rand()%10 * num) % SZ; base.update(val, cell); boost::mutex::scoped_lock lock_i(io_mutex); cout << "wpisano " << val << " do " << cell << endl; } void join_() { thrd->join(); } void operator()(void) { boost::xtime xt; while(true) { srand(time(0)); this->update(); boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += rand()%10+1; boost::thread::sleep(xt); } } private: boost::thread* thrd; int val; int num; int seed; }; class Reader { public: Reader(int s) : seed(s) { thrd = new boost::thread(*this); } void read() { int cell = rand()%3; /*można też poprosić o obiekt RetVal, aby nie blokować wątku wtedy można np co jakiś czas sprawdzać, czy wartość już jest ale wymaga to ręcznego niszczenia tych obiektów*/ int result = base.read(cell); boost::mutex::scoped_lock lock_i(io_mutex); cout << "Odczytano: " << result << " z " << cell << endl; } void join_() { thrd->join(); } void operator()(void) { boost::xtime xt; while(true) { boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += rand()%2+1; boost::thread::sleep(xt); this->read(); } } private: boost::thread* thrd; int seed; };
Do testowania tworzę dwóch pisarzy i trzech czytelników, którzy w swoich wątkach pracują na bazie danych. Występuje tutaj problem z liczbami losowymi - w każdym wątku będzie losowana ta sama liczba przy każdym kolejnym wywołaniu rand(). Możnaby tego uniknąć stosując funkcję rand_r(), ale rozwiązanie to jest nieprzenośne na system windows.
int main() { Writer w1(1), w2(2); Reader r1(srand(time(0)), r2(srand(time(0)), r3(srand(time(0)); w1.join_(); w2.join_(); r1.join_(); r2.join_(); r3.join_(); return 0; }