Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,7 @@ function createBlobFromFilePath(path, options) {
return lazyDOMException('The blob could not be read', 'NotReadableError');
}
const { 0: blob, 1: length } = maybeBlob;
const res = createBlob(blob, length, options?.type);
res[kNotCloneable] = true;
return res;
return createBlob(blob, length, options?.type);
}

function arrayBuffer(blob) {
Expand Down
24 changes: 13 additions & 11 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class NonIdempotentDataQueueReader;

class EntryImpl : public DataQueue::Entry {
public:
virtual std::shared_ptr<DataQueue::Reader> get_reader() = 0;
std::shared_ptr<DataQueue::Reader> get_reader(Environment* env = nullptr) override = 0;
};

class DataQueueImpl final : public DataQueue,
Expand Down Expand Up @@ -183,7 +183,7 @@ class DataQueueImpl final : public DataQueue,
return !backpressure_listeners_.empty();
}

std::shared_ptr<Reader> get_reader() override;
std::shared_ptr<Reader> get_reader(Environment* env = nullptr) override;
SET_MEMORY_INFO_NAME(DataQueue)
SET_SELF_SIZE(DataQueueImpl)

Expand Down Expand Up @@ -521,7 +521,7 @@ class NonIdempotentDataQueueReader final
bool pull_pending_ = false;
};

std::shared_ptr<DataQueue::Reader> DataQueueImpl::get_reader() {
std::shared_ptr<DataQueue::Reader> DataQueueImpl::get_reader(Environment* env) {
if (is_idempotent()) {
return std::make_shared<IdempotentDataQueueReader>(shared_from_this());
}
Expand Down Expand Up @@ -573,7 +573,7 @@ class EmptyEntry final : public EntryImpl {
EmptyEntry& operator=(const EmptyEntry&) = delete;
EmptyEntry& operator=(EmptyEntry&&) = delete;

std::shared_ptr<DataQueue::Reader> get_reader() override {
std::shared_ptr<DataQueue::Reader> get_reader(Environment* env) override {
return std::make_shared<EmptyReader>();
}

Expand Down Expand Up @@ -661,7 +661,7 @@ class InMemoryEntry final : public EntryImpl {
InMemoryEntry& operator=(const InMemoryEntry&) = delete;
InMemoryEntry& operator=(InMemoryEntry&&) = delete;

std::shared_ptr<DataQueue::Reader> get_reader() override {
std::shared_ptr<DataQueue::Reader> get_reader(Environment* env) override {
return std::make_shared<InMemoryReader>(*this);
}

Expand Down Expand Up @@ -732,8 +732,8 @@ class DataQueueEntry : public EntryImpl {
DataQueueEntry& operator=(const DataQueueEntry&) = delete;
DataQueueEntry& operator=(DataQueueEntry&&) = delete;

std::shared_ptr<DataQueue::Reader> get_reader() override {
return std::make_shared<ReaderImpl>(data_queue_->get_reader());
std::shared_ptr<DataQueue::Reader> get_reader(Environment* env) override {
return std::make_shared<ReaderImpl>(data_queue_->get_reader(env));
}

std::unique_ptr<Entry> slice(
Expand Down Expand Up @@ -844,8 +844,8 @@ class FdEntry final : public EntryImpl {
CHECK_LE(start, end);
}

std::shared_ptr<DataQueue::Reader> get_reader() override {
return ReaderImpl::Create(this);
std::shared_ptr<DataQueue::Reader> get_reader(Environment* env) override {
return ReaderImpl::Create(this, env);
}

std::unique_ptr<Entry> slice(
Expand Down Expand Up @@ -901,7 +901,8 @@ class FdEntry final : public EntryImpl {
public StreamListener,
public std::enable_shared_from_this<ReaderImpl> {
public:
static std::shared_ptr<ReaderImpl> Create(FdEntry* entry) {
static std::shared_ptr<ReaderImpl> Create(FdEntry* entry,
Environment* env) {
uv_fs_t req;
auto cleanup = OnScopeLeave([&] { uv_fs_req_cleanup(&req); });
int file =
Expand All @@ -910,7 +911,8 @@ class FdEntry final : public EntryImpl {
uv_fs_close(nullptr, &req, file, nullptr);
return nullptr;
}
Realm* realm = entry->env()->principal_realm();
Environment* reader_env = env ? env : entry->env();
Realm* realm = reader_env->principal_realm();
return std::make_shared<ReaderImpl>(
BaseObjectPtr<fs::FileHandle>(
fs::FileHandle::New(realm->GetBindingData<fs::BindingData>(),
Expand Down
6 changes: 5 additions & 1 deletion src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class DataQueue : public MemoryRetainer {
// idempotent and cannot preserve that quality, subsequent reads
// must fail with an error when a variance is detected.
virtual bool is_idempotent() const = 0;

// Create a reader for this entry. If `env` is provided, it may be used
// when the entry needs to create native resources in the current realm.
virtual std::shared_ptr<Reader> get_reader(Environment* env = nullptr) = 0;
};

// Creates an idempotent DataQueue with a pre-established collection
Expand Down Expand Up @@ -228,7 +232,7 @@ class DataQueue : public MemoryRetainer {
// any number of readers can be created, all of which are guaranteed
// to provide the same data. Otherwise, only a single reader is
// permitted.
virtual std::shared_ptr<Reader> get_reader() = 0;
virtual std::shared_ptr<Reader> get_reader(Environment* env = nullptr) = 0;

// Append a single new entry to the queue. Appending is only allowed
// when is_idempotent() is false. std::nullopt will be returned
Expand Down
95 changes: 69 additions & 26 deletions src/node_blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include "v8.h"

#include <algorithm>
#include <cstdlib>
#include <mutex>
#include <optional>
#include <unordered_map>

namespace node {

Expand Down Expand Up @@ -103,6 +107,48 @@ void Concat(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(ArrayBuffer::New(isolate, std::move(store)));
}

struct BlobURLEntry {
std::shared_ptr<DataQueue> data_queue;
size_t length;
std::string type;
};

static std::mutex blob_url_registry_mutex;
static std::unordered_map<std::string, BlobURLEntry> blob_url_registry;
Comment on lines +116 to +117
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a good use case for ExclusiveAccess from node_mutex.h


static void RevokeBlobURLEntry(const std::string& uuid);

static void BlobURLCleanupHook(void* arg) {
std::string* uuid = static_cast<std::string*>(arg);
RevokeBlobURLEntry(*uuid);
delete uuid;
}

static void StoreBlobURLEntry(const std::string& uuid,
std::shared_ptr<DataQueue> data_queue,
size_t length,
std::string type,
Environment* env) {
std::lock_guard<std::mutex> lock(blob_url_registry_mutex);
blob_url_registry[uuid] = BlobURLEntry{std::move(data_queue), length,
std::move(type)};

std::string* uuid_copy = new std::string(uuid);
env->AddCleanupHook(BlobURLCleanupHook, uuid_copy);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really the right lifetime? i.e. a blob URL becomes inaccessible once its creating environment exits? That would mean, for example, that when a worker exits (and that could also be e.g. a crash due to an uncaught exception, i.e. something unpredictable), its parent environment would not be able to use the URL anymore

}

static std::optional<BlobURLEntry> GetBlobURLEntry(const std::string& uuid) {
std::lock_guard<std::mutex> lock(blob_url_registry_mutex);
auto it = blob_url_registry.find(uuid);
if (it == blob_url_registry.end()) return std::nullopt;
return it->second;
}

static void RevokeBlobURLEntry(const std::string& uuid) {
std::lock_guard<std::mutex> lock(blob_url_registry_mutex);
blob_url_registry.erase(uuid);
}

void BlobFromFilePath(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
BufferValue path(env->isolate(), args[0]);
Expand Down Expand Up @@ -302,7 +348,7 @@ Blob::Reader::Reader(Environment* env,
Local<Object> obj,
BaseObjectPtr<Blob> strong_ptr)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_BLOBREADER),
inner_(strong_ptr->data_queue_->get_reader()),
inner_(strong_ptr->data_queue_->get_reader(env)),
strong_ptr_(std::move(strong_ptr)) {
MakeWeak();
}
Expand Down Expand Up @@ -453,7 +499,6 @@ void Blob::StoreDataObject(const FunctionCallbackInfo<Value>& args) {
CHECK(args[2]->IsUint32()); // Length
CHECK(args[3]->IsString()); // Type

BlobBindingData* binding_data = realm->GetBindingData<BlobBindingData>();
Isolate* isolate = realm->isolate();

Utf8Value key(isolate, args[0]);
Expand All @@ -463,10 +508,8 @@ void Blob::StoreDataObject(const FunctionCallbackInfo<Value>& args) {
size_t length = args[2].As<Uint32>()->Value();
Utf8Value type(isolate, args[3]);

binding_data->store_data_object(
key.ToString(),
BlobBindingData::StoredDataObject(
BaseObjectPtr<Blob>(blob), length, type.ToString()));
StoreBlobURLEntry(key.ToString(), blob->data_queue_, length,
type.ToString(), realm->env());
}

// Note: applying the V8 Fast API to the following function does not produce
Expand All @@ -475,7 +518,6 @@ void Blob::RevokeObjectURL(const FunctionCallbackInfo<Value>& args) {
CHECK_GE(args.Length(), 1);
CHECK(args[0]->IsString());
Realm* realm = Realm::GetCurrent(args);
BlobBindingData* binding_data = realm->GetBindingData<BlobBindingData>();
Isolate* isolate = realm->isolate();

Utf8Value input(isolate, args[0].As<String>());
Expand All @@ -492,37 +534,38 @@ void Blob::RevokeObjectURL(const FunctionCallbackInfo<Value>& args) {
auto end_index = pathname.find(':', start_index + 1);
if (end_index == std::string_view::npos) {
auto id = std::string(pathname.substr(start_index + 1));
binding_data->revoke_data_object(id);
RevokeBlobURLEntry(id);
}
}
}

void Blob::GetDataObject(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsString());
Realm* realm = Realm::GetCurrent(args);
BlobBindingData* binding_data = realm->GetBindingData<BlobBindingData>();
Isolate* isolate = realm->isolate();

Utf8Value key(isolate, args[0]);
std::optional<BlobURLEntry> stored = GetBlobURLEntry(key.ToString());
if (!stored.has_value()) return;

Environment* env = realm->env();
BaseObjectPtr<Blob> blob = Blob::Create(env, stored->data_queue);
if (!blob) return;

Local<Value> type;
if (!String::NewFromUtf8(isolate,
stored->type.c_str(),
NewStringType::kNormal,
static_cast<int>(stored->type.length()))
.ToLocal(&type)) {
return;
}

BlobBindingData::StoredDataObject stored =
binding_data->get_data_object(key.ToString());
if (stored.blob) {
Local<Value> type;
if (!String::NewFromUtf8(isolate,
stored.type.c_str(),
NewStringType::kNormal,
static_cast<int>(stored.type.length()))
.ToLocal(&type)) {
return;
}

Local<Value> values[] = {stored.blob->object(),
Uint32::NewFromUnsigned(isolate, stored.length),
type};
Local<Value> values[] = {blob->object(),
Uint32::NewFromUnsigned(isolate, stored->length),
type};

args.GetReturnValue().Set(Array::New(isolate, values, arraysize(values)));
}
args.GetReturnValue().Set(Array::New(isolate, values, arraysize(values)));
}

void BlobBindingData::StoredDataObject::MemoryInfo(
Expand Down
11 changes: 4 additions & 7 deletions test/parallel/test-blob-file-backed.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,9 @@ writeFileSync(testfile5, '');
})().then(common.mustCall());

(async () => {
// We currently do not allow File-backed blobs to be cloned or transferred
// across worker threads. This is largely because the underlying FdEntry
// is bound to the Environment/Realm under which is was created.
const blob = await openAsBlob(__filename);
assert.throws(() => structuredClone(blob), {
code: 'ERR_INVALID_STATE',
message: 'Invalid state: File-backed Blobs are not cloneable'
});
const clone = structuredClone(blob);
assert.strictEqual(clone.size, blob.size);
assert.strictEqual(clone.type, blob.type);
assert.strictEqual(await clone.text(), await blob.text());
})().then(common.mustCall());
30 changes: 30 additions & 0 deletions test/parallel/test-blob-url-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { isMainThread, parentPort, Worker } = require('worker_threads');
const { Blob } = require('buffer');

if (isMainThread) {
const blob = new Blob(['hello world']);
const url = URL.createObjectURL(blob);

const worker = new Worker(__filename);
worker.once('message', common.mustCall((value) => {
assert.deepStrictEqual(value, { size: 11, type: '', text: 'hello world' });
worker.terminate();
}));
worker.once('error', common.mustNotCall());
worker.once('exit', common.mustCall((code) => {
assert.strictEqual(code, 0);
}));

worker.postMessage(url);
} else {
const { resolveObjectURL } = require('buffer');
parentPort.once('message', async (url) => {
const blob = resolveObjectURL(url);
const text = await blob.text();
parentPort.postMessage({ size: blob.size, type: blob.type, text });
});
}
Loading