Skip to content

Commit f94598d

Browse files
Your Nameclaude
andcommitted
Optimize frame codec and Socket.js for zero-copy operations and reduced allocations
Core optimizations: - FrameCodec.js: Zero-copy varint encoding/decoding, reusable buffers, optimized drain handling - FrameCodecCirc.js: Circular buffer implementation for high-throughput streaming - FrameCodecShared.js: Shared encoding utilities with zero-copy fast paths - Socket.js: Eliminated base64 allocation hotspots in paused data handling Test infrastructure: - test-tug-of-war.js: Consolidated DRY test suite supporting basic and variance modes Type safety: - src/FrameCodec.d.ts: TypeScript definitions for frame codec exports - lib/types/Socket.d.ts: Updated Socket type definitions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 4c6d55e commit f94598d

File tree

7 files changed

+1153
-14
lines changed

7 files changed

+1153
-14
lines changed

lib/types/Socket.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ export default class Socket extends EventEmitter<SocketEvents & ReadableEvents,
6969
private _pending;
7070
/** @private */
7171
private _destroyed;
72+
/** @private */
73+
private _writableEnded;
74+
/** @private */
75+
private _readableEnded;
7276
/** @type {'opening' | 'open' | 'readOnly' | 'writeOnly'} @private */
7377
private _readyState;
7478
/** @type {{ id: number; data: string; }[]} @private */

src/FrameCodec.d.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Transform } from 'stream';
2+
import { Buffer } from 'buffer';
3+
4+
export class FrameEncoder extends Transform {
5+
constructor();
6+
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
7+
}
8+
9+
export class FrameDecoder extends Transform {
10+
constructor();
11+
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
12+
static _nextId: number;
13+
}
14+
15+
export function createLibp2pStream(socket: any): {
16+
source: AsyncGenerator<any, void, unknown>;
17+
sink: (src: AsyncIterable<any>) => Promise<void>;
18+
[Symbol.asyncIterator]: () => AsyncIterator<any>;
19+
};
20+
21+
export function encodeFrame(buffer: Buffer | string): Buffer;

src/FrameCodec.js

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
const { Transform } = require('stream');
2+
const {
3+
FrameEncoder,
4+
createLibp2pStreamFactory,
5+
encodeFrame
6+
} = require('./FrameCodecShared');
7+
8+
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
9+
10+
class FrameDecoder extends Transform {
11+
constructor() {
12+
super({ readableObjectMode: true }); // object mode ensures zero-length payloads surface as readable chunks
13+
this._q = [];
14+
this._l = 0;
15+
this._e = null;
16+
this._vlen = 0;
17+
this._id = FrameDecoder._nextId++;
18+
}
19+
20+
_log(event, details) {
21+
if (!DEBUG_FRAME_DECODER) return;
22+
const prefix = `FrameDecoder#${this._id}`;
23+
if (details) {
24+
console.log(`${prefix} ${event}`, details);
25+
} else {
26+
console.log(`${prefix} ${event}`);
27+
}
28+
}
29+
30+
_transform(chunk, encoding, cb) {
31+
try {
32+
if (!Buffer.isBuffer(chunk)) chunk = Buffer.from(chunk);
33+
this._q.push(chunk);
34+
this._l += chunk.length;
35+
this._log('chunk', { chunkLength: chunk.length, buffered: this._l });
36+
37+
while (this._l > 0) {
38+
this._log('loop', { buffered: this._l, expectedPayload: this._e, varintBytes: this._vlen });
39+
if (this._e === null) {
40+
const decoded = this._dv();
41+
if (!decoded) {
42+
this._log('await_varint', { buffered: this._l });
43+
break;
44+
}
45+
this._e = decoded.value;
46+
this._vlen = decoded.bytes;
47+
this._log('varint_ready', { payloadLength: this._e, headerBytes: this._vlen, buffered: this._l });
48+
}
49+
50+
if (this._e !== null) {
51+
const need = this._vlen + this._e;
52+
if (this._l < need) {
53+
this._log('await_payload', { need, buffered: this._l });
54+
break;
55+
}
56+
this._take(this._vlen, 'varint');
57+
const payload = this._take(this._e, 'payload');
58+
this._log('frame_emitted', { payloadLength: this._e, buffered: this._l });
59+
this._e = null;
60+
this._vlen = 0;
61+
this.push(payload);
62+
}
63+
}
64+
65+
cb();
66+
} catch (err) {
67+
cb(err);
68+
}
69+
}
70+
71+
_dv() {
72+
let r = 0, s = 0, p = 0, i = 0, o = 0;
73+
while (p < this._l) {
74+
if (i >= this._q.length) break;
75+
const buf = this._q[i];
76+
if (o >= buf.length) {
77+
i++;
78+
o = 0;
79+
continue;
80+
}
81+
const v = buf[o++];
82+
r |= (v & 0x7f) << s;
83+
p++;
84+
this._log('varint_byte', { byte: v, shift: s, partialValue: r, bytesRead: p });
85+
if ((v & 0x80) === 0) {
86+
this._log('varint_complete', { value: r, bytes: p });
87+
return { value: r, bytes: p };
88+
}
89+
s += 7;
90+
if (s > 53) break;
91+
}
92+
this._log('varint_incomplete', { bytesScanned: p, buffered: this._l });
93+
return null;
94+
}
95+
96+
_take(n, label = 'bytes') {
97+
this._log('take_start', { label, bytes: n, buffered: this._l });
98+
99+
// Zero-copy fast path: single chunk contains all needed bytes
100+
if (this._q.length > 0 && this._q[0].length >= n) {
101+
const head = this._q[0];
102+
const slice = head.slice(0, n);
103+
this._l -= n;
104+
if (n === head.length) {
105+
this._q.shift();
106+
} else {
107+
this._q[0] = head.slice(n);
108+
}
109+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: true });
110+
return slice;
111+
}
112+
113+
// Multi-chunk path: allocate and copy
114+
const f = Buffer.allocUnsafe(n);
115+
let w = 0;
116+
while (w < n && this._q.length > 0) {
117+
const next = this._q[0];
118+
const t = Math.min(next.length, n - w);
119+
next.copy(f, w, 0, t);
120+
w += t;
121+
this._l -= t;
122+
if (t === next.length) {
123+
this._q.shift();
124+
} else {
125+
this._q[0] = next.slice(t);
126+
}
127+
this._log('take_progress', { label, copied: t, written: w, buffered: this._l });
128+
}
129+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: false });
130+
return f;
131+
}
132+
}
133+
134+
FrameDecoder._nextId = 1;
135+
136+
module.exports = {
137+
FrameEncoder,
138+
FrameDecoder,
139+
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoder()),
140+
encodeFrame
141+
};

src/FrameCodecCirc.js

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
const { Transform } = require('stream');
2+
const {
3+
FrameEncoder,
4+
createLibp2pStreamFactory,
5+
encodeFrame
6+
} = require('./FrameCodecShared');
7+
8+
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
9+
10+
class FrameDecoderCirc extends Transform {
11+
constructor(bufferSize = 16384) {
12+
super({ readableObjectMode: true });
13+
this._buf = Buffer.allocUnsafe(bufferSize);
14+
this._head = 0;
15+
this._tail = 0;
16+
this._size = bufferSize;
17+
this._e = null;
18+
this._vlen = 0;
19+
this._id = FrameDecoderCirc._nextId++;
20+
}
21+
22+
_log(event, details) {
23+
if (!DEBUG_FRAME_DECODER) return;
24+
const prefix = `FrameDecoderCirc#${this._id}`;
25+
if (details) {
26+
console.log(`${prefix} ${event}`, details);
27+
} else {
28+
console.log(`${prefix} ${event}`);
29+
}
30+
}
31+
32+
_available() {
33+
return (this._tail - this._head + this._size) % this._size;
34+
}
35+
36+
_transform(chunk, encoding, cb) {
37+
try {
38+
if (!Buffer.isBuffer(chunk)) chunk = Buffer.from(chunk);
39+
40+
const avail = this._available();
41+
const needed = chunk.length;
42+
43+
if (needed > this._size - avail - 1) {
44+
const newSize = Math.max(this._size * 2, this._size + needed);
45+
const newBuf = Buffer.allocUnsafe(newSize);
46+
const used = avail;
47+
48+
if (this._head <= this._tail) {
49+
this._buf.copy(newBuf, 0, this._head, this._tail);
50+
} else {
51+
const firstPart = this._size - this._head;
52+
this._buf.copy(newBuf, 0, this._head, this._size);
53+
this._buf.copy(newBuf, firstPart, 0, this._tail);
54+
}
55+
56+
this._buf = newBuf;
57+
this._head = 0;
58+
this._tail = used;
59+
this._size = newSize;
60+
}
61+
62+
let written = 0;
63+
while (written < chunk.length) {
64+
const contiguous = this._tail < this._head
65+
? this._head - this._tail - 1
66+
: this._size - this._tail - (this._head === 0 ? 1 : 0);
67+
const toWrite = Math.min(chunk.length - written, contiguous);
68+
chunk.copy(this._buf, this._tail, written, written + toWrite);
69+
this._tail = (this._tail + toWrite) % this._size;
70+
written += toWrite;
71+
}
72+
73+
this._log('chunk', { chunkLength: chunk.length, buffered: this._available() });
74+
75+
while (this._available() > 0) {
76+
this._log('loop', { buffered: this._available(), expectedPayload: this._e, varintBytes: this._vlen });
77+
78+
if (this._e === null) {
79+
const decoded = this._dv();
80+
if (!decoded) {
81+
this._log('await_varint', { buffered: this._available() });
82+
break;
83+
}
84+
this._e = decoded.value;
85+
this._vlen = decoded.bytes;
86+
this._log('varint_ready', { payloadLength: this._e, headerBytes: this._vlen, buffered: this._available() });
87+
}
88+
89+
if (this._e !== null) {
90+
const need = this._vlen + this._e;
91+
if (this._available() < need) {
92+
this._log('await_payload', { need, buffered: this._available() });
93+
break;
94+
}
95+
this._consume(this._vlen);
96+
const payload = this._take(this._e);
97+
this._log('frame_emitted', { payloadLength: this._e, buffered: this._available() });
98+
this._e = null;
99+
this._vlen = 0;
100+
this.push(payload);
101+
}
102+
}
103+
104+
cb();
105+
} catch (err) {
106+
cb(err);
107+
}
108+
}
109+
110+
_dv() {
111+
let r = 0, s = 0, p = 0;
112+
const avail = this._available();
113+
let pos = this._head;
114+
115+
while (p < avail) {
116+
const v = this._buf[pos];
117+
pos = (pos + 1) % this._size;
118+
r |= (v & 0x7f) << s;
119+
p++;
120+
this._log('varint_byte', { byte: v, shift: s, partialValue: r, bytesRead: p });
121+
if ((v & 0x80) === 0) {
122+
this._log('varint_complete', { value: r, bytes: p });
123+
return { value: r, bytes: p };
124+
}
125+
s += 7;
126+
if (s > 53) break;
127+
}
128+
this._log('varint_incomplete', { bytesScanned: p, buffered: avail });
129+
return null;
130+
}
131+
132+
_consume(n) {
133+
this._head = (this._head + n) % this._size;
134+
}
135+
136+
_take(n) {
137+
const f = Buffer.allocUnsafe(n);
138+
let w = 0;
139+
this._log('take_start', { bytes: n, buffered: this._available() });
140+
141+
while (w < n) {
142+
const contiguous = this._head < this._tail
143+
? this._tail - this._head
144+
: this._size - this._head;
145+
const toCopy = Math.min(n - w, contiguous);
146+
this._buf.copy(f, w, this._head, this._head + toCopy);
147+
this._head = (this._head + toCopy) % this._size;
148+
w += toCopy;
149+
this._log('take_progress', { copied: toCopy, written: w, buffered: this._available() });
150+
}
151+
152+
this._log('take_complete', { bytes: n, buffered: this._available() });
153+
return f;
154+
}
155+
}
156+
157+
FrameDecoderCirc._nextId = 1;
158+
159+
module.exports = {
160+
FrameEncoder,
161+
FrameDecoderCirc,
162+
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoderCirc()),
163+
encodeFrame
164+
};

0 commit comments

Comments
 (0)