Skip to content

'fetch types' functionality causing weird Postgres behaviour #1136

@nilansaha

Description

@nilansaha

I was looking at Postgres network traffic and usually every query lifecycle is followed by a CommandComplete response which marks the end of the query.

Weirdly enough the first time I use Drizzle which internally uses this library it runs this query below which never receives its CommandComplete response from the Postgres server.

Is there any reason for that?
Are we doing something freaky with this particular feature that is causing Postgres to step outside of its protocol?

SELECT b.oid, b.typarray
FROM pg_catalog.pg_type a
LEFT JOIN pg_catalog.pg_type b 
       ON b.oid = a.typelem
WHERE a.typcategory = 'A'
GROUP BY b.oid, b.typarray
ORDER BY b.oid;

I have a small script to patch and debug the library to replicate the issue for anyone who wants to see. Just run this code and then see the output, you will see there are two queries made - the one I explicitly run, the other one that the library runs to fetch types. The one I explicitly run gets its CommandComplete but the fetch types query that the library runs does not get its CommandComplete.

Note - Don't forget to replace the DATABASE, USERNAME & PASSWORD

const postgres = require("postgres");
const net = require("net");

// Store original methods
const originalSocketOn = net.Socket.prototype.on;
const originalSocketWrite = net.Socket.prototype.write;

// Message type codes in PostgreSQL protocol
const MESSAGE_TYPES = {
  0x43: "CommandComplete",
  0x44: "DataRow",
  0x45: "ErrorResponse",
  0x49: "EmptyQueryResponse",
  0x4e: "NoticeResponse",
  0x52: "Authentication",
  0x53: "ParameterStatus",
  0x54: "RowDescription",
  0x5a: "ReadyForQuery",
  0x31: "ParseComplete",
  0x32: "BindComplete",
  0x6e: "NoData",
  0x73: "PortalSuspended",
  0x74: "ParameterDescription",
};

// Track queries to correlate with responses
let queryCounter = 0;
let activeQueries = new Map();

// Patch Socket.write to intercept outgoing queries
net.Socket.prototype.write = function (data, encoding, callback) {
  if (Buffer.isBuffer(data) && data.length > 0) {
    console.log("\n" + "▼".repeat(60));
    console.log("[SENDING] Data to PostgreSQL");
    console.log("Length:", data.length, "bytes");

    // Parse outgoing messages
    let offset = 0;
    while (offset < data.length) {
      const msgType = data[offset];
      const msgTypeChar = String.fromCharCode(msgType);

      // Common outgoing message types
      const outgoingTypes = {
        Q: "Query (Simple)",
        P: "Parse",
        B: "Bind",
        E: "Execute",
        D: "Describe",
        C: "Close",
        S: "Sync",
        X: "Terminate",
      };

      console.log(
        `\n[OUTGOING] Type: ${
          outgoingTypes[msgTypeChar] || "Unknown"
        } ('${msgTypeChar}' = 0x${msgType.toString(16)})`
      );

      if (offset + 4 <= data.length) {
        const msgLength = data.readInt32BE(offset + 1);
        console.log("Message length:", msgLength, "bytes");

        if (offset + 1 + msgLength <= data.length) {
          const msgData = data.slice(offset + 5, offset + 1 + msgLength);

          // Decode the query text for Query messages
          if (msgTypeChar === "Q") {
            const query = msgData.toString("utf8", 0, msgData.indexOf(0x00));
            console.log(">>> QUERY TEXT:", query);
          }

          // Decode for Parse messages (prepared statements)
          if (msgTypeChar === "P") {
            const nullPos = msgData.indexOf(0x00);
            const stmtName = msgData.toString("utf8", 0, nullPos);
            const queryStart = nullPos + 1;
            const queryEnd = msgData.indexOf(0x00, queryStart);
            const query = msgData.toString("utf8", queryStart, queryEnd);
            console.log(">>> PREPARED STATEMENT:", stmtName || "(unnamed)");
            console.log(">>> QUERY TEXT:", query);
          }

          // Show decoded text for other messages
          const fullText = msgData.toString("utf8").replace(/\0/g, " | ");
          console.log("Full decoded:", fullText.substring(0, 200));

          offset += 1 + msgLength;
        } else {
          break;
        }
      } else {
        break;
      }
    }
    console.log("▲".repeat(60));
  }

  return originalSocketWrite.call(this, data, encoding, callback);
};

// Patch Socket to intercept all PostgreSQL protocol messages
net.Socket.prototype.on = function (event, handler) {
  if (event === "data") {
    const wrappedHandler = function (chunk) {
      console.log("\n" + "=".repeat(60));
      console.log("[RECEIVED] Raw data chunk");
      console.log("Length:", chunk.length, "bytes");
      console.log(
        "Hex:",
        chunk.toString("hex").substring(0, 100) +
          (chunk.length > 50 ? "..." : "")
      );

      // Parse PostgreSQL messages
      let offset = 0;
      while (offset < chunk.length) {
        const msgType = chunk[offset];
        const msgTypeName =
          MESSAGE_TYPES[msgType] || `Unknown(0x${msgType.toString(16)})`;

        console.log(
          `\n[MESSAGE] Type: ${msgTypeName} (0x${msgType
            .toString(16)
            .toUpperCase()})`
        );

        if (offset + 4 <= chunk.length) {
          const msgLength = chunk.readInt32BE(offset + 1);
          console.log("Message length:", msgLength, "bytes");

          // Extract message content
          if (offset + 1 + msgLength <= chunk.length) {
            const msgData = chunk.slice(offset + 5, offset + 1 + msgLength);

            // Special handling for CommandComplete
            if (msgType === 0x43) {
              const commandTag = msgData.toString(
                "utf8",
                0,
                msgData.indexOf(0x00)
              );
              console.log(">>> CommandComplete TAG:", commandTag);
            }

            // Special handling for DataRow
            if (msgType === 0x44) {
              const fieldCount = msgData.readInt16BE(0);
              console.log("Field count:", fieldCount);
            }

            // Special handling for RowDescription
            if (msgType === 0x54) {
              const fieldCount = msgData.readInt16BE(0);
              console.log("Column count:", fieldCount);
            }

            // Decode message content as string
            if (msgData.length > 0) {
              const fullText = msgData.toString("utf8").replace(/\0/g, " | ");
              console.log("DECODED TEXT:", fullText);
              console.log(
                "Raw hex:",
                msgData.toString("hex").substring(0, 200)
              );
            }

            offset += 1 + msgLength;
          } else {
            console.log("Incomplete message, waiting for more data...");
            break;
          }
        } else {
          console.log("Incomplete length field, waiting for more data...");
          break;
        }
      }
      console.log("=".repeat(60));

      // Call original handler
      return handler.call(this, chunk);
    };
    return originalSocketOn.call(this, event, wrappedHandler);
  }
  return originalSocketOn.call(this, event, handler);
};

// Configuration
const sql = postgres({
  host: "localhost",
  port: 5432,
  database: "<DATABASE>",
  username: "<USERNAME>",
  password: "<PASSWORD>",
  debug: false, // We're doing our own debugging
  max: 1, // Use single connection for clearer logging
});

async function testQueries() {
  console.log("\n### STARTING PROTOCOL MONITOR ###\n");

  try {
    console.log(
      "\n>>> TEST 1: Simple SELECT query (triggers type loading on first connection)"
    );
    const result1 = await sql`SELECT 1;`;
    console.log("\n[APPLICATION] Query result:", result1);
  } catch (error) {
    console.error("\n[ERROR]", error);
  } finally {
    await sql.end();
    console.log("\n### PROTOCOL MONITOR ENDED ###\n");
  }
}

testQueries();

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions