Datastax Cassandra Driver Analyst (10)-how to support dc level failover

当使用DCAwareRoundRobinPolicy策略时,里面暗含了一个dc level failover的有趣功能,当然默认是关闭的,默认关闭应该是出于两种考虑:

(1)如果支持远程dc level failover,那么当整个dc都down时,所有的请求都会到remote dc,那么对于请求的处理肯定很慢,是否能满足应用需求?

(2)当使用local_quram等local一致性级别时,如果支持远程dc level failover,那么临时会打破了一致性级别的约束。

假设我们不在乎这2种情况,想最大化可用性,那么我们就开始要支持dc level failover的support.

(1)如何开启?

如果要开启,则不能使用DCAwareRoundRobinPolicy默认的构造器,而是使用如下构造器:

public DCAwareRoundRobinPolicy(String localDc, int usedHostsPerRemoteDc, boolean allowRemoteDCsForLocalConsistencyLevel)

allowRemoteDCsForLocalConsistencyLevel  允许打破一致性约束。当然对于非local consistency来说则无所谓了。

usedHostsPerRemoteDc  一定要注意到这个是每个远程DC的可使用的used hosts.

实现上,在初始化连接时,就会建立一批数据连接,如果这些参数有配置,例如usedHostsPerRemoteDc  =1,allowRemoteDCsForLocalConsistencyLevel  =true, 则每个DC都会建立1个连接:

通过下面的代码可以看出,将启用这些参数时,所有node划分成了3类,local, remote,ignored,而不启用时,只有local和ignore 2类。


@Override
 public HostDistance distance(Host host) {
 String dc = dc(host);
 if (dc == UNSET || dc.equals(localDc))
 return HostDistance.LOCAL;

CopyOnWriteArrayList<Host> dcHosts = perDcLiveHosts.get(dc);
 if (dcHosts == null || usedHostsPerRemoteDc == 0)
 return HostDistance.IGNORED;

// We need to clone, otherwise our subList call is not thread safe
 dcHosts = cloneList(dcHosts);
 return dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)).contains(host)
 ? HostDistance.REMOTE
 : HostDistance.IGNORED;
 }

然后在初始化cluster连接时,除了ignore都会建立连接。所以使用这些参数就让远程DC也存在了连接,且每个DC都有usedHostsPerRemoteDc个;


// Returns whether there was problem creating the pool
ListenableFuture<Boolean> forceRenewPool(final Host host, ListeningExecutorService executor) {
final HostDistance distance = cluster.manager.loadBalancingPolicy().distance(host);
if (distance == HostDistance.IGNORED) //SO will connect to local and remote nodes
return Futures.immediateFuture(true);

(2)driver如何支持的?

实现上很简单,一旦设置了上面2个参数,query plan里面即会把远程DC的一些结点纳入到考虑范围,如果本地DC的结点全部失败,则开始使用远程DC的结点。


com.datastax.driver.core.policies.DCAwareRoundRobinPolicy.newQueryPlan(String, Statement)
if (remainingLocal > 0) {
remainingLocal--;
int c = idx++ % hosts.size();
if (c < 0) {
c += hosts.size();
}
return hosts.get(c);
}

if (localSuspected == null) {
List<Host> l = perDcSuspectedHosts.get(localDc);
localSuspected = l == null ? Collections.<Host>emptySet().iterator() : l.iterator();
}

while (localSuspected.hasNext()) {
Host h = localSuspected.next();
waitOnReconnection(h);
if (h.isUp())
return h;
}

ConsistencyLevel cl = statement.getConsistencyLevel() == null
? configuration.getQueryOptions().getConsistencyLevel()
: statement.getConsistencyLevel();

if (dontHopForLocalCL && cl.isDCLocal()) //不支持时,到这就结束了,不会尝试远程DC
return endOfData();

if (remoteDcs == null) {
Set<String> copy = new HashSet<String>(perDcLiveHosts.keySet());
copy.remove(localDc);
remoteDcs = copy.iterator();
}

while (true) {
if (currentDcHosts != null && currentDcRemaining > 0) {
currentDcRemaining--;
int c = idx++ % currentDcHosts.size();
if (c < 0) {
c += currentDcHosts.size();
}
return currentDcHosts.get(c);
}

if (currentDcSuspected != null) {
while (currentDcSuspected.hasNext()) {
Host h = currentDcSuspected.next();
waitOnReconnection(h);
if (h.isUp())
return h;
}
}

if (!remoteDcs.hasNext())
break;

String nextRemoteDc = remoteDcs.next();
CopyOnWriteArrayList<Host> nextDcHosts = perDcLiveHosts.get(nextRemoteDc);
if (nextDcHosts != null) {
// Clone for thread safety
List<Host> dcHosts = cloneList(nextDcHosts);
currentDcHosts = dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc));
currentDcRemaining = currentDcHosts.size();
}
List<Host> suspectedList = perDcSuspectedHosts.get(nextRemoteDc);
currentDcSuspected = suspectedList == null ? null : suspectedList.iterator();
}
return endOfData();

 

(3)带来了什么变化?

当开启后,确实能支持整个dc level的failover,而且很强大:DC1挂了数据会发往DC2,DC2挂了数据请求会发往DC3,当然也带来一些额外变化(以下讨论基于usedHostsPerRemoteDc=1):

a  预先建立的数据连接变多了:

cassandra对于数据连接的维护不是on demand或lazy加载的方式来管理,而是直接上来就建立所有以后可能连接的数据连接(但是这个数据连接的结点只是协调者,并不一定是数据存储的结点,假设一个keyspace只存DC2和DC3,但是DC1也会建立连接。)。所以当开启这个功能后,明显TCP常连接多了。具体多的数目是(DC Number-1)*usedHostsPerRemoteDc,与keyspace的存储策略无关。

例如不开启时,存在的连接数是 3(本地DC节点数)+1, 3条数据连接+1条通信连接。

DS-10.224.89.38 - root@hf3dsa002optwebexdsagentconf - Xshell 5_2

而开启后,则存在的连接数是3+1+(额外的每个DC每个usedHostsPerRemoteDc)DS-10.224.89.38 - root@hf3dsa002optwebexdsagentconf - Xshell 5

对于如此多的数据连接的维护,不免产生如下疑问:

(1)假设远程DC的连接结点down了。会切换到其他Node么?

会的。当down时(或者一开始就连接不上),会触发onDown,然后会删除这个连接,然后更新pool的连接,此时会补上另外一个远程DC的连接。


com.datastax.driver.core.SessionManager.onDown(Host)

void onDown(Host host) throws InterruptedException, ExecutionException {

removePool(host).force().get();  //remote it

updateCreatedPools(MoreExecutors.<em>sameThreadExecutor</em>());  //update and add new connection for remote dc.

}

关键log如下:


10:19:50,143 DEBUG Connection:274 - Defuncting connection to /10.194.250.245:9042

com.datastax.driver.core.TransportException: [/10.194.250.245:9042] Channel has been closed  //当连接down时,触发channel chosed方法。

at com.datastax.driver.core.Connection$Dispatcher.channelClosed(Connection.java:750)

&nbsp;

10:19:50,143 DEBUG Cluster:1526 - Host /10.194.250.245:9042 is Suspected

10:19:50,159 DEBUG Connection:449 - Connection[/10.194.250.245:9042-1, inFlight=0, closed=true] closing connection

10:19:51,720 DEBUG Connection:103 - Connection[/10.194.250.245:9042-2, inFlight=0, closed=false] Error connecting to /10.194.250.245:9042 (Connection refused: no further information: /10.194.250.245:9042)

09:25:52,166 DEBUG Cluster:1592 - Host /10.194.250.245:9042 is DOWN

09:25:53,249 DEBUG SingleConnectionPool:84 - Created connection pool to host /10.194.250.156:9042  //切换到新node.

09:25:55,965 DEBUG Cluster:1691 - Failed reconnection to /10.194.250.245:9042 ([/10.194.250.245:9042] Cannot connect), scheduling retry in 2000 milliseconds
//重试down结点。

效果:从10.194.250.245:9042切换到了10.194.250.156:9042

# netstat -nap|grep 9042

tcp        0      0 10.224.89.38:17748          10.224.57.166:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:60873          10.224.57.168:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:35391          10.89.108.187:9042          ESTABLISHED 3104/java  

tcp        0      0 10.224.89.38:27454          10.194.250.245:9042         ESTABLISHED 3104/java  //the down node in dc x

tcp        0      0 10.224.89.38:39537          10.224.57.207:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:60875          10.224.57.168:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:19961          10.224.57.167:9042          ESTABLISHED 3104/java

[root@hf3dsa002 ~]# netstat -nap|grep 9042

tcp        0      0 10.224.89.38:49385          10.194.250.156:9042         ESTABLISHED 3104/java  // other live node in dc x

tcp        0      0 10.224.89.38:17748          10.224.57.166:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:60873          10.224.57.168:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:35391          10.89.108.187:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:39537          10.224.57.207:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:60875          10.224.57.168:9042          ESTABLISHED 3104/java

tcp        0      0 10.224.89.38:19961          10.224.57.167:9042          ESTABLISHED 3104/java

(2)假设远程DC的连接结点down了。这个结点会重试么?

会的,而且直到成功,log可参考以上。


09:25:53,249 DEBUG SingleConnectionPool:84 - Created connection pool to host /10.194.250.156:9042  //切换到new node

09:25:55,965 DEBUG Cluster:1691 - Failed reconnection to /10.194.250.245:9042 ([/10.194.250.245:9042] Cannot connect), scheduling retry in 2000 milliseconds  //重试down node

 

(3)假设远程DC的连接结点down了后也连上了新的node,这个down结点的重试还会继续么?

会,所以引入一个新问题,假设不是日常维护(先关闭后启动),而是真正移除(remove)一个结点,是否有重试,如果存在,是否永远存在,从代码看如果是remove,则不会触发重试,比较down和remove两种处理,down只多了一个启动重试)。但是如果是真的关闭一个node再不启动它,则会永远重试。


com.datastax.driver.core.Cluster.Manager.onDown(Host, boolean, boolean)

logger.debug("{} is down, scheduling connection retries", host);
startPeriodicReconnectionAttempt(host, isHostAddition);

(4)假设远程DC的连接结点down了后又恢复了,会重新连接这个节点么?

不会,也没有必要。从代码来看,调用onup()方法后,并没有创建新的连接。


loadBalancingPolicy().onUp(host);  //将结点增加到DC所对应的结点的list末尾。
controlConnection.onUp(host);

logger.trace("Adding/renewing host pools for newly UP host {}", host);

List<ListenableFuture<Boolean>> futures = new ArrayList<ListenableFuture<Boolean>>(sessions.size());
for (SessionManager s : sessions)
futures.add(s.forceRenewPool(host, poolCreationExecutor));  //貌似有重新连接的过程

//但是实际上:算出node的距离已经不再是remote,而是ignore,因为dcHosts的末尾才是刚up的机器,而usedHostsPerRemoteDc只有1个。所以ignore了。

@Override
public HostDistance distance(Host host) {
String dc = dc(host);
if (dc == UNSET || dc.equals(localDc))
return HostDistance.LOCAL;

CopyOnWriteArrayList<Host> dcHosts = perDcLiveHosts.get(dc);
if (dcHosts == null || usedHostsPerRemoteDc == 0)
return HostDistance.IGNORED;

// We need to clone, otherwise our subList call is not thread safe
dcHosts = cloneList(dcHosts);
return dcHosts.subList(0, Math.min(dcHosts.size(), usedHostsPerRemoteDc)).contains(host)
? HostDistance.REMOTE
: HostDistance.IGNORED;
}

然而问题的关键是:确实后来没有再创建连接,但是开始重试的时候不就已经创建一个连接了?正常的情况下,某个结点挂了,然后重试到最后成功了,那即使以后不用这个连接,这个连接本身应该在,因为其来自重试,这个连接去哪了?实际代码中可以看出这个连接在重试成功后就立马关闭了,如果不关闭,就会打破usedHostsPerRemoteDC的要求:


protected Connection tryReconnect() throws ConnectionException, InterruptedException, UnsupportedProtocolVersionException, ClusterNameMismatchException {

 return connectionFactory.open(host);
 }

protected void onReconnection(Connection connection) {
 // We don't use that first connection so close it.
 // TODO: this is a bit wasteful, we should consider passing it to onAdd/onUp so
 // we use it for the first HostConnectionPool created
 connection.closeAsync();

 

 总结:

从上面可以看出:

(1)支持强大的功能后,带来的影响是会事先会建立更多的TCP常连接,多的具体数目是(DC Number-1)*usedHostsPerRemoteDc。所以这个和所在cassandra cluster的规模息息相关。

(2)重试直至成功的这种方式不能适应“不通过标准remove来实现的down且不再up“这种情况,遇到这种情况,除非重启app,否则最终会以10分钟的频率重试下去。

(3)开启dc level failover后,是否能满足性能需求,如果不能,和一个挂掉的系统又有何区别。

所以最好设置一个flag来关闭这个功能,以避免以上3个问题的不可控。

 

 

Datastax Cassandra Driver Analyst (9)-understand contact points

本节主要解决以下几个问题:

(1)contact point是如何工作的?

(2)如何配置contact point?

例如假设在DCAwareRoundRobinPolicy开启“可使用远程DC”,是否需要配置远程DC的结点作为contact point.

1 contact point 如何工作?

contact的作用是获取cassandra的cluster的结点、表等所有信息,相当于“通信兵”的角色。通过代码阅读可知,cluster在初始化中使用contact point做了以下几件事情:

a   acquire contact points. Then try one-by-one until success.(before success one, the failed one will retry forever, after success, it won’t retry others)

b   use contact point to Registering for events:


 List<ProtocolEvent.Type> evs = Arrays.asList(

 ProtocolEvent.Type. TOPOLOGY_CHANGE,

 ProtocolEvent.Type. STATUS_CHANGE,

 ProtocolEvent.Type. SCHEMA_CHANGE

 );

 

c  use contact point to Refreshing schema and Refreshing node list and token map

d  use contact point to get all hosts.

因此其作用相当于开启cassandra大门的”钥匙“,考虑以下几种情况:

(1)仅配置1个结点,则启动时,假设这个结点不能正常工作,则启动不了:


 com.datastax.driver.core.Cluster.Manager.init()

 catch (NoHostAvailableException e) { //no control point connected
 close();
 throw e;
 }

 Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.224.57.163:9042 (com.datastax.driver.core.TransportException: [/10.224.57.163:9042] Cannot connect))

 at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)

 at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)

 at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1230)

 at com.datastax.driver.core.Cluster.init(Cluster.java:157)

 at com.datastax.driver.core.Cluster.connect(Cluster.java:245)

 at com.datastax.driver.core.Cluster.connect(Cluster.java:278)

 

(2)假设配置2+以上结点,当第一个可以工作时,会继续连接后面配置的么?

答案是不会,代码如下:


 com.datastax.driver.core.ControlConnection.reconnectInternal(Iterator<Host>, boolean)

  while (iter.hasNext()) {

 host = iter.next();

  try {

  return tryConnect(host, isInitialConnection);   //try one be one

 }  catch (ConnectionException e) {

 errors = logError(host, e, errors, iter);

  if (isInitialConnection) {

 // Mark the host down right away so that we don't try it again during the initialization process.

 // We don't call cluster.triggerOnDown because it does a bunch of other things we don't want to do here (notify LBP, etc.)

 host.setDown();

 cluster.startPeriodicReconnectionAttempt(host,  true); //重试

 }

 }

 

(3)假设配置了2+结点,其中第二个可以工作,第一个不能正常工作,第一个会重试么?

答案是会的。参考(2)代码,当连不上时,会重试,重试策略是根据配置的,默认如下:


  private static final ReconnectionPolicy  DEFAULT_RECONNECTION_POLICY  =  new ExponentialReconnectionPolicy(1000, 10 * 60 * 1000);  //从1S开始重试,然后2,4,8等等,最终重试到10分钟,然后保持10分钟重试一次的节奏。

 

重试后,假设第一个恢复了,会启用它么?不会:


protected void onReconnection(Connection connection) {
// We don't use that first connection so close it.
// TODO: this is a bit wasteful, we should consider passing it to onAdd/onUp so
// we use it for the first HostConnectionPool created
connection.closeAsync();
// Make sure we have up-to-date infos on that host before adding it (so we typically
// catch that an upgraded node uses a new cassandra version).
if (controlConnection.refreshNodeInfo(host)) {
logger.debug("Successful reconnection to {}, setting host UP", host);
try {
if (isHostAddition)
onAdd(host);
else
onUp(host);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("Unexpected error while setting node up", e);
}
} else {
logger.debug("Not enough info for {}, ignoring host", host);
}
}

所以通过以上三个问题的分析,可知通信结点,不管配置多少,最终只会有一个通信连接Control connection。那么引出问题4,:

(4)假设通信连接挂了,会如何?

按照常理,如果通信结点挂了,那么会使用配置的其他的通信结点,实际上从代码中可以看出,完全不是如此,而是使用了query plan中的结点。query plan是什么?是和loadbalance绑定的一批结点。例如假设使用DCAwareRoundRobinPolicy,且指定DC1是本地DC,那么query plan中的结点是DC1的所有节点,而如果这个DCAwareRoundRobinPolicy开启了远程DC允许,且允许数目是1,则query plan还包括其他所有DC中的:每个DC中连接1个。之所以称为query plan是其含有多个结点以便于重试。

所以当通信结点挂了时,通信结点的任务会转交给load balance策略中的结点,即负责处理请求的”协调者“。


com.datastax.driver.core.Connection.Dispatcher.channelClosed(ChannelHandlerContext, ChannelStateEvent) //trigger it when connection down.

 com.datastax.driver.core.ControlConnection.backgroundReconnect(long)

 protected Connection tryReconnect() throws ConnectionException {
 try {
 return reconnectInternal(queryPlan(), false);  //will use nodes in query plan
 } catch (NoHostAvailableException e) {
 throw new ConnectionException(null, e.getMessage());
 } catch (UnsupportedProtocolVersionException e) {
 // reconnectInternal only propagate those if we've not decided on the protocol version yet,
 // which should only happen on the initial connection and thus in connect() but never here.
 throw new AssertionError();
 }
 }

 

(5)通信连接会复用处理请求的连接么?

不会,例如下图:10.224.57.168存在2个连接,其中1个是通信连接,一个是数据连接。

12344

(6)配置的一批通信结点中,最前面的会先尝试么?

不一定,这个问题比较让人无语,但不管是从代码还是实际测试都是如此。

因为在具体连接时,使用的通信结点的信息是转化过后的concurrent hash map的values(),而不是一个有序的list.。反过来看,cassandra  driver真的没有对contact point进行优选的过程。

 

结论:

通过以上概要分析,可以得出以下几点关键信息:

(1)不管contact point配置多少个,最终只有一个control connection;

(2)DCAwareRoundRobinPolicy中开启远程DC读(allowRemoteDCsForLocalConsistencyLevel),不需要让contact point增加一个远程DC的结点配置:因为假设通信结点所在的DC全部down,则query plan会适应这种变化(将以后的所有的请求转往其他DC),而通信结点正好使用的也是query plan来重建连接,所以自然OK.

(需要说明的是:如果需要在本DC结点全部都挂掉的情况下,仍然可以启动起来,那么真的需要配置一个远程DC的结点作为通信结点,否则压根启动不了,但是这种情况下确定需要启动起来么?如果启动起来或许掩盖了后续问题?)

可知理论上contact point配置的越多越好,但是从另外一个角度说太多(真的太多)也无意义,因为即使配置的少点,只要有一个能工作,让cassandra启动起来,则以后的各种情景都不会存在问题,因为随着各种down/up的切换,最终cassandra可以完全不使用配置的任何contact point.

综上:从代码阅读和一些测试结果来看,contact point配不配远程DC node从现在的driver实现来看已成悖论:配置的话,有可能让其成为通信连接(因为不会因为配置在后面,就最后一个尝试),node change,schema change可能稍慢处理(好在这些数据对适时性要求不是特别高)。不配,在本地DC全部node down时,应用启动不起来。

所以结论是:如果希望本DC结点全部都不work时,还能启动起来,那么需要至少配置一个远程DC Node作为通信结点。否则全部配置本DC结点吧,越多越好。

 

PS:  一直觉得不优选contact point是一个可以优化的地方,翻阅了github上的代码历史,在以后的版本中,不仅没有优化这个,而且会故意完全打乱contact point,从driver设计者的角度来看是为了避免多个connect()启动时都会用某一个从而产生hotspot.

参考:https://github.com/datastax/java-driver/commit/e6b28bcca1882b198064594d82696fc247ffac1f