Skip to content

Commit e30012e

Browse files
Your Nameclaude
andcommitted
Optimize frame codec and Socket.js for zero-copy operations and reduced allocations
Core optimizations: - FrameCodec.js: Zero-copy varint encoding/decoding, reusable buffers, optimized drain handling - FrameCodecCirc.js: Circular buffer implementation for high-throughput streaming - FrameCodecShared.js: Shared encoding utilities with zero-copy fast paths - Socket.js: Eliminated base64 allocation hotspots in paused data handling Test infrastructure: - test-tug-of-war.js: Consolidated DRY test suite supporting basic and variance modes Type safety: - src/FrameCodec.d.ts: TypeScript definitions for frame codec exports - lib/types/Socket.d.ts: Updated Socket type definitions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 81ad117 commit e30012e

File tree

7 files changed

+866
-114
lines changed

7 files changed

+866
-114
lines changed

lib/types/Socket.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ export default class Socket extends EventEmitter<SocketEvents & ReadableEvents,
6969
private _pending;
7070
/** @private */
7171
private _destroyed;
72+
/** @private */
73+
private _writableEnded;
74+
/** @private */
75+
private _readableEnded;
7276
/** @type {'opening' | 'open' | 'readOnly' | 'writeOnly'} @private */
7377
private _readyState;
7478
/** @type {{ id: number; data: string; }[]} @private */

src/FrameCodec.d.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Transform } from 'stream';
2+
import { Buffer } from 'buffer';
3+
4+
export class FrameEncoder extends Transform {
5+
constructor();
6+
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
7+
}
8+
9+
export class FrameDecoder extends Transform {
10+
constructor();
11+
_transform(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
12+
static _nextId: number;
13+
}
14+
15+
export function createLibp2pStream(socket: any): {
16+
source: AsyncGenerator<any, void, unknown>;
17+
sink: (src: AsyncIterable<any>) => Promise<void>;
18+
[Symbol.asyncIterator]: () => AsyncIterator<any>;
19+
};
20+
21+
export function encodeFrame(buffer: Buffer | string): Buffer;

src/FrameCodec.js

Lines changed: 29 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,12 @@
11
const { Transform } = require('stream');
2+
const {
3+
FrameEncoder,
4+
createLibp2pStreamFactory,
5+
encodeFrame
6+
} = require('./FrameCodecShared');
27

38
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
49

5-
const varint = {
6-
encode: (n) => {
7-
if (n < 0) throw new RangeError('varint unsigned only');
8-
const o = [];
9-
do {
10-
let b = n & 0x7f;
11-
n = Math.floor(n / 128);
12-
if (n > 0) b |= 0x80;
13-
o.push(b);
14-
} while (n > 0);
15-
return Buffer.from(o);
16-
},
17-
decodeFrom: (buf, offset = 0) => {
18-
let r = 0, s = 0, i = offset;
19-
for (; i < buf.length; i++) {
20-
const b = buf[i];
21-
r |= (b & 0x7f) << s;
22-
if ((b & 0x80) === 0) return { value: r, bytes: i - offset + 1 };
23-
s += 7;
24-
if (s > 53) break;
25-
}
26-
return null;
27-
}
28-
};
29-
30-
class FrameEncoder extends Transform {
31-
constructor() {
32-
super({ writableObjectMode: true });
33-
}
34-
_transform(f, e, cb) {
35-
try {
36-
if (!Buffer.isBuffer(f)) f = Buffer.from(f);
37-
this.push(Buffer.concat([varint.encode(f.length), f]));
38-
cb();
39-
} catch (err) {
40-
cb(err);
41-
}
42-
}
43-
}
44-
4510
class FrameDecoder extends Transform {
4611
constructor() {
4712
super({ readableObjectMode: true }); // object mode ensures zero-length payloads surface as readable chunks
@@ -129,9 +94,25 @@ class FrameDecoder extends Transform {
12994
}
13095

13196
_take(n, label = 'bytes') {
97+
this._log('take_start', { label, bytes: n, buffered: this._l });
98+
99+
// Zero-copy fast path: single chunk contains all needed bytes
100+
if (this._q.length > 0 && this._q[0].length >= n) {
101+
const head = this._q[0];
102+
const slice = head.slice(0, n);
103+
this._l -= n;
104+
if (n === head.length) {
105+
this._q.shift();
106+
} else {
107+
this._q[0] = head.slice(n);
108+
}
109+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: true });
110+
return slice;
111+
}
112+
113+
// Multi-chunk path: allocate and copy
132114
const f = Buffer.allocUnsafe(n);
133115
let w = 0;
134-
this._log('take_start', { label, bytes: n, buffered: this._l });
135116
while (w < n && this._q.length > 0) {
136117
const next = this._q[0];
137118
const t = Math.min(next.length, n - w);
@@ -145,19 +126,16 @@ class FrameDecoder extends Transform {
145126
}
146127
this._log('take_progress', { label, copied: t, written: w, buffered: this._l });
147128
}
148-
this._log('take_complete', { label, bytes: n, buffered: this._l });
129+
this._log('take_complete', { label, bytes: n, buffered: this._l, zeroCopy: false });
149130
return f;
150131
}
151132
}
152133

153134
FrameDecoder._nextId = 1;
154135

155-
function createLibp2pStream(socket) {
156-
const d = new FrameDecoder(), e = new FrameEncoder();
157-
socket.pipe(d); e.pipe(socket);
158-
const s = { source: (async function* () { for await (const c of d) yield c; })(), sink: async (src) => { for await (const c of src) { if (!e.write(c)) await new Promise(r => e.once('drain', r)); } e.end(); } };
159-
s[Symbol.asyncIterator] = () => s.source[Symbol.asyncIterator]();
160-
return s;
161-
}
162-
163-
module.exports = { FrameEncoder, FrameDecoder, createLibp2pStream, encodeFrame: (b) => { const buf = Buffer.isBuffer(b) ? b : Buffer.from(b); return Buffer.concat([varint.encode(buf.length), buf]); } };
136+
module.exports = {
137+
FrameEncoder,
138+
FrameDecoder,
139+
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoder()),
140+
encodeFrame
141+
};

src/FrameCodecCirc.js

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,12 @@
11
const { Transform } = require('stream');
2+
const {
3+
FrameEncoder,
4+
createLibp2pStreamFactory,
5+
encodeFrame
6+
} = require('./FrameCodecShared');
27

38
const DEBUG_FRAME_DECODER = process.env.FRAME_DECODER_DEBUG === '1';
49

5-
const varint = {
6-
encode: (n) => {
7-
if (n < 0) throw new RangeError('varint unsigned only');
8-
const o = [];
9-
do {
10-
let b = n & 0x7f;
11-
n = Math.floor(n / 128);
12-
if (n > 0) b |= 0x80;
13-
o.push(b);
14-
} while (n > 0);
15-
return Buffer.from(o);
16-
},
17-
decodeFrom: (buf, offset = 0) => {
18-
let r = 0, s = 0, i = offset;
19-
for (; i < buf.length; i++) {
20-
const b = buf[i];
21-
r |= (b & 0x7f) << s;
22-
if ((b & 0x80) === 0) return { value: r, bytes: i - offset + 1 };
23-
s += 7;
24-
if (s > 53) break;
25-
}
26-
return null;
27-
}
28-
};
29-
30-
class FrameEncoder extends Transform {
31-
constructor() {
32-
super({ writableObjectMode: true });
33-
}
34-
_transform(f, e, cb) {
35-
try {
36-
if (!Buffer.isBuffer(f)) f = Buffer.from(f);
37-
this.push(Buffer.concat([varint.encode(f.length), f]));
38-
cb();
39-
} catch (err) {
40-
cb(err);
41-
}
42-
}
43-
}
44-
4510
class FrameDecoderCirc extends Transform {
4611
constructor(bufferSize = 16384) {
4712
super({ readableObjectMode: true });
@@ -191,12 +156,9 @@ class FrameDecoderCirc extends Transform {
191156

192157
FrameDecoderCirc._nextId = 1;
193158

194-
function createLibp2pStream(socket) {
195-
const d = new FrameDecoderCirc(), e = new FrameEncoder();
196-
socket.pipe(d); e.pipe(socket);
197-
const s = { source: (async function* () { for await (const c of d) yield c; })(), sink: async (src) => { for await (const c of src) { if (!e.write(c)) await new Promise(r => e.once('drain', r)); } e.end(); } };
198-
s[Symbol.asyncIterator] = () => s.source[Symbol.asyncIterator]();
199-
return s;
200-
}
201-
202-
module.exports = { FrameEncoder, FrameDecoderCirc, createLibp2pStream, encodeFrame: (b) => { const buf = Buffer.isBuffer(b) ? b : Buffer.from(b); return Buffer.concat([varint.encode(buf.length), buf]); } };
159+
module.exports = {
160+
FrameEncoder,
161+
FrameDecoderCirc,
162+
createLibp2pStream: createLibp2pStreamFactory(() => new FrameDecoderCirc()),
163+
encodeFrame
164+
};

src/FrameCodecShared.js

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
const { Transform } = require('stream');
2+
3+
const varint = {
4+
// Write varint-encoded `n` into `target` at `offset`. Returns number of bytes written.
5+
encodeTo: (target, offset, n) => {
6+
if (n < 0) throw new RangeError('varint unsigned only');
7+
let i = 0;
8+
do {
9+
let b = n & 0x7f;
10+
n = Math.floor(n / 128);
11+
if (n > 0) b |= 0x80;
12+
target[offset + (i++)] = b;
13+
} while (n > 0);
14+
return i;
15+
},
16+
encode: (n) => {
17+
const buf = Buffer.allocUnsafe(10);
18+
const len = varint.encodeTo(buf, 0, n);
19+
return buf.slice(0, len);
20+
},
21+
decodeFrom: (buf, offset = 0) => {
22+
let r = 0, s = 0, i = offset;
23+
for (; i < buf.length; i++) {
24+
const b = buf[i];
25+
r |= (b & 0x7f) << s;
26+
if ((b & 0x80) === 0) return { value: r, bytes: i - offset + 1 };
27+
s += 7;
28+
if (s > 53) break;
29+
}
30+
return null;
31+
}
32+
};
33+
34+
class FrameEncoder extends Transform {
35+
constructor() {
36+
super({ writableObjectMode: true });
37+
let drainDeferred = null;
38+
// per-instance varint buffer to avoid allocating a small header Buffer per frame
39+
this._varintBuf = Buffer.allocUnsafe(10);
40+
this.waitForDrain = () => {
41+
if (!drainDeferred) {
42+
drainDeferred = {};
43+
drainDeferred.promise = new Promise((resolve) => {
44+
drainDeferred.resolve = resolve;
45+
});
46+
this.once('drain', () => {
47+
if (drainDeferred) {
48+
drainDeferred.resolve();
49+
drainDeferred = null;
50+
}
51+
});
52+
}
53+
return drainDeferred.promise;
54+
};
55+
}
56+
_transform(f, e, cb) {
57+
try {
58+
if (!Buffer.isBuffer(f)) f = Buffer.from(f);
59+
// encode varint header into reusable buffer then copy into final frame
60+
const payloadLen = f.length;
61+
const hdrLen = varint.encodeTo(this._varintBuf, 0, payloadLen);
62+
const frame = Buffer.allocUnsafe(hdrLen + payloadLen);
63+
this._varintBuf.copy(frame, 0, 0, hdrLen);
64+
f.copy(frame, hdrLen);
65+
this.push(frame);
66+
cb();
67+
} catch (err) {
68+
cb(err);
69+
}
70+
}
71+
}
72+
73+
const createLibp2pStreamFactory = (decoderFactory) => (socket) => {
74+
const decoder = decoderFactory();
75+
const encoder = new FrameEncoder();
76+
socket.pipe(decoder);
77+
encoder.pipe(socket);
78+
const stream = {
79+
source: (async function* () {
80+
for await (const chunk of decoder) {
81+
yield chunk;
82+
}
83+
})(),
84+
sink: async (src) => {
85+
for await (const chunk of src) {
86+
if (!encoder.write(chunk)) await encoder.waitForDrain();
87+
}
88+
encoder.end();
89+
}
90+
};
91+
stream[Symbol.asyncIterator] = () => stream.source[Symbol.asyncIterator]();
92+
return stream;
93+
};
94+
95+
const encodeFrame = (b) => {
96+
const buf = Buffer.isBuffer(b) ? b : Buffer.from(b);
97+
// Avoid Buffer.concat by preallocating exact size and writing header then payload
98+
const tmp = Buffer.allocUnsafe(10);
99+
const hdrLen = varint.encodeTo(tmp, 0, buf.length);
100+
const out = Buffer.allocUnsafe(hdrLen + buf.length);
101+
tmp.copy(out, 0, 0, hdrLen);
102+
buf.copy(out, hdrLen);
103+
return out;
104+
};
105+
106+
module.exports = {
107+
varint,
108+
FrameEncoder,
109+
createLibp2pStreamFactory,
110+
encodeFrame
111+
};

0 commit comments

Comments
 (0)