Skip to content
7 changes: 7 additions & 0 deletions lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const {
Buffer,
} = require('buffer');

const {
isArrayBuffer,
} = require('internal/util/types');

const {
AbortError,
ErrnoException,
Expand Down Expand Up @@ -213,6 +217,9 @@ function newWritableStreamFromStreamWritable(streamWritable) {
start(c) { controller = c; },

write(chunk) {
if (!streamWritable.writableObjectMode && isArrayBuffer(chunk)) {
chunk = new Uint8Array(chunk);
}
if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
backpressurePromise = PromiseWithResolvers();
return SafePromisePrototypeFinally(
Expand Down
42 changes: 42 additions & 0 deletions test/parallel/test-webstreams-compression-buffer-source.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict';
require('../common');
const assert = require('assert');
const test = require('node:test');
const { DecompressionStream, CompressionStream } = require('stream/web');

// Minimal gzip-compressed bytes for "hello"
const compressedGzip = new Uint8Array([
31, 139, 8, 0, 0, 0, 0, 0, 0, 3,
203, 72, 205, 201, 201, 7, 0, 134, 166, 16, 54, 5, 0, 0, 0,
]);

test('DecompressionStream accepts ArrayBuffer chunks', async () => {
const ds = new DecompressionStream('gzip');
const writer = ds.writable.getWriter();

const writePromise = writer.write(compressedGzip.buffer);
writer.close();

const chunks = await Array.fromAsync(ds.readable);
await writePromise;
const out = Buffer.concat(chunks.map((c) => Buffer.from(c)));
assert.strictEqual(out.toString(), 'hello');
});

test('CompressionStream round-trip with ArrayBuffer input', async () => {
const cs = new CompressionStream('gzip');
const ds = new DecompressionStream('gzip');

const csWriter = cs.writable.getWriter();

const input = new TextEncoder().encode('hello').buffer;

await csWriter.write(input);
csWriter.close();

await cs.readable.pipeTo(ds.writable);

const out = await Array.fromAsync(ds.readable);
const result = Buffer.concat(out.map((c) => Buffer.from(c)));
assert.strictEqual(result.toString(), 'hello');
});
32 changes: 32 additions & 0 deletions test/parallel/test-whatwg-webstreams-adapters-to-writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,35 @@ class TestWritable extends Writable {
const writer = writableStream.getWriter();
writer.closed.then(common.mustCall());
}

{
const duplex = new PassThrough();
const writableStream = newWritableStreamFromStreamWritable(duplex);
const ec = new TextEncoder();
const arrayBuffer = ec.encode('hello').buffer;
writableStream
.getWriter()
.write(arrayBuffer)
.then(common.mustCall());

duplex.on('data', common.mustCall((chunk) => {
assert(chunk instanceof Buffer);
assert(chunk.equals(Buffer.from('hello')));
}));
}

{
const duplex = new PassThrough({ objectMode: true });
const writableStream = newWritableStreamFromStreamWritable(duplex);
const ec = new TextEncoder();
const arrayBuffer = ec.encode('hello').buffer;
writableStream
.getWriter()
.write(arrayBuffer)
.then(common.mustCall());

duplex.on('data', common.mustCall((chunk) => {
assert(chunk instanceof ArrayBuffer);
assert.strictEqual(chunk, arrayBuffer);
}));
}
3 changes: 0 additions & 3 deletions test/wpt/status/compression.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
"decompression-bad-chunks.tentative.any.js": {
"skip": "Execution \"hangs\", ArrayBuffer and TypedArray is not accepted and throws, instead of rejects during writer.write"
},
"decompression-buffersource.tentative.any.js": {
"skip": "ArrayBuffer and TypedArray is not accepted and throws, instead of rejects during writer.write"
},
"compression-with-detach.tentative.window.js": {
"requires": ["crypto"]
},
Expand Down
Loading