metric driven (6) – common arch solutions

TIG:

1 install

1.1 create /etc/yum.repos.d/influxdb.repo:

[influxdb]
name = InfluxDB Repository – RHEL \$releasever
baseurl = https://repos.influxdata.com/rhel/\$releasever/\$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdb.key

1.2 sudo yum install telegraf

1.3 startup

sudo service telegraf start
Or if your operating system is using systemd (CentOS 7+, RHEL 7+):
sudo systemctl start telegraf

2 Config:

默认配置文件为/etc/telegraf/telegraf.conf,也可以查看https://github.com/influxdata/telegraf/blob/master/etc/telegraf.conf, telegraf是通过输入、转化,输出插件方式来管理的。

所以默认什么都不做修改时的,telegraf收集的是如下信息:

inputs.disk inputs.diskio inputs.kernel inputs.mem inputs.processes inputs.swap inputs.system inputs.cpu

而输出采用的是influxdb方式。这点可以通过启动日志来观察到:

2018/09/17 01:31:19 I! Using config file: /etc/telegraf/telegraf.conf
2018-09-17T01:31:19Z W! [outputs.influxdb] when writing to [http://localhost:8086]: database “telegraf” creation failed: Post http://localhost:8086/query: dial tcp 127.0.0.1:8086: connect: connection refused
2018-09-17T01:31:19Z I! Starting Telegraf v1.7.4
2018-09-17T01:31:19Z I! Loaded inputs: inputs.disk inputs.diskio inputs.kernel inputs.mem inputs.processes inputs.swap inputs.system inputs.cpu
2018-09-17T01:31:19Z I! Loaded aggregators:
2018-09-17T01:31:19Z I! Loaded processors:
2018-09-17T01:31:19Z I! Loaded outputs: influxdb
2018-09-17T01:31:19Z I! Tags enabled: host=appOne
2018-09-17T01:31:19Z I! Agent Config: Interval:10s, Quiet:false, Hostname:”telegraf”, Flush Interval:10s
2018-09-17T01:31:30Z E! [outputs.influxdb]: when writing to [http://localhost:8086]: Post http://localhost:8086/write?db=telegraf: dial tcp 127.0.0.1:8086

所以如果需要修改或者定制可以直接修改/etc/telegraf/telegraf.conf达到目标,但是默认配置里面有太多冗余插件信息去注释掉,所以telegraf提供了一种简洁的方式来产生配置文件。

#telegraf –input-filter redis:cpu:mem:net:swap –output-filter influxdb:kafka config //采集多个指标
#telegraf –input-filter redis –output-filter influxdb config //采集一个指标

例如,产生一个redis.conf的配置:

#telegraf -sample-config -input-filter redis:mem -output-filter influxdb > redis.conf

产生后的配置内容如下:

###############################################################################
# INPUT PLUGINS #
###############################################################################

# Read metrics about memory usage
[[inputs.mem]]
# no configuration

[[inputs.redis]]
## specify servers via a url matching:
## [protocol://][:password]@address[:port]
## e.g.
## tcp://localhost:6379
## tcp://:password@192.168.99.100
## unix:///var/run/redis.sock
##
## If no servers are specified, then localhost is used as the host.
## If no port is specified, 6379 is used
servers = [“tcp://localhost:6379”]

###############################################################################
# OUTPUT PLUGINS #
###############################################################################

# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = [“unix:///var/run/influxdb.sock”]
# urls = [“udp://127.0.0.1:8089”]
# urls = [“http://127.0.0.1:8086”]

## The target database for metrics; will be created as needed.
# database = “telegraf”
# username = “telegraf”
# password = “metricsmetricsmetricsmetrics”

然后以这个文件作为启动配置文件启动:

#telegraf –config /etc/telegraf/redis.conf

[root@telegraf ~]# telegraf –config /etc/telegraf/redis.conf
2018-09-17T02:43:08Z I! Starting Telegraf v1.7.4
2018-09-17T02:43:08Z I! Loaded inputs: inputs.redis inputs.mem
2018-09-17T02:43:08Z I! Loaded aggregators:
2018-09-17T02:43:08Z I! Loaded processors:
2018-09-17T02:43:08Z I! Loaded outputs: influxdb
2018-09-17T02:43:08Z I! Tags enabled: host=telegraf
2018-09-17T02:43:08Z I! Agent Config: Interval:10s, Quiet:false, Hostname:”telegraf “, Flush Interval:10s

此时,influxdb会受到请求:

2018-09-17T02:43:08.060799Z info Executing query {“log_id”: “0AaMBDO0000”, “service”: “query”, “query”: “CREATE DATABASE telegraf”}
[httpd] 127.0.0.1 – – [17/Sep/2018:02:43:08 +0000] “POST /query HTTP/1.1” 200 57 “-” “telegraf” 68dafe05-ba23-11e8-8001-000000000000 108642
[httpd] 127.0.0.1 – – [17/Sep/2018:02:43:20 +0000] “POST /write?db=telegraf HTTP/1.1” 204 0 “-” “telegraf” 7026fecd-ba23-11e8-8002-000000000000 595855
[httpd] 127.0.0.1 – – [17/Sep/2018:02:43:30 +0000] “POST /write?db=telegraf HTTP/1.1” 204 0 “-” “telegraf” 761ceb12-ba23-11e8-8003-000000000000 149522
[httpd] 127.0.0.1 – – [17/Sep/2018:02:43:40 +0000] “POST /write?db=telegraf HTTP/1.1” 204 0 “-” “telegraf” 7c12cd50-ba23-11e8-8004-000000000000 326783
[httpd] 127.0.0.1 – – [17/Sep/2018:02:43:50 +0000] “POST /write?db=telegraf HTTP/1.1” 204 0 “-” “telegraf” 820892ba-ba23-11e8-8005-000000000000 101009
[httpd] 127.0.0.1 – – [17/Sep/2018:02:44:00 +0000] “POST /write?db=telegraf HTTP/1.1” 204 0 “-” “telegraf” 87fe77d9-ba23-11e8-8006-000000000000 86017
[httpd] 127.0.0.1 – – [17/Sep/2018:02:44:10 +0000] “POST /write?db=telegraf HTTP/1.1” 204 0 “-” “telegraf” 8df464b0-ba23-11e8-8007-000000000000 85689

通过influxdb的client命令就可以查询到收集到的信息了,非常简单方便:

[root@influx ~]# influx
Connected to http://localhost:8086 version 1.6.2
InfluxDB shell version: 1.6.2
> show databases
name: databases
name
—-
_internal
telegraf
> use telegraf
Using database telegraf
>
> show measurements
name: measurements
name
—-
mem
redis

> select * from redis limit 1;
name: redis
time aof_current_rewrite_time_sec aof_enabled aof_last_bgrewrite_status aof_last_rewrite_time_sec aof_last_write_status aof_rewrite_in_progress aof_rewrite_scheduled blocked_clients client_biggest_input_buf client_longest_output_list clients cluster_enabled connected_slaves evicted_keys expired_keys host instantaneous_input_kbps instantaneous_ops_per_sec instantaneous_output_kbps keyspace_hitrate keyspace_hits keyspace_misses latest_fork_usec loading lru_clock master_repl_offset maxmemory maxmemory_policy mem_fragmentation_ratio migrate_cached_sockets port pubsub_channels pubsub_patterns rdb_bgsave_in_progress rdb_changes_since_last_save rdb_current_bgsave_time_sec rdb_last_bgsave_status rdb_last_bgsave_time_sec rdb_last_save_time rdb_last_save_time_elapsed redis_version rejected_connections repl_backlog_active repl_backlog_first_byte_offset repl_backlog_histlen repl_backlog_size replication_role server slave0 sync_full sync_partial_err sync_partial_ok total_commands_processed total_connections_received total_net_input_bytes total_net_output_bytes total_system_memory uptime used_cpu_sys used_cpu_sys_children used_cpu_user used_cpu_user_children used_memory used_memory_lua used_memory_peak used_memory_rss
—- —————————- ———– ————————- ————————- ——————— ———————– ——————— ————— ———————— ————————– ——- ————— —————- ———— ———— —- ———————— ————————- ————————- —————- ————- ————— —————- ——- ——— —————— ——— —————- ———————– ———————- —- ————— ————— ———————- ————————— ————————— ———————- ———————— —————— ————————– ————- ——————– ——————- —————————— ——————– —————– —————- —— —— ——— —————- ————— ———————— ————————– ——————— ———————- ——————- —— ———— ——————— ————- ———————- ———– ————— —————- —————
1537152190000000000 -1 0 ok -1 ok 0 0 0 0 0 41 1 1 0 778 telegraf 0.09 2 0.01 1 188 0 379 0 10425533 16473380 8000000000 allkeys-lru 1.17 0 7001 0 0 0 856 -1 ok 1 1530088772 7063418 3.2.8 0 1 15424805 1048576 1048576 master 10.224.91.231 ip=10.224.91.234,port=7001,state=online,offset=16473380,lag=1 2 0 0 19620365 1239692 500589135 885305642 33670017024 11549541 15528.8 0 8857.04 0 4476504 37888 5601248 5259264
>

> select * from mem limit 1;
name: mem
time active available available_percent buffered cached free host inactive slab total used used_percent wired
—- —— ——— —————– ——– —— —- —- ——– —- —– —- ———— —–
1537152190000000000 771219456 7859949568 93.83099562612006 422666240 890130432 6547152896 telegraf 860303360 142872576 8376709120 516759552 6.169004373879942 0
>
>

ELKK

common performance tool and solution

之前一直想把经常搞的性能测试的公共部分(压力控制部分)抽取出来作为一个公共的部分(jar),这样一方面能让开发测试者都集中在”测试性能的case”编写上,另外一个方面使用同一标准和同一实现有利于”团队”内部标准化。

实际上,现在大多直接使用jmeter来控制压力,也能达到效果,但是jmeter本身到底如何控制的,不去熟读代码很难理解,实际使用中,假设case需要调用java代码等时,还需要学习bean shell等,所以总结起来就是自由度不够大,不够透明,所以试用一段时间后,觉得不如自己实现一套,自由度大的,更广泛通用可控的,于是有了:

https://github.com/jiafu1115/performance-test-tool

直接看如何使用(基本使用方式):

compile exec:java -Dexec.mainClass="com.test.performance.PerfTool" -Dexec.args="-t com.test.performance.demo.DemoTestCaseImpl -duration 20 -thread 5 -tps 30"

(1)控制3个参数:1 持续多久 -duration 20 2 使用多少线程 -thread 5 3 TPS期待多少 -tps 30 实际使用,可以只指定线程数,让每个线程loop去发,也可以单独设置tps不设置线程数来尽量达到预期TPS.
(2)提供2种方式:1 测试Case实现类:-t com.test.performance.demo.DemoTestCaseImpl 2 收集测试结果类: -r com.test.performance.result.impl.InfluxdbCollectMethodImpl或自己提供
(3)提供3种运行信息:1 -program MyProgramName 2 -testname TestWebService 3 -runid ThisRunId
(4)提供4种case辅助: 1 before test 2 after test 3 prepare environment 4 destroy environment.

这样基本完成单机压力控制和实现,然后默认提供了influxdb的收集结果的方式和日志输出的方式可供选择,从而使用者只需要专注用例实现和结果收集即可。

结合这个单机的压力控制,还要完成三件事情:

(1) 并发控制: 可以采用jenkins的multi config项目来控制多个机器并发。

效果图:

(2) 结果收集: 可以采用influxdb等来收集,同时需要收集被测试机器的性能,可以在机器上部署collectd,然后发到influxdb,这样数据结果包含2个部分:性能测试数据和系统性能。
(3) 结果分析: 可以直接使用grafana来展示即可,而对于server的数据收集可采用collectd + grafana.

效果图:

结果应该至少提供3个维度:
(1)测试的性能数据, TPS, 响应时间(分布), 成功率
(2)被测机器的系统性能: cpu, memory, io, etc
(3)被测应用的性能数据: TPS, 响应时间(分布),成功率

总结: 经过剥离变化,就解决了共同的问题,然后使得性能测试者只关注自身测试用例和测试结果的收集和展示,这样就轻松了许多。

附:

1 使用的组件的安装:

1.1 influxdb 安装:

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.0.x86_64.rpm
sudo yum localinstall influxdb-1.5.0.x86_64.rpm
service influxdb start

1.1版本后无web界面了,别找了。

1.2 grafana 安装

wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-5.0.3-1.x86_64.rpm
sudo yum localinstall grafana-5.0.3-1.x86_64.rpm
service grafana-server start

1.3 collectd 安装

wget http://mirrors.163.com/.help/CentOS6-Base-163.repo
yum install epel-release
yum install collectd
service collectd start

2. 使用的组件的配置:

2.1 influxdb + collectd收集系统信息需要的配置:

influxdb配置:

开启collectd数据收集:

[[collectd]]
    enabled = true
    bind-address = ":25826"
    database = "collectd"

启动会报错: /usr/share/collectd/types.db
所以influx上也要装上collectd可以解决这个问题。

2.2 collectd配置: server指向influxdb

Hostname "10.224.82.92"
 
Interval 2
ReadThreads 5

LoadPlugin cpu
LoadPlugin load
LoadPlugin memory
LoadPlugin swap
LoadPlugin battery

LoadPlugin network
<Plugin "network">
Server "10.224.2.147" "25826"
</Plugin>

zookeeper analyst(1)- service discovery-to improve load balance of TCP connections by NetScaler

问题: 产品应用中,使用了TCP长连接+自定义PDU的方式来提速业务处理,其中负载均衡使用NetScaler,并采用最小连接数优先原则,这样能保证每次新建连接时,都按负载均衡的趋势去建立。但是在使用硬件负载均衡器处理这种场景时,难免缺乏一定的灵活性和自由度,分析其缺点,主要有以下:

(1)正常使用中,服务方某台服务down或者重新部署时,这个机器所维持的长连接,被”驱赶”到其他服务机器。但是这台机器重启后,谁能用到这台机器?
(2)需要扩容时,需要修改netscaler上的配置,增加一台,修改一次。
(3)单点瓶颈,所有请求经由负载局衡器,虽然请求量的无限增加,势必有单点瓶颈。

针对存在的问题,当前的解决办法是:
(1)每建立一个连接,存入数据库
(2)服务发现方定时轮巡所有连接情况,发现不均衡,通知消费方调节。

分析:
总结以上问题,实际上是一个服务发现与注册的问题,而采用当前的解决方法,具有以下问题:
(1)只要是轮巡,基本都是可有优化的地方:轮训时间长,则效果不及时,轮巡时间短,实际上一年也发生不了太多次,浪费系统资源
(2)消费方和服务方耦合性提高,因为服务方会直接通知消费方重建连接来均衡。

解决:
实际上,这是一个“服务发现与注册”的典型案例,可以采用zookeeper或者spring cloud的Eureka等解决,这里拿zookeeper来展示下,如何解决这个问题:

(1)服务方启动时,创建一个服务“根节点”(假设不存在),然后注册一个子节点到这个根节点
(2)消费方监听“根节点”的“子节点”变化。

再说说几个要点:

(1)创建根节点时,需要检查是否已经存在, 可以自己实现(直接创建然后忽略NodeExisted异常,或者提前判断是否存在再创建),也可以直接使用下面方法:

ZKPaths.mkdirs(zooKeeper, "/Service");

(2)创建子节点使用临时节点方式:CreateMode.EPHEMERAL, 这样就可以将服务的生命周期(session)和注册的信息生命周期绑定起来。

String nodePath = ZKPaths.makePath("Service", localIp);
zooKeeper.create(nodePath , nodeInfo, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

(3) 设置session timeout时间
临时节点与session timeout息息相关,假设这个节点忽然断电,则zookeeper何时去删除这个临时节点而通知到消费方此节点已不可用,所以需要设置下session timeout来控制,设置的越短,检查(意外)失效越及时,但是如果过短,考虑网络波动,则可能被zookeeper误认为session失效,所以过大过小,都不合适,默认zookeeper使用的时间是2*ticktime(4)到20*ticktime(40s),即如果客户端设置在这个范围,则生效,如果设置的过小,所以最小值2*ticktime,如果设置过大值,使用最大值20*ticktime。

客户端设置:

	    CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("10.224.2.116:2181")
            .retryPolicy(retryPolicy)
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000) //设置5s,则可以容忍5s网络波动,同时意外down机,也需要5s才能移除这个节点。
            .build();

server处理使用的默认值:

  public void setMinSessionTimeout(int min) {
        this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
        LOG.info("minSessionTimeout set to {}", this.minSessionTimeout);
    }

    public void setMaxSessionTimeout(int max) {
        this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
        LOG.info("maxSessionTimeout set to {}", this.maxSessionTimeout);
    }

server处理:

        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);

(4)重连
假设在以上session timeout内发生网络波动,重新连接后,session仍然生效,且是同一个,这样节点并不会移除,并不会通知到服务方这个节点不可用,服务预期,但是假设网络不可达超过session timeout时间,则session绑定的节点会被server移除,这个时候,假设网络恢复,连接重新建立(zkclient会后台一直重试断掉的连接),并没有人去重新注册这个服务节点,所以需要一个重新注册的逻辑:

需要使用curatorFramework封装的功能(关系上zkclient -> curatorClient – > curatorFramework)


   curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
				
				@Override
				public void stateChanged(CuratorFramework arg0, ConnectionState connectionState) {
 					if(connectionState == ConnectionState.RECONNECTED){ //判断是否发生了重新连接且成功状态
 			 		  //重新注册服务节点
					}
 				}
			});

(5)提供更多的信息
一般提供一个ip信息就够用了,但是为了扩展需要,可以将子节点的信息设计成json格式等,提供更多的信息,例如payload, health check信息。考虑一种case(参考下图):假设zk和服务方的网络断了,zk和消费方以及消费方和服务方网络都没有断,实际上这种情况下,服务仍然可以继续服务,但是zk因为网络问题会将服务方节点全部删除,这个时候,可以让消费方使用节点中的health check信息去确认下,是否真的不可达了,以应对这种场景,但是这种方案需要事先规划好,假设一个服务就是不想被访问,所以主动移除了,但是使用Health check肯定还是可以正常工作的,所以整体设计要考虑各种场景,但是不管如何,提供更多的信息,才方便以后的扩展和决策。

(6)watch是一次性的

zk的监听是一次性的,即触发处理后,下次再发生变化,并不会触发处理。所以这要求在事件处理后重新注册watch,


	public static class ReceiveNodesChangeWatcher implements Watcher{
		
		private ZooKeeper zooKeeper;
		private String servicePath;

		@Override
		public void process(WatchedEvent event) {
			try {
				List<String> nodes = zooKeeper.getChildren(servicePath, this); //重新注册下
			} catch (Exception e) {
			} 			
		}
		
	}

总结:
经过以上设计就可完成一个服务发现与注册的案例,解决了最开始提出的问题。将路由工作从netscaler移到消费方来处理。而zookeeper仅仅在节点变动时,通知变化。增加节点时,0配置,自动使用上,比较智能!

deep into http client issues on production

项目稳定运行快一年,没有过关于发http请求的问题,但是最近由于第三方频繁”变动”,项目集中爆发apache httpclient相关的一些“事故”,所以事后整理做个记录:

1 关于keepalive和DNS cache
问题描述: 产线某个组件使用了DNS切换的方式来做Failover, 基本流程Primary->Backup,然后从Backup->Primary(区别在于Failback回来时,并无关闭或者重启Backup),
当Failback回来时,一个feature就不work了。

问题跟踪: 拿到这个问题时,首先定位到原因: Failback时,连接仍然连接在Backup。而刚好由于第三方某个bug,当一个业务的所有请求不完全落在一个DC时,就会不work, 虽然说fix这个问题,功能就可以正常,但是不符合优先选择primary的策略(假设确实有权重的话)。

拿到这个问题后,第一怀疑是DNS Cache问题,因为failover是使用DNS的方式,所以第一怀疑到这个上面:

(1)排除主机DNS Cache: 直接使用ping或者traceroute:

[root@jiafu conf]# traceroute www.baidu.com
traceroute to www.baidu.com (104.193.88.123), 30 hops max, 60 byte packets
 1  10.224.2.1 (10.224.2.1)  0.502 ms  0.783 ms  0.782 ms
 2  1.1.1.1 (1.1.1.1)  0.899 ms  1.042 ms  1.116 ms

(2) 排除应用层Cache: 我们知道JVM内部也是有dns cache的。

但是使用下面程序“嵌入”到服务器代码中测试了下,并无任何缓存:

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
public class DNSCache {
  public static void main(String[] args) throws Exception {
    InetAddress.getByName("www.google.com");
    try {
        InetAddress.getByName("nowhere.example.com");
    } catch (UnknownHostException e) {

    }

    String addressCache = "addressCache";
    System.out.println(addressCache);
    printDNSCache(addressCache);
    String negativeCache = "negativeCache";
    System.out.println(negativeCache);
    printDNSCache(negativeCache);
  }
  private static void printDNSCache(String cacheName) throws Exception {
    Class<InetAddress> klass = InetAddress.class;
    Field acf = klass.getDeclaredField(cacheName);
    acf.setAccessible(true);
    Object addressCache = acf.get(null);
    Class cacheKlass = addressCache.getClass();
    Field cf = cacheKlass.getDeclaredField("cache");
    cf.setAccessible(true);
    Map<String, Object> cache = (Map<String, Object>) cf.get(addressCache);
    for (Map.Entry<String, Object> hi : cache.entrySet()) {
        Object cacheEntry = hi.getValue();
        Class cacheEntryKlass = cacheEntry.getClass();
        Field expf = cacheEntryKlass.getDeclaredField("expiration");
        expf.setAccessible(true);
        long expires = (Long) expf.get(cacheEntry);

        Field af = cacheEntryKlass.getDeclaredField("address");
        af.setAccessible(true);
        InetAddress[] addresses = (InetAddress[]) af.get(cacheEntry);
        List<String> ads = new ArrayList<String>(addresses.length);
        for (InetAddress address : addresses) {
            ads.add(address.getHostAddress());
        }

        System.out.println(hi.getKey() + " "+new Date(expires) +" " +ads);
    }
  }
}

排查后,并无任何缓存记录。继续跟踪了下:原来项目中在启动参数使用-Dsun.net.inetaddr.ttl=0里面关闭了JVM的DNS cache:

sun.net.InetAddressCachePolicy


    // Controls the cache policy for successful lookups only
    private static final String cachePolicyProp = "networkaddress.cache.ttl";
    private static final String cachePolicyPropFallback =
        "sun.net.inetaddr.ttl";


    public static final int FOREVER = -1;
    public static final int NEVER = 0;

    /* default value for positive lookups */
    public static final int DEFAULT_POSITIVE = 30;  //默认30s

        Integer tmp = java.security.AccessController.doPrivileged(
          new PrivilegedAction<Integer>() {
            public Integer run() {
                try { 
                    String tmpString = Security.getProperty(cachePolicyProp); //判断有没有设置networkaddress.cache.ttl
                    if (tmpString != null) {
                        return Integer.valueOf(tmpString);
                    }
                } catch (NumberFormatException ignored) {
                    // Ignore
                }

                try {
                    String tmpString = System.getProperty(cachePolicyPropFallback); //判断有没有设置sun.net.inetaddr.ttl
                    if (tmpString != null) {
                        return Integer.decode(tmpString);
                    }
                } catch (NumberFormatException ignored) {
                    // Ignore
                }
                return null;
            }
          });

        if (tmp != null) {
            cachePolicy = tmp.intValue();
            if (cachePolicy < 0) {
                cachePolicy = FOREVER; //永久cache
            }
            propertySet = true;
        } else {
            /* No properties defined for positive caching. If there is no
             * security manager then use the default positive cache value.
             */
            if (System.getSecurityManager() == null) {
                cachePolicy = DEFAULT_POSITIVE; //设置成默认30s
            }
        }

每次DNS解析后,都调用上面代码设置的policy存cache,例如30s或者不存等等:

java.net.InetAddress


   /**
         * Add an entry to the cache. If there's already an
         * entry then for this host then the entry will be
         * replaced.
         */
        public Cache put(String host, InetAddress[] addresses) {
            int policy = getPolicy();
            if (policy == InetAddressCachePolicy.NEVER) {
                return this;
            }

            // purge any expired entries

            if (policy != InetAddressCachePolicy.FOREVER) {

                // As we iterate in insertion order we can
                // terminate when a non-expired entry is found.
                LinkedList<String> expired = new LinkedList<>();
                long now = System.currentTimeMillis();
                for (String key : cache.keySet()) {
                    CacheEntry entry = cache.get(key);

                    if (entry.expiration >= 0 && entry.expiration < now) {
                        expired.add(key);
                    } else {
                        break;
                    }
                }

                for (String key : expired) {
                    cache.remove(key);
                }
            }

            // create new entry and add it to the cache
            // -- as a HashMap replaces existing entries we
            //    don't need to explicitly check if there is
            //    already an entry for this host.
            long expiration;
            if (policy == InetAddressCachePolicy.FOREVER) {
                expiration = -1;
            } else {
                expiration = System.currentTimeMillis() + (policy * 1000);
            }
            CacheEntry entry = new CacheEntry(addresses, expiration);
            cache.put(host, entry);
            return this;
        }

(3)锁定长连接问题:
查看协议,在http1.1协议中,默认就是长连接,除非显式加上header: Connection: close. 所以apache http client在拿到一个响应时,默认是按照长连接来处理的,所以除非打断连接(例如重启机器或者LB).否则无法重新做连接。

基本逻辑:涉及2个策略:reuseStrategy控制是否重用,keepAliveStrategy控制重用多久。

整体逻辑实现:org.apache.http.impl.execchain.MainClientExec:

   // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    // Set the idle duration of this connection
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    connHolder.markReusable();
                } else {
                    connHolder.markNonReusable();
                }

org.apache.http.impl.DefaultConnectionReuseStrategy 策略决定是否keepalive

  HeaderIterator hit = response.headerIterator(HTTP.CONN_DIRECTIVE);
        if (hit.hasNext()) {
            try {
                TokenIterator ti = createTokenIterator(hit);
                boolean keepalive = false;
                while (ti.hasNext()) {
                    final String token = ti.nextToken();
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {  //对端显示返回Connection:close
                        return false;
                    } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) {
                        // continue the loop, there may be a "close" afterwards
                        keepalive = true;
                    }
                }
                if (keepalive)
                    return true;
                // neither "close" nor "keep-alive", use default policy

            } catch (ParseException px) {
                // invalid connection header means no persistent connection
                // we don't have logging in HttpCore, so the exception is lost
                return false;
            }
        }

org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy 策略决定keepalive多久


public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {

    public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        if (response == null) {
            throw new IllegalArgumentException("HTTP response may not be null");
        }
        HeaderElementIterator it = new BasicHeaderElementIterator(
                response.headerIterator(HTTP.CONN_KEEP_ALIVE));
        while (it.hasNext()) {
            HeaderElement he = it.nextElement();
            String param = he.getName();
            String value = he.getValue();
            if (value != null && param.equalsIgnoreCase("timeout")) { //http协议草案,大多实现,但是不是强制要求
                try {
                    return Long.parseLong(value) * 1000;
                } catch(NumberFormatException ignore) {
                }
            }
        }
        return -1; //永久连接。
    }

}

org.apache.http.impl.client.DefaultClientConnectionReuseStrategy另外一个子策略的继承类,加了“请求端connection header的判断”:

    @Override
    public boolean keepAlive(final HttpResponse response, final HttpContext context) {

        final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST);
        if (request != null) {
            final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION);  //处理request的
            if (connHeaders.length != 0) {
                final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null));
                while (ti.hasNext()) {
                    final String token = ti.nextToken();
                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {
                        return false;
                    }
                }
            }
        }
        return super.keepAlive(response, context); //调用上层策略,即response的header的处理
    }

解决方案:
(1)在服务器端的响应中,添加header: Connection:close,本地测试通过,但是上线通过VIP后失效,VIP直接将这个头扔掉
(2)在客户端的请求中,添加header:Connection: close,本地和上线都能通过,这种实际上把长连接主动变成了短连接,失去了长连接的优势
(3)仅仅将长连接的“时长”控制在一个时间范围内,比如3分钟。这样每到3分钟后,就自动重新连接,既一定程度保留了长连接的优势,也兼顾了可以做“切换”DNS的可能性。
但是每个人都会问同一个问题:设置几分钟合理?所以最好是由server来告知,但是这个在http协议中并无规定如何实现,所以http draft中层提及keepalive: timeout,这种确实在apache http client中支持,但是在当前比较流行的ok http client并不支持。

所以综合起来看,方案3是最合理的方案,同时在服务器应用设计时,应该让所有客户端提前知道这件事情,不然很容易“犯错”而产生质疑:“你没有遵循或者协商过要求我主动断连接,我遵循协议还有错?”

2 关于SNI支持

屋漏更糟连夜雨,还没来得及改完,另外的问题又来了。

问题描述: 某第三方从rackspace迁移至aws.在迁移后,应用直接报证书不批配错误:

Caused by: javax.net.ssl.SSLException: hostname in certificate didn't match: <xxxx.com> != <.yyy.com>
at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:227)
at org.apache.http.conn.ssl.BrowserCompatHostnameVerifier.verify(BrowserCompatHostnameVerifier.java:54)
at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:147)
at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:128)
at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:439)
at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:180)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:294)
at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:643)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:479)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
at org.jboss.resteasy.client.jaxrs.engines.ApacheHttpClient4Engine.invoke(ApacheHttpClient4Engine.java:283

问题跟踪:

本来怀疑比较多,想下我方应用并无改动,肯定不是我方问题,估计是对方证书什么搞错了,后来也翻看了下源码,验证了下想法:

1 final Certificate[] certs = session.getPeerCertificates();
2 final String cn = DefaultHostnameVerifier.extractCN(subjectPrincipal.getName(X500Principal.RFC2253));
3 void verify(String host, String[] cns, String[] subjectAlts)

基本就是访问那个主机,拿到证书,和自己访问的url可匹配。后来仔细了解了背景,这个服务部署做了迁移,做了SNI的支持,即支持不同域名来访问,然后客户端在拿证书时,需要带上自己访问的域名,以返回正确的证书,而不带的话,返回默认的。

使用openssl演示下:

openssl s_client -connect 130.59.223.53:443
CONNECTED(00000003)
depth=3 C = US, O = "The Go Daddy Group, Inc.", OU = Go Daddy Class 2 Certification Authority
verify return:1
depth=0 OU = Domain Control Validated, CN = *.yy.com
verify return:1

带上主机名后:

openssl s_client -connect 130.59.223.53:443 -servername xx.com
CONNECTED(00000003)
depth=3 C = US, O = "The Go Daddy Group, Inc.", OU = Go Daddy Class 2 Certification Authority
verify return:1
depth=0 OU = Domain Control Validated, CN = *.xx.com //返回了正确的证书
verify return:1

问题解决:
了解基本情况后,google了httpclient的SNI支持,在4.3.2+才支持,而自己用的版本略低(4.2.6),查看4.3.2的release时间是2014年,刚好是项目启动之时,悲剧。
所以直接升级httpclient,但是测试发现仍然不work,原来,虽然升级了,但是原有代码的使用方式是不支持sni的deprecated的code方式。这种情况特别容易发生在间接调用的情况下(例如我这里的案例,外面套了一层jboss的rest client,调用了httpclient的deprecated code)。

https://issues.apache.org/jira/browse/HTTPCLIENT-1119

sun.security.ssl.Handshaker


    /**
     * Sets the server name indication of the handshake.
     */
    void setSNIServerNames(List<SNIServerName> serverNames) {
        // The serverNames parameter is unmodifiable.
        this.serverNames = serverNames;
    }

sun.security.ssl.ClientHandshaker

  // add server_name extension
        if (enableSNIExtension) {
            if (session != null) {
                requestedServerNames = session.getRequestedServerNames();
            } else {
                requestedServerNames = serverNames;
            }

            if (!requestedServerNames.isEmpty()) {
                clientHelloMessage.addSNIExtension(requestedServerNames);
            }
        }

tcpdump可以知道,支持SNI的请求第一步”client hello”会带上servername:

3 关于连接失效检测
当完成第二步升级之后,基本对httpclient的源码有个大体了解,想起之前在升级前和另外一个第三方做集成时,每次都做stable check失败。所以继续翻阅了httpclient的代码。发现一个升级后潜在的问题,最后测试验证确实如此。

(1)为什么要做连接检测:
httpclient使用的是传统的io模式,所以一旦长连接建立并且使用完后,归回connection manager,这个时候,它既不处于读也不处于写操作状态,因为也没有必要,因为http的是请求-》响应模式,根据content-length或者chunk方式读完后,没有必要继续等待数据。所以这个时候归还connection manager给接下的请求使用。但是就是因为它闲置了,所以没有办法检测到对端连接关闭,下次使用时直接会报错。
而对于nio,引用一段话:

The only time you need a selector to detect a closed channel is the case where the peer closes it, in which case select() will trigger with OP_READ/isReadable(), and a subsequent read() will return -1.

所以无论使用IO还是NIO,只要不读的时候,即使底层有通知,也需要上层来处理。

(2)怎么做连接检测:
连接检测主要就是担心连接失效。所以可以预防着做:例如连接使用5分钟就自动不用了(expire),或者另外一种方式,闲置了5分钟就直接不要了(idle),而不需要做什么检测。

				setConnectionTimeToLive(3, TimeUnit.MINUTES).
				evictExpiredConnections().
				evictIdleConnections(1, TimeUnit.MINUTES).

但是使用这种方式都是预防性的方式,还有主动性的方式:例如升级前的httpclient在每个请求复用旧连接时,都会检查一次是否失效了。但是这样效率比较低,因为事先如果就已经约定好是长连接的话,何必又处处设防于每次请求都浪费时间做check,明显并不高效。所以新版本的httpclient做了改善,deprecate了老版本的每个请求检查,使用了一个validateAfterInactivity参数来控制,一个连接使用时,发现已经过了某个时间期就来检测一次。从而减少检查次数。

了解基本原理后,考虑一个情况:假设一个第三方不遵循http1.1的协议,告诉你是个长连接,但是响应你的请求后,立马又断连,可能会出现什么情况:

拿到一个连接后,还没有到检查时候,也不符合idle和expire条件时,这个时候,直接就fail了。除非把默认关闭的“废弃的”stable check打开。

Caused by: org.apache.http.NoHttpResponseException: The target server failed to respond
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at org.jboss.resteasy.client.jaxrs.engines.ApacheHttpClient4Engine.invoke(ApacheHttpClient4Engine.java:312)

(3)如何做检查

org.apache.http.impl.BHttpConnectionBase


   @Override
    public boolean isStale() {
        if (!isOpen()) {
            return true;
        }
        try {
            final int bytesRead = fillInputBuffer(1);
            return bytesRead < 0;
        } catch (final SocketTimeoutException ex) {
            return false;
        } catch (final IOException ex) {
            return true;
        }
    }


    private int fillInputBuffer(final int timeout) throws IOException {
        final Socket socket = this.socketHolder.get();
        final int oldtimeout = socket.getSoTimeout(); //取出原有设置的socket timeout时间
        try {
            socket.setSoTimeout(timeout); //用上面设置的1ms来检查connection是否损坏,如果正常断开,立马返回-1.然后正常,会阻塞1ms.
            return this.inbuffer.fillBuffer();
        } finally {
            socket.setSoTimeout(oldtimeout);  //设置回去。
        }
    }

  public int fillBuffer() throws IOException {
        // compact the buffer if necessary
        if (this.bufferpos > 0) {
            final int len = this.bufferlen - this.bufferpos;
            if (len > 0) {
                System.arraycopy(this.buffer, this.bufferpos, this.buffer, 0, len);
            }
            this.bufferpos = 0;
            this.bufferlen = len;
        }
        final int l;
        final int off = this.bufferlen;
        final int len = this.buffer.length - off;
        l = streamRead(this.buffer, off, len);
        if (l == -1) {
            return -1;
        } else {
            this.bufferlen = off + l;
            this.metrics.incrementBytesTransferred(l);
            return l;
        }
    }
 

问题解决:

方案1: setStaleConnectionCheckEnabled(true),这个时候实际上等于把不推荐的方法开启了,和validateAfterInactivity这种控制有重复。
方案2: 要求对方遵循http规范: 处理完请求立马断连接应该加上header: Connection: close
方案3: 既然已经知道对方不遵循,对于这种应用,主动改成短连接。

setConnectionReuseStrategy(isKeepalive? DefaultConnectionReuseStrategy.INSTANCE: NoConnectionReuseStrategy.INSTANCE).
					if(!isKeepalive) {
						headers.put(HttpHeaderConstants.Keys.HEADER_CONNECTION, Arrays.asList("close"));
					}

题外话:
(1)对于别人主动关闭自己的情况,自己会处于close wait状态(关闭需要双向关闭,主动关闭,会处于timewait),这个时候除非主动发现并关闭,否则一直处于这个状态,直至tcp层的keepalive,而tcp层默认是2小时。所以可以通过缩短这个时间来做一个保护:

sudo sysctl -w net.ipv4.tcp_keepalive_time=120

(2)文档说check isstable会浪费30ms时间,实际看源码不应该,因为也就阻塞等了1ms,所以用jprofiler测试了下,结果符合预期,基本都在2ms以内,不超过5ms:


问题4: 关于超时设置。

问题: 偶然发现代码setConnectTimeout(2 * 1000)后,竟然有出现超时4s的情况:

现象:

Caused by: java.net.SocketTimeoutException: connect timed out //4s后才超时,而不是设置的2s
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:337)
        at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:141)
        ... 46 more

原因: DNS可能会解析出多笔记录,然后在连接时,挨个尝试。所以会出现超过设置的connection timeout情况。
org.apache.http.impl.conn.DefaultHttpClientConnectionOperator

    @Override
    public void connect(
            final ManagedHttpClientConnection conn,
            final HttpHost host,
            final InetSocketAddress localAddress,
            final int connectTimeout,
            final SocketConfig socketConfig,
            final HttpContext context) throws IOException {
        final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);
        final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
        if (sf == null) {
            throw new UnsupportedSchemeException(host.getSchemeName() +
                    " protocol is not supported");
        }
        final InetAddress[] addresses = host.getAddress() != null ?
                new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName()); //解析出多个记录。
        final int port = this.schemePortResolver.resolve(host);
        for (int i = 0; i < addresses.length; i++) {
            final InetAddress address = addresses[i];
            final boolean last = i == addresses.length - 1;  //判断是不是最后一个

            Socket sock = sf.createSocket(context);
            sock.setSoTimeout(socketConfig.getSoTimeout());
            sock.setReuseAddress(socketConfig.isSoReuseAddress());
            sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
            sock.setKeepAlive(socketConfig.isSoKeepAlive());
            if (socketConfig.getRcvBufSize() > 0) {
                sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
            }
            if (socketConfig.getSndBufSize() > 0) {
                sock.setSendBufferSize(socketConfig.getSndBufSize());
            }

            final int linger = socketConfig.getSoLinger();
            if (linger >= 0) {
                sock.setSoLinger(true, linger);
            }
            conn.bind(sock);

            final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connecting to " + remoteAddress);
            }
            try {
                sock = sf.connectSocket(
                        connectTimeout, sock, host, remoteAddress, localAddress, context);
                conn.bind(sock);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection established " + conn);
                }
                return;  //连上主机后,就退出了
            } catch (final SocketTimeoutException ex) {
                if (last) {
                    throw new ConnectTimeoutException(ex, host, addresses);
                }
            } catch (final ConnectException ex) {
                if (last) { //连不上,假设也是最后一个可以尝试主机了,就退出了。
                    final String msg = ex.getMessage();
                    if ("Connection timed out".equals(msg)) {
                        throw new ConnectTimeoutException(ex, host, addresses);
                    } else {
                        throw new HttpHostConnectException(ex, host, addresses);
                    }
                }
            } catch (final NoRouteToHostException ex) {
                if (last) {
                    throw ex;
                }
            }
            if (this.log.isDebugEnabled()) { //尝试下一个主机
                this.log.debug("Connect to " + remoteAddress + " timed out. " +
                        "Connection will be retried using another IP address");
            }
        }

总结:
1 尽可能熟悉自己所用的开源组件,并了解其版本变更, 有可能的话升级下老版本。
2 升级第三方库要仔细测试,单纯看文档,参考最佳实践完成的代码不见得实际生效,实际情况往往远比理论情况来的复杂。
3 不应该假设所有组件都严格遵循协议,应该以实际实现为准。