refactor
This commit is contained in:
parent
497b061b92
commit
2301c5d631
5 changed files with 203 additions and 202 deletions
|
|
@ -10,6 +10,14 @@
|
||||||
"exclude": ["no-explicit-any", "require-await"]
|
"exclude": ["no-explicit-any", "require-await"]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"compilerOptions": {
|
||||||
|
"lib": [
|
||||||
|
"deno.ns",
|
||||||
|
"esnext",
|
||||||
|
"dom",
|
||||||
|
"dom.iterable"
|
||||||
|
]
|
||||||
|
},
|
||||||
"imports": {
|
"imports": {
|
||||||
"@std/assert": "jsr:@std/assert@1"
|
"@std/assert": "jsr:@std/assert@1"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
153
src/html.ts
153
src/html.ts
|
|
@ -3,102 +3,127 @@
|
||||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { type Chunk, ChunkedWriter } from "./http.ts";
|
import { type Chunk } from "./http.ts";
|
||||||
import { ChunkedStream } from "./stream.ts";
|
import { ChunkedStream } from "./stream.ts";
|
||||||
|
|
||||||
type Attrs = Record<string, string | number | boolean>;
|
type Attrs = Record<string, string | number | boolean>;
|
||||||
|
|
||||||
const SELF_CLOSING_TAGS = new Set([
|
const VOID_TAGS = new Set([
|
||||||
|
"area",
|
||||||
|
"base",
|
||||||
"br",
|
"br",
|
||||||
|
"col",
|
||||||
"embed",
|
"embed",
|
||||||
"hr",
|
"hr",
|
||||||
"img",
|
"img",
|
||||||
"input",
|
"input",
|
||||||
"link",
|
"link",
|
||||||
"meta",
|
"meta",
|
||||||
"track",
|
"param",
|
||||||
"source",
|
"source",
|
||||||
|
"track",
|
||||||
|
"wbr",
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
const ESCAPE_MAP: Record<string, string> = {
|
||||||
|
"&": "&",
|
||||||
|
"<": "<",
|
||||||
|
">": ">",
|
||||||
|
'"': """,
|
||||||
|
"'": "'",
|
||||||
|
};
|
||||||
|
|
||||||
export function escape(input: string): string {
|
export function escape(input: string): string {
|
||||||
return input
|
let result = "";
|
||||||
.replace(/&/g, "&")
|
let lastIndex = 0;
|
||||||
.replace(/</g, "<")
|
|
||||||
.replace(/>/g, ">")
|
for (let i = 0; i < input.length; i++) {
|
||||||
.replace(/"/g, """)
|
const replacement = ESCAPE_MAP[input[i]];
|
||||||
.replace(/'/g, "'");
|
if (replacement) {
|
||||||
|
result += input.slice(lastIndex, i) + replacement;
|
||||||
|
lastIndex = i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return lastIndex ? result + input.slice(lastIndex) : input;
|
||||||
}
|
}
|
||||||
|
|
||||||
function attrsToString(
|
function serialize(attrs: Attrs | undefined): string {
|
||||||
attrs: Attrs | undefined,
|
|
||||||
escape: (str: string) => string,
|
|
||||||
): string {
|
|
||||||
if (!attrs) return "";
|
if (!attrs) return "";
|
||||||
const pairs = Object.entries(attrs)
|
let output = "";
|
||||||
.filter(([_, value]) => value !== undefined && value !== null)
|
|
||||||
.map(([key, value]) => {
|
for (const key in attrs) {
|
||||||
if (value === true) return key;
|
const val = attrs[key];
|
||||||
if (value === false) return "";
|
if (val == null || val === false) continue;
|
||||||
return `${key}="${escape(String(value))}"`;
|
output += " ";
|
||||||
})
|
output += val === true ? key : `${key}="${escape(String(val))}"`;
|
||||||
.filter(Boolean);
|
}
|
||||||
return pairs.length ? " " + pairs.join(" ") : "";
|
|
||||||
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
type TagFunction = {
|
type TagRes = void | Promise<void>;
|
||||||
(attrs: Attrs, ...children: Chunk[]): Promise<void>;
|
|
||||||
(...children: Chunk[]): Promise<void>;
|
type TagFn = {
|
||||||
|
(attrs: Attrs, ...children: Chunk[]): TagRes;
|
||||||
|
(...children: Chunk[]): TagRes;
|
||||||
|
(template: TemplateStringsArray, ...values: Chunk[]): TagRes;
|
||||||
|
(fn: () => any): TagRes;
|
||||||
};
|
};
|
||||||
|
|
||||||
type HtmlBuilder = {
|
type HtmlProxy = { [K in keyof HTMLElementTagNameMap]: TagFn } & {
|
||||||
[K: string]: TagFunction;
|
[key: string]: TagFn;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const isTemplateLiteral = (arg: any): arg is TemplateStringsArray =>
|
||||||
|
Array.isArray(arg) && "raw" in arg;
|
||||||
|
|
||||||
|
const isAttributes = (arg: any): arg is Record<string, any> =>
|
||||||
|
arg && typeof arg === "object" && !isTemplateLiteral(arg);
|
||||||
|
|
||||||
|
async function render(child: any): Promise<string> {
|
||||||
|
if (child == null) return "";
|
||||||
|
if (typeof child === "string" || typeof child === "number") {
|
||||||
|
return String(child);
|
||||||
|
}
|
||||||
|
if (child instanceof Promise) return render(await child);
|
||||||
|
if (Array.isArray(child)) {
|
||||||
|
return (await Promise.all(child.map(render))).join("");
|
||||||
|
}
|
||||||
|
if (typeof child === "function") return render(await child());
|
||||||
|
return String(child);
|
||||||
|
}
|
||||||
|
|
||||||
export function html(
|
export function html(
|
||||||
chunks: ChunkedStream<string>,
|
chunks: ChunkedStream<string>,
|
||||||
write: ChunkedWriter,
|
): HtmlProxy {
|
||||||
): HtmlBuilder {
|
const cache = new Map<string, TagFn>();
|
||||||
const tags = new Map<string, (...args: any[]) => Promise<void>>();
|
const write = (buf: string) => !chunks.closed && chunks.write(buf);
|
||||||
|
|
||||||
const handler: ProxyHandler<Record<string, TagFunction>> = {
|
const handler: ProxyHandler<Record<string, TagFn>> = {
|
||||||
get(_, tag: string) {
|
get(_, tag: string) {
|
||||||
if (tags.has(tag)) {
|
let fn = cache.get(tag);
|
||||||
return tags.get(tag);
|
if (fn) return fn;
|
||||||
|
|
||||||
|
fn = async (...args: any[]) => {
|
||||||
|
const attrs = isAttributes(args[0]) ? args.shift() : undefined;
|
||||||
|
|
||||||
|
const isVoid = VOID_TAGS.has(tag.toLowerCase());
|
||||||
|
const attributes = serialize(attrs);
|
||||||
|
|
||||||
|
write(`<${tag}${attributes}${isVoid ? "/" : ""}>`);
|
||||||
|
if (isVoid) return;
|
||||||
|
|
||||||
|
for (const child of args) {
|
||||||
|
write(await render(child));
|
||||||
}
|
}
|
||||||
|
|
||||||
const fn = async (...args: (Chunk | Attrs)[]) => {
|
write(`</${tag}>`);
|
||||||
const isTemplate = args.length === 1 && Array.isArray(args[0]) &&
|
|
||||||
"raw" in args[0];
|
|
||||||
const hasAttrs = !isTemplate && args.length &&
|
|
||||||
typeof args[0] === "object";
|
|
||||||
|
|
||||||
const attrs: Attrs | undefined = hasAttrs
|
|
||||||
? args.shift() as Attrs
|
|
||||||
: undefined;
|
|
||||||
const children: Chunk[] = args as Chunk[];
|
|
||||||
|
|
||||||
const attributes = attrsToString(attrs, escape);
|
|
||||||
const isSelfClosing = SELF_CLOSING_TAGS.has(tag.toLowerCase());
|
|
||||||
if (!isSelfClosing && !children.length) return;
|
|
||||||
chunks.write(`<${tag}${attributes}${isSelfClosing ? " /" : ""}>`);
|
|
||||||
|
|
||||||
if (!isSelfClosing) {
|
|
||||||
if (isTemplate) {
|
|
||||||
await (write as any)(...children);
|
|
||||||
} else {
|
|
||||||
for (const child of children) {
|
|
||||||
typeof child === "function"
|
|
||||||
? await (child as any)()
|
|
||||||
: chunks.write(String(await child));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
chunks.write(`</${tag}>`);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
tags.set(tag, fn);
|
return cache.set(tag, fn), fn;
|
||||||
return fn;
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
return new Proxy({}, handler);
|
|
||||||
|
return new Proxy({}, handler) as HtmlProxy;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
73
src/http.ts
73
src/http.ts
|
|
@ -21,17 +21,17 @@ async function* normalize(
|
||||||
value: Chunk | undefined | null,
|
value: Chunk | undefined | null,
|
||||||
): AsyncIterable<string> {
|
): AsyncIterable<string> {
|
||||||
if (value == null) return;
|
if (value == null) return;
|
||||||
|
|
||||||
if (typeof value === "string") {
|
if (typeof value === "string") {
|
||||||
|
3;
|
||||||
yield value;
|
yield value;
|
||||||
} else if (value instanceof Promise) {
|
} else if (value instanceof Promise) {
|
||||||
const resolved = await value;
|
const resolved = await value;
|
||||||
if (resolved != null) yield String(resolved);
|
if (resolved != null) yield String(resolved);
|
||||||
} else if (
|
} else if (Symbol.asyncIterator in value || Symbol.iterator in value) {
|
||||||
typeof value === "object" &&
|
|
||||||
(Symbol.asyncIterator in value || Symbol.iterator in value)
|
|
||||||
) {
|
|
||||||
for await (const chunk of value as AsyncIterable<string>) {
|
for await (const chunk of value as AsyncIterable<string>) {
|
||||||
if (chunk != null) yield String(chunk);
|
if (chunk != null) yield String(chunk);
|
||||||
|
1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
yield String(value);
|
yield String(value);
|
||||||
|
|
@ -43,66 +43,65 @@ export type ChunkedWriter = (
|
||||||
...values: Chunk[]
|
...values: Chunk[]
|
||||||
) => Promise<void>;
|
) => Promise<void>;
|
||||||
|
|
||||||
export function makeChunkWriter(stream: ChunkedStream<string>): ChunkedWriter {
|
export const makeChunkWriter =
|
||||||
const emit = (chunk: string) => {
|
(stream: ChunkedStream<string>): ChunkedWriter =>
|
||||||
if (stream.closed) return;
|
async (strings, ...values) => {
|
||||||
chunk === "EOF" ? stream.close() : stream.write(chunk);
|
const emit = (chunk: string) =>
|
||||||
};
|
!stream.closed &&
|
||||||
|
(chunk === "EOF" ? stream.close() : stream.write(chunk));
|
||||||
|
|
||||||
return async function (strings: TemplateStringsArray, ...values: Chunk[]) {
|
|
||||||
for (let i = 0; i < strings.length; i++) {
|
for (let i = 0; i < strings.length; i++) {
|
||||||
if (strings[i]) emit(strings[i]);
|
strings[i] && emit(strings[i]);
|
||||||
for await (const chunk of normalize(values[i])) {
|
for await (const chunk of normalize(values[i])) {
|
||||||
emit(chunk);
|
emit(chunk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
|
||||||
export function chunkedHtml(): {
|
export function chunkedHtml() {
|
||||||
chunks: ChunkedStream<string>;
|
|
||||||
stream: ReadableStream<Uint8Array>;
|
|
||||||
} {
|
|
||||||
const encoder = new TextEncoder();
|
|
||||||
const chunks = new ChunkedStream<string>();
|
const chunks = new ChunkedStream<string>();
|
||||||
|
|
||||||
const stream = new ReadableStream<Uint8Array>({
|
const stream = new ReadableStream<Uint8Array>({
|
||||||
async pull(controller) {
|
async start(controller) {
|
||||||
if (chunks.closed) return;
|
const encoder = new TextEncoder();
|
||||||
const result = await chunks.next();
|
for await (const chunk of chunks) {
|
||||||
result.done
|
controller.enqueue(encoder.encode(chunk));
|
||||||
? controller.close()
|
}
|
||||||
: controller.enqueue(encoder.encode(result.value));
|
controller.close();
|
||||||
},
|
|
||||||
cancel() {
|
|
||||||
chunks.close();
|
|
||||||
},
|
},
|
||||||
|
cancel: chunks.close,
|
||||||
});
|
});
|
||||||
|
|
||||||
return { chunks, stream };
|
return { chunks, stream };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const DOCUMENT_TYPE = "<!DOCTYPE html>";
|
||||||
|
const HTML_BEGIN = (lang: string) =>
|
||||||
|
`<html lang="${lang}"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0">`;
|
||||||
|
const HEAD_END = "</head><body";
|
||||||
|
const BODY_END = ">";
|
||||||
|
const HTML_END = "</body></html>";
|
||||||
|
|
||||||
export async function createHtmlStream(options: StreamOptions = {}) {
|
export async function createHtmlStream(options: StreamOptions = {}) {
|
||||||
const { chunks, stream } = chunkedHtml();
|
const { chunks, stream } = chunkedHtml();
|
||||||
const writer = makeChunkWriter(chunks);
|
const writer = makeChunkWriter(chunks);
|
||||||
|
|
||||||
await writer`<!DOCTYPE html>
|
chunks.write(DOCUMENT_TYPE);
|
||||||
<html lang="${options.lang || "en"}">
|
chunks.write(HTML_BEGIN(options.lang || "en"));
|
||||||
<head>
|
options.headContent && chunks.write(options.headContent);
|
||||||
<meta charset="utf-8">
|
chunks.write(HEAD_END);
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
options.bodyAttributes && chunks.write(" " + options.bodyAttributes);
|
||||||
${options.headContent || ""}
|
chunks.write(BODY_END);
|
||||||
</head>
|
|
||||||
<body ${options.bodyAttributes || ""}>`;
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
write: writer,
|
write: writer,
|
||||||
stream,
|
blob: stream,
|
||||||
chunks,
|
chunks,
|
||||||
close() {
|
close() {
|
||||||
if (chunks.closed) return;
|
if (!chunks.closed) {
|
||||||
chunks.write("</body></html>");
|
chunks.write(HTML_END);
|
||||||
chunks.close();
|
chunks.close();
|
||||||
|
}
|
||||||
},
|
},
|
||||||
get response(): Response {
|
get response(): Response {
|
||||||
return new Response(stream, {
|
return new Response(stream, {
|
||||||
|
|
|
||||||
|
|
@ -10,19 +10,18 @@ Deno.serve({
|
||||||
port: 8080,
|
port: 8080,
|
||||||
async handler() {
|
async handler() {
|
||||||
const stream = await createHtmlStream({ lang: "en" });
|
const stream = await createHtmlStream({ lang: "en" });
|
||||||
const { h1, p, li } = html(stream.chunks, stream.write);
|
const { h1, ol, p, li } = html(stream.chunks);
|
||||||
|
|
||||||
await h1`<h1>Normal Streaming Page</h1>`;
|
await h1`Normal Streaming Page`;
|
||||||
await p({ class: "oh hey" }, "meowing chunk by chunk");
|
await p({ class: "oh hey" }, "meowing chunk by chunk");
|
||||||
|
|
||||||
(async () => {
|
ol(async () => {
|
||||||
const fruits = ["Apple", "Banana", "Cherry"];
|
const fruits = ["Apple", "Banana", "Cherry"];
|
||||||
|
|
||||||
for (const fruit of fruits) {
|
for (const fruit of fruits) {
|
||||||
await new Promise((r) => setTimeout(r, 500));
|
await new Promise((r) => setTimeout(r, 500));
|
||||||
await li(fruit);
|
await li(fruit);
|
||||||
}
|
}
|
||||||
})();
|
});
|
||||||
|
|
||||||
return stream.response;
|
return stream.response;
|
||||||
},
|
},
|
||||||
|
|
|
||||||
140
src/stream.ts
140
src/stream.ts
|
|
@ -5,38 +5,37 @@
|
||||||
|
|
||||||
export class ChunkedStream<T> implements AsyncIterable<T> {
|
export class ChunkedStream<T> implements AsyncIterable<T> {
|
||||||
private readonly chunks: T[] = [];
|
private readonly chunks: T[] = [];
|
||||||
private readonly queue: ((result: IteratorResult<T>) => void)[] = [];
|
private readonly resolvers: ((result: IteratorResult<T>) => void)[] = [];
|
||||||
private _closed = false;
|
private _closed = false;
|
||||||
|
|
||||||
get closed(): boolean {
|
get closed(): boolean {
|
||||||
return this._closed;
|
return this._closed;
|
||||||
}
|
}
|
||||||
|
|
||||||
write(chunk: T): void {
|
write(chunk: T) {
|
||||||
if (this._closed) throw new Error("Cannot write to closed stream");
|
if (this._closed) throw new Error("Cannot write to closed stream");
|
||||||
this.queue.shift()?.({ value: chunk, done: false }) ??
|
|
||||||
|
const resolver = this.resolvers.shift();
|
||||||
|
if (resolver) {
|
||||||
|
resolver({ value: chunk, done: false });
|
||||||
|
} else {
|
||||||
this.chunks.push(chunk);
|
this.chunks.push(chunk);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
close(): void {
|
close(): void {
|
||||||
this._closed = true;
|
this._closed = true;
|
||||||
this.queue.splice(0).forEach((r) => r({ value: undefined, done: true }));
|
while (this.resolvers.length) {
|
||||||
|
this.resolvers.shift()!({ value: undefined! as any, done: true });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
next(): Promise<IteratorResult<T>> {
|
async next(): Promise<IteratorResult<T>> {
|
||||||
if (this.chunks.length) {
|
if (this.chunks.length) {
|
||||||
return Promise.resolve({
|
return { value: this.chunks.shift()!, done: false };
|
||||||
value: this.chunks.shift()!,
|
|
||||||
done: false,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
if (this._closed) {
|
if (this._closed) return { value: undefined as any, done: true };
|
||||||
return Promise.resolve({
|
return new Promise((resolve) => this.resolvers.push(resolve));
|
||||||
value: undefined as any,
|
|
||||||
done: true,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return new Promise((resolve) => this.queue.push(resolve));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
|
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
|
||||||
|
|
@ -44,94 +43,65 @@ export class ChunkedStream<T> implements AsyncIterable<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function* mapStream<T, U>(
|
export const mapStream = <T, U>(
|
||||||
source: AsyncIterable<T>,
|
fn: (chunk: T, i: number) => U | Promise<U>,
|
||||||
fn: (chunk: T, index: number) => U | Promise<U>,
|
) =>
|
||||||
): AsyncIterable<U> {
|
async function* (source: AsyncIterable<T>): AsyncIterable<U> {
|
||||||
let index = 0;
|
let i = 0;
|
||||||
|
for await (const chunk of source) yield await fn(chunk, i++);
|
||||||
|
};
|
||||||
|
|
||||||
|
export const filterStream = <T>(
|
||||||
|
pred: (chunk: T, i: number) => boolean | Promise<boolean>,
|
||||||
|
) =>
|
||||||
|
async function* (source: AsyncIterable<T>): AsyncIterable<T> {
|
||||||
|
let i = 0;
|
||||||
for await (const chunk of source) {
|
for await (const chunk of source) {
|
||||||
yield await fn(chunk, index++);
|
if (await pred(chunk, i++)) yield chunk;
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
export async function* filterStream<T>(
|
export const takeStream = <T>(count: number) =>
|
||||||
source: AsyncIterable<T>,
|
async function* (source: AsyncIterable<T>): AsyncIterable<T> {
|
||||||
predicate: (chunk: T, index: number) => boolean | Promise<boolean>,
|
|
||||||
): AsyncIterable<T> {
|
|
||||||
let index = 0;
|
|
||||||
for await (const chunk of source) {
|
|
||||||
if (await predicate(chunk, index++)) {
|
|
||||||
yield chunk;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function* composeStreams<T>(
|
|
||||||
...sources: AsyncIterable<T>[]
|
|
||||||
): AsyncIterable<T> {
|
|
||||||
for (const source of sources) {
|
|
||||||
yield* source;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function* takeStream<T>(
|
|
||||||
source: AsyncIterable<T>,
|
|
||||||
count: number,
|
|
||||||
): AsyncIterable<T> {
|
|
||||||
let taken = 0;
|
let taken = 0;
|
||||||
for await (const chunk of source) {
|
for await (const chunk of source) {
|
||||||
if (taken++ >= count) break;
|
if (taken++ >= count) return;
|
||||||
yield chunk;
|
yield chunk;
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
export async function* skipStream<T>(
|
export const skipStream = <T>(count: number) =>
|
||||||
source: AsyncIterable<T>,
|
async function* (source: AsyncIterable<T>): AsyncIterable<T> {
|
||||||
count: number,
|
let i = 0;
|
||||||
): AsyncIterable<T> {
|
|
||||||
let skipped = 0;
|
|
||||||
for await (const chunk of source) {
|
for await (const chunk of source) {
|
||||||
if (skipped++ >= count) {
|
if (i++ >= count) yield chunk;
|
||||||
yield chunk;
|
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
}
|
|
||||||
|
|
||||||
export async function* batchStream<T>(
|
export const batchStream = <T>(size: number) =>
|
||||||
source: AsyncIterable<T>,
|
async function* (source: AsyncIterable<T>): AsyncIterable<T[]> {
|
||||||
size: number,
|
|
||||||
): AsyncIterable<T[]> {
|
|
||||||
let batch: T[] = [];
|
let batch: T[] = [];
|
||||||
|
|
||||||
for await (const chunk of source) {
|
for await (const chunk of source) {
|
||||||
batch.push(chunk);
|
batch.push(chunk);
|
||||||
if (batch.length >= size) {
|
if (batch.length >= size) {
|
||||||
yield batch, batch = [];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (batch.length > 0) {
|
|
||||||
yield batch;
|
yield batch;
|
||||||
|
batch = [];
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
export async function* flatMapStream<T, U>(
|
|
||||||
source: AsyncIterable<T>,
|
|
||||||
fn: (chunk: T, index: number) => AsyncIterable<U> | Iterable<U>,
|
|
||||||
): AsyncIterable<U> {
|
|
||||||
let index = 0;
|
|
||||||
for await (const chunk of source) {
|
|
||||||
yield* fn(chunk, index++);
|
|
||||||
}
|
}
|
||||||
}
|
batch.length && (yield batch);
|
||||||
|
};
|
||||||
|
|
||||||
export async function* tapStream<T>(
|
export const tapStream = <T>(
|
||||||
source: AsyncIterable<T>,
|
fn: (chunk: T, i: number) => void | Promise<void>,
|
||||||
fn: (chunk: T, index: number) => void | Promise<void>,
|
) =>
|
||||||
): AsyncIterable<T> {
|
async function* (source: AsyncIterable<T>): AsyncIterable<T> {
|
||||||
let index = 0;
|
let i = 0;
|
||||||
for await (const chunk of source) {
|
for await (const chunk of source) {
|
||||||
yield chunk;
|
yield chunk;
|
||||||
await fn(chunk, index++);
|
await fn(chunk, i++);
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
|
export const pipe =
|
||||||
|
<T>(...fns: Array<(src: AsyncIterable<T>) => AsyncIterable<any>>) =>
|
||||||
|
(source: AsyncIterable<T>) => fns.reduce((acc, fn) => fn(acc), source);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue