Alex正在电脑前面作冥思苦想状,这时Tony悄悄地走到Alex的身后,观察了一会儿…

Tony : 看来今天我们要讨论同步问题了。
Alex : (惊奇地回头)。Hey Man , you scared me! 你说的没错,我正在学习同步这一块儿呢,有什么高见不妨说出来吧,我洗耳恭听!
Tony : 不敢不敢。关于进程和线程同步的问题,W. Richard Stevens在他的那本经典的“UNIX Network Programming Volume 2”中有过详尽的讲解,你不妨仔细阅读一下。
Alex : 远水解不了近渴。你还是大概跟我说说吧!
Tony : OK, 我们就拿一个最简单例子来探讨一下吧。在拿出例子之前我们来回顾一下同步的由来。Alex你说说为什么要同步呢?
Alex : 有共享就要同步,就好比超市的POS,如果没有好的同步顾客活动的策略,那超市不就乱了套了么,大家都争着抢着去结账。
Tony : 嗯,没错。mess world is not what we need! 互斥和条件变量是我们经常使用的同步手段,当然更高级的还有信号灯等。
Alex : 逐一说明吧,看来今天又会有不小的收获^_^
Tony : 历史上有个特别有名的问题叫做“生产者-消费者”问题,又叫“有限缓冲区”问题,我们今天的例子大约就是这个样子的。
Alex : (入迷的样子)
Tony : 我们的例子是这样的,我们有“生产者”和“消费者”两个角色,他们共享某一整型变量,规定如下:
       1)生产者发现产品已经被消费了,便生产,即将该共享变量置为1;
       2)消费者发现有产品了,便消费,即将该共享变量置为0;
       很简单吧。我们还是用老办法,由简入难,我们可以使用最简单的手段“互斥锁”来完成这个任务。
Alex : 我知道“互斥锁”,但是了解得并不深,先讲讲理论把!
Tony : 互斥,顾名思义互相排斥,它是最基本的同步手段,一般用来保护“临界区”,“临界区”是一段代码,看起来互斥保护了临界区这段代码的,实质上互斥保护的是“临界区”中被操纵的数据。
Alex : 互斥是不是即可用于线程,也可以用于进程呢?
Tony : 都可以,在我们的例子中我们使用线程,因为线程间共享一个数据空间,实现起来比较容易;进程间要想共享数据就需要额外的支持,比如共享内存等。
Alex : 噢。
Tony : 我们开始吧,按照例子中所述我们应该有两个线程,分别代表生产者和消费者。按照W. Richard Stevens的指导,我们将我们的互斥锁和我们的共享数据放在一个结构体内。

//数据结构定义
#define MAX_COUNT 100

typedef struct sharedata_t{
 pthread_mutex_t lock;
 int val;
}sharedata_t;

sharedata_t shared = {PTHREAD_MUTEX_INITIALIZER};

//主函数
int main(){
 pthread_t producer;
 pthread_t consumer;

 pthread_create(&producer, NULL, produce, NULL);
 pthread_create(&consumer, NULL, consume, NULL);

 pthread_join(producer, NULL);
 pthread_join(consumer, NULL);

 return 0;
}
这些都很简单,关键的是produce和consume两个线程执行函数。
Alex : 如前面所说,produce和consume在访问shared时候一定要先对lock上锁。
Tony : 没错,在任意时刻都只有一个线程在操纵shared变量。代码如下:

void *produce(void *arg){
 int count = 0;
 for( ; ; ){
  pthread_mutex_lock(&shared.lock);
  if(shared.val == 1){//如果已经生产了我就不生产了,直到消费者消费掉
   pthread_mutex_unlock(&shared.lock);
   continue;
  }
  shared.val = 1;
  count++;
  if(count > MAX_COUNT){
   pthread_mutex_unlock(&shared.lock);
   break;
  }
  printf("the %d th produce\n", count);//线程共享进程stdout缓冲区,如果不加以保护,就会被另一个线程的输出刷新
  pthread_mutex_unlock(&shared.lock);
 }
}

void *consume(void *arg){
 int count = 0;
 for( ; ; ){
  pthread_mutex_lock(&shared.lock);
  if(shared.val == 0){//如果还没生产呢,我就暂时不能消费
   pthread_mutex_unlock(&shared.lock);
   continue;
  }
  shared.val = 0;
  count++;
  if(count > MAX_COUNT){
   pthread_mutex_unlock(&shared.lock);
   break;
  }
  printf("the %d th consume\n", count);
  pthread_mutex_unlock(&shared.lock);
 }
}

Tony : 现在这个程序是正确的,但是却不是理想的,他的输出结果肯定是如下的:
the 1 th produce
the 1 th consume
the 2 th produce
the 2 th consume

the 100 th produce
the 100 th consume

Alex : 是producer和consumer交替对吧。
Tony : 没错!运行一下,你感觉如何呢?
Alex : 好像有些慢!我觉得produce和consume两个函数中关于shared.val的值的轮转测试是比较耗时的,而且每次测试前后都要上锁、解锁。
Tony : 说的没错!像这样的轮询是极其浪费CPU时间的。我们不是没有办法解决的,我们可以利用条件变量的方式来解决它,不过针对这个例子来说,使用条件变量从代码上看理解起来就会有些困难了。
Alex : 继续说!
Tony : 条件变量提供一种等待-唤醒机制,可以这样理解如果消费者发现没有产品,它并不继续轮训,而是睡眠,直到生产者生产出产品,并将之唤醒消费。反过来说也一样,那就是如果生产者发现生产出来的产品还没有被消费者消费掉,就同样睡眠,直到消费者将产品消费掉,并将生产者唤醒生产,这样就省下了大量的CPU时间,性能提升可不是一点半点的。不过我们实现起来的时候要更加小心。
Alex : Just go on!
Tony : 要想使用条件变量,我们还需要定一个结构,用来存放我们的条件。

typedef struct conddata_t{
 pthread_mutex_t lock;
 pthread_cond_t cond;
 int ready;
}conddata_t;

conddata_t condd = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0};

主函数无需改变,修改后的produce和consume如下,由于不好理解,所以我讲上了不少的注释,可以帮助理解。

void *produce(void *arg){
 int count = 0;

 for( ; ; ){
  pthread_mutex_lock(&condd.lock);
  while(condd.ready == 1)//已生产,还没消费完,此时producer休眠
   pthread_cond_wait(&condd.cond, &condd.lock);
  condd.ready = 0;//消费完,还没生产呢,此时consumer休眠
  pthread_mutex_unlock(&condd.lock);

  pthread_mutex_lock(&shared.lock);
  shared.val = 1;
  count++;
  if(count >= MAX_COUNT){
   pthread_mutex_unlock(&shared.lock);
   condd.ready = 1;//生产者生产完最后一个后退出了,告诉消费者
   pthread_cond_signal(&condd.cond);//告诉消费者可以消费最后一个了
   break;
  }
  printf("the %d th produce\n", count);//线程共享进程stdout缓冲区,如果不加以保护,就会被另一个线程的输出刷新
  pthread_mutex_unlock(&shared.lock);

  pthread_mutex_lock(&condd.lock);
  condd.ready = 1;//生产完了,等待消费
  pthread_mutex_unlock(&condd.lock);
  pthread_cond_signal(&condd.cond);//告诉消费者可以消费了
 }
}

void *consume(void *arg){
 int count = 0;
 for( ; ; ){
  pthread_mutex_lock(&condd.lock);
  while(condd.ready == 0)//没有东西可以消费,消费者休眠
   pthread_cond_wait(&condd.cond, &condd.lock);
  condd.ready = 1;//这在消费,请生产者等待,生产者休眠
  pthread_mutex_unlock(&condd.lock);

  pthread_mutex_lock(&shared.lock);
  shared.val = 0;
  count++;
  if(count >= MAX_COUNT){
   pthread_mutex_unlock(&shared.lock);
   break;
  }
  printf("the %d th consume\n", count);
  pthread_mutex_unlock(&shared.lock);

  pthread_mutex_lock(&condd.lock);
  condd.ready = 0;//告诉生产者消费完了,该生产了
  pthread_mutex_unlock(&condd.lock);
  pthread_cond_signal(&condd.cond);
 }

}

代码更长了。有些东西不必要解释,看看注释认真思考一下就能得到答案的。

Alex : 晓得。
Tony : 这回我们来看看性能,第一个实现cpu占用98% 耗时近10秒,而第二个实现几乎瞬间完成。
Alex : 我还在思考,的确不容易理解。
Tony : 还要注意的是资源的释放,这可是一个重要的问题。你慢慢思考吧,我去喝杯coffee。^_^

© 2005, bigwhite. 版权所有.

Related posts:

  1. 从技术到管理的对话-Tony与Alex的对话系列
  2. 一个Xml Parser的TDD开发过程-Tony与Alex的对话系列
  3. CppUnit入门实践-Tony与Alex的对话系列
  4. 一个C++项目的Makefile编写-Tony与Alex的对话系列
  5. 如何编写类中的setter和getter