1 run as daemon with jmx port enable
JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
2 manage tool
https://github.com/yahoo/kafka-manager
./sbt clean dist
bin/kafka-manager
default port: 8000
1 run as daemon with jmx port enable
JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
2 manage tool
https://github.com/yahoo/kafka-manager
./sbt clean dist
bin/kafka-manager
default port: 8000
之前一直想把经常搞的性能测试的公共部分(压力控制部分)抽取出来作为一个公共的部分(jar),这样一方面能让开发测试者都集中在”测试性能的case”编写上,另外一个方面使用同一标准和同一实现有利于”团队”内部标准化。
实际上,现在大多直接使用jmeter来控制压力,也能达到效果,但是jmeter本身到底如何控制的,不去熟读代码很难理解,实际使用中,假设case需要调用java代码等时,还需要学习bean shell等,所以总结起来就是自由度不够大,不够透明,所以试用一段时间后,觉得不如自己实现一套,自由度大的,更广泛通用可控的,于是有了:
https://github.com/jiafu1115/performance-test-tool
直接看如何使用(基本使用方式):
compile exec:java -Dexec.mainClass="com.test.performance.PerfTool" -Dexec.args="-t com.test.performance.demo.DemoTestCaseImpl -duration 20 -thread 5 -tps 30"
(1)控制3个参数:1 持续多久 -duration 20 2 使用多少线程 -thread 5 3 TPS期待多少 -tps 30 实际使用,可以只指定线程数,让每个线程loop去发,也可以单独设置tps不设置线程数来尽量达到预期TPS.
(2)提供2种方式:1 测试Case实现类:-t com.test.performance.demo.DemoTestCaseImpl 2 收集测试结果类: -r com.test.performance.result.impl.InfluxdbCollectMethodImpl或自己提供
(3)提供3种运行信息:1 -program MyProgramName 2 -testname TestWebService 3 -runid ThisRunId
(4)提供4种case辅助: 1 before test 2 after test 3 prepare environment 4 destroy environment.
这样基本完成单机压力控制和实现,然后默认提供了influxdb的收集结果的方式和日志输出的方式可供选择,从而使用者只需要专注用例实现和结果收集即可。
结合这个单机的压力控制,还要完成三件事情:
(1) 并发控制: 可以采用jenkins的multi config项目来控制多个机器并发。
效果图:
(2) 结果收集: 可以采用influxdb等来收集,同时需要收集被测试机器的性能,可以在机器上部署collectd,然后发到influxdb,这样数据结果包含2个部分:性能测试数据和系统性能。
(3) 结果分析: 可以直接使用grafana来展示即可,而对于server的数据收集可采用collectd + grafana.
效果图:
结果应该至少提供3个维度:
(1)测试的性能数据, TPS, 响应时间(分布), 成功率
(2)被测机器的系统性能: cpu, memory, io, etc
(3)被测应用的性能数据: TPS, 响应时间(分布),成功率
总结: 经过剥离变化,就解决了共同的问题,然后使得性能测试者只关注自身测试用例和测试结果的收集和展示,这样就轻松了许多。
附:
1 使用的组件的安装:
1.1 influxdb 安装:
wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.0.x86_64.rpm sudo yum localinstall influxdb-1.5.0.x86_64.rpm service influxdb start
1.1版本后无web界面了,别找了。
1.2 grafana 安装
wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-5.0.3-1.x86_64.rpm sudo yum localinstall grafana-5.0.3-1.x86_64.rpm service grafana-server start
1.3 collectd 安装
wget http://mirrors.163.com/.help/CentOS6-Base-163.repo yum install epel-release yum install collectd service collectd start
2. 使用的组件的配置:
2.1 influxdb + collectd收集系统信息需要的配置:
influxdb配置:
开启collectd数据收集:
[[collectd]] enabled = true bind-address = ":25826" database = "collectd"
启动会报错: /usr/share/collectd/types.db
所以influx上也要装上collectd可以解决这个问题。
2.2 collectd配置: server指向influxdb
Hostname "10.224.82.92" Interval 2 ReadThreads 5 LoadPlugin cpu LoadPlugin load LoadPlugin memory LoadPlugin swap LoadPlugin battery LoadPlugin network <Plugin "network"> Server "10.224.2.147" "25826" </Plugin>
问题: 产品应用中,使用了TCP长连接+自定义PDU的方式来提速业务处理,其中负载均衡使用NetScaler,并采用最小连接数优先原则,这样能保证每次新建连接时,都按负载均衡的趋势去建立。但是在使用硬件负载均衡器处理这种场景时,难免缺乏一定的灵活性和自由度,分析其缺点,主要有以下:
(1)正常使用中,服务方某台服务down或者重新部署时,这个机器所维持的长连接,被”驱赶”到其他服务机器。但是这台机器重启后,谁能用到这台机器?
(2)需要扩容时,需要修改netscaler上的配置,增加一台,修改一次。
(3)单点瓶颈,所有请求经由负载局衡器,虽然请求量的无限增加,势必有单点瓶颈。
针对存在的问题,当前的解决办法是:
(1)每建立一个连接,存入数据库
(2)服务发现方定时轮巡所有连接情况,发现不均衡,通知消费方调节。
分析:
总结以上问题,实际上是一个服务发现与注册的问题,而采用当前的解决方法,具有以下问题:
(1)只要是轮巡,基本都是可有优化的地方:轮训时间长,则效果不及时,轮巡时间短,实际上一年也发生不了太多次,浪费系统资源
(2)消费方和服务方耦合性提高,因为服务方会直接通知消费方重建连接来均衡。
解决:
实际上,这是一个“服务发现与注册”的典型案例,可以采用zookeeper或者spring cloud的Eureka等解决,这里拿zookeeper来展示下,如何解决这个问题:
(1)服务方启动时,创建一个服务“根节点”(假设不存在),然后注册一个子节点到这个根节点
(2)消费方监听“根节点”的“子节点”变化。
再说说几个要点:
(1)创建根节点时,需要检查是否已经存在, 可以自己实现(直接创建然后忽略NodeExisted异常,或者提前判断是否存在再创建),也可以直接使用下面方法:
ZKPaths.mkdirs(zooKeeper, "/Service");
(2)创建子节点使用临时节点方式:CreateMode.EPHEMERAL, 这样就可以将服务的生命周期(session)和注册的信息生命周期绑定起来。
String nodePath = ZKPaths.makePath("Service", localIp); zooKeeper.create(nodePath , nodeInfo, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
(3) 设置session timeout时间
临时节点与session timeout息息相关,假设这个节点忽然断电,则zookeeper何时去删除这个临时节点而通知到消费方此节点已不可用,所以需要设置下session timeout来控制,设置的越短,检查(意外)失效越及时,但是如果过短,考虑网络波动,则可能被zookeeper误认为session失效,所以过大过小,都不合适,默认zookeeper使用的时间是2*ticktime(4)到20*ticktime(40s),即如果客户端设置在这个范围,则生效,如果设置的过小,所以最小值2*ticktime,如果设置过大值,使用最大值20*ticktime。
客户端设置:
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("10.224.2.116:2181") .retryPolicy(retryPolicy) .connectionTimeoutMs(3000) .sessionTimeoutMs(5000) //设置5s,则可以容忍5s网络波动,同时意外down机,也需要5s才能移除这个节点。 .build();
server处理使用的默认值:
public void setMinSessionTimeout(int min) { this.minSessionTimeout = min == -1 ? tickTime * 2 : min; LOG.info("minSessionTimeout set to {}", this.minSessionTimeout); } public void setMaxSessionTimeout(int max) { this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; LOG.info("maxSessionTimeout set to {}", this.maxSessionTimeout); }
server处理:
int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout);
(4)重连
假设在以上session timeout内发生网络波动,重新连接后,session仍然生效,且是同一个,这样节点并不会移除,并不会通知到服务方这个节点不可用,服务预期,但是假设网络不可达超过session timeout时间,则session绑定的节点会被server移除,这个时候,假设网络恢复,连接重新建立(zkclient会后台一直重试断掉的连接),并没有人去重新注册这个服务节点,所以需要一个重新注册的逻辑:
需要使用curatorFramework封装的功能(关系上zkclient -> curatorClient – > curatorFramework)
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework arg0, ConnectionState connectionState) { if(connectionState == ConnectionState.RECONNECTED){ //判断是否发生了重新连接且成功状态 //重新注册服务节点 } } });
(5)提供更多的信息
一般提供一个ip信息就够用了,但是为了扩展需要,可以将子节点的信息设计成json格式等,提供更多的信息,例如payload, health check信息。考虑一种case(参考下图):假设zk和服务方的网络断了,zk和消费方以及消费方和服务方网络都没有断,实际上这种情况下,服务仍然可以继续服务,但是zk因为网络问题会将服务方节点全部删除,这个时候,可以让消费方使用节点中的health check信息去确认下,是否真的不可达了,以应对这种场景,但是这种方案需要事先规划好,假设一个服务就是不想被访问,所以主动移除了,但是使用Health check肯定还是可以正常工作的,所以整体设计要考虑各种场景,但是不管如何,提供更多的信息,才方便以后的扩展和决策。
(6)watch是一次性的
zk的监听是一次性的,即触发处理后,下次再发生变化,并不会触发处理。所以这要求在事件处理后重新注册watch,
public static class ReceiveNodesChangeWatcher implements Watcher{ private ZooKeeper zooKeeper; private String servicePath; @Override public void process(WatchedEvent event) { try { List<String> nodes = zooKeeper.getChildren(servicePath, this); //重新注册下 } catch (Exception e) { } } }
总结:
经过以上设计就可完成一个服务发现与注册的案例,解决了最开始提出的问题。将路由工作从netscaler移到消费方来处理。而zookeeper仅仅在节点变动时,通知变化。增加节点时,0配置,自动使用上,比较智能!
Redis Cluster发布历史:
3.0Beta1(2014.2) ->
3.0.0rc(2014.10) ->
3.0GA(2015.4) ->
3.2.11(2017.9)
本文主要分两部分:一部分是综述下一些互联网公司总结的问题;另外是个人总结使用redis的规则
(1)互联网公司实践
a.美团
目前(2016)百余集群,数千节点,主要用于缓存和存储,单个集群内存容量1TB+,15亿+的键,百万级QPS吞吐量
b.唯品会
目前(2016)在线有生产几十个cluster集群,约2千个instances,单个集群最大达到250+instances,主要是后端业务的存储,没有作为cache使用的场景。
c.大街网
command/day 20亿+, Instance 300+, Servers: 几十台, Memory:1T
d.饿了么
e.南航
分享的问题:
(1)主从节点不要在同一个机器部署:不跨机房、要跨机架、可以在一个机柜
(2)误判节点Fail进行切换: 因为单进程单线程,所以单个耗时操作时间,超过cluster-node-timeout或许会引发误判从而进行failover.
(3)某个节点内存占用飙升
(4)不响应请求或停止的情况
(5)安全漏洞
(6)周期性connection timeout
(7)内存被挤爆,系统除了报Cluster down外, 无明显错误提示
(8)AOF文件过大占满磁盘空间
(9)主库重启冲掉从库所有数据
点评:
(1)容易被忽视
(2)考虑单个耗时操作包括哪些?命令本身随着keys数目增多而耗时时间增大,例如flushall,在4.0之前是同步操作。所以存储数据特别大时,可能执行时间较长,导致引发failover。
(3)这个节点被执行了monitor命令, 定位线索的方法:info信息里面的client_longest_output_list是否太大?
(4)磁盘空间不足: 假设仅仅使用rdb,原则上rdb不会超过内存的max memory设置,但是开启aof后,不在受限于此。所以关闭rdb/aof仅使用cache时,其实还是会产生rdb,因为默认全盘同步时,先要产生rdb,然后同步。后期提出的socket直接传输,测试还是产生了rdb,这个问题没有细查。但是保险起见,仅开启rdb时,> max memory些应该问题不大。
(5)分享提到:一个是Redis未添加认证,第二个是Redis以root用户启动,一般不会出现这种问题。
(6)类似(2): 单进程单线程,遇到慢操作
(7)查出为设置max memory,没啥好说的了。
(8)磁盘要不小了,要不aof重写规则设置的不够合理。
(9)redis官方说明也提到了这点,自动拉起挂了的禁用rdb的redis server,重启很快,还没有来得及cluster-node-timeout检测认为fail,所以不会主动切换,从而重新和slave连接后,自己的数据由于没有落地所以为空,直接冲掉backup的所有数据。
结合最近的研究和体会,总结下使用redis的一些“规则”:
(1)部署上避免鸡蛋放一箩筐:master与master之间,master与slave之间
(2)设置cluster-require-full-coverage no:避免某个master挂了,引发整个cluster瘫痪
(3)避免一切耗时操作:keys, flushall, monitor, transaction, etc.
(4)保护好自身:一定要设置max memory,同时设置好daemon,确保自动恢复。
(5)确保内存充足,避免使用swap:设置max memory < 1/2 hardware memory,以保证dump时的copy of write(cow)
(6)不要在产线上,随便执行操作,例如monitor命令
(7)使用尽量多的机器(<1000),而不是使用更大的内存:机器越少,内存越大导致每个机器的重要性更大,同时大内存不易于管理。
(8)不要在同一集群里混用存储和缓存,除非可控大小,否则不要当存储。
(9)考虑“黑马”: 热点数据或者巨大数据
(10)节约内存:尽量短的ttl;尽早删除数据保证内存释放,而不依赖“随机删除”;压缩;数据结构调整
(11)制定好规则:key命名规则;数据库(0-16,cluster不支持)使用规则
(12)使用SSD,加快全复制时间,加快启动恢复速度等等
(13)Redis Cluster一出,别想读写分离了。
(14)做好“真实”的容量评估:内存10G时,dump出的数据一定是某个值么?使用的业务数据到底是什么结构和数量级
(15)自动daemon重启+无落地的Redis,重启过快会冲掉所有数据
(16)准备好系统:关闭Transparent HugePages(在运行时动态分配内存的,所以会带来在运行时内存分配延误);ulimit -n 65535;vm.overcommit_memory = 1(对内存申请来者不拒)
最后总结下trouble shooting的一些方法(不考虑操作系统和应用层):
(1)日志:
loglevel verbose
logfile “/var/redis/log/redis.log”
(2)工具:
2.1 command: https://redis.io/commands#server
cluster info;
info;
role;
127.0.0.1:7001> role 1) "slave" 2) "10.224.91.234" 3) (integer) 7001 4) "connected" 5) (integer) 23871
cluster keyslot “key”;
127.0.0.1:7001> cluster keyslot “key”; (integer) 13592
cluster nodes;
cluster slaves 2776d678f92e1a5ee5cdefd4c81df7615c799265
127.0.0.1:7001> cluster slaves 2776d678f92e1a5ee5cdefd4c81df7615c799265 1) "45f14c44358f77d0e1a754863000460dad847fae 10.224.91.233:7001 slave 2776d678f92e1a5ee5cdefd4c81df7615c799265 0 1519982493781 6 connected"
slowlog
monitor
127.0.0.1:7001>monitor OK 1519981033.980566 [0 10.224.91.234:7001] "PING" 1519981044.015475 [0 10.224.91.234:7001] "PING" 1519981044.838077 [0 127.0.0.1:37727] "AUTH" "P@ss123"
client list
127.0.0.1:7001> CLIENT LIST id=8 addr=10.224.91.234:7001 fd=7 name= age=16484 idle=3 flags=M db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping id=277 addr=127.0.0.1:37725 fd=18 name= age=475 idle=3 flags=O db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=monitor id=278 addr=127.0.0.1:37727 fd=19 name= age=454 idle=369 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping id=281 addr=127.0.0.1:37741 fd=20 name= age=331 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client
2.2 trouble shooting tool:
redis-check-rdb
./redis-check-rdb /opt/redis/dump.rdb
[offset 0] Checking RDB file /opt/redis/dump.rdb
[offset 26] AUX FIELD redis-ver = ‘3.2.8’
[offset 40] AUX FIELD redis-bits = ’64’
[offset 52] AUX FIELD ctime = ‘1519965014’
[offset 67] AUX FIELD used-mem = ‘2511240’
[offset 76] Checksum OK
[offset 76] \o/ RDB looks OK! \o/
[info] 0 keys read
[info] 0 expires
[info] 0 already expired
redis-check-aof
redis-trib.rb (info/check)
2.3 debug tool for learn:
debug
127.0.0.1:7001> debug sleep 1 OK (1.00s)
redis/jedis虽然代码比较少,但是很难一篇文章概况所有设计及实现细节,所以还是抓取日常工作中可能的疑问,带着问题去翻代码:
1 假设当前内存占用是10G,那么落到磁盘rdb时,是否一定也是10G?
不一定,因为一方面可以设置压缩(默认开启:rdbcompression yes),另外一方面,假设内存中存在大量已过期数据,则也过滤掉这部分数据, dump的实现上,会遍历所有数据库(0-默认16),遍历所有数据然后保存:
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, long long now) { /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ if (expiretime < now) return 0; if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val) == -1) return -1; return 1; }
2 monitor实现
void monitorCommand(client *c) { /* ignore MONITOR if already slave or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); listAddNodeTail(server.monitors,c); //把整个客户端增加到server.monitors里面去 addReply(c,shared.ok); }
执行命令时:
void call(client *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags; /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); //把命令发到client去 }
可以根据info来查询输出list的最大值。或者根据client list也可以排查问题
client_longes_output_list信息获取的方法,遍历所有client,获许对应的值,取最大值。
void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer) { client *c; listNode *ln; listIter li; unsigned long lol = 0, bib = 0; listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { c = listNodeValue(ln); if (listLength(c->reply) > lol) lol = listLength(c->reply); if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); } *longest_output_list = lol; *biggest_input_buffer = bib; }