高并发服务器经编程,需要操作共享数据,为了保护数据的正确性,有一种有效的方法就是加锁机制,但这种方式存在以下一些缺点:
在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题 一个线程持有锁会导致其他所有需要此锁的线程挂起为了解决多线程并行情况下使用锁造成性能损耗的问题。 引入CAS 操作:该包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。
实现一、
#include <iostream> #include <string> #include <vector> #include <thread> #include <cstring> #include <cstdlib> #include <cstdio> #include <assert.h> using namespace std; template <typename T> class stack { public: stack():top(NULL){} void push(const T& data); T pop(); bool empty()const; private: struct Node{ T data; Node* next; Node(const T& d) : data(d), next(NULL) { } }; Node *top; }; template <typename T> void stack<T>::push(const T& data) { Node *n = new Node(data); while (1) { n->next = top; if (__sync_bool_compare_and_swap(&top, n->next, n)) { // CAS break; } } } template <typename T> T stack<T>::pop( ) { while (1) { Node* oldval = top; assert(oldval != NULL); if (top && __sync_bool_compare_and_swap(&top, oldval, oldval->next)) { // CAS return oldval->data; } } } template <class T> bool stack<T>::empty()const{ if (top == NULL) { return true; } return false; } class people{ public: char name[20]; int age; unsigned int tid; }; stack<people> ss; void pthread_func(){ people tmp; int age; for(int i = 0; i < 100; ++i){ age = rand() % 100; memset(tmp.name, 0, 20); sprintf(tmp.name, "xm_%d", age); tmp.age = age; tmp.tid = pthread_self(); ss.push(tmp); } return ; } int main(int argc, char* argv[]) { std::vector<std::thread> ths; for (int i=0; i<5; i++) ths.push_back(std::thread(pthread_func)); for (int i=0; i<5; i++) ths[i].join(); int count = 0; while(!ss.empty()){ count++; people tmp = ss.pop(); cout<<"tid : "<<tmp.tid<<",name : "<<tmp.name<<", age:"<<tmp.age<<endl; } cout<<count<<endl; }实现二、
#include <iostream> #include <cstring> #include <cstdlib> #include <string> #include <cstdio> #include <assert.h> #include <pthread.h> #include <unistd.h> using namespace std; template <class T> class stack{ public: stack(int size = 10) : mtop(0), msize(size){ mdata = new T[msize]; } ~stack(){ delete[]mdata; } void push(T& val); void pop(); bool empty()const; T& top(); bool full()const; private: T* mdata; int msize; int mtop; }; template <class T> void stack<T>::push(T& val){ T* ptmp = mdata; /* ptmp 暂存 mdata*/ while (1){ while (mtop >= msize){ // 多线程自旋 if (ptmp != nullptr){ if (mtop == msize) {// 防止多次扩容 if (__sync_bool_compare_and_swap(&mdata, ptmp, nullptr)){ T* pChange = new T[msize * 2]; for (int i = 0; i < mtop; ++i){ pChange[i] = ptmp[i]; } delete ptmp; mdata = pChange; msize *= 2; } } } } do{ int tmp = mtop; // 循环拿到最新的栈顶元素 if (mtop >= msize){ // 多线程同时进行push,某个线程push后栈满 break; } if (__sync_bool_compare_and_swap(&mtop, tmp, tmp + 1)) {// CAS memcpy(&mdata[tmp], &val, sizeof(T)); return; // push over } } while (1); // 自旋 } } template <class T> void stack<T>::pop(){ if (empty()){ return; } do{ int tmp = mtop; if (tmp == 0){ break; } if (__sync_bool_compare_and_swap(&mtop, tmp, tmp - 1)){// CAS ,满足条件将mtop更新为tmp - 1 return; } } while (1); } template <class T> bool stack<T>::empty()const{ return mtop == 0; } template <class T> bool stack<T>::full()const{ return mtop == msize; } template <class T> T& stack<T>::top(){ assert(!empty()); return mdata[mtop - 1]; } class people{ public: char name[20]; int age; unsigned int tid; }; stack<people> ss; void *pthread_func(void *arg){ pthread_detach(pthread_self()); people tmp; int age; for(int i = 0; i < 100; ++i){ age = rand() % 100; memset(tmp.name, 0, 20); sprintf(tmp.name, "xm_%d", age); tmp.age = age; tmp.tid = pthread_self(); ss.push(tmp); } } int main(int argc, char* argv[]) { pthread_t tidp; for(int i = 0; i < 5; i++) { //创建线程pthread if ((pthread_create(&tidp, NULL, pthread_func, NULL)) == -1) { printf("create error!\n"); return 1; } } sleep(3); int count = 0; while(!ss.empty()){ count++; cout<<"tid : "<<ss.top().tid<<",name : "<<ss.top().name<<", age:"<<ss.top().age<<endl; ss.pop(); } cout<<count<<endl; }