January 13, 2024

Pushing Cloudflare Worker logs to Grafana Loki

Cloudflare can’t stop surprising me. I was searching for a simple solution on how to collect logs from my Cloudflare workers and push them to my Grafana Loki cluster, and I trapped on a new solution developed by Cloudflare precisely for my problem—Tail Workers.

If you don’t want to read the post and want to jump straight away into the code, I open-sourced the Tail Worker code I use for my screenshot API product.

What is a Tail Worker?

Tail Workers receives execution information of other Workers, such as HTTP statuses, data passed to console.log() or uncaught exceptions. The main use cases are:

  • Processing logs;
  • Triggering alerts;
  • Debugging;
  • Analytics;
  • and many more.

Writing such a worker as simple as:

export default {
    async tail(events) {
        fetch("https://example.com/endpoint", {
            method: "POST",
            body: JSON.stringify(events),
        });
    },
};

Events contain execution information of other workers. Let’s see how we can use that in a real-life example.

Pushing logs to Loki

1. Create new Tail Worker

Creating a Tail Worker can’t be simpler than:

npm create cloudflare@latest

Once you have followed all steps and are done, you will have your worker entry point:

export default {
    async tail(events: TraceItem[], env: Env, context: ExecutionContext) {
        // ... code ...
    },
};

2. Push to Grafana Loki via HTTP

Grafana Loki has an HTTP-based API available for pushing logs:

POST /loki/api/v1/push

{
  "streams": [
    {
      "stream": {
        "label": "value"
      },
      "values": [
          [ "<unix epoch in nanoseconds>", "<log line>" ],
          [ "<unix epoch in nanoseconds>", "<log line>" ]
      ]
    }
  ]
}

And then you push logs like:

async tail(events: TraceItem[], env: Env, context: ExecutionContext) {
	const data = this.transformEvents(events);
	if (data.streams.length == 0) {
		return;
	}

	await fetch(env.LOKI_PUSH_URL, {
		method: 'POST',
		headers: {
			authorization: `Basic ${env.LOKI_CREDENTIALS}`,
			'content-type': 'application/json',
		},
		body: JSON.stringify(data),
	});
},

The LOKI_CREDENTIALS credentials variable is a base64-encoded username and password you use to access your Loki instance.

3. Transform Worker events to Loki format

The goal of transformation is to decide what you want to push to Loki and what you want to skip:

function toLogNanoSeconds(timestamp: number) {
	return (timestamp * 1000000).toLocaleString('fullwide', { useGrouping: false });
}


transformEvents(events: TraceItem[]) {
	const streams: { stream: Record<string, string>; values: [string, string][] }[] = [];
	for (const event of events) {
		this.transformEvent(event).forEach((stream) => streams.push(stream));
	}

	return { streams };
},

transformEvent(event: TraceItem) {
	if (!(event.outcome == 'ok' || event.outcome == 'exception') || !event.scriptName) {
		return [];
	}

	const streams: { stream: Record<string, string>; values: [string, string][] }[] = [];

	const logsByLevel: Record<string, [string, string][]> = {};
	for (const log of event.logs) {
		if (!(log.level in logsByLevel)) {
			logsByLevel[log.level] = [];
		}

		const [logMessage] = log.message;
		if (logMessage) {
			logsByLevel[log.level].push([toLogNanoSeconds(log.timestamp), logMessage]);
		}
	}

	for (const [level, logs] of Object.entries(logsByLevel)) {
		if (level == 'debug') {
			continue;
		}

		streams.push({
			stream: {
				level,
				outcome: event.outcome,
				app: event.scriptName,
			},
			values: logs,
		});
	}

	if (event.exceptions.length) {
		streams.push({
			stream: {
				level: 'error',
				outcome: event.outcome,
				app: event.scriptName,
			},
			values: event.exceptions.map((e) => [toLogNanoSeconds(e.timestamp), `${e.name}: ${e.message}`]),
		});
	}

	return streams;
},

4. Configure producer workers

In the workers you want to fetch logs from you need to specify the tail worker we created as a consumer. Here is an example from my screenshot API gateway worker:

name = "gateway"
main = "src/index.ts"
compatibility_date = "2023-03-05"

# ...

tail_consumers = [{ service = "logger" }]

5. Full source code

I decided to open-source the logging worker that I use for ScreenshotOne and it is code available on GitHub. Feel free to use, improve, and share. Or just copy the full worker code:

export interface Env {
    LOKI_PUSH_URL: string;
    LOKI_CREDENTIALS: string;
}

function toLogNanoSeconds(timestamp: number) {
    return (timestamp * 1000000).toLocaleString("fullwide", {
        useGrouping: false,
    });
}

export default {
    async tail(events: TraceItem[], env: Env, context: ExecutionContext) {
        const data = this.transformEvents(events);
        if (data.streams.length == 0) {
            return;
        }

        await fetch(env.LOKI_PUSH_URL, {
            method: "POST",
            headers: {
                authorization: `Basic ${env.LOKI_CREDENTIALS}`,
                "content-type": "application/json",
            },
            body: JSON.stringify(data),
        });
    },

    transformEvents(events: TraceItem[]) {
        const streams: {
            stream: Record<string, string>;
            values: [string, string][];
        }[] = [];
        for (const event of events) {
            this.transformEvent(event).forEach((stream) =>
                streams.push(stream)
            );
        }

        return { streams };
    },

    transformEvent(event: TraceItem) {
        if (
            !(event.outcome == "ok" || event.outcome == "exception") ||
            !event.scriptName
        ) {
            return [];
        }

        const streams: {
            stream: Record<string, string>;
            values: [string, string][];
        }[] = [];

        const logsByLevel: Record<string, [string, string][]> = {};
        for (const log of event.logs) {
            if (!(log.level in logsByLevel)) {
                logsByLevel[log.level] = [];
            }

            const [logMessage] = log.message;
            if (logMessage) {
                logsByLevel[log.level].push([
                    toLogNanoSeconds(log.timestamp),
                    logMessage,
                ]);
            }
        }

        for (const [level, logs] of Object.entries(logsByLevel)) {
            if (level == "debug") {
                continue;
            }

            streams.push({
                stream: {
                    level,
                    outcome: event.outcome,
                    app: event.scriptName,
                },
                values: logs,
            });
        }

        if (event.exceptions.length) {
            streams.push({
                stream: {
                    level: "error",
                    outcome: event.outcome,
                    app: event.scriptName,
                },
                values: event.exceptions.map((e) => [
                    toLogNanoSeconds(e.timestamp),
                    `${e.name}: ${e.message}`,
                ]),
            });
        }

        return streams;
    },
};

Logpush is not always a fit

I usually try not to reimplement the wheel and use established solutions. And Cloudflare has a simple Logpush solution for sending logs. But it was not a good fit for me, since they log in a format that is incompatible with Grafana Loki.

And it is better than sending logs directly from your worker

You can also send logs directly from your workers to Loki or any other logging system, but! If your worker fails due to stack overflow, unhandled exception, and other reasons, you might lose your logs. With Tail Workers you can receive all that information and more.

Also, you could you log in by using Cloudflare worker queues, but they will have the same downsides as logging directly from your worker.

All-in-all, tail workers is a perfect fit for pushing Cloudflare Worker to Grafana Loki.