Li Kai

  • 主页
  • 所有文章
文章分类 友情链接 关于我

Li Kai

  • 主页
  • 所有文章

Golang线程池实现百万级高并发

2021-03-22

本文基于Golang实现线程池,从而可以达到百万级别的高并发请求。本文实现的代码在https://github.com/lk668/threadpool可见。

1. Golang并发简介

Golang原生的goroutine可以很轻松实现并发编程。Go语言的并发是基于用户态的并发,这种并发方式就变得非常轻量,能够轻松运行几万并发逻辑。Go 的并发属于 CSP 并发模型的一种实现,CSP 并发模型的核心概念是:不要通过共享内存来通信,而应该通过通信来共享内存。

2. 并发方案演进

2.1 直接使用goroutine

直接使用goroutine启动一个新的线程来进行任务的执行

1
go handler(request)

2.2 缓冲队列

利用channel实现一个缓冲队列,每次请求先放入缓冲队列,然后从缓冲队列读取数据,一次执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Job interface{
Run()
}

// 长度为1000的缓冲队列
jobQueue := make(chan Job, 1000)

// 启动一个协程来读取缓冲队列的数据
go func(){
for {
select {
case job := <- jobQueue:
job.Run()
}
}
}()

// 请求发送
job := Job{}
jobQueue <- job

该方案在请求量低于缓冲队列长度时,可以应对并发请求。但是当并发请求量大于缓冲队列长度时,channel会出现阻塞情况。

2.3 线程池实现百万级高并发

更好的实现方案是利用job队列+线程池来实现,具体如下所示:

有个全局JobQueue,用来存储需要执行的Job,有个WorkerPool的线程池,用来存储空闲的Worker。当JobQueue中有Job时,从JobQueue获取Job对象,从WorkerPool获取空闲Worker,将Job对象发送给Worker,进行执行。每个Worker都是一个独立的Goroutine。从而真正意义上实现高并发。

代码实现主要分为三部分

2.3.1 Job定义

Job是一个interface,其下有个函数RunTask,用户定制化的任务,需要实现RunTask函数。JobChan是一个Job channel结构。Job是高并发需要执行的任务

1
2
3
4
5
type Job interface {
RunTask(request interface{})
}

type JobChan chan Job

2.3.2 Worker定义

Worker就是高并发里面的一个线程,启动的时候是一个Goroutine。Worker结构一需要一个JobChan,用来接收从全局JobQueue里面获取的Job对象。有个Quit的channel,用来接收退出信号。需要一个Start函数,将自己注册到WorkerPool,然后监听Job,有Job传入时,处理Job的RunTask,处理完成之后,重新将自己添加回WorkerPool。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Worker结构体
// WorkerPool随机选取一个Worker,将Job发送给Worker去执行
type Worker struct {
// 不需要带缓冲的任务队列
JobQueue JobChan
//退出标志
Quit chan bool
}

// 创建一个新的Worker对象
func NewWorker() Worker {
return Worker{
make(JobChan),
make(chan bool),
}
}

// 启动一个Worker,来监听Job事件
// 执行完任务,需要将自己重新发送到WorkerPool
func (w Worker) Start(workerPool *WorkerPool) {
// 需要启动一个新的协程,从而不会阻塞
go func() {
for {
// 将worker注册到线程池
workerPool.WorkerQueue <- &w
select {
case job := <-w.JobQueue:
job.RunTask(nil)
// 终止当前worker
case <-w.Quit:
return
}
}
}()
}

2.3.3 WorkerPool定义

WorkerPool是一个线程池,用来存储Worker。所以需要一个Size变量,用来表示这个Pool存储的Worker的个数。需要一个JobQueue,用来充当全局JobQueue的作用,所有的job先存储到该全局JobQueue中。有个WorkerQueue是个channel,用来存储空闲的Worker,有Job来的时候,从WorkerQueue里面取出一个Worker,执行相应的任务,执行完成以后,重新将Worker放回WorkerQueue。

WorkerPool需要一个启动函数,一个是来启动Size数量的Worker线程,一个是需要启动一个新的线程,来接收Job。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 线程池
type WorkerPool struct {
// 线程池大小
Size int
// 不带缓冲的任务队列,任务到达后,从workerQueue随机选取一个Worker来执行Job
JobQueue JobChan
WorkerQueue chan *Worker
}

func NewWorkerPool(poolSize, jobQueueLen int) *WorkerPool {
return &WorkerPool{
poolSize,
make(JobChan, jobQueueLen),
make(chan *Worker, poolSize),
}
}

func (wp *WorkerPool) Start() {

// 将所有worker启动
for i := 0; i < wp.Size; i++ {
worker := NewWorker()
worker.Start(wp)
}

// 监听JobQueue,如果接收到请求,随机取一个Worker,然后将Job发送给该Worker的JobQueue
// 需要启动一个新的协程,来保证不阻塞
go func() {
for {
select {
case job := <-wp.JobQueue:
worker := <-wp.WorkerQueue
worker.JobQueue <- job
}
}
}()

}

2.3.4 代码调用举例

接下来,举例分析如何使用该线程池。首先需要定义你要执行的任务,实现RunTask函数。然后初始化一个WorkerPool,将模拟百万请求的数据发送给全局JobQueue。交给线程池进行任务处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//需要执行任务的结构体
type Task struct {
Number int
}

// 实现Job这个interface的RunTask函数
func (t Task) RunTask(request interface{}) {
fmt.Println("This is task: ", t.Number)
//设置个等待时间
time.Sleep(1 * time.Second)
}

func main() {

// 设置线程池的大小
poolNum := 100 * 100 * 20
jobQueueNum := 100
workerPool := threadpool.NewWorkerPool(poolNum, jobQueueNum)
workerPool.Start()

// 模拟百万请求
dataNum := 100 * 100 * 100

go func() {
for i := 0; i < dataNum; i++ {
task := Task{Number: i}
workerPool.JobQueue <- task
}
}()

// 阻塞主线程
for {
fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
time.Sleep(2 * time.Second)
}
}

参考

  • https://blog.csdn.net/jkwanga/article/details/110948445
  • http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
  • golang

扫一扫,分享到微信

微信分享二维码
手把手教你如何用golang实现一个timewheel时间轮
基于ECMP的多活负载均衡策略
  1. 1. 1. Golang并发简介
  2. 2. 2. 并发方案演进
    1. 2.1. 2.1 直接使用goroutine
    2. 2.2. 2.2 缓冲队列
    3. 2.3. 2.3 线程池实现百万级高并发
      1. 2.3.1. 2.3.1 Job定义
      2. 2.3.2. 2.3.2 Worker定义
      3. 2.3.3. 2.3.3 WorkerPool定义
      4. 2.3.4. 2.3.4 代码调用举例
收藏文章
登录
表情删除后不可恢复,是否删除
取消
确定
图片正在上传,请稍后...
取消上传
评论内容为空!
  • 评论
1人参与,1条评论
  • 最新评论
2021年12月1日 3:51 淡年华 [上海市网友]

写的很好,但是缺少一个等待所有线程结束后退出的一个操作

举报回复
该评论已关闭!
likai博客正在使用畅言云评
去社区看看吧
去热评看看吧
  • 手把手教你如何用golang实现一个timewheel时间轮 | LiKai的博客
    手把手教你如何用golang实现一个timewheel时间轮 | LiKai的博客
  • Golang高级教程(二)类型断言 | LiKai的博客
    Golang高级教程(二)类型断言 | LiKai的博客
  • 算法(二)深度优先搜索DFS | LiKai的博客
    算法(二)深度优先搜索DFS | LiKai的博客
  • 数据结构-队列 | LiKai的博客
    数据结构-队列 | LiKai的博客
  • 手把手教你如何用golang实现一个timewheel时间轮 | LiKai的博客
  • Golang高级教程(二)类型断言 | LiKai的博客
  • 算法(二)深度优先搜索DFS | LiKai的博客
  • 数据结构-队列 | LiKai的博客
热评话题
  • 数据结构-队列 | LiKai的博客
  • 算法(五)回溯 | LiKai的博客
  • SQL基础教程(四) | LiKai的博客
  • Golang高级教程(一)for-range, switch, select使用 | LiKai的博客
  • 算法(八)最短路径 | LiKai的博客
  • Python高级用法 | LiKai的博客
  • 算法(三)二分查找 | LiKai的博客
关闭
按钮 内容不能为空!
立刻说两句吧! 查看1条评论
  • 你收到0条新通知
  • 你有0条评论收到赞同
  • 你有0条新回复
  • 本日畅言云评热评新鲜出炉啦!
  • 你有0个任务已完成
  • 你收获0个畅言云评足迹
© 2021 Li Kai
Hexo Theme Yilia by Litten
  • 文章分类
  • 友情链接
  • 关于我

tag:

  • test
  • 数据结构
  • shell
  • 计算机网络
  • 数据库
  • 负载均衡
  • ovn
  • 算法
  • linux
  • kubernetes
  • sql
  • python
  • openstack
  • designate
  • neutron
  • golang
  • timewheel

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 手把手教你如何用golang实现一个timewheel时间轮

    2021-04-04

    #算法#golang#timewheel

  • Golang线程池实现百万级高并发

    2021-03-21

    #golang

  • 基于ECMP的多活负载均衡策略

    2020-12-12

    #负载均衡

  • neutron metadata 服务详解

    2020-10-11

    #openstack#neutron

  • ovn为外部主机提供dhcp服务

    2020-09-20

    #ovn

  • Neutron网络troubleshooting

    2020-09-09

    #openstack#neutron

  • kubernetes中pod的自动伸缩

    2020-04-05

    #kubernetes

  • Golang高级教程(二)类型断言

    2020-02-04

    #golang

  • Golang高级教程(一)for-range, switch, select使用

    2020-02-02

    #golang

  • Kubernetes容器安全之Apparmor

    2020-01-04

    #kubernetes

  • 网络性能测试

    2019-09-19

    #计算机网络

  • 算法(十一)刷题常用语法python/go

    2019-07-18

    #算法#python#golang

  • 算法(十)快速排序

    2019-07-10

    #算法#python#golang

  • 算法(九)记忆化搜索

    2019-06-08

    #算法

  • linux常用指令

    2019-03-31

    #linux

  • OpenStack Designate简介

    2019-03-03

    #openstack#designate

  • 算法(八)最短路径

    2018-12-09

    #算法

  • 算法(七)并查集

    2018-10-21

    #算法

  • 算法(六)二叉树

    2018-09-19

    #算法

  • 算法(五)回溯

    2018-07-19

    #算法

  • Python yield使用

    2018-07-09

    #python

  • 算法(四)前缀和

    2018-07-05

    #算法

  • 算法(三)二分查找

    2018-05-19

    #算法

  • 算法(二)深度优先搜索DFS

    2018-04-20

    #算法

  • Python装饰器

    2018-04-19

    #python

  • Python高级用法

    2018-04-09

    #python

  • 算法(一)单调栈

    2018-03-19

    #算法

  • Python单例模式

    2018-03-08

    #python

  • Python编程规范

    2017-05-09

    #python

  • SQL基础教程(四)

    2017-04-19

    #数据库#sql

  • SQL基础教程(三)

    2017-04-11

    #数据库#sql

  • SQL基础教程(二)

    2017-03-24

    #数据库#sql

  • SQL基础教程(一)

    2017-03-19

    #数据库#sql

  • 9.计算机网络-QOS

    2016-08-19

    #计算机网络

  • 8.计算机网络-网络安全(下)

    2016-08-08

    #计算机网络

  • 7.计算机网络-网络安全(上)

    2016-08-01

    #计算机网络

  • 6.计算机网络-应用层

    2016-07-27

    #计算机网络

  • 5.计算机网络-传输层

    2016-07-24

    #计算机网络

  • 4.计算机网络-网络层

    2016-07-22

    #计算机网络

  • 3.计算机网络-数据链路层

    2016-07-17

    #计算机网络

  • 2.计算机网络-物理层

    2016-07-14

    #计算机网络

  • 1.计算机网络概述

    2016-07-09

    #计算机网络

  • Hash算法实现之MD5

    2016-04-09

    #算法

  • 数据结构-链表

    2016-04-02

    #数据结构

  • 数据结构-队列

    2016-04-01

    #数据结构

  • 数据结构-栈

    2016-03-31

    #数据结构

  • Shell基础

    2016-03-27

    #shell

  • Hello World

    2016-03-24

    #test

  • 我的博客园
  • 我的CSDN1
  • 我的CSDN2
李凯
研究生毕业于北邮
现就职于腾讯

一名奋斗在
OpenStack(Python)
Kubernetes(Golang)
的搬运工