流与延迟计算

本文最后更新于:2022年6月20日 下午

前言

在此之前,吾辈介绍了如何 从函数中无中生有的构造数据结构。现在,吾辈将使用它构造一个更强大的数据结构:Stream(流)。

或许你曾经见过这张图

stream.drawio.svg

一些数据经过一系列的处理,然后得到结果,而只需要迭代每个元素一次,就像魔法一样。

之前的想法

最初,吾辈的想法是让操作保存起来,只有等待某个显式调用的时候才真正执行。

即调用 map/filter/reduce 时将 action 与参数保存起来,等到调用 value 方法的时候才真正执行,像是 lodash 的 _.chain api 就是这种形式的。

形如

_.chain([1, 2, 3, 4])
  .filter((i) => i % 2 === 0)
  .map((i) => i * 2)
  .value();

于是最初吾辈也是按照这种 api 实现

class Stream<T> {
  private flags: [action: string, ...args: any[]][] = [];
  constructor(private readonly arr: T[]) {}
  map<R>(f: (item: T) => R): Stream<R> {
    this.flags.push(["map", f]);
    return this as any;
  }
  filter(f: (item: T) => boolean): Stream<T> {
    this.flags.push(["filter", f]);
    return this;
  }
  reduce<R>(f: (r: R, item: T) => R, init: R): R {
    this.flags.push(["reduce", f, init]);
    return this.value() as any;
  }
  value() {
    const res: any[] = [];
    for (const item of this.arr) {
      outer: {
        let temp = item;
        for (const [action, ...args] of this.flags) {
          switch (action) {
            case "map":
              temp = args[0](temp);
              break;
            case "filter":
              if (!args[0](temp)) {
                break outer;
              }
              break;
          }
        }
        res.push(temp);
      }
    }
    return res;
  }
}

it("basic", () => {
  const r = new Stream([1, 2, 3, 4])
    .filter((i) => i % 2 === 0)
    .map((i) => i * 2)
    .value();
  expect(r).toEqual([4, 8]);
});

看起来 map/filter 工作的还不错,但 reduce 却存在问题,因为在两层 for 循环中 map 是依赖于临时变量替换,filter 则是依赖于 break label 实现,这种方式显然很难在 value 函数中兼容所有的功能。

不过实际上的解决方案也是延迟执行,只需要转换一下思路即可。下面将使用 cons 来构造流。

延迟计算

首先,可以将流可以看作一系列的值,而且只能顺序依次获取,那么,我们仅需要实现一种在需要的时候能拿到值的数据结构就可以了。

下面将流的每个节点分为两部分,head 部分是一个具体的值,而 tail 部分则是一个产生下一个节点的函数,它可能是空的。

流节点.excalidraw.svg

type Stream<T> = Cons<T, () => Stream<T>> | null;
function makeStream<T>(head: T, tail: () => Stream<T> = () => null) {
  return cons(head, tail);
}
function head<T>(s: Stream<T>): T | null {
  return s ? car(s) : null;
}
function tail<T>(s: Stream<T>): Stream<T> {
  return s ? cdr(s)() : null;
}
function isEmpty(s: Stream<any>): boolean {
  return s === null;
}

现在有了基本的流的构造函数,可以使用最原始的方法构造了一个流了

makeStream(1, () => makeStream(2, () => makeStream(3)));

看起来好像没什么用?这是因为我们还没有动态去构造。例如,我们可以从数组构造一个流,也可以转换回去

function ofArray<T>(arr: T[]): Stream<T> {
  return arr.length === 0
    ? null
    : makeStream(arr[0], () => ofArray(arr.slice(1)));
}
function toArray<T>(s: Stream<T>): T[] {
  return isEmpty(s) ? [] : [head(s)!, ...toArray(tail(s))];
}

const arr = [1, 2, 3, 4];
expect(toArray(ofArray(arr))).toEqual(arr);

map/filter/reduce

当然,我们也可以为它们编写 map/filter/reduce

function map<T, R>(s: Stream<T>, fn: (item: T) => R): Stream<R> {
  return isEmpty(s) ? null : makeStream(fn(head(s)!), () => map(tail(s), fn));
}
function filter<T>(s: Stream<T>, fn: (item: T) => boolean): Stream<T> {
  return isEmpty(s)
    ? null
    : fn(head(s)!)
    ? makeStream(head(s)!, () => filter(tail(s), fn))
    : filter(tail(s), fn);
}
function reduce<T, R>(s: Stream<T>, fn: (res: R, item: T) => R, init: R): R {
  return isEmpty(s) ? init : reduce(tail(s), fn, fn(init, head(s)!));
}

const r = reduce(
  map(
    filter(ofArray([1, 2, 3, 4]), (i) => i % 2 === 0),
    (i) => i * 2
  ),
  (r, i) => r + i,
  0
);
expect(r).toBe(12);

啊咧?奇怪,我们是不是之前在写链表的时候写过类似的代码?像是这样

function map<T, R>(list: List<T>, f: (i: T) => R): List<R> {
  return list === null ? null : (cons(f(car(list)), map(cdr(list)!, f)) as any);
}
function filter<T>(list: List<T>, f: (i: T) => boolean): List<T> {
  return list === null
    ? null
    : f(car(list))
    ? cons(car(list), filter(cdr(list), f))
    : filter(cdr(list), f);
}

是的,它们的代码实现看起来很像,实际上,它们的区别仅在于流的节点的 cdr 部分是一个可以延迟计算的函数,只有在需要时才会被真的计算。在嵌套使用 map/filter 时,不再是先将所有元素通过 map 处理一遍形成新的流,再通过 filter 处理一遍形成新的流,而是在真正转换为链表或数组这种非延时的数据结构时才会被真正计算,而且方向是反的。

我们可以验证它真的是延迟计算的。

const genStream = jest
  .fn()
  .mockImplementation((i) => makeStream(i, () => genStream(i + 1)));
const s = genStream(1);
expect(head(s)).toEqual(1);
expect(genStream.mock.calls.length).toBe(1);
expect(head(tail(s))).toEqual(2);
expect(genStream.mock.calls.length).toBe(2);

缓存

在继续下一步之前,我们首先对流做一个简单的性能优化

继续使用上一个示例,当多次访问 tail 时,每次都会重新计算下一个节点的值,即便已经计算过了,这很浪费,而且会引发一些其他的问题,例如 tail(s) === tail(s) 的结果会是 false,因为这里生成了两次。

it("once before", () => {
  const genStream = jest
    .fn()
    .mockImplementation((i) => makeStream(i, () => genStream(i + 1)));
  const s = genStream(1); // 创建流时会立刻计算 head,1 次
  tail(s); // 1 次
  tail(tail(s)); // 2 次
  tail(tail(tail(s))); // 3 次
  expect(genStream.mock.calls.length).toBe(7);
  expect(tail(s) === tail(s)).toBeFalsy(); // 两个值不相等
});

那么,是否有办法解决呢?
很简单,只需要做个缓存即可。

function once<T extends () => any>(f: T): T {
  let flag: any = null;
  return (() => {
    if (flag === null) {
      flag = f();
    }
    return flag;
  }) as any;
}
function makeStream<T>(head: T, tail: () => Stream<T> = () => null) {
  return cons(head, once(tail));
}
it("once after", () => {
  const genStream = jest
    .fn()
    .mockImplementation((i) => makeStream(i, () => genStream(i + 1)));
  const s = genStream(1); // 创建流时会立刻计算 head,1 次
  tail(tail(tail(s))); // 3 次
  tail(tail(s)); // 直接从缓存读取,0 次
  tail(s); // 直接从缓存读取,0 次
  expect(genStream.mock.calls.length).toBe(4);
  expect(tail(s) === tail(s)).toBeTruthy(); // 两个值相等
});

现在,所有访问 tail 访问下一个节点都会默认缓存,如果已经计算过,将永远不再重复计算。

无限流

流不仅可以表示有限的数据,甚至可以是无限的。

例如可以使用以下函数生成一个从 n 开始的无限整数流

function intFrom(n: number): Stream<number> {
  return makeStream(n, () => intFrom(n + 1));
}

我们可以使用以下代码验证它

const ints = intFrom(1);
expect(head(ints)).toBe(1);
expect(head(tail(ints))).toBe(2);

或者生成一个所有元素都是同一个值的无限流

function ones<T>(i: T): Stream<T> {
  return makeStream(i, () => ones(i));
}

let n = ones(1);
Array(100)
  .fill(0)
  .forEach(() => {
    expect(head(n)).toBe(1);
    n = tail(n);
  });

那么,如何截取流的一部分呢?例如截取前 n 个元素(take)或第 n 个之后的所有元素(drop)呢?
其实很简单,take 只需要生成新的流并在取到第 n 个元素的时候返回 null,drop 则不断使用 tail 获取下一个节点,直到获取到第 n 个节点就返回,而 slice 则是组合这两个函数。

function take<T>(s: Stream<T>, n: number): Stream<T> {
  return n === 0 || isEmpty(s)
    ? null
    : makeStream(head(s)!, () => take(tail(s), n - 1));
}
function drop<T>(s: Stream<T>, n: number): Stream<T> {
  return isEmpty(s) || n === 0 ? s : drop(tail(s), n - 1);
}
function slice<T>(s: Stream<T>, start: number, end: number) {
  return take(drop(s, start), end - start);
}

it("slice", () => {
  const ints = intFrom(1);
  expect(toArray(take(ints, 4))).toEqual([1, 2, 3, 4]);
  expect(head(drop(ints, 4))).toBe(5);
  expect(toArray(slice(ints, 2, 4))).toEqual([3, 4]);
});

链式调用

上面的函数并非是链式调用的,在使用起来会感觉有点繁琐,嵌套很多层的话尤其如此,但可以通过简单的包装实现传统的链式调用。

基本思想是在 Stream 对象中保存当前流,然后每次非终结操作都不断创建新的 Stream 对象,并且在对象的方法中调用之前实现的函数。

class StreamImpl<T> {
  constructor(private readonly s: Stream<T>) {}
  map<R>(fn: (item: T) => R): StreamImpl<R> {
    return new StreamImpl(map(this.s, fn));
  }
  filter(fn: (item: T) => boolean): StreamImpl<T> {
    return new StreamImpl(filter(this.s, fn));
  }
  reduce<R>(fn: (res: R, item: T) => R, init: R): R {
    return reduce(this.s, fn, init);
  }
  take(n: number): StreamImpl<T> {
    return new StreamImpl(take(this.s, n));
  }
  drop(n: number): StreamImpl<T> {
    return new StreamImpl(drop(this.s, n));
  }
  value() {
    return toArray(this.s);
  }
}
function stream<T>(arr: T[]): StreamImpl<T> {
  return new StreamImpl(ofArray(arr));
}

it("stream", () => {
  const s = stream([1, 2, 3, 4, 5, 6, 7, 8, 9])
    .filter((i) => i % 2 === 0)
    .map((i) => i * 2)
    .drop(1)
    .take(2);
  expect(s.value()).toEqual([8, 12]);
  expect(s.reduce((r, i) => r + i, 0)).toBe(20);
});

可以看到,上面的 api 其实已经和 lodash 一样了。

异步的流

流不仅可以处理同步的数据,也可以处理异步的数据。但与同步的数据相比,需要更换流节点的基本模型,从 head 立刻计算、tail 延迟计算更换为整个流节点都是延迟计算的。

一个基本的异步流节点可能是这样的

异步流节点.excalidraw.svg

类型定义

type AsyncStream<T> = Promise<Cons<T, AsyncStream<T>> | null>;

创建一些基本的辅助函数

/** 从数组中创建异步流 */
async function asyncOfArray<T>(arr: T[]): AsyncStream<T> {
  return arr.length === 0 ? null : cons(arr[0], asyncOfArray(arr.slice(1)));
}
/** 将异步流转换为数组 */
async function asyncToArray<T>(s: AsyncStream<T>): Promise<T[]> {
  const r = await s;
  return r === null ? [] : [car(r), ...(await asyncToArray(cdr(r)))];
}

it("basic", async () => {
  const a = [1, 2, 3, 4];
  expect(await asyncToArray(asyncOfArray(a))).toEqual(a);
});

在此基础上实现 map/filter/reduce

async function asyncMap<T, R>(
  s: AsyncStream<T>,
  f: (i: T) => Promise<R>
): AsyncStream<R> {
  const r = await s;
  return r === null ? null : cons((await f(car(r))) as R, asyncMap(cdr(r), f));
}
async function asyncFilter<T>(
  s: AsyncStream<T>,
  f: (i: T) => Promise<boolean>
): AsyncStream<T> {
  const r = await s;
  return r === null
    ? null
    : (await f(car(r)))
    ? cons(car(r), asyncFilter(cdr(r), f))
    : await asyncFilter(cdr(r), f);
}
async function asyncReduce<T, R>(
  s: AsyncStream<T>,
  f: (r: R, i: T) => Promise<R>,
  init: R
): Promise<R> {
  const r = await s;
  return r === null ? init : asyncReduce(cdr(r), f, await f(init, car(r)));
}

it("map/filter/reduce", async () => {
  const s = asyncOfArray([1, 2, 3, 4]);
  const r = await asyncReduce(
    asyncMap(
      asyncFilter(s, async (i) => i % 2 === 0),
      async (i) => i * 2
    ),
    async (r, i) => r + i,
    0
  );
  expect(r).toBe(12);
});

这里也顺便实现一下 flatMap

async function concat<T>(a: AsyncStream<T>, b: AsyncStream<T>): AsyncStream<T> {
  const r = await a;
  return r === null ? b : cons(car(r), concat(cdr(r), b));
}
async function asyncFlatMap<T, R>(
  s: AsyncStream<T>,
  f: (i: T) => AsyncStream<R>
): AsyncStream<R> {
  const r = await s;
  return r === null ? null : concat(f(car(r)), asyncFlatMap(cdr(r), f));
}
async function int(start: number, end: number): AsyncStream<number> {
  return start === end ? null : cons(start, int(start + 1, end));
}
it("flatMap", async () => {
  expect(await asyncToArray(int(1, 4))).toEqual([1, 2, 3]);
  const r = asyncFlatMap(int(1, 4), (i) => int(0, i));
  expect(await asyncToArray(r)).toEqual([0, 0, 1, 0, 1, 2]);
});

可以看到,同步与异步的 map/filter/reduce 两者之间实现的差距相当小,是否有办法兼容呢?
不知道,或许有办法做到,但类型上感觉会有些复杂。

异步数组

下面列出一个真实的例子,在写前端应用时,处理异步操作是一件常见的事情,当需要异步的 map/filter/reduce 时,我们经常会使用 Promise.all 来完成

it("promise.all", async () => {
  const r = (await Promise.all([1, 2, 3, 4].map(async (i) => [i % 2 === 0, i])))
    .filter((r) => r[0])
    .map((r) => r[1]);
  expect(r).toEqual([2, 4]);
});

但是也可以看到,这种代码非常丑陋,因此吾辈之前曾经写过 AsyncArray 这种异步数组工具类,参考 https://github.com/rxliuli/liuli-tools/blob/master/libs/async/src/AsyncArray.ts

下面基于 asyncStream 来实现它,可以看到都是直接转发到异步操作上,这没什么复杂的。有一点比较比较有趣的是这里还实现了 PromiseLike 接口,这让我们可以在不使用 .value 这种显式调用的情况下将流转换为 Promise<any[]>,仅使用 await 就像等待一个 Promise 一样执行了 then 方法,这只是个 Promise 的技巧罢了。

class AsyncArray<T> implements PromiseLike<T[]> {
  static reduce<T, R>(
    arr: T[],
    fn: (res: R, item: T) => Promise<R>,
    res: R
  ): Promise<R> {
    return asyncReduce(asyncOfArray(arr), fn, res);
  }
  static map<T, R>(arr: T[], fn: (item: T) => Promise<R>): Promise<R[]> {
    return asyncToArray(asyncMap(asyncOfArray(arr), fn));
  }
  static filter<T>(arr: T[], fn: (item: T) => Promise<boolean>): Promise<T[]> {
    return asyncToArray(asyncFilter(asyncOfArray(arr), fn));
  }
  static flatMap<T, R>(arr: T[], fn: (item: T) => Promise<R[]>): Promise<R[]> {
    return asyncToArray(
      asyncFlatMap(asyncOfArray(arr), async (i) => asyncOfArray(await fn(i)))
    );
  }
  static async forEach<T>(
    arr: T[],
    fn: (item: T) => Promise<void>
  ): Promise<void> {
    await this.map(arr, fn);
  }

  private s: AsyncStream<T>;
  constructor(arr: T[]) {
    this.s = asyncOfArray(arr);
  }

  private static of<T>(s: AsyncStream<T>): AsyncArray<T> {
    const r = new AsyncArray<T>([]);
    r.s = s;
    return r;
  }

  map<R>(fn: (item: T) => Promise<R>): AsyncArray<R> {
    return AsyncArray.of(asyncMap(this.s, fn));
  }
  flatMap<R>(fn: (item: T) => Promise<R[]>): AsyncArray<R> {
    return AsyncArray.of(
      asyncFlatMap(this.s, async (i) => asyncOfArray(await fn(i)))
    );
  }
  filter(fn: (item: T) => Promise<boolean>): AsyncArray<T> {
    return AsyncArray.of(asyncFilter(this.s, fn));
  }
  async forEach<R>(fn: (item: T) => Promise<R>): Promise<void> {
    await this.map(fn);
  }
  reduce<R>(fn: (res: R, item: T) => Promise<R>, res: R): Promise<R> {
    return asyncReduce(this.s, fn, res);
  }
  then<TResult1 = T[], TResult2 = never>(
    onfulfilled?: ((value: T[]) => TResult1 | PromiseLike<TResult1>) | null,
    onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | null
  ): PromiseLike<TResult1> {
    return asyncToArray(this.s) as any;
  }
}

it("AsyncArray", async () => {
  const r = await new AsyncArray([1, 2, 3, 4])
    .filter(async (i) => i % 2 === 0)
    .map(async (i) => i * 2)
    .reduce(async (r, i) => r + i, 0);
  expect(r).toBe(12);
});

结语

流是一个很有用的数据抽象,尤其是在处理集合方面,几乎是无可比肩的,之前曾经非常流行的大数据处理框架 Spark 就非常强调 map/filter/reduce,这种数据结构约定了接口,然后就可以抽象出各种通用的处理了,延迟计算的想法更是天才,几乎仅仅只修改了 makeStream,就能够实现集合处理的性能优化了。虽然吾辈了解和实现这些知识有些晚,但就像有人说过 种一棵树最好的时间是十年前,其次是现在,亡羊补牢,尤未迟也,只要在不断的变得更好,便已然足够了。


流与延迟计算
https://blog.rxliuli.com/p/74ee9e8c552b421680a9cd967e2638ad/
作者
rxliuli
发布于
2022年6月9日
许可协议