SealHuang +

使用Python进程池消费流数据

在数据处理工作中,经常会遇到这样一个场景:

  1. 从一个数据源接收流数据(比如利用Kafka等框架建立的数据流管道,或是从一个大文件里分块读取到的数据);

  2. 对取得的数据进行处理;

  3. 将处理结果发送到另一个地方。

如果数据处理的计算量很小,并且你也不太在意处理的速度,当然可以对接收到的数据顺序地逐个处理。但当你想要尽快完成这些计算,你也许就该考虑使用多进程了(multiprocessing)。这样你的程序可以同时处理多个数据,或是在处理一组数据的同时,还可以下载新的数据,并将处理好的结果发出去。

利用Python自带的multiprocessing库,可以很容易地实现这个过程。使用已经包装好的进程池(Pool)和队列(Queue)类,让你不必太在意底层的实现细节。

下面是一个简单的实现实例。

  1. import multiprocessing as mp
  2. import time
  3. def worker(msg, out_queue):
  4. print('Processing %s (MP: %s) ' % (msg, mp.current_process().name))
  5. time.sleep(0.01)
  6. out_queue.put('Job finished - %s'%(msg))
  7. def sender(out_queue):
  8. while True:
  9. if not out_queue.empty():
  10. msg = out_queue.get()
  11. print('Kafka sending %s' % msg)
  12. else:
  13. time.sleep(0.01)
  14. def writer(q):
  15. # Write to the in-queue
  16. count = 30
  17. for ii in range(0, count):
  18. print('Writing %s' % (ii))
  19. # Write 'count' numbers into the queue
  20. q.put(ii)
  21. time.sleep(0.001)
  22. if __name__=='__main__':
  23. # initialize queues
  24. data_manager = mp.Manager()
  25. # this is where we are going to store input data
  26. in_queue = data_manager.Queue(10)
  27. # this where we are gonna push them out
  28. out_queue = data_manager.Queue(10)
  29. # initialize a message writer
  30. # the writer would generate data for data stream
  31. mp.Process(target=writer, args=(in_queue,)).start()
  32. # initialize a message sender
  33. # the sender would get the processed results out
  34. mp.Process(target=sender, args=(out_queue,)).start()
  35. # create processes pool to consume the data
  36. # the process in the pool would take input data and
  37. # out-queue as arguments
  38. # the data will get out through the out-queue
  39. pool = mp.Pool(3)
  40. # daemonize it
  41. while True:
  42. if not in_queue.empty():
  43. # consume the input data
  44. msg = in_queue.get()
  45. pool.apply_async(worker, (msg, out_queue))
  46. else:
  47. time.sleep(0.01)

在上面这个例子里,我们利用writer向数据管道(in_queue)中写入数据,模拟接收数据的过程,收到的数据会被送到进程池,分配一个进程进行处理,再将结果写入到出队列(out_queue)里,sender会不断检测出队列是否有数据,如果有数据就将其发送出去。

这里唯一需要注意的地方是,在使用Pool时,如果要使用与其搭配的队列,需要从multiprocessing.Manager中进行实例化,它是一个资源管理器,具体介绍可以查阅官方手册。

Blog

Technique

Theory