Cassandra之Token

有一个多月没有更新过blog了,有点惭愧。不管何种理由,不管工作生活有何种变动,有一些我们内心真正追求的东西,不能放弃。昨天晚上,世界杯大幕拉开,在等待揭幕战的过程中,看了一段Cassandra关于dht部分的源代码。要在生产系统中运维,则数据如何分布不得不做周详细致的考虑。

将Cassandra用于实际的生成环境,一个必须要考虑的关键问题是Token的选择。Token决定了每个节点存储的数据的分布范围,每个节点保存的数据的key在(前一个节点Token,本节点Token]的半开半闭区间内,所有的节点形成一个首尾相接的环,所以第一个节点保存的是大于最大Token小于等于最小Token之间的数据。

根据采用的分区策略的不同,Token的类型和设置原则也有所不同。 Cassandra (0.6版本)本身支持三种分区策略:

RandomPartitioner:随机分区是一种hash分区策略,使用的Token是大整数型(BigInteger),范围为0~2^127,因此极端情况下,一个采用随机分区策略的Cassandra集群的节点可以达到2^127+1个节点。嗯,为什么是2^127?因为Cassandra采用了MD5作为hash函数,其结果是128位的整数值(其中一位是符号位,Token取绝对值为结果)。采用随机分区策略的集群无法支持针对Key的范围查询。假如集群有N个节点,每个节点的hash空间采取平均分布的话,那么第i个节点的Token可以设置为:

 i * ( 2 ^ 127 / N )

下面的测试程序是从org.apache.cassandra.utils.FBUtilities类抽取出来的计算MD5值的函数,输入任何字符都可以得到其对应的MD5的整数值,利用该值和节点的Token对比即可知道该Key对应的数据归属于哪个节点:

import java.io.*;
import java.util.*;
import java.math.BigInteger;
import java.security.MessageDigest;

class get_md5{
  static final Scanner cin=new Scanner(System.in);

  public static byte[] hash(String type, byte[]... data){
    byte[] result = null;
    try{
      MessageDigest messageDigest = MessageDigest.getInstance(type);
      for(byte[] block : data)
        messageDigest.update(block);
      result = messageDigest.digest();
    }
    catch (Exception e){
      throw new RuntimeException(e);
    }
    return result;
  }

  public static BigInteger hash(String data){
    byte[] result = hash("MD5", data.getBytes());
    BigInteger hash = new BigInteger(result);
    return hash.abs();
  }

  public static void main(String[] args){
    while(cin.hasNext()){
      String str1=cin.next();
      BigInteger a= hash(str1);
      System.out.println(a);
    }
  }
}
D:>java get_md5
ningoo
100335222541762605209205022078301814192
江枫
48295316926871024838894171432474082043

OrderPreservingPartitioner:如果要支持针对Key的范围查询,那么可以选择这种有序分区策略。该策略采用的是字符串类型的Token。每个节点的具体选择需要根据Key的情况来确定。如果没有指定InitialToken,则系统会使用一个长度为16的随机字符串作为Token,字符串包含大小写字符和数字。

CollatingOrderPreservingPartitioner:和OrderPreservingPartitioner一样是有序分区策略。只是排序的方式不一样,采用的是字节型Token,支持设置不同语言环境的排序方式,代码中默认是en_US。

分区策略和每个节点的Token(Initial Token)都可以在storage-conf.xml配置文件中设置:

<Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>
<InitialToken>10633823966279300000000000000000000000</InitialToken>

节点初始化完成以后,Token值做为元数据会保留在system keyspace中,每次启动会以该值为准,即使再改动配置文件中的InitialToken也不会产生任何影响。

Saved Token found: 10633823966279300000000000000000000000

通过nodetool的ring命令,可以查看集群各个节点的Token,这些Token值最好备份下来,当出现节点彻底顺坏时,可以重新设置同样的Token,确保数据分布可以不受节点损坏的影响。

 nodetool -h test ring
Address       Status     Load          Range                                      Ring
                                       85070591730234600000000000000000000000
192.168.0.1 Up         0 bytes       10633823966279300000000000000000000000     |<--|
192.168.0.2 Up         0 bytes       85070591730234600000000000000000000000     |-->|

PS: 在我的0.6.2的一个测试集群中,使用nodetool时不小心连到了9160端口,结果每次都会把节点搞挂,百试百灵。而且直接telnet到9160端口,随便发送个字符,也会把节点搞崩溃。不知道是我的测试环境的原因,还是Thrift有bug,这样节点的健壮性就有问题了,这个端口只能接受协议格式内的信息。对Java和Thrift都不太了解,把这个问题抛出来,希望有大牛能帮忙找到原因。

Update:之前贴的nodetool错连9160端口的报错可能有点误导大家,因为jmx用的默认的8080端口,连9160端口jmx报错是正常的,问题是节点不应该崩溃的。看了/var/log/cassandra/system.log中记录的节点错误信息,报的是OOM,Cassandra的java进程都消失了。调整了一下jvm参数,将heap的最小内存从默认的256MB设置到1G(-Xms1G),还是有同样的问题。另外,我的java环境是jre1.6.0_18。

ERROR [pool-1-thread-1] 2010-06-12 16:49:40,459 CassandraDaemon.java (line 78)
Fatal exception in thread Thread[pool-1-thread-1,5,main]
java.lang.OutOfMemoryError: Java heap space
        at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
        at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:203)
        at org.apache.cassandra.thrift.Cassandra$Processor.process(Cassandra.java:1113)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:253)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Google了一把这个错误,也有人碰到过,并且发现Thrift确实有类似的bug:
https://issues.apache.org/jira/browse/THRIFT-601

参考文档:
http://wiki.apache.org/cassandra/Operations

Cassandra Commitlog

上一篇blog中,大致介绍了一下Cassandra的存储机制,通过将最新的写操作放在内存中的Memtable,然后定期刷新到磁盘持久化为SSTable,Cassandra将随机写操作转换成了顺序写操作,这可以提升IO性能。

最新写入的脏数据是在内存Memtable表中,因此必须有机制来确保异常情况下,能够将内存中的数据恢复出来。和关系型数据库系统一样,Cassandra也是采用的先写日志再写数据的方式,其日志称之为Commitlog。

和Memtable/SSTable不一样的是,Commitlog是server级别的,不是Column Family级别的。每个Commitlog文件的大小是固定的,称之为一个Commitlog Segment,目前版本(0.5.1)中,这个大小是128MB,这是硬编码在代码(src\java\org\apache\cassandra\db\Commitlog.java)中的。当一个Commitlog文件写满以后,会新建一个的文件。当旧的Commitlog文件不再需要时,会自动清除。

每个Commitlog文件(Segment)都有一个固定大小(大小根据Column Family的数目而定)的CommitlogHeader结构,其中有两个重要的数组,每一个Column Family在这两个数组中都存在一个对应的元素。其中一个是位图数组(BitSet dirty),如果Column Family对应的Memtable中有脏数据,则置为1,否则为0,这在恢复的时候可以指出哪些Column Family是需要利用Commitlog进行恢复的。另外一个是整数数组(int[] lastFlushedAt),保存的是Column Family在上一次Flush时日志的偏移位置,恢复时则可以从这个位置读取Commitlog记录。通过这两个数组结构,Cassandra可以在异常重启服务的时候根据持久化的SSTable和Commitlog重构内存中Memtable的内容,也就是类似Oracle等关系型数据库的实例恢复。

当Memtable flush到磁盘的SStable时,会将所有Commitlog文件的dirty数组对应的位清零,而在Commitlog达到大小限制创建新的文件时,dirty数组会从上一个文件中继承过来。如果一个Commitlog文件的dirty数组全部被清零,则表示这个Commitlog在恢复的时候不再需要,可以被清除。因此,在恢复的时候,所有的磁盘上存在的Commitlog文件都是需要的。

参考文章:
[1].http://wiki.apache.org/cassandra/ArchitectureCommitLog

Cassandra存储机制

在2009年兴起的NoSQL运动中,Cassandra是其中重要的一个分布式key-value数据库产品,由Facebook在2008年开源,目前是Apache的顶级项目。最近twitter的一篇声明,表示将从MySQL迁移到Cassandra,更让其声名大振。Cassandra是结合了Google Bigtable的数据模型和Amazon Dynamo高可用框架的一个产品。其数据模型可以参考张瑞的blog

值得说一下的是Cassandra的存储机制,也是借鉴了Bigtable的设计,采用Memtable和SSTable的方式。和关系数据库一样,Cassandra在写数据之前,也需要先记录日志,称之为commitlog,然后数据才会写入到Column Family对应的Memtable中,并且Memtable中的内容是按照key排序好的。Memtable是一种内存结构,满足一定条件后批量刷新到磁盘上,存储为SSTable。这种机制,相当于缓存写回机制(Write-back Cache),优势在于将随机IO写变成顺序IO写,降低大量的写操作对于存储系统的压力。SSTable一旦完成写入,就不可变更,只能读取。下一次Memtable需要刷新到一个新的SSTable文件中。所以对于Cassandra来说,可以认为只有顺序写,没有随机写操作。

因为SSTable数据不可更新,可能导致同一个Column Family的数据存储在多个SSTable中,这时查询数据时,需要去合并读取Column Family所有的SSTable和Memtable,这样到一个Column Family的数量很大的时候,可能导致查询效率严重下降。因此需要有一种机制能快速定位查询的Key落在哪些SSTable中,而不需要去读取合并所有的SSTable。Cassandra采用的是Bloom Filter算法,通过多个hash函数将key映射到一个位图中,来快速判断这个key属于哪个SSTable。关于Bloom Filter,有兴趣的可以去看看参考文章4,5和6。

为了避免大量SSTable带来的性能影响,Cassandra也提供一种定期将多个SSTable合并成一个新的SSTable的机制,因为每个SSTable中的key都是已经排序好的,因此只需要做一次合并排序就可以完成该任务,代价还是可以接受的。所以在Cassandra的数据存储目录中,可以看到三种类型的文件,格式类似于:

  • Column Family Name-序号-Data.db
  • Column Family Name-序号-Filter.db
  • Column Family Name-序号-index.db

其中Data.db文件是SSTable数据文件,SSTable是Sorted Strings Table的缩写,按照key排序后存储key/value键值字符串。index.db是索引文件,保存的是每个key在数据文件中的偏移位置,而Filter.db则是Bloom Filter算法生产的映射文件。

参考文章:
[1].http://wiki.apache.org/cassandra/ArchitectureOverview
[2].http://wiki.apache.org/cassandra/MemtableSSTable
[3].http://wiki.apache.org/cassandra/ArchitectureSSTable
[4].http://blog.csdn.net/jiaomeng/archive/2007/01/27/1495500.aspx
[5].http://www.hellodba.net/2009/04/bloom_filter.html
[6].http://www.googlechinablog.com/2007/07/bloom-filter.html
[7].http://labs.google.com/papers/bigtable.html