Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/types/Socket.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ export default class Socket extends EventEmitter<SocketEvents & ReadableEvents,
private _pending;
/** @private */
private _destroyed;
/** @private */
private _writableEnded;
/** @private */
private _readableEnded;
/** @type {'opening' | 'open' | 'readOnly' | 'writeOnly'} @private */
private _readyState;
/** @type {{ id: number; data: string; }[]} @private */
Expand Down
21 changes: 21 additions & 0 deletions src/FrameCodec.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Transform } from 'stream';
import { Buffer } from 'buffer';

export class FrameEncoder extends Transform {
constructor();
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
}

export class FrameDecoder extends Transform {
constructor();
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
static _nextId: number;
}

export function createLibp2pStream(socket: any): {
source: AsyncGenerator<any, void, unknown>;
sink: (src: AsyncIterable<any>) => Promise<void>;
[Symbol.asyncIterator]: () => AsyncIterator<any>;
};

export function encodeFrame(buffer: Buffer | string): Buffer;
141 changes: 141 additions & 0 deletions src/FrameCodec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
const { Transform } = require('stream');
const {
FrameEncoder,
createLibp2pStreamFactory,
encodeFrame
} = require('./FrameCodecShared');

const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';

class FrameDecoder extends Transform {
constructor() {
super({ readableObjectMode: true }); // object mode ensures zero-length payloads surface as readable chunks
this._q = [];
this._l = 0;
this._e = null;
this._vlen = 0;
this._id = FrameDecoder._nextId++;
}

_log(event, details) {
if (!DEBUG_FRAME_DECODER) return;
const prefix = `FrameDecoder#${this._id}`;
if (details) {
console.log(`${prefix} ${event}`, details);
} else {
console.log(`${prefix} ${event}`);
}
}

_transform(chunk, encoding, cb) {
try {
if (!Buffer.isBuffer(chunk)) chunk = Buffer.from(chunk);
this._q.push(chunk);
this._l += chunk.length;
this._log('chunk', { chunkLength: chunk.length, buffered: this._l });

while (this._l > 0) {
this._log('loop', { buffered: this._l, expectedPayload: this._e, varintBytes: this._vlen });
if (this._e === null) {
const decoded = this._dv();
if (!decoded) {
this._log('await_varint', { buffered: this._l });
break;
}
this._e = decoded.value;
this._vlen = decoded.bytes;
this._log('varint_ready', { payloadLength: this._e, headerBytes: this._vlen, buffered: this._l });
}

if (this._e !== null) {
const need = this._vlen + this._e;
if (this._l < need) {
this._log('await_payload', { need, buffered: this._l });
break;
}
this._take(this._vlen, 'varint');
const payload = this._take(this._e, 'payload');
this._log('frame_emitted', { payloadLength: this._e, buffered: this._l });
this._e = null;
this._vlen = 0;
this.push(payload);
}
}

cb();
} catch (err) {
cb(err);
}
}

_dv() {
let r = 0, s = 0, p = 0, i = 0, o = 0;
while (p < this._l) {
if (i >= this._q.length) break;
const buf = this._q[i];
if (o >= buf.length) {
i++;
o = 0;
continue;
}
const v = buf[o++];
r |= (v & 0x7f) << s;
p++;
this._log('varint_byte', { byte: v, shift: s, partialValue: r, bytesRead: p });
if ((v & 0x80) === 0) {
this._log('varint_complete', { value: r, bytes: p });
return { value: r, bytes: p };
}
s += 7;
if (s > 53) break;
}
this._log('varint_incomplete', { bytesScanned: p, buffered: this._l });
return null;
}

_take(n, label = 'bytes') {
this._log('take_start', { label, bytes: n, buffered: this._l });

// Zero-copy fast path: single chunk contains all needed bytes
if (this._q.length > 0 && this._q[0].length >= n) {
const head = this._q[0];
const slice = head.slice(0, n);
this._l -= n;
if (n === head.length) {
this._q.shift();
} else {
this._q[0] = head.slice(n);
}
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: true });
return slice;
}

// Multi-chunk path: allocate and copy
const f = Buffer.allocUnsafe(n);
let w = 0;
while (w < n && this._q.length > 0) {
const next = this._q[0];
const t = Math.min(next.length, n - w);
next.copy(f, w, 0, t);
w += t;
this._l -= t;
if (t === next.length) {
this._q.shift();
} else {
this._q[0] = next.slice(t);
}
this._log('take_progress', { label, copied: t, written: w, buffered: this._l });
}
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: false });
return f;
}
}

FrameDecoder._nextId = 1;

module.exports = {
FrameEncoder,
FrameDecoder,
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoder()),
encodeFrame
};
164 changes: 164 additions & 0 deletions src/FrameCodecCirc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
const { Transform } = require('stream');
const {
FrameEncoder,
createLibp2pStreamFactory,
encodeFrame
} = require('./FrameCodecShared');

const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';

class FrameDecoderCirc extends Transform {
constructor(bufferSize = 16384) {
super({ readableObjectMode: true });
this._buf = Buffer.allocUnsafe(bufferSize);
this._head = 0;
this._tail = 0;
this._size = bufferSize;
this._e = null;
this._vlen = 0;
this._id = FrameDecoderCirc._nextId++;
}

_log(event, details) {
if (!DEBUG_FRAME_DECODER) return;
const prefix = `FrameDecoderCirc#${this._id}`;
if (details) {
console.log(`${prefix} ${event}`, details);
} else {
console.log(`${prefix} ${event}`);
}
}

_available() {
return (this._tail - this._head + this._size) % this._size;
}

_transform(chunk, encoding, cb) {
try {
if (!Buffer.isBuffer(chunk)) chunk = Buffer.from(chunk);

const avail = this._available();
const needed = chunk.length;

if (needed > this._size - avail - 1) {
const newSize = Math.max(this._size * 2, this._size + needed);
const newBuf = Buffer.allocUnsafe(newSize);
const used = avail;

if (this._head <= this._tail) {
this._buf.copy(newBuf, 0, this._head, this._tail);
} else {
const firstPart = this._size - this._head;
this._buf.copy(newBuf, 0, this._head, this._size);
this._buf.copy(newBuf, firstPart, 0, this._tail);
}

this._buf = newBuf;
this._head = 0;
this._tail = used;
this._size = newSize;
}

let written = 0;
while (written < chunk.length) {
const contiguous = this._tail < this._head
? this._head - this._tail - 1
: this._size - this._tail - (this._head === 0 ? 1 : 0);
const toWrite = Math.min(chunk.length - written, contiguous);
chunk.copy(this._buf, this._tail, written, written + toWrite);
this._tail = (this._tail + toWrite) % this._size;
written += toWrite;
}

this._log('chunk', { chunkLength: chunk.length, buffered: this._available() });

while (this._available() > 0) {
this._log('loop', { buffered: this._available(), expectedPayload: this._e, varintBytes: this._vlen });

if (this._e === null) {
const decoded = this._dv();
if (!decoded) {
this._log('await_varint', { buffered: this._available() });
break;
}
this._e = decoded.value;
this._vlen = decoded.bytes;
this._log('varint_ready', { payloadLength: this._e, headerBytes: this._vlen, buffered: this._available() });
}

if (this._e !== null) {
const need = this._vlen + this._e;
if (this._available() < need) {
this._log('await_payload', { need, buffered: this._available() });
break;
}
this._consume(this._vlen);
const payload = this._take(this._e);
this._log('frame_emitted', { payloadLength: this._e, buffered: this._available() });
this._e = null;
this._vlen = 0;
this.push(payload);
}
}

cb();
} catch (err) {
cb(err);
}
}

_dv() {
let r = 0, s = 0, p = 0;
const avail = this._available();
let pos = this._head;

while (p < avail) {
const v = this._buf[pos];
pos = (pos + 1) % this._size;
r |= (v & 0x7f) << s;
p++;
this._log('varint_byte', { byte: v, shift: s, partialValue: r, bytesRead: p });
if ((v & 0x80) === 0) {
this._log('varint_complete', { value: r, bytes: p });
return { value: r, bytes: p };
}
s += 7;
if (s > 53) break;
}
this._log('varint_incomplete', { bytesScanned: p, buffered: avail });
return null;
}

_consume(n) {
this._head = (this._head + n) % this._size;
}

_take(n) {
const f = Buffer.allocUnsafe(n);
let w = 0;
this._log('take_start', { bytes: n, buffered: this._available() });

while (w < n) {
const contiguous = this._head < this._tail
? this._tail - this._head
: this._size - this._head;
const toCopy = Math.min(n - w, contiguous);
this._buf.copy(f, w, this._head, this._head + toCopy);
this._head = (this._head + toCopy) % this._size;
w += toCopy;
this._log('take_progress', { copied: toCopy, written: w, buffered: this._available() });
}

this._log('take_complete', { bytes: n, buffered: this._available() });
return f;
}
}

FrameDecoderCirc._nextId = 1;

module.exports = {
FrameEncoder,
FrameDecoderCirc,
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoderCirc()),
encodeFrame
};
Loading