Generate Web ReadableStream by converting Node.js Readable
This commit is contained in:
@@ -2,7 +2,7 @@ import fs from 'node:fs';
|
||||
|
||||
import * as mm from '../lib/index.js';
|
||||
import type { IAudioMetadata, IOptions } from '../lib/index.js';
|
||||
import { makeReadableByteFileStream } from './util.js';
|
||||
import { makeByteReadableStreamFromFile } from './util.js';
|
||||
|
||||
type ParseFileMethod = (skipTest: () => void, filePath: string, mimeType?: string, options?: IOptions) => Promise<IAudioMetadata>;
|
||||
|
||||
@@ -39,7 +39,7 @@ export const Parsers: IParser[] = [
|
||||
description: 'parseWebStream',
|
||||
webStream: true,
|
||||
initParser: async (skipTest, filePath: string, mimeType?: string, options?: IOptions) => {
|
||||
const webStream = await makeReadableByteFileStream(filePath);
|
||||
const webStream = await makeByteReadableStreamFromFile(filePath);
|
||||
try {
|
||||
return await mm.parseWebStream(webStream.stream, {mimeType: mimeType, size: webStream.fileSize}, options);
|
||||
} finally {
|
||||
|
||||
+61
-41
@@ -3,8 +3,9 @@
|
||||
import { Readable } from 'node:stream';
|
||||
import path from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import { createReadStream } from 'node:fs';
|
||||
import fs from 'node:fs/promises';
|
||||
import { ReadableStream } from 'node:stream/web';
|
||||
import type {ReadableByteStreamController, ReadableStreamBYOBRequest} from 'node:stream/web';
|
||||
|
||||
const filename = fileURLToPath(import.meta.url);
|
||||
const dirname = path.dirname(filename);
|
||||
@@ -24,52 +25,71 @@ export class SourceStream extends Readable {
|
||||
}
|
||||
}
|
||||
|
||||
export async function makeReadableByteFileStream(filename: string, delay = 0): Promise<{ fileSize: number, stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {
|
||||
export async function makeByteReadableStreamFromFile(filename: string): Promise<{ fileSize: number, stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {
|
||||
|
||||
let position = 0;
|
||||
const fileInfo = await fs.stat(filename);
|
||||
const fileHandle = await fs.open(filename, 'r');
|
||||
const nodeStream = createReadStream(filename);
|
||||
|
||||
return {
|
||||
fileSize: fileInfo.size,
|
||||
stream: new ReadableStream({
|
||||
type: 'bytes',
|
||||
|
||||
async pull(controller) {
|
||||
|
||||
// @ts-ignore
|
||||
const view = controller.byobRequest.view;
|
||||
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position);
|
||||
if (bytesRead === 0) {
|
||||
await fileHandle.close();
|
||||
controller.close();
|
||||
// @ts-ignore
|
||||
controller.byobRequest.respond(0);
|
||||
} else {
|
||||
position += bytesRead;
|
||||
// @ts-ignore
|
||||
controller.byobRequest.respond(bytesRead);
|
||||
}
|
||||
} catch (err) {
|
||||
controller.error(err);
|
||||
await fileHandle.close();
|
||||
}
|
||||
}, delay);
|
||||
},
|
||||
|
||||
cancel() {
|
||||
return fileHandle.close();
|
||||
},
|
||||
|
||||
autoAllocateChunkSize: 1024
|
||||
}),
|
||||
closeFile: () => {
|
||||
return fileHandle.close();
|
||||
}
|
||||
stream: makeByteReadableStreamFromNodeReadable(nodeStream),
|
||||
closeFile: () => Promise.resolve()
|
||||
};
|
||||
}
|
||||
|
||||
function makeByteReadableStreamFromNodeReadable(nodeReadable: Readable): ReadableStream<Uint8Array> {
|
||||
return new ReadableStream<Uint8Array>({
|
||||
type: 'bytes',
|
||||
start(controller: ReadableByteStreamController) {
|
||||
const onData = (chunk: Buffer) => {
|
||||
if (controller.byobRequest) {
|
||||
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view;
|
||||
const bytesToCopy = Math.min(view.byteLength, chunk.byteLength);
|
||||
|
||||
new Uint8Array(view.buffer, view.byteOffset, view.byteLength)
|
||||
.set(new Uint8Array(chunk.buffer, chunk.byteOffset, bytesToCopy));
|
||||
|
||||
(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesToCopy);
|
||||
|
||||
if (bytesToCopy < chunk.byteLength) {
|
||||
controller.enqueue(chunk.subarray(bytesToCopy));
|
||||
}
|
||||
} else {
|
||||
controller.enqueue(new Uint8Array(chunk));
|
||||
}
|
||||
};
|
||||
|
||||
const onEnd = () => {
|
||||
controller.close();
|
||||
cleanup();
|
||||
};
|
||||
|
||||
const onError = (err: Error) => {
|
||||
controller.error(err);
|
||||
cleanup();
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
nodeReadable.off('data', onData);
|
||||
nodeReadable.off('end', onEnd);
|
||||
nodeReadable.off('error', onError);
|
||||
};
|
||||
|
||||
nodeReadable.on('data', onData);
|
||||
nodeReadable.on('end', onEnd);
|
||||
nodeReadable.on('error', onError);
|
||||
nodeReadable.resume();
|
||||
},
|
||||
pull(controller) {
|
||||
if (nodeReadable.isPaused()) {
|
||||
nodeReadable.resume();
|
||||
}
|
||||
},
|
||||
cancel(reason) {
|
||||
nodeReadable.destroy(reason);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
export const samplePath = path.join(dirname, 'samples');
|
||||
|
||||
Reference in New Issue
Block a user