Narzędzia użytkownika

Narzędzia witryny


aktywny_obiekt

Wzorzec Projektowy Aktywny Obiekt

autor: Krzysztof Rogala

Problem

Wyobraźmy sobie bazę danych (już istniejącą). Jedni użytkownicy mogą czytać z niej dane, a innni je zmieniać. Ci pierwsi (dalej: czytelnicy) są niezadowoleni, ponieważ czasem z bazy odczytują nieaktualne lub bezsensowne dane. Ci drudzy (dalej: pisarze) także są niezadowoleni, ponieważ podczas oczekiwania na wprowadzenie poprawek do bazy, nic nie mogą robić. Rozwiązaniem tego problemu jest synchronizacja dostępu. Nie może być tak, że jednocześnie ktoś pisze do bazy i z niej czyta. Preferowani w takiej sytuacji będą pisarze. Ponadto założyłem także, że na raz tylko jeden czytelnik korzysta z bazy. Co więcej, wprowadzamy rozdzielenie wywołania metody i jej wykonania, co pozwala pisarzom natychmiast przejść do innych zadań.

include'y i stałe globalne

#include <queue>
#include <ctime>
#include <cstdlib>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/xtime.hpp>
#include <iostream>
using namespace std; 
 
/*przydatne stałe, oraz mutex do synchronizacji wyjścia*/
const int SZ = 10;
enum {UP = 1, RD = 2};
boost::mutex io_mutex;

Budowa "aktywnego obiektu"

Wzorzec „aktywny obiekt” wykorzystuje następujące klasy: Proxy - klasa widoczna dla programisty, która zawiera interfejs obiektu Servant - klasa implementująca metody Scheduler - klasa odpowiedzialna za ustawienie oczekujących metod w kolejce i wyboru odpowiedniej do zdjęcia z kolejki Request - klasa bazowa reprezentująca żądanie wywołania metody. Dla każdej metody będziemy mieć klasę z niej dziedziczącą Kolejka - można ewentualnie stworzyć własną kolejkę do przechowywania żądań wywołania metody Future - klasa zawierająca wartości zwracane z funkcji, jeżeli jakieś są. Obiekt tej klasy jest natychmiast zwracany do wywołującego, który może później poczekać, aż pojawi się w nim faktyczna wartość zwrócona.

/*to jest nasz Servant*/
class Database;
/*proxy*/
class DBProxy;
/*planista*/
class Scheduler;
/*reprezentacja żądania wywołania metody*/
class Request;
/*dwie klasy reprezentujące żądania konkretnych metod*/
class Update;
class Read;
/*klasa opakowująca zwracaną wartość*/
class RetVal;
/*a to już klasy reprezentujące czytelnika i pisarza*/
class Writer;
class Reader;

Implementacja klas

Baza danych

Klasa reprezentująca faktyczną bazę danych, oraz implementująca metody na niej wywoływane

class Database {
public:
	Database() {
		for(int i=0; i<SZ; ++i) {
			base[i] = 0;
		}
	}
	void update(int num, int cell) {
		//na wypadek przekroczenia zasięgu
		base[cell%SZ] = num;
	}
	int read(int cell) {
		return base[cell%SZ];
	}
private:
	int base[SZ];
};

Wartości zwracane

Klasa ta opakowuje wartość int, zwracaną przez funkcję read. Umożliwia ona automatyczną lecz blkującą konwersję na int, lub na pobranie wartości za pomocą funkcji, jeżeli ta jest już gotowa.

class RetVal {
public:
	RetVal() {
		ready_ = 0;
		val = -1;
	}
	RetVal(const RetVal& r) {
		ready_ = r.ready_;
		val = r.val;
	}
	RetVal& operator=(const RetVal& r) {
		ready_ = r.ready_;
		val = r.val;
		return *this;
	}
	int getVal() const {
		return val;
	}
	/*funkcja ustawiająca wartość zwróconą*/
	void set(int num) {
		ready_ = 1;
		val = num;
	}
	/*funkja do sprawdzenia, czy wartośc została już przekazana*/
	int ready() const {
		return ready_;
	}
	/*blokujący operator rzutowania na int*/
	operator int() {
		while(!ready_) {
			;
		}
		return val;
	}
private:
	int ready_;
	int val;
};

Żądanie

Implementujemy żądanie wywołania metody. Zawiera ono funkcję guard służącą do zapewnienia spełnienia dodatkowych warunków synchronizacji, oraz call, wywołującą właściwą metodę

class Request {
public:
	/*w tym przykładzie funkcja guard nic nie będzie robić.
	Standardowo jednak może ona zawierać elementy synchronizacji
	np jeżeli chcielibyśmy blokować czytanie z pustej kolejki*/
	virtual bool guard() const = 0;
	/*funkcja ta wywołuje faktyczną metodę z bazy danych*/
	virtual void call() = 0;
};

I dwie klasy reprezentujące konkretne metody:

class Update : public Request {
public:
	Update(Database* d, int n, int c) : database(d), num(n), cell(c) {}
	bool guard() const {
		return true;
	}
	void call() {
		database->update(num, cell);
	}
public:
	Database *database;
	int num;
	int cell;
};
 
class Read : public Request {
public:
	Read(Database* d, RetVal* rv, int c) : database(d), cell(c), ret(rv) {}
	bool guard() const {
		return true;
	}
	/*tutaj jeszcze trzeba zapamiętać wynik*/
	void call() {
		int num = database->read(cell);
		ret->set(num);
	}
private:
	Database* database;
	int cell;
	RetVal* ret;
};

Planista

Planista odpowiadający za uszeregowanie metod, korzysta z wewnętrznej kolejki std::queue

class Scheduler {
public:
	Scheduler() : upQueue(new queue<Request*>()), rdQueue(new queue<Request*>), sem_u(new boost::mutex())
		, sem_r(new boost::mutex()) {
			thrd = new boost::thread(*this);
	}
	/*funkcja wstawiająca elementy do kolejki
	zauważmy, że jest tu mały problem producent-konsumer,
	tak więc potrzebna jest synchronizacja*/
	void enqueue(Request* req, int type) {
		if(type == UP) {
			boost::mutex::scoped_lock lock_i(*sem_u);
			upQueue->push(req);
		}
		else {
			boost::mutex::scoped_lock lock_i(*sem_r);
			rdQueue->push(req);
		}
	}
	/*ta funkcja działa w nieskończoność w osobnym wątku
	pisarz ma priorytet nad czytelnikiem, dlatego są dwie kolejki
	i kolejka z pisarzami obsługiwana jest jako pierwsza
	zakładam sekwencyjny dostęp do bazy danych również dla czytelników,
	więc zdejmuję ich z kolejki pojedynczo*/
	void dequeue() {
		int flag = 0;
		while(true) {
			flag = 0;
			/*tu powstaje dodatkowy zasięg, po to aby jak najszybciej zniszczyć obiekt lock_i*/
			{
				boost::mutex::scoped_lock lock_i(*sem_u);
				if(!upQueue->empty()) {
					flag = 1;
					Request* req = upQueue->front();
					upQueue->pop();
					req->call();
				}
			}
			if(!flag) {
				boost::mutex::scoped_lock lock_i(*sem_r);
				if(!rdQueue->empty()) {
					Request* req = rdQueue->front();
					rdQueue->pop();
					req->call();
				}
			}
		}
	}
	void operator()(void) {
		this->dequeue();
	}
private:
	queue<Request*>* upQueue;
	queue<Request*>* rdQueue;
	boost::mutex* sem_u;
	boost::mutex* sem_r;
	boost::thread* thrd;
	/*do konstruktora thread można podać obiekt funkcyjny,
	powstaje kopia danego obiektu, a ponieważ składowe są wskaźnikami
	to operujemy wciąż na tych samych kolejkach i semaforach*/
};

Proxy

Klasa będąca interfejsem dla programisty. Tworzy ona żądania wywołania do metod, a do wątku wywołującego metodę natychmiast zwraca wartość.

class DBProxy {
public:
	/*tutaj faktycznie powstają planista oraz baza danych*/
	DBProxy() : scheduler(new Scheduler()), database(new Database()) {}
	/*przy wywołaniu metody, tworzone jest żądanie i odkładane na kolejkę*/
	void update(int num, int cell) {
		Request* req = new Update(database, num, cell);
		scheduler->enqueue(req, UP);
	}
	/*tutaj dodatkowo powstaje obiekt przechowujący wartość zwracaną z funkcji*/
	RetVal& read(int cell) {
		RetVal* ret = new RetVal();
		Request* req = new Read(database, ret, cell);
		scheduler->enqueue(req, RD);
		return *ret;
	}
private:
	Database* database;
	Scheduler* scheduler;
};

Tworzymy instancję bazy danych, globalna i wspólną dla wszystkich

DBProxy base;

Przykład użycia

Pisarz i czytelnik

To prosta implementacja pisarza i czytelnika. Jedyne co robią to wpisują/czytają dane z bazy i spią. Można jednak wyobrazić sobie jak np pisarz wymyśla dane (np wyszukuje), a czytelnik jakoś odczytane dane przetwarza. Każdy będzie wywoływał się w osobnym wątku. Do podziału na wątki wykorzystałem boost::thread. Warto też zwrócić uwagę na utworzenie z wypisywania sekcji krytycznej i synchronizacji przy pomocy mutexa.

class Writer {
public:
	Writer(int n) : num(n) {
		thrd = new boost::thread(*this);
	}
	void update() {
		int val, cell;
		val = rand()%100 + 1;
		cell = (rand()%10 * num) % SZ;
		base.update(val, cell);
		boost::mutex::scoped_lock lock_i(io_mutex);
		cout << "wpisano " << val << " do " << cell << endl;
	}
	void join_() {
		thrd->join();
	}
	void operator()(void) {
		boost::xtime xt;
		while(true) {
			srand(time(0));
			this->update();
			boost::xtime_get(&xt, boost::TIME_UTC);
			xt.sec += rand()%10+1;
			boost::thread::sleep(xt);
		}
	}
private:
	boost::thread* thrd;
	int val;
	int num;
	int seed;
};
 
class Reader {
public:
	Reader(int s) : seed(s) {
		thrd = new boost::thread(*this);
	}
	void read() {
		int cell = rand()%3;
		/*można też poprosić o obiekt RetVal, aby nie blokować wątku
		wtedy można np co jakiś czas sprawdzać, czy wartość już jest
		ale wymaga to ręcznego niszczenia tych obiektów*/
		int result = base.read(cell);
		boost::mutex::scoped_lock lock_i(io_mutex);
		cout << "Odczytano: " << result << " z " << cell << endl;
	}
	void join_() {
		thrd->join();
	}
	void operator()(void) {
		boost::xtime xt;
		while(true) {
			boost::xtime_get(&xt, boost::TIME_UTC);
			xt.sec += rand()%2+1;
			boost::thread::sleep(xt);
			this->read();
		}
	}
private:
	boost::thread* thrd;
	int seed;
};

Do testowania tworzę dwóch pisarzy i trzech czytelników, którzy w swoich wątkach pracują na bazie danych. Występuje tutaj problem z liczbami losowymi - w każdym wątku będzie losowana ta sama liczba przy każdym kolejnym wywołaniu rand(). Możnaby tego uniknąć stosując funkcję rand_r(), ale rozwiązanie to jest nieprzenośne na system windows.

int main() {
	Writer w1(1), w2(2);
	Reader r1(srand(time(0)), r2(srand(time(0)), r3(srand(time(0));
	w1.join_();
	w2.join_();
	r1.join_();
	r2.join_();
	r3.join_();
	return 0;
}
aktywny_obiekt.txt · ostatnio zmienione: 2008/04/15 20:43 przez krogala