Datastax Cassandra Driver Analyst (4)-Configuration-TimestampGenerator

TimestampGenerator主要是标示请求的先后顺序,分本地和远程产生两种方式。

功能

功能是为了resolve conflicts,也为了识别数据新旧等。API DOC如下:

* Generates client-side, microsecond-precision query timestamps.
* Given that Cassandra uses those timestamps to resolve conflicts, implementations should generate
* incrementing timestamps for successive implementations.

 

查询timestamp可以使用下列CQL语法:writetime()

select id,writetime(id),ttl(id) FROM table;

分类

timestamp

 

Monotonic的意思可以参考网上找的一篇文章:

clock-realtime

代表机器上可以理解为当前的我们所常看的时间,其当time-of-day 被修改的时候而改变,这包括NTP对它的修改。

clock-monotonic

代表从过去某个固定的时间点开始的绝对的逝去时间,它不受任何系统time-of-day时钟修改的影响。
一共分为3种:
ServerSideTimestampGenerator:  客户端不产生,让服务器端自己产生维护;
AtomicMonotonicTimestampGenerator:客户端产生:使用的是CAS乐观锁;
private AtomicLong lastRef = new AtomicLong(0);

@Override
public long next() {
while (true) {
long last = lastRef.get();
long next = computeNext(last);
if (lastRef.compareAndSet(last, next))
return next;
}
}
ThreadLocalMonotonicTimestampGenerator:客户端产生:使用的是thread local产生.
  private final ThreadLocal<Long> lastRef = new ThreadLocal<Long>();

 @Override
 public long next() {
 Long last = this.lastRef.get();
 if (last == null)
 last = 0L;

 long next = computeNext(last);

 this.lastRef.set(next);
 return next;
 }
其中后两者复用如下代码:即在1ms之内能区分1000个请求,如果超过1000就区分不了。

protected long computeNext(long last) {
long millis = last / 1000;
long counter = last % 1000;

long now = clock.currentTime();

// System.currentTimeMillis can go backwards on an NTP resync, hence the ">" below
if (millis >= now) {
if (counter == 999)
logger.warn("Sub-millisecond counter overflowed, some query timestamps will not be distinct");
else
counter += 1;
} else {
millis = now;
counter = 0;
}

return millis * 1000 + counter;
}
使用
(1)组装请求:
 使用TimeStamp的代码:
long defaultTimestamp = Long.MIN_VALUE;
if (cluster.manager.protocolVersion().compareTo(ProtocolVersion.V3) >= 0) {
defaultTimestamp = statement.getDefaultTimestamp();
if (defaultTimestamp == Long.MIN_VALUE)
defaultTimestamp = cluster.getConfiguration().getPolicies().getTimestampGenerator().next();
}
可见:
 1)使用的协议版本要>=V3;
 2)   假设没有为statement设置timestamp,那么默认为 private volatile long defaultTimestamp = Long.MIN_VALUE; 然后会使用默认TimeStampGenerator去产生timestamp.即让server端自己产生。
public static TimestampGenerator defaultTimestampGenerator() {
return ServerSideTimestampGenerator.INSTANCE;
}

(2)编码请求:

 

第一步:初始化请求


 if (defaultTimestamp != Long.MIN_VALUE)
 flags.add(QueryFlag.DEFAULT_TIMESTAMP);

第二步:编码PDU

case V3:
 if (version == ProtocolVersion.V3 && flags.contains(QueryFlag.DEFAULT_TIMESTAMP))
 dest.writeLong(defaultTimestamp);

总结:
1 V3版本协议是可以为单独的Statement设置timestamp的;
com.datastax.driver.core.Statement.setDefaultTimestamp(long)
2 V3和V3之后的版本,如果不单独设置,默认采用的是让server自己产生的方式。
3 V3之前的版本client不做任何处理,让Server自己产生。
4 timestamp可以帮助跟踪某数据是否被更新。

 

Datastax Cassandra Driver Analyst (3)-Configuration-ProtocolOptions

ProtocolOptions是与协议有关的一些配置,例如端口,压缩算法,SSL,授权等。具体包括以下6部分。

protocal

(1)port:  要建立连接的cassandra服务器的服务端口,默认9042

(2)sslOptions:是否采用ssl, 默认null不使用。

(3)compression: 传输压缩算法,一共有3种:NONE/SNAPPY/LZ4,默认为NONE,不采用任何压缩算法。

(4)authProvider:授权方式,一共有2种,NONE和PlainTextAuthProvider(用户名+密码),默认不授权。

(5)initialProtocolVersion:协议版本,一共有3种,默认为null,当采用默认值时:

control connection采用V2,

// This happens for the first control connection because the protocol version has not been
// negociated yet.

其他connection采用v3.

V1(“1.2.0”, 1),
V2(“2.0.0”, 2),
V3(“2.1.0”, 3),

协议版本主要有2个功能:

1)在增加Node或初始化加载所有Node时,会检查本地的版本与远程版本的兼容性(如果为null,则默认都兼容),当不兼容时,会忽略Node:

logger.warn(“Detected added or restarted Cassandra host {} but ignoring it since it does not support the version {} of the native ”
+ “protocol which is currently in use. If you want to force the use of a particular version of the native protocol, use ”
+ “Cluster.Builder#usingProtocolVersion() when creating the Cluster instance.”, host, version);

远程版本的获取:

test@cqlsh> select peer,release_version from system.peers;

peer | release_version
—————-+—————–
10.253.160.247 | 2.1.0
10.89.108.186 | 2.1.0
10.140.200.227 | 2.1.1-SNAPSHOT

2)决定了client采用不同的connection pool去处理请求。

(6)maxSchemaAgreementWaitSeconds    让schema一致的最长超时时间,默认10S. 主要通过执行2个CQL,比较schema version然后将结果加入一个set,进而判断set的size<=1以决策是否已经达到了一致。

test@cqlsh> SELECT schema_version FROM system.local WHERE key=’local’;

schema_version

————————————–

ef249049-9b7c-36c1-ad5d-335bdf02410f

 

test@cqlsh> SELECT peer, rpc_address, schema_version FROM system.peers;

peer           | rpc_address    | schema_version

—————-+—————-+————————————–

10.253.160.247 | 10.253.160.247 | ef249049-9b7c-36c1-ad5d-335bdf02410f

10.224.57.165 |  10.224.57.165 | ef249049-9b7c-36c1-ad5d-335bdf02410f

10.253.160.245 | 10.253.160.245 | ef249049-9b7c-36c1-ad5d-335bdf02410f

 

在以下二种情况下,会要求schema做一致性准备:

1)SCHEMA_CHANGE : 如果在超时时间外还不一致,会记录日志:

logger.warn(“No schema agreement from live replicas after {} s. The schema may not be up to date on some nodes.”, configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds());

2)Node Up/Add: 如果在超时时间外还不一致,不做进一步处理。

 

总结:这个选项和开发者关系最大的是授权和SSL启用与否,其他貌似没有太多牵连,直接使用默认值即可。如果不使用默认值,需要综合考虑一些因素,例如是否启用传输压缩要考虑:带宽+压缩速度。