Skip to content

Deploying a Cloudflare Worker to Forward Traffic to a REST API

This guide explains how to create and deploy a Cloudflare Worker that listens for incoming POST requests containing a uuid, enriches the request with metadata, and forwards it to a specified backend REST API.

What You’ll Do

  • Create a Cloudflare Worker using the dashboard UI.
  • Replace the default code with a custom forwarding script.
  • Deploy the Worker to production.

Kafka Rest must be accesible from Cloudflare.

Step 1: Log in to Cloudflare

Step 2: Navigate to Workers & Pages

  • In the left-hand sidebar, click Compute (Workers).
  • Click the Create button on the upper right.

Step 3: Choose Create Worker

  • Select the Worker tab (not Pages).
  • Click Create Worker to proceed.

Step 4: Replace the Default Code

Once you're in the online editor:

1. Delete the sample code in the editor.

2. Paste the custom Cloudflare Worker script (provided below).

Text Only
const config = {
  collectorAgentUrl: "http://rest.netfein.com:8084/topics/test-cloudflare",
  responseBodyLimit: 1 * 1024 * 1024, // max bytes to read from response body
  requestBodyLimit: 1 * 1024 * 1024, // max bytes to read from request body (10 MB)
  apiCatalogId: "e9aaf8fd-976a-4a56-9628-ee5e290cf50a",
  source: "cloudflare",
  timeout: 10000,
  staticExtensions: [
      ".css", ".js", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".ico", ".woff",
      ".woff2", ".eot", ".ttf", ".otf", ".webp", ".avif", ".mp4", ".webm"
  ]
};

addEventListener("fetch", event => {
  event.passThroughOnException();

  const shouldProcess = !isStatic(new URL(event.request.url));
  const reqId = crypto.randomUUID();

  const originalRequest = event.request;
  const clonedRequest = originalRequest.clone();

  const responsePromise = fetch(originalRequest, {
      signal: AbortSignal.timeout(config.timeout)
  });

  event.respondWith(
      responsePromise.then(response => {
          const responseClone = response.clone();

          if (shouldProcess) {
              event.waitUntil(processCombined(reqId, clonedRequest, responseClone));
          }

          return response;
      }).catch(error => {
          console.warn("Error fetching request:", error);
          throw error;
      })
  );
});

async function processCombined(reqId, request, response) {
  const url = new URL(request.url);

  const reqContentLength = parseInt(request.headers.get("content-length") || "0", 10);
  let reqBody;
  if (reqContentLength > config.requestBodyLimit) {
    reqBody = `[Request body too large: ${reqContentLength} bytes - skipped]`;
  } else {
    const reqArrayBuffer = await Promise.race([
      request.clone().arrayBuffer(),
      new Promise((_, reject) =>
          setTimeout(() => reject(new Error('Request body read timeout')), config.timeout)
      )
    ]);

    const reqContentType = request.headers.get("content-type") || "";
    if (isTextContentType(reqContentType)) {
        reqBody = arrayBufferToUtf8(reqArrayBuffer);
    } else {
        reqBody = arrayBufferToBase64(reqArrayBuffer);
    }
  }

  const reqHeadersObj = headersToObject(request.headers);
  const rspHeadersObj = headersToObject(response.headers);

  let rspBody = "";
  const rspContentLength = parseInt(response.headers.get("content-length") || "0", 10);
  if (response.body && config.responseBodyLimit > 0) {
    if (rspContentLength > config.responseBodyLimit) {
      rspBody = `[Response body too large: ${rspContentLength} bytes - skipped]`;
    } else {
      const chunks = [];
      let totalLen = 0;
      const reader = response.body.getReader();
      try {
          const timeoutPromise = new Promise((_, reject) =>
              setTimeout(() => reject(new Error('Response body read timeout')), config.timeout)
          );

          while (true) {
              const { done, value } = await Promise.race([
                  reader.read(),
                  timeoutPromise
              ]);
              if (done || totalLen >= config.responseBodyLimit) break;
              chunks.push(value);
              totalLen += value.length;
          }

          const rspArrayBuffer = concatenateUint8Arrays(chunks).buffer;
          const rspContentType = response.headers.get("content-type") || "";

          if (isTextContentType(rspContentType)) {
              rspBody = arrayBufferToUtf8(rspArrayBuffer);
          } else {
              rspBody = arrayBufferToBase64(rspArrayBuffer);
          }
      } finally {
          reader.releaseLock();
      }
    }
  }

  const payload = {
      reqId: reqId,
      path: url.pathname + url.search,
      method: request.method,
      requestHeaders: JSON.stringify(reqHeadersObj),
      requestPayload: reqBody,
      responseHeaders: JSON.stringify(rspHeadersObj),
      responsePayload: rspBody,
      apiCatalogId: config.apiCatalogId,
      source: config.source,
      time: Math.floor(Date.now() / 1000).toString(),
      statusCode: response.status.toString(),
      status: response.statusText,
      ip: request.headers.get("CF-Connecting-IP") || "",
      type: request.headers.get("Upgrade") ? "WebSocket" : "HTTP/1.1"
  };

  try {
      await fetch(config.collectorAgentUrl, {
          method: "POST",
          headers: { "Content-Type": "application/vnd.kafka.json.v2+json" },
          body: JSON.stringify({
              records: [
                  { value: payload }
              ]
          }),
          signal: AbortSignal.timeout(config.timeout)
      });
  } catch (e) {
      console.warn("Failed to send to Rest Proxy:", e);
  }
}

function arrayBufferToBase64(buffer) {
  let binary = '';
  const bytes = new Uint8Array(buffer);
  const len = bytes.byteLength;
  for (let i = 0; i < len; i++) {
      binary += String.fromCharCode(bytes[i]);
  }
  return btoa(binary);
}

function arrayBufferToUtf8(buffer) {
  try {
      return new TextDecoder().decode(buffer);
  } catch (e) {
      console.warn("Failed to decode buffer as UTF-8:", e);
      return null;
  }
}

function concatenateUint8Arrays(chunks) {
  const totalLength = chunks.reduce((sum, c) => sum + c.length, 0);
  const result = new Uint8Array(totalLength);
  let offset = 0;
  for (const chunk of chunks) {
      result.set(chunk, offset);
      offset += chunk.length;
  }
  return result;
}

function isStatic(url) {
  return config.staticExtensions.some(ext => url.pathname.endsWith(ext));
}

function headersToObject(headers) {
  const obj = {};
  headers.forEach((value, key) => { obj[key] = value; });
  return obj;
}

function isTextContentType(contentType) {
  return contentType.startsWith('text/') ||
      contentType.includes('json') ||
      contentType.includes('xml') ||
      contentType.includes('javascript') ||
      contentType.includes('css');
}
3. Update the following variables in the script:

  • collectorAgentUrl: The URL of the REST API endpoint that forwards data to the relevant Kafka topic.

  • apiCatalogId: The application ID associated with the domain, defined earlier in ApiFort.

4. Make sure the Apifort configured for the incoming traffic from Cloudflare:

  • Check the SWITCH variables and TRAFFIC_SOURCE parameters from docker-compose.yml in Apifort Server
YAML
  SWITCH_MIRROR: ${SWITCH_MIRROR:-true}
  SWITCH_KAFKA: ${SWITCH_KAFKA:-true}

  TRAFFIC_SOURCE: ...,cloudflare|kafka|kafka:9092|apifort_consumers|apifort.api.cloudflare.logs,...

Step 5: Save and Deploy

  • Click the Save and Deploy button in the top-right corner of the editor.

Step 6: Add Routes

  • Navigate to the Settings tab of your worker
  • In the Routes section, click Add Route
  • You can define routes following ways:

  • To capture all traffic on a specific domain: example.com/*

  • To capture specific API paths: api.example.com/v1/*
  • To capture all subdomains: *.example.com/*

  • Once you've defined your routes, click Save