Dataloader 的批处理能力,可以实现在一个周期内将多次数据请求进行合并,最终只向远程数据源发送一次请求。这个周期不是固定的,Dataloader 的默认实现是 NodeJS 事件循环的一个循环周期,用户也可以自定义周期。

如下是 Dataloader 的构造函数的签名:

constructor(batchLoadFn: DataLoader.BatchLoadFn<K, V>, options?: DataLoader.Options<K, V, C>);
1

首先是一个批量请求函数 batchLoadFn,告诉 Dataloader 如何批量发起请求。其次 options 参数中有是三个配置项关于批处理的:

  • batch 是否启用批量请求,false 为不启用,每次数据请求都会向远程数据源发起请求。默认为 true
  • maxBatchSize 一次最多可以请求多少个数据,超过这个数字的请求会放到下一批请求中。默认不限制。
  • batchScheduleFn 批量调度函数,告诉 Dataloader 什么时候发起批量请求。

Dataloader 提供了两个函数来获取数据:

  • load(key: K): Promise<V>;
  • loadMany(keys: ArrayLike<K>): Promise<Array<V | Error>>;

显而易见,一个是一次获取一个,一个是一次获取多个。

在讲这两个函数的具体实现之前,我们先来了解下 Dataloader 中的基本概念。

什么是 Batch

前面说过,Dataloader 会搜集一个周期内的所有数据调用,然后合并成一个数据请求。Batch 对象就是这个能力的载体。

type Batch<K, V> = {
  hasDispatched: boolean;
  keys: Array<K>;
  callbacks: Array<{
    resolve: (value: V) => void;
    reject: (error: Error) => void;
  }>;
  cacheHits?: Array<() => void>;
};
1
2
3
4
5
6
7
8
9

这是 Batch 的类型定义,hasDispatched 表示当前批的请求是否已经发出。keys 中存放着需要通过请求获取数据的 key 列表。callbacks 是对应于每一个 key 的请求回调,且是一一对应的关系。cacheHits 是命中缓存的数据列表。

Dataloader 会尽量利用缓存不重复请求相同的数据。也就是说,假如一个周期内有 10 个数据请求,其中有两个会命中缓存,那么 keys 的长度就是 8,cacheHits 的长度是 2。

举个例子,假如我们需要查询用户的好友的好友。用户 A 有好友 B 和 C。用户 D 有好友 C 和 E。假设我们有一个 loaderUser 的接口查询用户信息。那么通常情况下,需要查询 6(3 + 3)次。使用了 Dataloader 的话,只需要请求 5(3 + 2)次,因为 A 和 D 有个共同好友 C,第二次查询用户 C 的时候会命中缓存。

Dataloader 会将一个周期内的数据请求都存放在一个 Batch 对象内(如果不超过 maxBatchSize 的话),之后通过调用构造函数传入的 batchLoadFn 一次查询所有的数据。

batchLoadFn 批量请求函数

先看一个批量函数的 demo。

async function batchFunction(keys) {
  const results = await db.fetchAllKeys(keys);
  return keys.map((key) => results[key] || new Error(`No result for ${key}`));
}

const loader = new DataLoader(batchFunction);
1
2
3
4
5
6

Dataloader 对 batchLoadFn 函数有两个约束:

  • 函数的返回数组长度必须与 keys 数组长度一致。
  • 函数的返回数组内的元素必须与 keys 内的元素一一对应。

也就是说,假如请求的 keys 列表为 [2, 9, 6, 1],如果数据接口只返回了

{ id: 9, name: 'Chicago' }
{ id: 1, name: 'New York' }
{ id: 2, name: 'San Francisco' }
1
2
3

那么我们的 batchLoadFn 需要返回:

[
  { id: 2, name: 'San Francisco' },
  { id: 9, name: 'Chicago' },
  null, // 或者 `new Error()`
  { id: 1, name: 'New York' },
];
1
2
3
4
5
6

注意,batchLoadFn 需要返回一个 Promise。

load 函数

load 函数的签名是 load(key: K): Promise<V>,关键代码如下:

load(key: K): Promise<V> {
  var batch = getCurrentBatch(this);
  var cacheMap = this._cacheMap;
  var cacheKey = this._cacheKeyFn(key);

  // If caching and there is a cache-hit, return cached Promise.
  if (cacheMap) {
    var cachedPromise = cacheMap.get(cacheKey);
    if (cachedPromise) {
      var cacheHits = batch.cacheHits || (batch.cacheHits = []);
      return new Promise(resolve => {
        cacheHits.push(() => {
          resolve(cachedPromise);
        });
      });
    }
  }

  // Otherwise, produce a new Promise for this key, and enqueue it to be
  // dispatched along with the current batch.
  batch.keys.push(key);
  var promise = new Promise((resolve, reject) => {
    batch.callbacks.push({ resolve, reject });
  });

  // If caching, cache this promise.
  if (cacheMap) {
    cacheMap.set(cacheKey, promise);
  }

  return promise;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

我们可以看到,load 函数首先通过 getCurrentBatch 获取了当前的 Batch 对象。之后检查缓存是否存在,如果缓存存在,并且要请求的这个 key 命中缓存,则将已经缓存的 promise 放到 Batch 对象的 cacheHits 数组中去。load 函数本身会返回一个 Promise,在这个 Promise 中返回被缓存的 promise。

如果没有命中缓存,即需要发送请求获取数据,则将 key 放入到 Batch 对象的 keys 数组中去,同时新建一个 Promise,并将它的 resolvereject 放到 Batch 对象的 callbacks 中。最后在返回这个 Promise 之前,还需要将 Promise 放入到缓存中,这样后面的相同请求可以命中缓存。

需要注意的是,cacheHits 是一组函数,函数中包装了 load 函数返回的 Promise 的 resolvereject 回调。而 callbacks 中保存的是一个记录了 load 函数返回的 Promise 的 resolvereject 回调的对象。这样,当批量函数返回的时候,通过调用 cacheHits 中的函数或者 callbacks 中对象保存的回调,就能改变 load 函数返回的 Promise 的状态了。

getCurrentBatch

我们现在来看 getCurrentBatch 的实现。

getCurrentBatch 函数代码如下:

function getCurrentBatch<K, V>(loader: DataLoader<K, V, any>): Batch<K, V> {
  // If there is an existing batch which has not yet dispatched and is within
  // the limit of the batch size, then return it.
  var existingBatch = loader._batch;
  if (
    existingBatch !== null &&
    !existingBatch.hasDispatched &&
    existingBatch.keys.length < loader._maxBatchSize &&
    (!existingBatch.cacheHits ||
      existingBatch.cacheHits.length < loader._maxBatchSize)
  ) {
    return existingBatch;
  }

  // Otherwise, create a new batch for this loader.
  var newBatch = { hasDispatched: false, keys: [], callbacks: [] };

  // Store it on the loader so it may be reused.
  loader._batch = newBatch;

  // Then schedule a task to dispatch this batch of requests.
  loader._batchScheduleFn(() => {
    dispatchBatch(loader, newBatch);
  });

  return newBatch;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

getCurrentBatch 首先会判断当前 Dataloader 实例上是否已经有 Batch 对象,并且当前 Batch 还没有发送请求,同时还要检查当前需要发请求的 keys 长度或者命中了缓存的请求数组长度小于 maxBatchSize,如果这些条件都成立,则返回已经存在的 Batch 对象,否则新建一个。

新建的 Batch 对象会存放在当前 Dataloader 实例上以便复用。同时,会使用当前 Dataloader 实例的批处理调度函数 batchScheduleFn 来调度新创建的 Batch 对象。

现在汇总一下,load 函数会构建或获取(已存在的)一个 Batch 对象,如果 key 命中缓存,就返回缓存的 Promise。如果没有命中,则新建一个 Promise,然后将设置 Batch 对象的 keyscallbacks,使得在批处理结束后能正确的改变 Promise 的状态,返回数据。

Batch 的创建过程中,Batch 对象会将其自己交给 Dataloader 实例的 batchScheduleFn 来调度,batchScheduleFn 会通过 dispatchBatch 函数来发送批量请求。

dispatchBatch

dispatchBatch 的实现如下:

function dispatchBatch<K, V>(
  loader: DataLoader<K, V, any>,
  batch: Batch<K, V>
) {
  // Mark this batch as having been dispatched.
  batch.hasDispatched = true;

  // If there's nothing to load, resolve any cache hits and return early.
  if (batch.keys.length === 0) {
    resolveCacheHits(batch);
    return;
  }

  // Call the provided batchLoadFn for this loader with the batch's keys and
  // with the loader as the `this` context.
  var batchPromise = loader._batchLoadFn(batch.keys);

  // Await the resolution of the call to batchLoadFn.
  batchPromise
    .then((values) => {
      // Resolve all cache hits in the same micro-task as freshly loaded values.
      resolveCacheHits(batch);

      // Step through values, resolving or rejecting each Promise in the batch.
      for (var i = 0; i < batch.callbacks.length; i++) {
        var value = values[i];
        if (value instanceof Error) {
          batch.callbacks[i].reject(value);
        } else {
          batch.callbacks[i].resolve(value);
        }
      }
    })
    .catch((error) => {
      failedDispatch(loader, batch, error);
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

首先 dispatchBatch 是否需要发送请求,如果 keys 长度为零,即全部命中缓存,则直接调用 resolveCacheHits 并返回。resolveCacheHits 会直接一次调用 Batch 对象中 cacheHits 里面的函数,此时 load 就拿到了缓存的结果(Promise 到了 resolved 的状态)。

function resolveCacheHits(batch: Batch<any, any>) {
  if (batch.cacheHits) {
    for (var i = 0; i < batch.cacheHits.length; i++) {
      batch.cacheHits[i]();
    }
  }
}
1
2
3
4
5
6
7

如果有需要请求的,则使用创建 Dataloader 实例的时候传入的 batchLoadFn 来批量发起请求。

然后,在 batchLoadFn 返回之后,先通过 resolveCacheHits 返回命中的缓存,之后,依次根据 batchLoadFn 返回的结果,如果出错了则调用 callbacks 中对象的 reject,如果正确返回则调用 callbacks 中对象的 resolve

这样就可以将批量请求的结果正确的传递回 load 函数了。

如果批量请求出错了,则会调用 failedDispatch 函数。

function failedDispatch<K, V>(
  loader: DataLoader<K, V, any>,
  batch: Batch<K, V>,
  error: Error
) {
  // Cache hits are resolved, even though the batch failed.
  resolveCacheHits(batch);
  for (var i = 0; i < batch.keys.length; i++) {
    loader.clear(batch.keys[i]);
    batch.callbacks[i].reject(error);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12

failedDispatch 函数会先返回命中的缓存,之后会为每一个需要请求的 key 返回错误信息,同时会通过 clear 函数清理缓存的信息(因为已经发生了错误)。

到这里,我们已经知道在调用 load 函数之后发生的所有事情了。但是有一点还没有说清楚,就是批量请求的调度策略。

批量调度策略

Dataloader 支持自定义调度策略,如果没有指定,则使用默认的,如下。

var enqueuePostPromiseJob =
  typeof process === 'object' && typeof process.nextTick === 'function'
    ? function (fn) {
        if (!resolvedPromise) {
          resolvedPromise = Promise.resolve();
        }
        resolvedPromise.then(() => {
          process.nextTick(fn);
        });
      }
    : setImmediate || setTimeout;

// Private: cached resolved Promise instance
var resolvedPromise;
1
2
3
4
5
6
7
8
9
10
11
12
13
14

首先检测环境,如果 process.nextTick 不存在的话,就直接使用 setImmediate 或者 setTimeout

如果 process.nextTick 存在的话,通过 Promise 的方式起一个微任务,在这个微任务中告诉 NodeJS 在下一个时间周期内发送批量请求。

这样的话,Dataloader 就可以在当前时间周期内尽可能多的收集需要请求的 key 了。

loadMany 函数

load 函数一次只查询一个,loadMany 可以查询多个。下面是 loadMany 函数的代码,可以看到,就是对 load 函数的封装,这里就不在赘述了。

loadMany(keys: $ReadOnlyArray<K>): Promise<Array<V | Error>> {
    // Support ArrayLike by using only minimal property access
    const loadPromises = [];
    for (let i = 0; i < keys.length; i++) {
      loadPromises.push(this.load(keys[i]).catch(error => error));
    }
    return Promise.all(loadPromises);
  }
1
2
3
4
5
6
7
8

关注微信公众号,获取最新推送~