RT

LRU原理

LRU 是一种经典的缓存淘汰策略,其原理以及实现可以查看我之前的博客LRU Cache 解析及实现。本文主要解析Leveldb LRU Cache。

Leveldb 实现

Leveldb 实现的LRUCache 使用自己实现的简单hashtable存储键值对,循环双链表记录每个元素的访问时间,为了提升多线程环境下的读写性能,Leveldb 内部使用LRUCache 数组对外提供服务。

Cache Entry

struct LRUHandle {
  void* value;
  void (*deleter)(const Slice&, void* value);
  // next_hash 代表在hash桶中下一个元素的位置
  LRUHandle* next_hash;
  // 双链表中下个元素的位置
  LRUHandle* next;
  // 双链表中上个元素的位置
  LRUHandle* prev;
  size_t charge;      // TODO(opt): Only allow uint32_t?
  size_t key_length;
  // 使用次数
  uint32_t refs;
  // key 的hash值用于在LRUCache数组以及hash表中定位
  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
  char key_data[1];   // Beginning of key

  Slice key() const {
    // For cheaper lookups, we allow a temporary Handle object
    // to store a pointer to a key in "value".
    if (next == this) {
      return *(reinterpret_cast<Slice*>(value));
    } else {
      return Slice(key_data, key_length);
    }
  }
};

简单hash表

Leveldb LRU Cache 实现了线程安全的拉链式的hashtable

class HandleTable {
 public:
  HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
  ~HandleTable() { delete[] list_; }
  // 查找,直接调用FindPointer方法
  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
    return *FindPointer(key, hash);
  }
 // 插入,使用FindPointer方法找到插入位置,在hashtable 对位位置插入,当hashtable 中的元素个数大于桶的个数时候触发hashtable 的扩容
  LRUHandle* Insert(LRUHandle* h) {
    LRUHandle** ptr = FindPointer(h->key(), h->hash);
    LRUHandle* old = *ptr;
    h->next_hash = (old == NULL ? NULL : old->next_hash);
    *ptr = h;
    if (old == NULL) {
      ++elems_;
      if (elems_ > length_) {
        // Since each cache entry is fairly large, we aim for a small
        // average linked list length (<= 1).
        Resize();
      }
    }
    return old;
  }
// 删除hashtable中的元素,同样使用FindPointer方法定位到在hashtable 中的位置
  LRUHandle* Remove(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = FindPointer(key, hash);
    LRUHandle* result = *ptr;
    if (result != NULL) {
      *ptr = result->next_hash;
      --elems_;
    }
    return result;
  }

 private:
  // The table consists of an array of buckets where each bucket is
  // a linked list of cache entries that hash into the bucket.
  uint32_t length_;
  uint32_t elems_;
  LRUHandle** list_;

  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  // 找到hash 元素的地址
  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
	//找到hash 桶的位置,由于length_是2的指数次幂,所以使用 按位与替换mod操作进行加速
    LRUHandle** ptr = &list_[hash & (length_ - 1)];
    // 遍历hash桶中的单链表直到找到相应节点或单链表结束
    while (*ptr != NULL &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }
  // hash 表扩容
  void Resize() {
	// 每次扩容都是翻倍,保证hash桶的个数是2的指数次幂
    uint32_t new_length = 4;
    while (new_length < elems_) {
      new_length *= 2;
    }
    // 创建一个新的hashtable
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);
    uint32_t count = 0;
    // 逐个桶迁移到新的hashtable
    for (uint32_t i = 0; i < length_; i++) {
      LRUHandle* h = list_[i];
      while (h != NULL) {
        LRUHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    // 释放掉老的hashtable
    delete[] list_;
    // 替换为新的hashtable
    list_ = new_list;
    length_ = new_length;
  }
};

单个LRUCache实现

LRUCache 类声明

class LRUCache {
 public:
  LRUCache();
  ~LRUCache();

  // Separate from constructor so caller can easily make an array of LRUCache
  void SetCapacity(size_t capacity) { capacity_ = capacity; }

  // Like Cache methods, but with an extra "hash" parameter.
  Cache::Handle* Insert(const Slice& key, uint32_t hash,
                        void* value, size_t charge,
                        void (*deleter)(const Slice& key, void* value));
  Cache::Handle* Lookup(const Slice& key, uint32_t hash);
  void Release(Cache::Handle* handle);
  void Erase(const Slice& key, uint32_t hash);
  void Prune();
  size_t TotalCharge() const {
    MutexLock l(&mutex_);
    return usage_;
  }

 private:
  void LRU_Remove(LRUHandle* e);
  void LRU_Append(LRUHandle* e);
  void Unref(LRUHandle* e);

  // Initialized before use.
  // LRUCache 存储的元素个数,超过此个数触发淘汰机制
  size_t capacity_;

  // mutex_ protects the following state.
  mutable port::Mutex mutex_;
  size_t usage_;

  // Dummy head of LRU list.
  // lru.prev is newest entry, lru.next is oldest entry.
  // 按照访问时间顺序存储的双向循环链表
  LRUHandle lru_;
  // 自己实现的拉链式的hash table
  HandleTable table_;
};

LRUCache 插入

Cache::Handle* LRUCache::Insert(
    const Slice& key, uint32_t hash, void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value)) {
  MutexLock l(&mutex_);
  // 构造一个新的LRUHandle 对象
  LRUHandle* e = reinterpret_cast<LRUHandle*>(
      malloc(sizeof(LRUHandle)-1 + key.size()));
  e->value = value;
  e->deleter = deleter;
  e->charge = charge;
  e->key_length = key.size();
  e->hash = hash;
  e->refs = 2;  // One from LRUCache, one for the returned handle
  memcpy(e->key_data, key.data(), key.size());
  // 插入到双向链表表头的位置
  LRU_Append(e);
  usage_ += charge;

  LRUHandle* old = table_.Insert(e);
  // 替换掉LRUCache中重复的元素
  if (old != NULL) {
    LRU_Remove(old);
    Unref(old);
  }
  // 触发LRU 淘汰
  while (usage_ > capacity_ && lru_.next != &lru_) {
    // 淘汰双向链表最后的元素
    LRUHandle* old = lru_.next;
    // 删除双向链表中的元素
    LRU_Remove(old);
    // 删除hashtable中的元素
    table_.Remove(old->key(), old->hash);
    Unref(old);
  }
  // 返回刚刚插入的Handle对象,此对象引用计数为2
  return reinterpret_cast<Cache::Handle*>(e);
}

LRUCache 删除

void LRUCache::Erase(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  // 删除hashtable中对应的元素,
  // 如果该元素存在在hashtable中则在双向链表中进行删除
  LRUHandle* e = table_.Remove(key, hash);
  if (e != NULL) {
    LRU_Remove(e);
    Unref(e);
  }
}

LRUCache Prune

删除LRUCache中ref为1 的元素,最初插入的元素的引用为2,外部消费者对使用结束的元素ref减一。LRUCache中ref 为1 的handle表示此元素只在LRUCache中出现,外部并不使用。此方法的作用就是删除这样的元素。

void LRUCache::Prune() {
  MutexLock l(&mutex_);
  for (LRUHandle* e = lru_.next; e != &lru_; ) {
    LRUHandle* next = e->next;
    if (e->refs == 1) {
      table_.Remove(e->key(), e->hash);
      LRU_Remove(e);
      Unref(e);
    }
    e = next;
  }
}

LRU_Remove

删除双向链表中一个节点

void LRUCache::LRU_Remove(LRUHandle* e) {
  e->next->prev = e->prev;
  e->prev->next = e->next;
}

LRU_Append

插入到双向链表表头位置

void LRUCache::LRU_Append(LRUHandle* e) {
  // Make "e" newest entry by inserting just before lru_
  e->next = &lru_;
  e->prev = lru_.prev;
  e->prev->next = e;
  e->next->prev = e;
}

ShardedLRUCache

static const int kNumShardBits = 4;
// 16个 lru cache,目的在多线程的环境下,每个线程的访问都会锁住缓冲区
// 如果缓冲区比较大的话, 多线程情况下的性能比较差,
// 所以使用hash 的方式将一块cache 缓冲区 划分为多个小块的缓冲区
static const int kNumShards = 1 << kNumShardBits;
class ShardedLRUCache : public Cache {
 private:
  // LRUCache 数组,每个LRUCache对象使用自己的锁
  LRUCache shard_[kNumShards];
  // 这个锁只保护id
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }
  // Shard 函数,根据key计算的hash 值定位LRUCache数组的位置
  static uint32_t Shard(uint32_t hash) {
    // 不需要使用mod运算,此数值一定是在0~kNumShards这个区间
    return hash >> (32 - kNumShardBits);
  }

 public:
  explicit ShardedLRUCache(size_t capacity)
      : last_id_(0) {
    // 向上取整,为每个LRUCache 分配容量
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
    for (int s = 0; s < kNumShards; s++) {
      shard_[s].SetCapacity(per_shard);
    }
  }
  virtual ~ShardedLRUCache() { }
  // 插入时,先使用Shard方法定位到相应的LRUCache
  virtual Handle* Insert(const Slice& key, void* value, size_t charge,
                         void (*deleter)(const Slice& key, void* value)) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
  }
  virtual Handle* Lookup(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Lookup(key, hash);
  }
  virtual void Release(Handle* handle) {
    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
    shard_[Shard(h->hash)].Release(handle);
  }
  virtual void Erase(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    shard_[Shard(hash)].Erase(key, hash);
  }
  virtual void* Value(Handle* handle) {
    return reinterpret_cast<LRUHandle*>(handle)->value;
  }
  virtual uint64_t NewId() {
    MutexLock l(&id_mutex_);
    return ++(last_id_);
  }
  virtual void Prune() {
    for (int s = 0; s < kNumShards; s++) {
      shard_[s].Prune();
    }
  }
  virtual size_t TotalCharge() const {
    size_t total = 0;
    for (int s = 0; s < kNumShards; s++) {
      total += shard_[s].TotalCharge();
    }
    return total;
  }
};

总结

Leveldb 实现的LRUCahe 基于hashtable以及双向链表实现,为了提升性能以及保证跨平台特性,Leveldb 实现了一个线程安全的拉链式hashtable,此hashtable 的缺点是当触发扩容操作的时候需要将老hashtable 的全部元素复制到新的hashtable 中,这会期间会一直占用锁,此时多线程环境下的读写会阻塞。这一点Memcached 中实现的hashtable是有一个线程专门负责扩展,每次只扩展一个元素。
Leveldb 为了提升多线程环境下的读写性能,将固定容量的Cache分摊到多个LRUCache,每个LRUCache保证自己的线程安全,这就降低了多线程环境下锁的竞争。这种做法与分段锁的思想类似。