主要讲的是《Implementing Lock-Free Queues》的论点,具体直接看论文最好。这里总结些要点。
CAS就是Compare And Swap。gcc可以调用:
1 __sync_bool_compare_and_swap
这段代码讲出无锁的两个关键手段:
1 EnQueue(x) //进队列 2 { 3 //准备新加入的结点数据 4 q = new record(); 5 q->value = x; 6 q->next = NULL; 7 8 do { 9 p = tail; //取链表尾指针的快照10 } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试11 12 CAS(tail, p, q); //置尾结点13 }
一个就是CAS,一个就是Retry-Loop。
然后,还有一个ABA问题,就是在被抢占之前和之后数据地址没变,但是数据内容可能变了。解决思路就是加一个额外的状态来记录(也可以用记数)。
一个无锁队列的实现:
1 #include2 #include 3 #include 4 5 class CircleQueue { 6 public: 7 CircleQueue(int queue_count) { 8 this->queue_size_ = queue_count * sizeof(void*) + sizeof(unsigned long*) * 2; 9 this->queue_count_ = queue_count;10 shm_ = mmap(NULL, queue_size_, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS , -1, 0);11 memset(shm_, 0, queue_size_);12 head_ = (unsigned long*)shm_;13 tail_ = (unsigned long*)((char*)shm_ + sizeof(unsigned long));14 queue_ = (void**)((char*)shm_ + sizeof(unsigned long) * 2);15 }16 17 ~CircleQueue() {18 munmap(shm_, queue_size_);19 }20 21 int PushOneThread(void* ap) {22 if((int)(*head_ - *tail_) >= queue_count_) {23 return 1;//full24 }25 queue_[ *head_ % queue_count_ ] = ap;26 *head_ = *head_ + 1;27 return 0;28 }29 30 bool IsEmpty() {31 if(*head_ == *tail_) {32 return true;33 }34 return false;35 }36 37 int PopOneThread(void** ap) {38 if(*head_ == *tail_) {39 return 1;40 }41 42 if (ap == NULL) {43 return -1;44 }45 *ap = *(queue_ + *tail_ % queue_count_);46 *tail_ = *tail_ + 1;47 return 0;48 }49 50 int PopMultiThread(void** ap, int retry) {51 *ap = NULL;52 for(int i = 0; i < retry; ++i) {53 volatile unsigned long tail = *tail_;54 if(*head_ == tail) {55 return 1;56 }57 58 int ret = __sync_bool_compare_and_swap(tail_, tail, tail + 1);59 if(ret) {60 *ap = *( queue_ + (tail % queue_count_));61 return 0;62 }63 }64 return -1;65 }66 private:67 unsigned long* head_;68 unsigned long* tail_;69 void** queue_;70 void* shm_;71 int queue_size_;72 int queue_count_;73 };
这里的场景主要是一写多读。写的时候操作的是head,不需要加锁。读的时候,tail在改变的时候用上了CAS和retry-loop。 注意这里tail_只有在写的时候改变,而且只会往上加,所以不存在ABA问题。
p.s. 多进程通信的一个关键点,就是在于共享内存的时候,要确保所有的地址都是多进程共享的。所以这里存的指针,如果是多进程环境下,也应该是从共享内存中分配的。