refactored stream transforms

This commit is contained in:
Danny Coates 2018-07-18 16:39:14 -07:00
parent f58b6194ce
commit f12d3abe79
No known key found for this signature in database
GPG key ID: 4C442633C62E00CB
5 changed files with 65 additions and 80 deletions

View file

@ -1,35 +1,36 @@
/* global TransformStream ReadableStream */
import { createReadableStreamWrapper } from '@mattiasbuelens/web-streams-adapter';
import {
TransformStream as TransformStreamPony,
ReadableStream as ReadableStreamPony
} from 'web-streams-ponyfill';
/* global ReadableStream */
const toNativeReadable = createReadableStreamWrapper(ReadableStream);
const toPonyReadable = createReadableStreamWrapper(ReadableStreamPony);
export function transform(readable, transformer) {
const reader = readable.getReader();
const tstream = new ReadableStream({
start(controller) {
if (transformer.start) {
return transformer.start(controller);
}
},
async pull(controller) {
let enqueued = false;
const c = {
enqueue(d) {
enqueued = true;
controller.enqueue(d);
}
};
while (!enqueued) {
const x = await reader.read();
if (x.done) {
if (transformer.flush) {
await transformer.flush(controller);
}
return controller.close();
}
await transformer.transform(x.value, c);
}
},
cancel() {
readable.cancel();
}
});
export let TStream;
if (typeof TransformStream === 'function') {
TStream = TransformStream;
} else {
TStream = TransformStreamPony;
TStream.prototype.isPony = true;
}
export let RStream = ReadableStream;
try {
new ReadableStream().pipeThrough(new TransformStream());
} catch (e) {
RStream = ReadableStreamPony;
RStream.prototype.isPony = true;
RStream.prototype.toNative = function() {
return toNativeReadable(this);
};
}
export function wrapReadable(stream) {
if (RStream === ReadableStream) {
return stream;
}
return toPonyReadable(stream);
return tstream;
}