-
Notifications
You must be signed in to change notification settings - Fork 325
Description
I was looking at Postgres network traffic and usually every query lifecycle is followed by a CommandComplete response which marks the end of the query.
Weirdly enough the first time I use Drizzle which internally uses this library it runs this query below which never receives its CommandComplete response from the Postgres server.
Is there any reason for that?
Are we doing something freaky with this particular feature that is causing Postgres to step outside of its protocol?
SELECT b.oid, b.typarray
FROM pg_catalog.pg_type a
LEFT JOIN pg_catalog.pg_type b
ON b.oid = a.typelem
WHERE a.typcategory = 'A'
GROUP BY b.oid, b.typarray
ORDER BY b.oid;
I have a small script to patch and debug the library to replicate the issue for anyone who wants to see. Just run this code and then see the output, you will see there are two queries made - the one I explicitly run, the other one that the library runs to fetch types. The one I explicitly run gets its CommandComplete but the fetch types query that the library runs does not get its CommandComplete.
Note - Don't forget to replace the DATABASE, USERNAME & PASSWORD
const postgres = require("postgres");
const net = require("net");
// Store original methods
const originalSocketOn = net.Socket.prototype.on;
const originalSocketWrite = net.Socket.prototype.write;
// Message type codes in PostgreSQL protocol
const MESSAGE_TYPES = {
0x43: "CommandComplete",
0x44: "DataRow",
0x45: "ErrorResponse",
0x49: "EmptyQueryResponse",
0x4e: "NoticeResponse",
0x52: "Authentication",
0x53: "ParameterStatus",
0x54: "RowDescription",
0x5a: "ReadyForQuery",
0x31: "ParseComplete",
0x32: "BindComplete",
0x6e: "NoData",
0x73: "PortalSuspended",
0x74: "ParameterDescription",
};
// Track queries to correlate with responses
let queryCounter = 0;
let activeQueries = new Map();
// Patch Socket.write to intercept outgoing queries
net.Socket.prototype.write = function (data, encoding, callback) {
if (Buffer.isBuffer(data) && data.length > 0) {
console.log("\n" + "▼".repeat(60));
console.log("[SENDING] Data to PostgreSQL");
console.log("Length:", data.length, "bytes");
// Parse outgoing messages
let offset = 0;
while (offset < data.length) {
const msgType = data[offset];
const msgTypeChar = String.fromCharCode(msgType);
// Common outgoing message types
const outgoingTypes = {
Q: "Query (Simple)",
P: "Parse",
B: "Bind",
E: "Execute",
D: "Describe",
C: "Close",
S: "Sync",
X: "Terminate",
};
console.log(
`\n[OUTGOING] Type: ${
outgoingTypes[msgTypeChar] || "Unknown"
} ('${msgTypeChar}' = 0x${msgType.toString(16)})`
);
if (offset + 4 <= data.length) {
const msgLength = data.readInt32BE(offset + 1);
console.log("Message length:", msgLength, "bytes");
if (offset + 1 + msgLength <= data.length) {
const msgData = data.slice(offset + 5, offset + 1 + msgLength);
// Decode the query text for Query messages
if (msgTypeChar === "Q") {
const query = msgData.toString("utf8", 0, msgData.indexOf(0x00));
console.log(">>> QUERY TEXT:", query);
}
// Decode for Parse messages (prepared statements)
if (msgTypeChar === "P") {
const nullPos = msgData.indexOf(0x00);
const stmtName = msgData.toString("utf8", 0, nullPos);
const queryStart = nullPos + 1;
const queryEnd = msgData.indexOf(0x00, queryStart);
const query = msgData.toString("utf8", queryStart, queryEnd);
console.log(">>> PREPARED STATEMENT:", stmtName || "(unnamed)");
console.log(">>> QUERY TEXT:", query);
}
// Show decoded text for other messages
const fullText = msgData.toString("utf8").replace(/\0/g, " | ");
console.log("Full decoded:", fullText.substring(0, 200));
offset += 1 + msgLength;
} else {
break;
}
} else {
break;
}
}
console.log("▲".repeat(60));
}
return originalSocketWrite.call(this, data, encoding, callback);
};
// Patch Socket to intercept all PostgreSQL protocol messages
net.Socket.prototype.on = function (event, handler) {
if (event === "data") {
const wrappedHandler = function (chunk) {
console.log("\n" + "=".repeat(60));
console.log("[RECEIVED] Raw data chunk");
console.log("Length:", chunk.length, "bytes");
console.log(
"Hex:",
chunk.toString("hex").substring(0, 100) +
(chunk.length > 50 ? "..." : "")
);
// Parse PostgreSQL messages
let offset = 0;
while (offset < chunk.length) {
const msgType = chunk[offset];
const msgTypeName =
MESSAGE_TYPES[msgType] || `Unknown(0x${msgType.toString(16)})`;
console.log(
`\n[MESSAGE] Type: ${msgTypeName} (0x${msgType
.toString(16)
.toUpperCase()})`
);
if (offset + 4 <= chunk.length) {
const msgLength = chunk.readInt32BE(offset + 1);
console.log("Message length:", msgLength, "bytes");
// Extract message content
if (offset + 1 + msgLength <= chunk.length) {
const msgData = chunk.slice(offset + 5, offset + 1 + msgLength);
// Special handling for CommandComplete
if (msgType === 0x43) {
const commandTag = msgData.toString(
"utf8",
0,
msgData.indexOf(0x00)
);
console.log(">>> CommandComplete TAG:", commandTag);
}
// Special handling for DataRow
if (msgType === 0x44) {
const fieldCount = msgData.readInt16BE(0);
console.log("Field count:", fieldCount);
}
// Special handling for RowDescription
if (msgType === 0x54) {
const fieldCount = msgData.readInt16BE(0);
console.log("Column count:", fieldCount);
}
// Decode message content as string
if (msgData.length > 0) {
const fullText = msgData.toString("utf8").replace(/\0/g, " | ");
console.log("DECODED TEXT:", fullText);
console.log(
"Raw hex:",
msgData.toString("hex").substring(0, 200)
);
}
offset += 1 + msgLength;
} else {
console.log("Incomplete message, waiting for more data...");
break;
}
} else {
console.log("Incomplete length field, waiting for more data...");
break;
}
}
console.log("=".repeat(60));
// Call original handler
return handler.call(this, chunk);
};
return originalSocketOn.call(this, event, wrappedHandler);
}
return originalSocketOn.call(this, event, handler);
};
// Configuration
const sql = postgres({
host: "localhost",
port: 5432,
database: "<DATABASE>",
username: "<USERNAME>",
password: "<PASSWORD>",
debug: false, // We're doing our own debugging
max: 1, // Use single connection for clearer logging
});
async function testQueries() {
console.log("\n### STARTING PROTOCOL MONITOR ###\n");
try {
console.log(
"\n>>> TEST 1: Simple SELECT query (triggers type loading on first connection)"
);
const result1 = await sql`SELECT 1;`;
console.log("\n[APPLICATION] Query result:", result1);
} catch (error) {
console.error("\n[ERROR]", error);
} finally {
await sql.end();
console.log("\n### PROTOCOL MONITOR ENDED ###\n");
}
}
testQueries();