skynet的actor对等调度分析
skynet的actor对等调度
- 一、actor对等调度
- 二、调度流程源码分析
- 2.1、thread_worker()
- 2.2、struct skynet_context
- 2.3、skynet_context_message_dispatch()
- 2.4、dispatch_message()
- 三、c语言到lua的调用过程分析
- 总结
- 后言
一、actor对等调度
actor的调度由线程池来调度。actor是被调度对象,skynet把所有活跃的actor通过链表串联起来,线程池从actor中取出相等数量的消息进行执行,实现公平调度。
但是,actor消息队列长度可能不一致,会出现部分actor "饿死"现象,skynet通过对线程池的工作线程赋予不同权重来规避这个问题。
二、调度流程源码分析
actor=隔离的运行环境+回调函数+消息队列。
2.1、thread_worker()
actor是由线程调度,所以从线程入口函数thread_worker开始。线程作为消费者,会不断循环从消息队列中取消息,如果没有消息就进入等待(pthread_cond_wait)。skynet_context_message_dispatch()用来取出消息和消费消息。
/skynet-src/skynet_start.c
static void *
thread_worker(void *p) {struct worker_parm *wp = p;int id = wp->id;int weight = wp->weight;struct monitor *m = wp->m;struct skynet_monitor *sm = m->m[id];skynet_initthread(THREAD_WORKER);struct message_queue * q = NULL;while (!m->quit) {q = skynet_context_message_dispatch(sm, q, weight);if (q == NULL) {if (pthread_mutex_lock(&m->mutex) == 0) {++ m->sleep;// "spurious wakeup" is harmless,// because skynet_context_message_dispatch() can be call at any time.if (!m->quit)pthread_cond_wait(&m->cond, &m->mutex);-- m->sleep;if (pthread_mutex_unlock(&m->mutex)) {fprintf(stderr, "unlock mutex error");exit(1);}}}}return NULL;
}
2.2、struct skynet_context
struct skynet_context保存的是actor的上下文信息。
- instance是lua虚拟机。
- cb是自己的回调函数。
- queue是消息队列,回调函数会从消息队列中取出消息作为参数传给回调函数,从而驱动actor运行;这个消息队列是专属于该actor的消息队列。
/skynet-src/skynet_server.c
struct skynet_context {void * instance;struct skynet_module * mod;void * cb_ud;skynet_cb cb;struct message_queue *queue;ATOM_POINTER logfile;uint64_t cpu_cost; // in microsecuint64_t cpu_start; // in microsecchar result[32];uint32_t handle;int session_id;ATOM_INT ref;int message_count;bool init;bool endless;bool profile;CHECKCALLING_DECL
};
/skynet-src/skynet_mq.c
struct message_queue {struct spinlock lock;uint32_t handle;int cap;int head;int tail;int release;int in_global;int overload;int overload_threshold;struct skynet_message *queue;struct message_queue *next;
};
2.3、skynet_context_message_dispatch()
- 通过skynet_globalmq_pop()从全局的消息队列中获取活跃消息队列(活跃actor的队列);
- 然后skynet_mq_handle()获取消息队列的句柄,即actor的唯一标识ID;
- skynet_handle_grab()获取actor的上下文。
- for循环开始执行调度,注意变量n的初始值是1,它代表着权重,因为skynet默认前面几个线程只消费一个消息。
- 通过skynet_mq_pop()先从actor的专属消息队列中取出消息,然后调用dispatch_message()分发消息。
- 权重变化 n >>= weight。
/skynet-src/skynet_server.c
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {if (q == NULL) {q = skynet_globalmq_pop();if (q==NULL)return NULL;}uint32_t handle = skynet_mq_handle(q);struct skynet_context * ctx = skynet_handle_grab(handle);if (ctx == NULL) {struct drop_t d = { handle };skynet_mq_release(q, drop_message, &d);return skynet_globalmq_pop();}int i,n=1;struct skynet_message msg;for (i=0;i<n;i++) {if (skynet_mq_pop(q,&msg)) {skynet_context_release(ctx);return skynet_globalmq_pop();} else if (i==0 && weight >= 0) {n = skynet_mq_length(q);n >>= weight;}int overload = skynet_mq_overload(q);if (overload) {skynet_error(ctx, "May overload, message queue length = %d", overload);}skynet_monitor_trigger(sm, msg.source , handle);if (ctx->cb == NULL) {skynet_free(msg.data);} else {dispatch_message(ctx, &msg);}skynet_monitor_trigger(sm, 0,0);}assert(q == ctx->queue);struct message_queue *nq = skynet_globalmq_pop();if (nq) {// If global mq is not empty , push q back, and return next queue (nq)// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)skynet_globalmq_push(q);q = nq;}skynet_context_release(ctx);return q;
}
2.4、dispatch_message()
dispatch_message()本质上调用回调函数来处理消息,消息内容作为参数;这里就是真正的运行actor了。
/skynet-src/skynet_server.c
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {assert(ctx->init);CHECKCALLING_BEGIN(ctx)pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));int type = msg->sz >> MESSAGE_TYPE_SHIFT;size_t sz = msg->sz & MESSAGE_TYPE_MASK;FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);if (f) {skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);}++ctx->message_count;int reserve_msg;if (ctx->profile) {ctx->cpu_start = skynet_thread_time();reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;ctx->cpu_cost += cost_time;} else {reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);}if (!reserve_msg) {skynet_free(msg->data);}CHECKCALLING_END(ctx)
}
三、c语言到lua的调用过程分析
了解完调度流程,那么如果在c语言的callback函数跳到lua层执行actor呢?
lua可以调用c语言,c语言需要导入一个方法给lua使用,skynet中lcallback()就是c语言导出给lua使用的方法。
在lcallback设置回调函数:skynet_callback()。
/lualib-src/lua-skynet.c
static int
lcallback(lua_State *L) {struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));int forward = lua_toboolean(L, 2);luaL_checktype(L,1,LUA_TFUNCTION);lua_settop(L,1);struct callback_context *cb_ctx = (struct callback_context *)lua_newuserdata(L, sizeof(*cb_ctx));cb_ctx->L = lua_newthread(L);lua_pushcfunction(cb_ctx->L, traceback);lua_setuservalue(L, -2);lua_setfield(L, LUA_REGISTRYINDEX, "callback_context");lua_xmove(L, cb_ctx->L, 1);if (forward) {skynet_callback(context, cb_ctx, forward_cb);} else {skynet_callback(context, cb_ctx, _cb);}return 0;
}// ...LUAMOD_API int
luaopen_skynet_core(lua_State *L) {luaL_checkversion(L);luaL_Reg l[] = {{ "send" , lsend },{ "genid", lgenid },{ "redirect", lredirect },{ "command" , lcommand },{ "intcommand", lintcommand },{ "addresscommand", laddresscommand },{ "error", lerror },{ "harbor", lharbor },{ "callback", lcallback },{ "trace", ltrace },{ NULL, NULL },};// functions without skynet_contextluaL_Reg l2[] = {{ "tostring", ltostring },{ "pack", luaseri_pack },{ "unpack", luaseri_unpack },{ "packstring", lpackstring },{ "trash" , ltrash },{ "now", lnow },{ "hpc", lhpc }, // getHPCounter{ NULL, NULL },};lua_createtable(L, 0, sizeof(l)/sizeof(l[0]) + sizeof(l2)/sizeof(l2[0]) -2);lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");struct skynet_context *ctx = lua_touserdata(L,-1);if (ctx == NULL) {return luaL_error(L, "Init skynet context first");}luaL_setfuncs(L,l,1);luaL_setfuncs(L,l2,0);return 1;
}
当执行lua的skynet.start时会调用c.callback()设置回调函数skynet.dispatch_message,skynet.dispatch_message是一个lua方法;每次消息到来就会调用lua的skynet.dispatch_message,通过它分发消息。
/lualib/skynet.lua
function skynet.start(start_func)c.callback(skynet.dispatch_message)init_thread = skynet.timeout(0, function()skynet.init_service(start_func)init_thread = nilend)
end
总结
后言
本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对c/c++linux系统提升感兴趣的读者,可以点击链接查看详细的服务:C/C++服务器开发 。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.dgrt.cn/a/299944.html
如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!相关文章:

skynet的actor对等调度分析
skynet的actor对等调度一、actor对等调度二、调度流程源码分析2.1、thread_worker()2.2、struct skynet_context2.3、skynet_context_message_dispatch()2.4、dispatch_message()三、c语言到lua的调用过程分析总结后言一、actor对等调度 actor的调度由线程池来调度。actor是被…...

01背包问题以及有关题目
一、01背包问题详解 确定dp数组以及下标的含义 使用二维数组 dp[i] [j] 表示从下标为[0-i]的物品里任意取,放进容量为j的背包,价值总和最大是多少。 确定递推公式 dp数组的初始化 首先从dp[i][j] 的定义出发,如果背包容量j为0的话&#…...

SSM 学习管理系统
SSM 学习管理系统 SSM 学习管理系统 功能介绍 首页 图片轮播展示 网站公告 学生注册 教师注册 课程资料 视频学习 友情链接 资料详情 学习进度 评论 收藏 后台管理 登录 管理员管理 修改密码 网站公告管理 友情链接管理 轮播图管理 学生管理 班级管理 我的班级管理 教师管理…...

前端按钮/组件权限管理
最近项目中遇到了按钮权限管理的需求,整理了一下目前的方案,有不对的地方望大家指出~ 方案1:数组自定义指令 把权限放到数组中,通过vue的自定义指令来判断是否拥有该权限,有则显示,反之则不显…...

[JavaScript]使用opencv.js实现基于傅里叶变换的频域水印(隐水印)
PS:查了多方资料,都没有提到用 JavaScript 来实现频域水印的教程,故经过笔者的实践,遂写一篇教程来简单介绍。 通过了解频域水印的相关知识,我理解了频域水印就是先将图片进行傅里叶变换,得到频域图&#x…...

P3916 图的遍历——反向建边dfs
图的遍历 题目描述 给出 NNN 个点,MMM 条边的有向图,对于每个点 vvv,求 A(v)A(v)A(v) 表示从点 vvv 出发,能到达的编号最大的点。 输入格式 第 111 行 222 个整数 N,MN,MN,M,表示点数和边数。 接下来 MMM 行&…...

vpp hash源码分析
概述 vpp的hash结构分为hash头、桶(_hash_create或hash_resize申请)和桶下元素(clib_mem_realloc申请),总共3个部分组成。 根据元素key的hash值不同,分配到不同的桶下,与其他hash表原理相同。 …...

Linux系统部署
Linux系统部署 下载vmware centos7 xshell6 xftp6新建虚拟机,注意设置网络连接,设置登录名:root,密码:root,等待登录,输入用户名和密码(注意密码输入不显示)登录成功,执行命令Ifc…...

单链表翻转-链表篇
leetcode206单链表的翻转 题目: 给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 示例 1: 输入:head [1,2,3,4,5] 输出:[5,4,3,2,1] 示例 2: 输入:head [1,2] 输出…...

Resnet18训练CIFAR10 准确率95%
准确率 95.31% 几个关键点: 1、改模型:原始的resnet18首层使用的7x7的卷积核,CIFAR10图片太小不适合,要改成3x3的,步长和padding都要一并改成1。因为图太小,最大池化层也同样没用,删掉。最后一…...

Hadoop回收站trash
Hadoop回收站trash,默认是关闭的。 1.修改conf/core-site.xml,增加 Xml代码 <property> <name>fs.trash.interval</name> <value>1440</value> <description>Number of minutes between trash checkpoints. …...

Pig 安装
Pig 的安装 1.下载文件 在官方上下载下来 http://pig.apache.org/releases.html#Download 我个人下载的 版本是pig-0.11.0.tar.gz 2.安装 上传到服务器指定位置 由于我个人是新创建了一个pig用户来创建的,所以上传到了 /home/pig/这个目录 (用…...

pig入门学习
个人目前理解pig是对mapreduce的一种封装扩展,使写mapreduce简单化,可维护性更高一点,可透明性更清晰一点,操作数据更简单一点吧。 1. Pig中的模式 pig中模式就是说pig数据的数据格式是什么样的。 比如当执行 grunt> de…...

Linux Vim使用
高级一些的编辑器,都会包含宏功能,vim当然不能缺少了,在vim中使用宏是非常方便的: :qx 开始记录宏,并将结果存入寄存器xq 退出记录模式x 播放记录在x寄存器中的宏命令稍微解释一下,当在normal模…...

pig 指定行分割符和列分隔符号
由于我们的hdfs上抽取的数据是存储行分隔符和列分割符不是用的\n和\t。所以就想能看看是否能指定行分隔符,查了半天没查到。。可能是查找能力有限,呵呵,后来下载下来pig-0.11.0的源码看了一下PigStorage的类,输入inputFormat类指定…...

pig基础实例运算
基础运算 加减乘除( 、-、*、/、bincond ) 查看一下简单的文本内容 grunt> cat A; 0,1,2 1,3,4 grunt> a load A usingPigStorage(,)as(c1:int,c2:double,c3:float); grunt> b foreach a generate $0$1 asc1_c2; grunt>dump b; (…...

修改linux 系统编码为utf-8
vi /etc/sysconfig/i18n LANG"zh_CN.GBK" 修改为LANG"zh_CN.UTF-8".保存退出source /etc/sysconfig/i18n 检查编码:locale...

linux开发常用的命令
学习资料:http://download.csdn.net/detail/ruishenh/6586391 查看端口信息 netstat -tln 或者-a lsof -i:8080 查看占用端口的程序netstat -apn | grep 8083tcp 0 0 192.168.2.17:8083 0.0.0.0:* LISTEN 387…...

pig关系操作符实例
cogroup 对两个对象模式,分别按指定的字段进行分组,然后按照指定的key列来分组 grunt> cat A; 0,1,2 1,3,4 grunt> cat B; 0,5,2 1,7,8 grunt> b load B usingPigStorage(,) as (c1:int,c2:int,c3:int); grunt> a load A usingPig…...

mapreduce之组件,join,排序原理
第一部分:重要的组件Combiner•什么是Combiner•combine函数把一个map函数产生的<key,value>对(多个key, value)合并成一个新的<key2,value2>. 将新的<key2,value2>作为输入到reduce函数中,其格式与reduce函数相…...