diff --git a/deno.json b/deno.json index 5a45916..a026964 100644 --- a/deno.json +++ b/deno.json @@ -1,26 +1,30 @@ { - "tasks": { - "dev": "deno run --allow-net src/main.ts" - }, - "imports": { - "interest/jsx-runtime": "./src/jsx.ts" - }, - "lint": { - "rules": { - "tags": [ - "recommended" - ], - "exclude": ["no-explicit-any", "require-await"] - } - }, - "compilerOptions": { - "jsx": "react-jsx", - "jsxImportSource": "interest", - "lib": [ - "deno.ns", - "esnext", - "dom", - "dom.iterable" - ] - } + "tasks": { + "dev": "deno run --allow-net src/main.ts" + }, + "imports": { + "interest/jsx-runtime": "./src/jsx.ts" + }, + "fmt": { + "useTabs": true, + "semiColons": true + }, + "lint": { + "rules": { + "tags": [ + "recommended" + ], + "exclude": ["no-explicit-any", "require-await"] + } + }, + "compilerOptions": { + "jsx": "react-jsx", + "jsxImportSource": "interest", + "lib": [ + "deno.ns", + "esnext", + "dom", + "dom.iterable" + ] + } } diff --git a/scripts/copyright.ts b/scripts/copyright.ts index 56b7afd..41a786a 100755 --- a/scripts/copyright.ts +++ b/scripts/copyright.ts @@ -15,17 +15,17 @@ const copyrightHeader = `/** const dir = "./"; for await ( - const entry of walk(dir, { - exts: [".ts", ".tsx"], - includeDirs: false, - skip: [/node_modules/, /copyright\.ts$/], - }) + const entry of walk(dir, { + exts: [".ts", ".tsx"], + includeDirs: false, + skip: [/node_modules/, /copyright\.ts$/], + }) ) { - const filePath = entry.path; - const content = await Deno.readTextFile(filePath); + const filePath = entry.path; + const content = await Deno.readTextFile(filePath); - if (!content.startsWith(copyrightHeader)) { - await Deno.writeTextFile(filePath, copyrightHeader + "\n" + content); - console.log(`Added header to ${filePath}`); - } + if (!content.startsWith(copyrightHeader)) { + await Deno.writeTextFile(filePath, copyrightHeader + "\n" + content); + console.log(`Added header to ${filePath}`); + } } diff --git a/src/app.tsx b/src/app.tsx index a558e06..243b45c 100644 --- a/src/app.tsx +++ b/src/app.tsx @@ -6,21 +6,22 @@ import { close, open } from "interest/jsx-runtime"; async function* Fruits() { - const fruits = ["TSX", "Apple", "Banana", "Cherry"]; - yield open("ol"); - for (const fruit of fruits) { - await new Promise((r) => setTimeout(r, 500)); - yield
  • {fruit}
  • ; - } - yield close("ol"); + const fruits = ["TSX", "Apple", "Banana", "Cherry"]; + + yield open("ol"); + for (const fruit of fruits) { + await new Promise((r) => setTimeout(r, 500)); + yield
  • {fruit}
  • ; + } + yield close("ol"); } export default function App() { - return ( - <> -

    JSX Page

    -

    meowing chunk by chunk

    - - - ); + return ( + <> +

    JSX Page

    +

    meowing chunk by chunk

    + + + ); } diff --git a/src/global.d.ts b/src/global.d.ts index 3e1a77b..1f6ab92 100644 --- a/src/global.d.ts +++ b/src/global.d.ts @@ -8,34 +8,34 @@ import type { JsxElement } from "interest/jsx-runtime"; type HTMLAttributeMap = Partial< - Omit & { - style?: string; - class?: string; - children?: any; - [key: `data-${string}`]: string | number | boolean | null | undefined; - [key: `aria-${string}`]: string | number | boolean | null | undefined; - } + Omit & { + style?: string; + class?: string; + children?: any; + [key: `data-${string}`]: string | number | boolean | null | undefined; + [key: `aria-${string}`]: string | number | boolean | null | undefined; + } >; declare global { - namespace JSX { - type Element = JsxElement; + namespace JSX { + type Element = JsxElement; - export interface ElementChildrenAttribute { - // deno-lint-ignore ban-types - children: {}; - } + export interface ElementChildrenAttribute { + // deno-lint-ignore ban-types + children: {}; + } - export type IntrinsicElements = - & { - [K in keyof HTMLElementTagNameMap]: HTMLAttributeMap< - HTMLElementTagNameMap[K] - >; - } - & { - [K in keyof SVGElementTagNameMap]: HTMLAttributeMap< - SVGElementTagNameMap[K] - >; - }; - } + export type IntrinsicElements = + & { + [K in keyof HTMLElementTagNameMap]: HTMLAttributeMap< + HTMLElementTagNameMap[K] + >; + } + & { + [K in keyof SVGElementTagNameMap]: HTMLAttributeMap< + SVGElementTagNameMap[K] + >; + }; + } } diff --git a/src/html.ts b/src/html.ts index f84ff32..d8eab54 100644 --- a/src/html.ts +++ b/src/html.ts @@ -7,131 +7,131 @@ import { type Chunk } from "./http.ts"; import { ChunkedStream } from "./stream.ts"; export type Attrs = Record< - string, - string | number | boolean | null | undefined + string, + string | number | boolean | null | undefined >; export const VOID_TAGS = new Set([ - "area", - "base", - "br", - "col", - "embed", - "hr", - "img", - "input", - "link", - "meta", - "param", - "source", - "track", - "wbr", + "area", + "base", + "br", + "col", + "embed", + "hr", + "img", + "input", + "link", + "meta", + "param", + "source", + "track", + "wbr", ]); const ESCAPE_MAP: Record = { - "&": "&", - "<": "<", - ">": ">", - '"': """, - "'": "'", + "&": "&", + "<": "<", + ">": ">", + '"': """, + "'": "'", }; export function escape(input: string): string { - let result = ""; - let lastIndex = 0; + let result = ""; + let lastIndex = 0; - for (let i = 0; i < input.length; i++) { - const replacement = ESCAPE_MAP[input[i]]; - if (replacement) { - result += input.slice(lastIndex, i) + replacement; - lastIndex = i + 1; - } - } + for (let i = 0; i < input.length; i++) { + const replacement = ESCAPE_MAP[input[i]]; + if (replacement) { + result += input.slice(lastIndex, i) + replacement; + lastIndex = i + 1; + } + } - return lastIndex ? result + input.slice(lastIndex) : input; + return lastIndex ? result + input.slice(lastIndex) : input; } function serialize(attrs: Attrs | undefined): string { - if (!attrs) return ""; - let output = ""; + if (!attrs) return ""; + let output = ""; - for (const key in attrs) { - const val = attrs[key]; - if (val == null || val === false) continue; - output += " "; - output += val === true ? key : `${key}="${escape(String(val))}"`; - } + for (const key in attrs) { + const val = attrs[key]; + if (val == null || val === false) continue; + output += " "; + output += val === true ? key : `${key}="${escape(String(val))}"`; + } - return output; + return output; } type TagRes = void | Promise; type TagFn = { - (attrs: Attrs, ...children: Chunk[]): TagRes; - (attrs: Attrs, fn: () => any): TagRes; - (...children: Chunk[]): TagRes; - (template: TemplateStringsArray, ...values: Chunk[]): TagRes; - (fn: () => any): TagRes; + (attrs: Attrs, ...children: Chunk[]): TagRes; + (attrs: Attrs, fn: () => any): TagRes; + (...children: Chunk[]): TagRes; + (template: TemplateStringsArray, ...values: Chunk[]): TagRes; + (fn: () => any): TagRes; }; export type HtmlProxy = { [K in keyof HTMLElementTagNameMap]: TagFn } & { - [key: string]: TagFn; + [key: string]: TagFn; }; const isTemplateLiteral = (arg: any): arg is TemplateStringsArray => - Array.isArray(arg) && "raw" in arg; + Array.isArray(arg) && "raw" in arg; const isAttributes = (arg: any): arg is Record => - arg && typeof arg === "object" && !isTemplateLiteral(arg) && - !Array.isArray(arg) && !(arg instanceof Promise); + arg && typeof arg === "object" && !isTemplateLiteral(arg) && + !Array.isArray(arg) && !(arg instanceof Promise); async function render(child: unknown): Promise { - if (child == null) return ""; + if (child == null) return ""; - if (typeof child === "string") return escape(child); - if (typeof child === "number") return String(child); - if (typeof child === "boolean") return String(Number(child)); + if (typeof child === "string") return escape(child); + if (typeof child === "number") return String(child); + if (typeof child === "boolean") return String(Number(child)); - if (child instanceof Promise) return render(await child); + if (child instanceof Promise) return render(await child); - if (Array.isArray(child)) { - return (await Promise.all(child.map(render))).join(""); - } + if (Array.isArray(child)) { + return (await Promise.all(child.map(render))).join(""); + } - if (typeof child === "function") return render(await child()); - return escape(String(child)); + if (typeof child === "function") return render(await child()); + return escape(String(child)); } export function html(chunks: ChunkedStream): HtmlProxy { - const cache = new Map(); - const write = (buf: string) => !chunks.closed && chunks.write(buf); + const cache = new Map(); + const write = (buf: string) => !chunks.closed && chunks.write(buf); - const handler: ProxyHandler> = { - get(_, tag: string) { - let fn = cache.get(tag); - if (fn) return fn; + const handler: ProxyHandler> = { + get(_, tag: string) { + let fn = cache.get(tag); + if (fn) return fn; - fn = async (...args: unknown[]) => { - const attrs = isAttributes(args[0]) ? args.shift() : undefined; + fn = async (...args: unknown[]) => { + const attrs = isAttributes(args[0]) ? args.shift() : undefined; - const isVoid = VOID_TAGS.has(tag.toLowerCase()); - const attributes = serialize(attrs as Attrs); + const isVoid = VOID_TAGS.has(tag.toLowerCase()); + const attributes = serialize(attrs as Attrs); - write(`<${tag}${attributes}${isVoid ? "/" : ""}>`); - if (isVoid) return; + write(`<${tag}${attributes}${isVoid ? "/" : ""}>`); + if (isVoid) return; - for (const child of args) { - write(await render(child)); - } + for (const child of args) { + write(await render(child)); + } - write(``); - }; + write(``); + }; - return cache.set(tag, fn), fn; - }, - }; + return cache.set(tag, fn), fn; + }, + }; - const proxy = new Proxy({}, handler) as HtmlProxy; - return proxy; + const proxy = new Proxy({}, handler) as HtmlProxy; + return proxy; } diff --git a/src/http.ts b/src/http.ts index c27a95e..7d811f7 100644 --- a/src/http.ts +++ b/src/http.ts @@ -6,127 +6,127 @@ import { ChunkedStream } from "./stream.ts"; export interface StreamOptions { - headContent?: string; - bodyAttributes?: string; - lang?: string; + headContent?: string; + bodyAttributes?: string; + lang?: string; } export type Chunk = - | string - | AsyncIterable - | Promise - | Iterable - | null - | undefined; + | string + | AsyncIterable + | Promise + | Iterable + | null + | undefined; async function* normalize( - value: Chunk | undefined | null, + value: Chunk | undefined | null, ): AsyncIterable { - if (value == null) return; + if (value == null) return; - if (typeof value === "string") { - yield value; - } else if (value instanceof Promise) { - const resolved = await value; - if (resolved != null) yield String(resolved); - } else if (Symbol.asyncIterator in value || Symbol.iterator in value) { - for await (const chunk of value as AsyncIterable) { - if (chunk != null) yield String(chunk); - } - } else { - yield String(value); - } + if (typeof value === "string") { + yield value; + } else if (value instanceof Promise) { + const resolved = await value; + if (resolved != null) yield String(resolved); + } else if (Symbol.asyncIterator in value || Symbol.iterator in value) { + for await (const chunk of value as AsyncIterable) { + if (chunk != null) yield String(chunk); + } + } else { + yield String(value); + } } export type ChunkedWriter = ( - strings: TemplateStringsArray, - ...values: Chunk[] + strings: TemplateStringsArray, + ...values: Chunk[] ) => Promise; export const makeChunkWriter = - (stream: ChunkedStream): ChunkedWriter => - async (strings, ...values) => { - const emit = (chunk: string) => - !stream.closed && - (chunk === "EOF" ? stream.close() : stream.write(chunk)); + (stream: ChunkedStream): ChunkedWriter => + async (strings, ...values) => { + const emit = (chunk: string) => + !stream.closed && + (chunk === "EOF" ? stream.close() : stream.write(chunk)); - for (let i = 0; i < strings.length; i++) { - strings[i] && emit(strings[i]); + for (let i = 0; i < strings.length; i++) { + strings[i] && emit(strings[i]); - for await (const chunk of normalize(values[i])) { - emit(chunk); - } - } - }; + for await (const chunk of normalize(values[i])) { + emit(chunk); + } + } + }; export function chunkedHtml() { - const chunks = new ChunkedStream(); + const chunks = new ChunkedStream(); - const stream = new ReadableStream({ - async start(controller) { - const encoder = new TextEncoder(); - try { - for await (const chunk of chunks) { - controller.enqueue(encoder.encode(chunk)); - } - controller.close(); - } catch (error) { - controller.error(error); - } - }, - cancel: chunks.close, - }); + const stream = new ReadableStream({ + async start(controller) { + const encoder = new TextEncoder(); + try { + for await (const chunk of chunks) { + controller.enqueue(encoder.encode(chunk)); + } + controller.close(); + } catch (error) { + controller.error(error); + } + }, + cancel: chunks.close, + }); - return { chunks, stream }; + return { chunks, stream }; } const DOCUMENT_TYPE = ""; const HTML_BEGIN = (lang: string) => - ``; + ``; const HEAD_END = "; - chunks: ChunkedStream; - close(): void; - error(err: Error): void; - readonly response: Response; + write: ChunkedWriter; + blob: ReadableStream; + chunks: ChunkedStream; + close(): void; + error(err: Error): void; + readonly response: Response; } export async function createHtmlStream( - options: StreamOptions = {}, + options: StreamOptions = {}, ): Promise { - const { chunks, stream } = chunkedHtml(); - const writer = makeChunkWriter(chunks); + const { chunks, stream } = chunkedHtml(); + const writer = makeChunkWriter(chunks); - chunks.write(DOCUMENT_TYPE); - chunks.write(HTML_BEGIN(options.lang || "en")); - options.headContent && chunks.write(options.headContent); - chunks.write(HEAD_END); - options.bodyAttributes && chunks.write(" " + options.bodyAttributes); - chunks.write(BODY_END); + chunks.write(DOCUMENT_TYPE); + chunks.write(HTML_BEGIN(options.lang || "en")); + options.headContent && chunks.write(options.headContent); + chunks.write(HEAD_END); + options.bodyAttributes && chunks.write(" " + options.bodyAttributes); + chunks.write(BODY_END); - return { - write: writer, - blob: stream, - chunks, - close() { - if (!chunks.closed) { - chunks.write(HTML_END); - chunks.close(); - } - }, - error: chunks.error, - response: new Response(stream, { - headers: { - "Content-Type": "text/html; charset=utf-8", - "Transfer-Encoding": "chunked", - "Cache-Control": "no-cache", - "Connection": "keep-alive", - }, - }), - }; + return { + write: writer, + blob: stream, + chunks, + close() { + if (!chunks.closed) { + chunks.write(HTML_END); + chunks.close(); + } + }, + error: chunks.error, + response: new Response(stream, { + headers: { + "Content-Type": "text/html; charset=utf-8", + "Transfer-Encoding": "chunked", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + }), + }; } diff --git a/src/jsx.ts b/src/jsx.ts index cb28938..16453cf 100644 --- a/src/jsx.ts +++ b/src/jsx.ts @@ -12,111 +12,111 @@ type Props = Attrs & { children?: any }; type Component = (props: Props) => JsxElement | AsyncGenerator; export type JsxElement = - | ((chunks: ChunkedStream) => Promise) - | AsyncGenerator; + | ((chunks: ChunkedStream) => Promise) + | AsyncGenerator; async function render( - child: any, - chunks: ChunkedStream, - context: ReturnType, + child: any, + chunks: ChunkedStream, + context: ReturnType, ): Promise { - if (child == null || child === false || child === true) return; + if (child == null || child === false || child === true) return; - if (typeof child === "string") return chunks.write(escape(child)); - if (typeof child === "number") return chunks.write(String(child)); + if (typeof child === "string") return chunks.write(escape(child)); + if (typeof child === "number") return chunks.write(String(child)); - if (typeof child === "function") return await child(chunks); - if (child instanceof Promise) { - return await render(await child, chunks, context); - } + if (typeof child === "function") return await child(chunks); + if (child instanceof Promise) { + return await render(await child, chunks, context); + } - if (typeof child === "object" && Symbol.asyncIterator in child) { - (async () => { - for await (const item of child) { - await render(item, chunks, context); - } - })(); - return; - } + if (typeof child === "object" && Symbol.asyncIterator in child) { + (async () => { + for await (const item of child) { + await render(item, chunks, context); + } + })(); + return; + } - if (Array.isArray(child)) { - for (const item of child) await render(item, chunks, context); - return; - } + if (Array.isArray(child)) { + for (const item of child) await render(item, chunks, context); + return; + } - chunks.write(escape(String(child))); + chunks.write(escape(String(child))); } export function jsx( - tag: string | Component | typeof Fragment, - props: Props | null = {}, + tag: string | Component | typeof Fragment, + props: Props | null = {}, ): JsxElement { - props ||= {}; + props ||= {}; - return async (chunks: ChunkedStream) => { - const context = html(chunks); - const { children, ...attrs } = props; + return async (chunks: ChunkedStream) => { + const context = html(chunks); + const { children, ...attrs } = props; - if (tag === Fragment) { - if (!Array.isArray(children)) { - return await render([children], chunks, context); - } - for (const child of children) { - await render(child, chunks, context); - } - return; - } + if (tag === Fragment) { + if (!Array.isArray(children)) { + return await render([children], chunks, context); + } + for (const child of children) { + await render(child, chunks, context); + } + return; + } - if (typeof tag === "function") { - return await render(tag(props), chunks, context); - } + if (typeof tag === "function") { + return await render(tag(props), chunks, context); + } - const childr = children == null ? [] : [].concat(children); - const attributes = Object.keys(attrs).length ? attrs : {}; + const childr = children == null ? [] : [].concat(children); + const attributes = Object.keys(attrs).length ? attrs : {}; - if (!childr.length || VOID_TAGS.has(tag)) { - await context[tag](childr); - } else { - await context[tag](attributes, async () => { - for (const child of childr) { - await render(child, chunks, context); - } - }); - } - }; + if (!childr.length || VOID_TAGS.has(tag)) { + await context[tag](childr); + } else { + await context[tag](attributes, async () => { + for (const child of childr) { + await render(child, chunks, context); + } + }); + } + }; } export const jsxs = jsx; async function renderJsx( - element: JsxElement | JsxElement[], - chunks: ChunkedStream, + element: JsxElement | JsxElement[], + chunks: ChunkedStream, ): Promise { - if (Array.isArray(element)) { - for (const el of element) { - await renderJsx(el, chunks); - } - return; - } - if (typeof element === "object" && Symbol.asyncIterator in element) { - for await (const item of element) { - await renderJsx(item, chunks); - } - return; - } - if (typeof element === "function") { - await element(chunks); - } + if (Array.isArray(element)) { + for (const el of element) { + await renderJsx(el, chunks); + } + return; + } + if (typeof element === "object" && Symbol.asyncIterator in element) { + for await (const item of element) { + await renderJsx(item, chunks); + } + return; + } + if (typeof element === "function") { + await element(chunks); + } } export const raw = - (html: string): JsxElement => async (chunks: ChunkedStream) => - void (!chunks.closed && chunks.write(html)); + (html: string): JsxElement => async (chunks: ChunkedStream) => + void (!chunks.closed && chunks.write(html)); export const open = (tag: K) => - raw(`<${tag}>`); + raw(`<${tag}>`); export const close = (tag: K) => - raw(``); + raw(``); export { renderJsx as render }; diff --git a/src/main.ts b/src/main.ts index 5423546..5c1dbf4 100644 --- a/src/main.ts +++ b/src/main.ts @@ -8,10 +8,10 @@ import App from "./app.tsx"; import { render } from "interest/jsx-runtime"; Deno.serve({ - port: 8080, - async handler() { - const stream = await createHtmlStream({ lang: "en" }); - await render(App(), stream.chunks); - return stream.response; - }, + port: 8080, + async handler() { + const stream = await createHtmlStream({ lang: "en" }); + await render(App(), stream.chunks); + return stream.response; + }, }); diff --git a/src/stream.ts b/src/stream.ts index 58c7eaa..729d947 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -4,141 +4,141 @@ */ export class ChunkedStream implements AsyncIterable { - private readonly chunks: T[] = []; + private readonly chunks: T[] = []; - private readonly resolvers: ((result: IteratorResult) => void)[] = []; - private readonly rejectors: ((error: Error) => void)[] = []; + private readonly resolvers: ((result: IteratorResult) => void)[] = []; + private readonly rejectors: ((error: Error) => void)[] = []; - private _error: Error | null = null; - private _closed = false; + private _error: Error | null = null; + private _closed = false; - get closed(): boolean { - return this._closed; - } + get closed(): boolean { + return this._closed; + } - write(chunk: T) { - if (this._closed) throw new Error("Cannot write to closed stream"); + write(chunk: T) { + if (this._closed) throw new Error("Cannot write to closed stream"); - const resolver = this.resolvers.shift(); - if (resolver) { - this.rejectors.shift(); - resolver({ value: chunk, done: false }); - } else { - this.chunks.push(chunk); - } - } + const resolver = this.resolvers.shift(); + if (resolver) { + this.rejectors.shift(); + resolver({ value: chunk, done: false }); + } else { + this.chunks.push(chunk); + } + } - close(): void { - this._closed = true; - while (this.resolvers.length) { - this.rejectors.shift(); - this.resolvers.shift()!({ value: undefined! as any, done: true }); - } - } + close(): void { + this._closed = true; + while (this.resolvers.length) { + this.rejectors.shift(); + this.resolvers.shift()!({ value: undefined! as any, done: true }); + } + } - error(err: Error): void { - if (this._closed) return; + error(err: Error): void { + if (this._closed) return; - this._error = err; - this._closed = true; + this._error = err; + this._closed = true; - while (this.rejectors.length) { - this.rejectors.shift()!(err); - this.resolvers.shift(); - } - } + while (this.rejectors.length) { + this.rejectors.shift()!(err); + this.resolvers.shift(); + } + } - async next(): Promise> { - if (this._error) { - throw this._error; - } + async next(): Promise> { + if (this._error) { + throw this._error; + } - if (this.chunks.length) { - return { value: this.chunks.shift()!, done: false }; - } - if (this._closed) return { value: undefined as any, done: true }; + if (this.chunks.length) { + return { value: this.chunks.shift()!, done: false }; + } + if (this._closed) return { value: undefined as any, done: true }; - return new Promise((resolve, reject) => { - this.resolvers.push(resolve); - this.rejectors.push(reject); - }); - } + return new Promise((resolve, reject) => { + this.resolvers.push(resolve); + this.rejectors.push(reject); + }); + } - [Symbol.asyncIterator](): AsyncIterableIterator { - return this; - } + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } } export const mapStream = ( - fn: (chunk: T, index: number) => U | Promise, + fn: (chunk: T, index: number) => U | Promise, ) => - async function* (source: AsyncIterable): AsyncIterable { - let index = 0; - for await (const chunk of source) yield await fn(chunk, index++); - }; + async function* (source: AsyncIterable): AsyncIterable { + let index = 0; + for await (const chunk of source) yield await fn(chunk, index++); + }; export const filterStream = ( - pred: (chunk: T, index: number) => boolean | Promise, + pred: (chunk: T, index: number) => boolean | Promise, ) => - async function* (source: AsyncIterable): AsyncIterable { - let index = 0; - for await (const chunk of source) { - if (await pred(chunk, index++)) yield chunk; - } - }; + async function* (source: AsyncIterable): AsyncIterable { + let index = 0; + for await (const chunk of source) { + if (await pred(chunk, index++)) yield chunk; + } + }; export const takeStream = (count: number) => - async function* (source: AsyncIterable): AsyncIterable { - let taken = 0; - for await (const chunk of source) { - if (taken++ >= count) return; - yield chunk; - } - }; + async function* (source: AsyncIterable): AsyncIterable { + let taken = 0; + for await (const chunk of source) { + if (taken++ >= count) return; + yield chunk; + } + }; export const skipStream = (count: number) => - async function* (source: AsyncIterable): AsyncIterable { - let index = 0; - for await (const chunk of source) { - if (index++ >= count) yield chunk; - } - }; + async function* (source: AsyncIterable): AsyncIterable { + let index = 0; + for await (const chunk of source) { + if (index++ >= count) yield chunk; + } + }; export const batchStream = (size: number) => - async function* (source: AsyncIterable): AsyncIterable { - let batch: T[] = []; - for await (const chunk of source) { - batch.push(chunk); - if (batch.length >= size) { - yield batch; - batch = []; - } - } - if (batch.length) yield batch; - }; + async function* (source: AsyncIterable): AsyncIterable { + let batch: T[] = []; + for await (const chunk of source) { + batch.push(chunk); + if (batch.length >= size) { + yield batch; + batch = []; + } + } + if (batch.length) yield batch; + }; export const tapStream = ( - fn: (chunk: T, index: number) => void | Promise, + fn: (chunk: T, index: number) => void | Promise, ) => - async function* (source: AsyncIterable): AsyncIterable { - let index = 0; - for await (const chunk of source) { - yield chunk; - await fn(chunk, index++); - } - }; + async function* (source: AsyncIterable): AsyncIterable { + let index = 0; + for await (const chunk of source) { + yield chunk; + await fn(chunk, index++); + } + }; export const catchStream = ( - handler: (error: Error) => void | Promise, + handler: (error: Error) => void | Promise, ) => - async function* (source: AsyncIterable): AsyncIterable { - try { - for await (const chunk of source) yield chunk; - } catch (err) { - await handler(err instanceof Error ? err : new Error(String(err))); - } - }; + async function* (source: AsyncIterable): AsyncIterable { + try { + for await (const chunk of source) yield chunk; + } catch (err) { + await handler(err instanceof Error ? err : new Error(String(err))); + } + }; export const pipe = - (...fns: Array<(src: AsyncIterable) => AsyncIterable>) => - (source: AsyncIterable) => fns.reduce((acc, fn) => fn(acc), source); + (...fns: Array<(src: AsyncIterable) => AsyncIterable>) => + (source: AsyncIterable) => fns.reduce((acc, fn) => fn(acc), source);