Deploying a Cloudflare Worker to Forward Traffic to a REST API¶
This guide explains how to create and deploy a Cloudflare Worker that listens for incoming POST requests containing a uuid, enriches the request with metadata, and forwards it to a specified backend REST API.
What You’ll Do¶
- Create a Cloudflare Worker using the dashboard UI.
- Replace the default code with a custom forwarding script.
- Deploy the Worker to production.
Kafka Rest must be accesible from Cloudflare.
Step 1: Log in to Cloudflare¶
- Visit https://dash.cloudflare.com/
- Log in using your Cloudflare account credentials.
Step 2: Navigate to Workers & Pages¶
- In the left-hand sidebar, click Compute (Workers).
- Click the Create button on the upper right.
Step 3: Choose Create Worker¶
- Select the Worker tab (not Pages).
- Click Create Worker to proceed.
Step 4: Replace the Default Code¶
Once you're in the online editor:
1. Delete the sample code in the editor.
2. Paste the custom Cloudflare Worker script (provided below).
const config = {
collectorAgentUrl: "http://rest.netfein.com:8084/topics/test-cloudflare",
responseBodyLimit: 1 * 1024 * 1024, // max bytes to read from response body
requestBodyLimit: 1 * 1024 * 1024, // max bytes to read from request body (10 MB)
apiCatalogId: "e9aaf8fd-976a-4a56-9628-ee5e290cf50a",
source: "cloudflare",
timeout: 10000,
staticExtensions: [
".css", ".js", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".ico", ".woff",
".woff2", ".eot", ".ttf", ".otf", ".webp", ".avif", ".mp4", ".webm"
]
};
addEventListener("fetch", event => {
event.passThroughOnException();
const shouldProcess = !isStatic(new URL(event.request.url));
const reqId = crypto.randomUUID();
const originalRequest = event.request;
const clonedRequest = originalRequest.clone();
const responsePromise = fetch(originalRequest, {
signal: AbortSignal.timeout(config.timeout)
});
event.respondWith(
responsePromise.then(response => {
const responseClone = response.clone();
if (shouldProcess) {
event.waitUntil(processCombined(reqId, clonedRequest, responseClone));
}
return response;
}).catch(error => {
console.warn("Error fetching request:", error);
throw error;
})
);
});
async function processCombined(reqId, request, response) {
const url = new URL(request.url);
const reqContentLength = parseInt(request.headers.get("content-length") || "0", 10);
let reqBody;
if (reqContentLength > config.requestBodyLimit) {
reqBody = `[Request body too large: ${reqContentLength} bytes - skipped]`;
} else {
const reqArrayBuffer = await Promise.race([
request.clone().arrayBuffer(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Request body read timeout')), config.timeout)
)
]);
const reqContentType = request.headers.get("content-type") || "";
if (isTextContentType(reqContentType)) {
reqBody = arrayBufferToUtf8(reqArrayBuffer);
} else {
reqBody = arrayBufferToBase64(reqArrayBuffer);
}
}
const reqHeadersObj = headersToObject(request.headers);
const rspHeadersObj = headersToObject(response.headers);
let rspBody = "";
const rspContentLength = parseInt(response.headers.get("content-length") || "0", 10);
if (response.body && config.responseBodyLimit > 0) {
if (rspContentLength > config.responseBodyLimit) {
rspBody = `[Response body too large: ${rspContentLength} bytes - skipped]`;
} else {
const chunks = [];
let totalLen = 0;
const reader = response.body.getReader();
try {
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Response body read timeout')), config.timeout)
);
while (true) {
const { done, value } = await Promise.race([
reader.read(),
timeoutPromise
]);
if (done || totalLen >= config.responseBodyLimit) break;
chunks.push(value);
totalLen += value.length;
}
const rspArrayBuffer = concatenateUint8Arrays(chunks).buffer;
const rspContentType = response.headers.get("content-type") || "";
if (isTextContentType(rspContentType)) {
rspBody = arrayBufferToUtf8(rspArrayBuffer);
} else {
rspBody = arrayBufferToBase64(rspArrayBuffer);
}
} finally {
reader.releaseLock();
}
}
}
const payload = {
reqId: reqId,
path: url.pathname + url.search,
method: request.method,
requestHeaders: JSON.stringify(reqHeadersObj),
requestPayload: reqBody,
responseHeaders: JSON.stringify(rspHeadersObj),
responsePayload: rspBody,
apiCatalogId: config.apiCatalogId,
source: config.source,
time: Math.floor(Date.now() / 1000).toString(),
statusCode: response.status.toString(),
status: response.statusText,
ip: request.headers.get("CF-Connecting-IP") || "",
type: request.headers.get("Upgrade") ? "WebSocket" : "HTTP/1.1"
};
try {
await fetch(config.collectorAgentUrl, {
method: "POST",
headers: { "Content-Type": "application/vnd.kafka.json.v2+json" },
body: JSON.stringify({
records: [
{ value: payload }
]
}),
signal: AbortSignal.timeout(config.timeout)
});
} catch (e) {
console.warn("Failed to send to Rest Proxy:", e);
}
}
function arrayBufferToBase64(buffer) {
let binary = '';
const bytes = new Uint8Array(buffer);
const len = bytes.byteLength;
for (let i = 0; i < len; i++) {
binary += String.fromCharCode(bytes[i]);
}
return btoa(binary);
}
function arrayBufferToUtf8(buffer) {
try {
return new TextDecoder().decode(buffer);
} catch (e) {
console.warn("Failed to decode buffer as UTF-8:", e);
return null;
}
}
function concatenateUint8Arrays(chunks) {
const totalLength = chunks.reduce((sum, c) => sum + c.length, 0);
const result = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.length;
}
return result;
}
function isStatic(url) {
return config.staticExtensions.some(ext => url.pathname.endsWith(ext));
}
function headersToObject(headers) {
const obj = {};
headers.forEach((value, key) => { obj[key] = value; });
return obj;
}
function isTextContentType(contentType) {
return contentType.startsWith('text/') ||
contentType.includes('json') ||
contentType.includes('xml') ||
contentType.includes('javascript') ||
contentType.includes('css');
}
-
collectorAgentUrl: The URL of the REST API endpoint that forwards data to the relevant Kafka topic. -
apiCatalogId: The application ID associated with the domain, defined earlier in ApiFort.
4. Make sure the Apifort configured for the incoming traffic from Cloudflare:
- Check the SWITCH variables and TRAFFIC_SOURCE parameters from docker-compose.yml in Apifort Server
SWITCH_MIRROR: ${SWITCH_MIRROR:-true}
SWITCH_KAFKA: ${SWITCH_KAFKA:-true}
TRAFFIC_SOURCE: ...,cloudflare|kafka|kafka:9092|apifort_consumers|apifort.api.cloudflare.logs,...
Step 5: Save and Deploy¶
- Click the Save and Deploy button in the top-right corner of the editor.
Step 6: Add Routes¶
- Navigate to the Settings tab of your worker
- In the Routes section, click Add Route
-
You can define routes following ways:
-
To capture all traffic on a specific domain:
example.com/* - To capture specific API paths:
api.example.com/v1/* -
To capture all subdomains:
*.example.com/* -
Once you've defined your routes, click Save