Datastax Cassandra Driver Analyst (7)-Data Structure and Codec

这一小节主要讨论两个问题,一是描述Datastax Cassandra Driver为请求和响应定义的的数据结构;二是在收发数据都经过了哪些编解码。

一、数据结构:

driver发送/接收数据的结构分为两种版本:V1和V2共用的版本和V3使用的版本(如下图):

 

 

(1)version, 不解释,V3还是V1/V2

(2)flags, 分两种,使用序列化方式压缩进数据帧内


(header.writeByte(Header.Flag.serialize(frame.header.flags));)

a) TRACING:  Returns whether tracing is enabled for this query or not.

b) COMPRESSED 是否压缩

(3)stream :很关键,标示数据帧的id. 收发一致以保持数据的发送和接收以一个对应起来。

(4)opcode: 编解码方式,例如对于请求,Query类型的opcode是7,从而服务器在接收到Driver发送的请求后,根据7的Requests.Query.coder进行解码,因为明显不同类型的请求的编解码方式是不同的。

 


STARTUP (1, Requests.Startup.coder),
CREDENTIALS (4, Requests.Credentials.coder),
OPTIONS (5, Requests.Options.coder),
QUERY (7, Requests.Query.coder),
PREPARE (9, Requests.Prepare.coder),
EXECUTE (10, Requests.Execute.coder),
REGISTER (11, Requests.Register.coder),
BATCH (13, Requests.Batch.coder),
AUTH_RESPONSE (15, Requests.AuthResponse.coder);

(5) length: 数据帧Body内容的总长度;

(6)body: 例如对于仅有HashMap属性的Credentials请求类型:

body分为2部分:第一部分为:总的长度(值为下图中x,区别于5);第二部分:数据内容(如下)

ds2

总体代码如下:


header.writeByte(frame.header.version.toInt());
header.writeByte(Header.Flag.serialize(frame.header.flags));
writeStreamId(frame.header.streamId, header, protocolVersion);
header.writeByte(frame.header.opcode);
header.writeInt(frame.body.readableBytes());
return ChannelBuffers.wrappedBuffer(header, frame.body);

二、编解码Codec

了解完数据帧的编码后,我们来了解下netty对收发数据的编解码,首先可以从以下代码可以看出收发数据编解码的过程和顺序:


pipeline.addLast("frameDecoder", new Frame.Decoder());
pipeline.addLast("frameEncoder", frameEncoder);

if (compressor != null) {
pipeline.addLast("frameDecompressor", new Frame.Decompressor(compressor));
pipeline.addLast("frameCompressor", new Frame.Compressor(compressor));
}

pipeline.addLast("messageDecoder", messageDecoder);

pipeline.addLast("messageEncoder", messageEncoderFor(protocolVersion));

pipeline.addLast("dispatcher", connection.dispatcher);

根据netty对收发数据处理的流程(如下图)可知:

20130909221201781

 

datastax的转化流程图如下:

其中Frame.Decoder()为LengthFieldBasedFrameDecoder;dispatcher为SimpleChannelUpstreamHandler,其他都为OneToOneDecoder

 

DatastaxDriver-Codec (1)

总结:Driver 对于数据帧的定义方式非常典型:有identify的标记位,有影响后面body处理的压缩等flag,有长度字段和编解码方式字段;同时编解码也挺齐全:有压缩,也有不同请求的封装。

从driver对netty的使用来看,netty确实是强大的网络程序构架框架,相比较自己去一步一步码nio的server/client程序要高效的多。

Datastax Cassandra Driver Analyst (3)-Configuration-ProtocolOptions

ProtocolOptions是与协议有关的一些配置,例如端口,压缩算法,SSL,授权等。具体包括以下6部分。

protocal

(1)port:  要建立连接的cassandra服务器的服务端口,默认9042

(2)sslOptions:是否采用ssl, 默认null不使用。

(3)compression: 传输压缩算法,一共有3种:NONE/SNAPPY/LZ4,默认为NONE,不采用任何压缩算法。

(4)authProvider:授权方式,一共有2种,NONE和PlainTextAuthProvider(用户名+密码),默认不授权。

(5)initialProtocolVersion:协议版本,一共有3种,默认为null,当采用默认值时:

control connection采用V2,

// This happens for the first control connection because the protocol version has not been
// negociated yet.

其他connection采用v3.

V1(“1.2.0”, 1),
V2(“2.0.0”, 2),
V3(“2.1.0”, 3),

协议版本主要有2个功能:

1)在增加Node或初始化加载所有Node时,会检查本地的版本与远程版本的兼容性(如果为null,则默认都兼容),当不兼容时,会忽略Node:

logger.warn(“Detected added or restarted Cassandra host {} but ignoring it since it does not support the version {} of the native ”
+ “protocol which is currently in use. If you want to force the use of a particular version of the native protocol, use ”
+ “Cluster.Builder#usingProtocolVersion() when creating the Cluster instance.”, host, version);

远程版本的获取:

test@cqlsh> select peer,release_version from system.peers;

peer | release_version
—————-+—————–
10.253.160.247 | 2.1.0
10.89.108.186 | 2.1.0
10.140.200.227 | 2.1.1-SNAPSHOT

2)决定了client采用不同的connection pool去处理请求。

(6)maxSchemaAgreementWaitSeconds    让schema一致的最长超时时间,默认10S. 主要通过执行2个CQL,比较schema version然后将结果加入一个set,进而判断set的size<=1以决策是否已经达到了一致。

test@cqlsh> SELECT schema_version FROM system.local WHERE key=’local’;

schema_version

————————————–

ef249049-9b7c-36c1-ad5d-335bdf02410f

 

test@cqlsh> SELECT peer, rpc_address, schema_version FROM system.peers;

peer           | rpc_address    | schema_version

—————-+—————-+————————————–

10.253.160.247 | 10.253.160.247 | ef249049-9b7c-36c1-ad5d-335bdf02410f

10.224.57.165 |  10.224.57.165 | ef249049-9b7c-36c1-ad5d-335bdf02410f

10.253.160.245 | 10.253.160.245 | ef249049-9b7c-36c1-ad5d-335bdf02410f

 

在以下二种情况下,会要求schema做一致性准备:

1)SCHEMA_CHANGE : 如果在超时时间外还不一致,会记录日志:

logger.warn(“No schema agreement from live replicas after {} s. The schema may not be up to date on some nodes.”, configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds());

2)Node Up/Add: 如果在超时时间外还不一致,不做进一步处理。

 

总结:这个选项和开发者关系最大的是授权和SSL启用与否,其他貌似没有太多牵连,直接使用默认值即可。如果不使用默认值,需要综合考虑一些因素,例如是否启用传输压缩要考虑:带宽+压缩速度。