UBERF-8842 Improve datalake performance logs (#7406)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-12-10 14:45:08 +07:00 committed by GitHub
parent fcfaba492e
commit 63418d67a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 246 additions and 162 deletions

View File

@ -14,15 +14,14 @@
// //
import { error, json } from 'itty-router' import { error, json } from 'itty-router'
import { type Sql } from 'postgres' import { type BlobDB, withPostgres } from './db'
import db, { withPostgres } from './db'
import { cacheControl, hashLimit } from './const' import { cacheControl, hashLimit } from './const'
import { toUUID } from './encodings' import { toUUID } from './encodings'
import { getSha256 } from './hash' import { getSha256 } from './hash'
import { selectStorage } from './storage' import { selectStorage } from './storage'
import { type BlobRequest, type WorkspaceRequest, type UUID } from './types' import { type BlobRequest, type WorkspaceRequest, type UUID } from './types'
import { copyVideo, deleteVideo } from './video' import { copyVideo, deleteVideo } from './video'
import { measure, LoggedCache } from './measure' import { type MetricsContext, LoggedCache } from './metrics'
interface BlobMetadata { interface BlobMetadata {
lastModified: number lastModified: number
@ -36,20 +35,24 @@ export function getBlobURL (request: Request, workspace: string, name: string):
return new URL(path, request.url).toString() return new URL(path, request.url).toString()
} }
export async function handleBlobGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> { export async function handleBlobGet (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
const cache = new LoggedCache(caches.default) const cache = new LoggedCache(caches.default, metrics)
const cached = await cache.match(request) const cached = await cache.match(request)
if (cached !== undefined) { if (cached !== undefined) {
console.log({ message: 'cache hit' })
return cached return cached
} }
const { bucket } = selectStorage(env, workspace) const { bucket } = selectStorage(env, workspace)
const blob = await withPostgres(env, ctx, (sql) => { const blob = await withPostgres(env, ctx, metrics, (db) => {
return db.getBlob(sql, { workspace, name }) return db.getBlob({ workspace, name })
}) })
if (blob === null || blob.deleted) { if (blob === null || blob.deleted) {
return error(404) return error(404)
@ -72,19 +75,25 @@ export async function handleBlobGet (request: BlobRequest, env: Env, ctx: Execut
const response = new Response(object?.body, { headers, status }) const response = new Response(object?.body, { headers, status })
if (response.status === 200) { if (response.status === 200) {
ctx.waitUntil(cache.put(request, response.clone())) const clone = metrics.withSync('response.clone', () => response.clone())
ctx.waitUntil(cache.put(request, clone))
} }
return response return response
} }
export async function handleBlobHead (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> { export async function handleBlobHead (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
const { bucket } = selectStorage(env, workspace) const { bucket } = selectStorage(env, workspace)
const blob = await withPostgres(env, ctx, (sql) => { const blob = await withPostgres(env, ctx, metrics, (db) => {
return db.getBlob(sql, { workspace, name }) return db.getBlob({ workspace, name })
}) })
if (blob === null || blob.deleted) { if (blob === null || blob.deleted) {
return error(404) return error(404)
@ -99,12 +108,17 @@ export async function handleBlobHead (request: BlobRequest, env: Env, ctx: Execu
return new Response(null, { headers, status: 200 }) return new Response(null, { headers, status: 200 })
} }
export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> { export async function handleBlobDelete (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
try { try {
await withPostgres(env, ctx, (sql) => { await withPostgres(env, ctx, metrics, (db) => {
return Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)]) return Promise.all([db.deleteBlob({ workspace, name }), deleteVideo(env, workspace, name)])
}) })
return new Response(null, { status: 204 }) return new Response(null, { status: 204 })
@ -118,7 +132,8 @@ export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: Exe
export async function handleUploadFormData ( export async function handleUploadFormData (
request: WorkspaceRequest, request: WorkspaceRequest,
env: Env, env: Env,
ctx: ExecutionContext ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> { ): Promise<Response> {
const contentType = request.headers.get('Content-Type') const contentType = request.headers.get('Content-Type')
if (contentType === null || !contentType.includes('multipart/form-data')) { if (contentType === null || !contentType.includes('multipart/form-data')) {
@ -130,7 +145,7 @@ export async function handleUploadFormData (
let formData: FormData let formData: FormData
try { try {
formData = await measure('fetch formdata', () => request.formData()) formData = await metrics.with('request.formData', () => request.formData())
} catch (err: any) { } catch (err: any) {
const message = err instanceof Error ? err.message : String(err) const message = err instanceof Error ? err.message : String(err)
console.error({ error: 'failed to parse form data', message }) console.error({ error: 'failed to parse form data', message })
@ -146,8 +161,8 @@ export async function handleUploadFormData (
files.map(async ([file, key]) => { files.map(async ([file, key]) => {
const { name, type, lastModified } = file const { name, type, lastModified } = file
try { try {
const metadata = await withPostgres(env, ctx, (sql) => { const metadata = await withPostgres(env, ctx, metrics, (db) => {
return saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified) return saveBlob(env, db, file.stream(), file.size, type, workspace, name, lastModified)
}) })
// TODO this probably should happen via queue, let it be here for now // TODO this probably should happen via queue, let it be here for now
@ -170,7 +185,7 @@ export async function handleUploadFormData (
export async function saveBlob ( export async function saveBlob (
env: Env, env: Env,
sql: Sql, db: BlobDB,
stream: ReadableStream, stream: ReadableStream,
size: number, size: number,
type: string, type: string,
@ -187,17 +202,15 @@ export async function saveBlob (
const [hashStream, uploadStream] = stream.tee() const [hashStream, uploadStream] = stream.tee()
const hash = await getSha256(hashStream) const hash = await getSha256(hashStream)
const data = await db.getData(sql, { hash, location }) const data = await db.getData({ hash, location })
if (data !== null) { if (data !== null) {
// Lucky boy, nothing to upload, use existing blob // Lucky boy, nothing to upload, use existing blob
await db.createBlob(sql, { workspace, name, hash, location }) await db.createBlob({ workspace, name, hash, location })
} else { } else {
await bucket.put(filename, uploadStream, { httpMetadata }) await bucket.put(filename, uploadStream, { httpMetadata })
await sql.begin((sql) => [ await db.createData({ hash, location, filename, type, size })
db.createData(sql, { hash, location, filename, type, size }), await db.createBlob({ workspace, name, hash, location })
db.createBlob(sql, { workspace, name, hash, location })
])
} }
return { type, size, lastModified, name } return { type, size, lastModified, name }
@ -205,17 +218,15 @@ export async function saveBlob (
// For large files we cannot calculate checksum beforehead // For large files we cannot calculate checksum beforehead
// upload file with unique filename and then obtain checksum // upload file with unique filename and then obtain checksum
const { hash } = await uploadLargeFile(bucket, stream, filename, { httpMetadata }) const { hash } = await uploadLargeFile(bucket, stream, filename, { httpMetadata })
const data = await db.getData(sql, { hash, location }) const data = await db.getData({ hash, location })
if (data !== null) { if (data !== null) {
// We found an existing blob with the same hash // We found an existing blob with the same hash
// we can safely remove the existing blob from storage // we can safely remove the existing blob from storage
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })])
} else { } else {
// Otherwise register a new hash and blob // Otherwise register a new hash and blob
await sql.begin((sql) => [ await db.createData({ hash, location, filename, type, size })
db.createData(sql, { hash, location, filename, type, size }), await db.createBlob({ workspace, name, hash, location })
db.createBlob(sql, { workspace, name, hash, location })
])
} }
return { type, size, lastModified, name } return { type, size, lastModified, name }
@ -225,6 +236,7 @@ export async function saveBlob (
export async function handleBlobUploaded ( export async function handleBlobUploaded (
env: Env, env: Env,
ctx: ExecutionContext, ctx: ExecutionContext,
metrics: MetricsContext,
workspace: string, workspace: string,
name: string, name: string,
filename: UUID filename: UUID
@ -238,18 +250,16 @@ export async function handleBlobUploaded (
const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID) const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)
await withPostgres(env, ctx, async (sql) => { await withPostgres(env, ctx, metrics, async (db) => {
const data = await db.getData(sql, { hash, location }) const data = await db.getData({ hash, location })
if (data !== null) { if (data !== null) {
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })])
} else { } else {
const size = object.size const size = object.size
const type = object.httpMetadata?.contentType ?? 'application/octet-stream' const type = object.httpMetadata?.contentType ?? 'application/octet-stream'
await sql.begin((sql) => [ await db.createData({ hash, location, filename, type, size })
db.createData(sql, { hash, location, filename, type, size }), await db.createBlob({ workspace, name, hash, location })
db.createBlob(sql, { workspace, name, hash, location })
])
} }
}) })
} }

View File

@ -14,7 +14,7 @@
// //
import postgres from 'postgres' import postgres from 'postgres'
import { measure, measureSync } from './measure' import { type MetricsContext } from './metrics'
import { type Location, type UUID } from './types' import { type Location, type UUID } from './types'
export interface BlobDataId { export interface BlobDataId {
@ -46,52 +46,62 @@ export interface BlobRecordWithFilename extends BlobRecord {
export async function withPostgres<T> ( export async function withPostgres<T> (
env: Env, env: Env,
ctx: ExecutionContext, ctx: ExecutionContext,
fn: (sql: postgres.Sql) => Promise<T> metrics: MetricsContext,
fn: (db: BlobDB) => Promise<T>
): Promise<T> { ): Promise<T> {
const sql = measureSync('db.connect', () => { const sql = metrics.withSync('db.connect', () => {
return postgres(env.HYPERDRIVE.connectionString) return postgres(env.HYPERDRIVE.connectionString, {
connection: {
application_name: 'datalake'
}
}) })
})
const db = new LoggedDB(new PostgresDB(sql), metrics)
try { try {
return await fn(sql) return await fn(db)
} finally { } finally {
measureSync('db.close', () => { metrics.withSync('db.disconnect', () => {
ctx.waitUntil(sql.end({ timeout: 0 })) ctx.waitUntil(sql.end({ timeout: 0 }))
}) })
} }
} }
export interface BlobDB { export interface BlobDB {
getData: (sql: postgres.Sql, dataId: BlobDataId) => Promise<BlobDataRecord | null> getData: (dataId: BlobDataId) => Promise<BlobDataRecord | null>
createData: (sql: postgres.Sql, data: BlobDataRecord) => Promise<void> createData: (data: BlobDataRecord) => Promise<void>
getBlob: (sql: postgres.Sql, blobId: BlobId) => Promise<BlobRecordWithFilename | null> getBlob: (blobId: BlobId) => Promise<BlobRecordWithFilename | null>
createBlob: (sql: postgres.Sql, blob: Omit<BlobRecord, 'filename' | 'deleted'>) => Promise<void> createBlob: (blob: Omit<BlobRecord, 'filename' | 'deleted'>) => Promise<void>
deleteBlob: (sql: postgres.Sql, blob: BlobId) => Promise<void> deleteBlob: (blob: BlobId) => Promise<void>
} }
const db: BlobDB = { export class PostgresDB implements BlobDB {
async getData (sql: postgres.Sql, dataId: BlobDataId): Promise<BlobDataRecord | null> { constructor (private readonly sql: postgres.Sql) {}
async getData (dataId: BlobDataId): Promise<BlobDataRecord | null> {
const { hash, location } = dataId const { hash, location } = dataId
const rows = await sql<BlobDataRecord[]>` const rows = await this.sql<BlobDataRecord[]>`
SELECT hash, location, filename, size, type SELECT hash, location, filename, size, type
FROM blob.data FROM blob.data
WHERE hash = ${hash} AND location = ${location} WHERE hash = ${hash} AND location = ${location}
` `
return rows.length > 0 ? rows[0] : null return rows.length > 0 ? rows[0] : null
}, }
async createData (sql: postgres.Sql, data: BlobDataRecord): Promise<void> { async createData (data: BlobDataRecord): Promise<void> {
const { hash, location, filename, size, type } = data const { hash, location, filename, size, type } = data
await sql` await this.sql`
UPSERT INTO blob.data (hash, location, filename, size, type) UPSERT INTO blob.data (hash, location, filename, size, type)
VALUES (${hash}, ${location}, ${filename}, ${size}, ${type}) VALUES (${hash}, ${location}, ${filename}, ${size}, ${type})
` `
}, }
async getBlob (sql: postgres.Sql, blobId: BlobId): Promise<BlobRecordWithFilename | null> { async getBlob (blobId: BlobId): Promise<BlobRecordWithFilename | null> {
const { workspace, name } = blobId const { workspace, name } = blobId
const rows = await sql<BlobRecordWithFilename[]>` try {
const rows = await this.sql<BlobRecordWithFilename[]>`
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename
FROM blob.blob AS b FROM blob.blob AS b
JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location
@ -101,23 +111,26 @@ const db: BlobDB = {
if (rows.length > 0) { if (rows.length > 0) {
return rows[0] return rows[0]
} }
} catch (err) {
console.error(err)
}
return null return null
}, }
async createBlob (sql: postgres.Sql, blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> { async createBlob (blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
const { workspace, name, hash, location } = blob const { workspace, name, hash, location } = blob
await sql` await this.sql`
UPSERT INTO blob.blob (workspace, name, hash, location, deleted) UPSERT INTO blob.blob (workspace, name, hash, location, deleted)
VALUES (${workspace}, ${name}, ${hash}, ${location}, false) VALUES (${workspace}, ${name}, ${hash}, ${location}, false)
` `
}, }
async deleteBlob (sql: postgres.Sql, blob: BlobId): Promise<void> { async deleteBlob (blob: BlobId): Promise<void> {
const { workspace, name } = blob const { workspace, name } = blob
await sql` await this.sql`
UPDATE blob.blob UPDATE blob.blob
SET deleted = true SET deleted = true
WHERE workspace = ${workspace} AND name = ${name} WHERE workspace = ${workspace} AND name = ${name}
@ -125,12 +138,29 @@ const db: BlobDB = {
} }
} }
export const measuredDb: BlobDB = { export class LoggedDB implements BlobDB {
getData: (sql, dataId) => measure('db.getData', () => db.getData(sql, dataId)), constructor (
createData: (sql, data) => measure('db.createData', () => db.createData(sql, data)), private readonly db: BlobDB,
getBlob: (sql, blobId) => measure('db.getBlob', () => db.getBlob(sql, blobId)), private readonly ctx: MetricsContext
createBlob: (sql, blob) => measure('db.createBlob', () => db.createBlob(sql, blob)), ) {}
deleteBlob: (sql, blob) => measure('db.deleteBlob', () => db.deleteBlob(sql, blob))
async getData (dataId: BlobDataId): Promise<BlobDataRecord | null> {
return await this.ctx.with('db.getData', () => this.db.getData(dataId))
} }
export default measuredDb async createData (data: BlobDataRecord): Promise<void> {
await this.ctx.with('db.createData', () => this.db.createData(data))
}
async getBlob (blobId: BlobId): Promise<BlobRecordWithFilename | null> {
return await this.ctx.with('db.getBlob', () => this.db.getBlob(blobId))
}
async createBlob (blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
await this.ctx.with('db.createBlob', () => this.db.createBlob(blob))
}
async deleteBlob (blob: BlobId): Promise<void> {
await this.ctx.with('db.deleteBlob', () => this.db.deleteBlob(blob))
}
}

View File

@ -14,11 +14,17 @@
// //
import { getBlobURL } from './blob' import { getBlobURL } from './blob'
import { type MetricsContext } from './metrics'
import { type BlobRequest } from './types' import { type BlobRequest } from './types'
const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png'] const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png']
export async function handleImageGet (request: BlobRequest): Promise<Response> { export async function handleImageGet (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { const {
workspace, workspace,
name, name,
@ -48,5 +54,5 @@ export async function handleImageGet (request: BlobRequest): Promise<Response> {
const blobURL = getBlobURL(request, workspace, name) const blobURL = getBlobURL(request, workspace, name)
const imageRequest = new Request(blobURL, { headers: { Accept } }) const imageRequest = new Request(blobURL, { headers: { Accept } })
return await fetch(imageRequest, { cf: { image, cacheTtl: 3600 } }) return await metrics.with('image.transform', () => fetch(imageRequest, { cf: { image, cacheTtl: 3600 } }))
} }

View File

@ -14,11 +14,11 @@
// //
import { WorkerEntrypoint } from 'cloudflare:workers' import { WorkerEntrypoint } from 'cloudflare:workers'
import { type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' import { type IRequest, type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router'
import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob' import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob'
import { cors } from './cors' import { cors } from './cors'
import { LoggedKVNamespace, LoggedR2Bucket, requestTimeAfter, requestTimeBefore } from './measure' import { LoggedKVNamespace, LoggedR2Bucket, MetricsContext } from './metrics'
import { handleImageGet } from './image' import { handleImageGet } from './image'
import { handleS3Blob } from './s3' import { handleS3Blob } from './s3'
import { handleVideoMetaGet } from './video' import { handleVideoMetaGet } from './video'
@ -36,8 +36,8 @@ const { preflight, corsify } = cors({
}) })
const router = Router<IRequestStrict, [Env, ExecutionContext], Response>({ const router = Router<IRequestStrict, [Env, ExecutionContext], Response>({
before: [preflight, requestTimeBefore], before: [preflight],
finally: [corsify, requestTimeAfter] finally: [corsify]
}) })
const withWorkspace: RequestHandler<WorkspaceRequest> = (request: WorkspaceRequest) => { const withWorkspace: RequestHandler<WorkspaceRequest> = (request: WorkspaceRequest) => {
@ -88,21 +88,29 @@ router
.all('*', () => error(404)) .all('*', () => error(404))
export default class DatalakeWorker extends WorkerEntrypoint<Env> { export default class DatalakeWorker extends WorkerEntrypoint<Env> {
constructor (ctx: ExecutionContext, env: Env) { async fetch (request: IRequest): Promise<Response> {
env = { const start = performance.now()
...env, const context = new MetricsContext()
datalake_blobs: new LoggedKVNamespace(env.datalake_blobs),
DATALAKE_APAC: new LoggedR2Bucket(env.DATALAKE_APAC), const env = {
DATALAKE_EEUR: new LoggedR2Bucket(env.DATALAKE_EEUR), ...this.env,
DATALAKE_WEUR: new LoggedR2Bucket(env.DATALAKE_WEUR), datalake_blobs: new LoggedKVNamespace(this.env.datalake_blobs, context),
DATALAKE_ENAM: new LoggedR2Bucket(env.DATALAKE_ENAM), DATALAKE_APAC: new LoggedR2Bucket(this.env.DATALAKE_APAC, context),
DATALAKE_WNAM: new LoggedR2Bucket(env.DATALAKE_WNAM) DATALAKE_EEUR: new LoggedR2Bucket(this.env.DATALAKE_EEUR, context),
} DATALAKE_WEUR: new LoggedR2Bucket(this.env.DATALAKE_WEUR, context),
super(ctx, env) DATALAKE_ENAM: new LoggedR2Bucket(this.env.DATALAKE_ENAM, context),
DATALAKE_WNAM: new LoggedR2Bucket(this.env.DATALAKE_WNAM, context)
} }
async fetch (request: Request): Promise<Response> { try {
return await router.fetch(request, this.env, this.ctx).catch(error) return await router.fetch(request, env, this.ctx, context).catch(error)
} finally {
const total = performance.now() - start
const ops = context.metrics
const url = `${request.method} ${request.url}`
const message = `total=${total} ` + context.toString()
console.log({ message, total, ops, url })
}
} }
async getBlob (workspace: string, name: string): Promise<ArrayBuffer> { async getBlob (workspace: string, name: string): Promise<ArrayBuffer> {

View File

@ -13,42 +13,47 @@
// limitations under the License. // limitations under the License.
// //
import { type IRequest, type ResponseHandler, type RequestHandler } from 'itty-router' export interface MetricsData {
name: string
time: number
}
export async function measure<T> (label: string, fn: () => Promise<T>): Promise<T> { export class MetricsContext {
metrics: Array<MetricsData> = []
async with<T>(name: string, fn: () => Promise<T>): Promise<T> {
const start = performance.now() const start = performance.now()
try { try {
return await fn() return await fn()
} finally { } finally {
const duration = performance.now() - start const time = performance.now() - start
console.log({ stage: label, duration }) this.metrics.push({ name, time })
} }
} }
export function measureSync<T> (label: string, fn: () => T): T { withSync<T>(name: string, fn: () => T): T {
const start = performance.now() const start = performance.now()
try { try {
return fn() return fn()
} finally { } finally {
const duration = performance.now() - start const time = performance.now() - start
console.log({ stage: label, duration }) this.metrics.push({ name, time })
} }
} }
export const requestTimeBefore: RequestHandler<IRequest> = async (request: IRequest) => { toString (): string {
request.startTime = performance.now() return this.metrics.map((p) => `${p.name}=${p.time}`).join(' ')
} }
export const requestTimeAfter: ResponseHandler<Response> = async (response: Response, request: IRequest) => {
const duration = performance.now() - request.startTime
console.log({ stage: 'total', duration })
} }
export class LoggedR2Bucket implements R2Bucket { export class LoggedR2Bucket implements R2Bucket {
constructor (private readonly bucket: R2Bucket) {} constructor (
private readonly bucket: R2Bucket,
private readonly ctx: MetricsContext
) {}
async head (key: string): Promise<R2Object | null> { async head (key: string): Promise<R2Object | null> {
return await measure('r2.head', () => this.bucket.head(key)) return await this.ctx.with('r2.head', () => this.bucket.head(key))
} }
async get ( async get (
@ -57,7 +62,7 @@ export class LoggedR2Bucket implements R2Bucket {
onlyIf?: R2Conditional | Headers onlyIf?: R2Conditional | Headers
} }
): Promise<R2ObjectBody | null> { ): Promise<R2ObjectBody | null> {
return await measure('r2.get', () => this.bucket.get(key, options)) return await this.ctx.with('r2.get', () => this.bucket.get(key, options))
} }
async put ( async put (
@ -67,28 +72,31 @@ export class LoggedR2Bucket implements R2Bucket {
onlyIf?: R2Conditional | Headers onlyIf?: R2Conditional | Headers
} }
): Promise<R2Object> { ): Promise<R2Object> {
return await measure('r2.put', () => this.bucket.put(key, value, options)) return await this.ctx.with('r2.put', () => this.bucket.put(key, value, options))
} }
async createMultipartUpload (key: string, options?: R2MultipartOptions): Promise<R2MultipartUpload> { async createMultipartUpload (key: string, options?: R2MultipartOptions): Promise<R2MultipartUpload> {
return await measure('r2.createMultipartUpload', () => this.bucket.createMultipartUpload(key, options)) return await this.ctx.with('r2.createMultipartUpload', () => this.bucket.createMultipartUpload(key, options))
} }
resumeMultipartUpload (key: string, uploadId: string): R2MultipartUpload { resumeMultipartUpload (key: string, uploadId: string): R2MultipartUpload {
return measureSync('r2.resumeMultipartUpload', () => this.bucket.resumeMultipartUpload(key, uploadId)) return this.ctx.withSync('r2.resumeMultipartUpload', () => this.bucket.resumeMultipartUpload(key, uploadId))
} }
async delete (keys: string | string[]): Promise<void> { async delete (keys: string | string[]): Promise<void> {
await measure('r2.delete', () => this.bucket.delete(keys)) await this.ctx.with('r2.delete', () => this.bucket.delete(keys))
} }
async list (options?: R2ListOptions): Promise<R2Objects> { async list (options?: R2ListOptions): Promise<R2Objects> {
return await measure('r2.list', () => this.bucket.list(options)) return await this.ctx.with('r2.list', () => this.bucket.list(options))
} }
} }
export class LoggedKVNamespace implements KVNamespace { export class LoggedKVNamespace implements KVNamespace {
constructor (private readonly kv: KVNamespace) {} constructor (
private readonly kv: KVNamespace,
private readonly ctx: MetricsContext
) {}
get (key: string, options?: Partial<KVNamespaceGetOptions<undefined>>): Promise<string | null> get (key: string, options?: Partial<KVNamespaceGetOptions<undefined>>): Promise<string | null>
get (key: string, type: 'text'): Promise<string | null> get (key: string, type: 'text'): Promise<string | null>
@ -100,7 +108,7 @@ export class LoggedKVNamespace implements KVNamespace {
get (key: string, options?: KVNamespaceGetOptions<'arrayBuffer'>): Promise<ArrayBuffer | null> get (key: string, options?: KVNamespaceGetOptions<'arrayBuffer'>): Promise<ArrayBuffer | null>
get (key: string, options?: KVNamespaceGetOptions<'stream'>): Promise<ReadableStream | null> get (key: string, options?: KVNamespaceGetOptions<'stream'>): Promise<ReadableStream | null>
async get (key: string, options?: any): Promise<any> { async get (key: string, options?: any): Promise<any> {
return await measure('kv.get', () => this.kv.get(key, options)) return await this.ctx.with('kv.get', () => this.kv.get(key, options))
} }
getWithMetadata<Metadata = unknown>( getWithMetadata<Metadata = unknown>(
@ -140,11 +148,11 @@ export class LoggedKVNamespace implements KVNamespace {
options?: KVNamespaceGetOptions<'stream'> options?: KVNamespaceGetOptions<'stream'>
): Promise<KVNamespaceGetWithMetadataResult<ReadableStream, Metadata>> ): Promise<KVNamespaceGetWithMetadataResult<ReadableStream, Metadata>>
async getWithMetadata (key: string, options?: any): Promise<any> { async getWithMetadata (key: string, options?: any): Promise<any> {
return await measure('kv.getWithMetadata', () => this.kv.getWithMetadata(key, options)) return await this.ctx.with('kv.getWithMetadata', () => this.kv.getWithMetadata(key, options))
} }
async list<Metadata = unknown>(options?: KVNamespaceListOptions): Promise<KVNamespaceListResult<Metadata, string>> { async list<Metadata = unknown>(options?: KVNamespaceListOptions): Promise<KVNamespaceListResult<Metadata, string>> {
return await measure('kv.list', () => this.kv.list(options)) return await this.ctx.with('kv.list', () => this.kv.list(options))
} }
async put ( async put (
@ -152,26 +160,29 @@ export class LoggedKVNamespace implements KVNamespace {
value: string | ArrayBuffer | ArrayBufferView | ReadableStream, value: string | ArrayBuffer | ArrayBufferView | ReadableStream,
options?: KVNamespacePutOptions options?: KVNamespacePutOptions
): Promise<void> { ): Promise<void> {
await measure('kv.put', () => this.kv.put(key, value)) await this.ctx.with('kv.put', () => this.kv.put(key, value))
} }
async delete (key: string): Promise<void> { async delete (key: string): Promise<void> {
await measure('kv.delete', () => this.kv.delete(key)) await this.ctx.with('kv.delete', () => this.kv.delete(key))
} }
} }
export class LoggedCache implements Cache { export class LoggedCache implements Cache {
constructor (private readonly cache: Cache) {} constructor (
private readonly cache: Cache,
private readonly ctx: MetricsContext
) {}
async match (request: RequestInfo, options?: CacheQueryOptions): Promise<Response | undefined> { async match (request: RequestInfo, options?: CacheQueryOptions): Promise<Response | undefined> {
return await measure('cache.match', () => this.cache.match(request, options)) return await this.ctx.with('cache.match', () => this.cache.match(request, options))
} }
async delete (request: RequestInfo, options?: CacheQueryOptions): Promise<boolean> { async delete (request: RequestInfo, options?: CacheQueryOptions): Promise<boolean> {
return await measure('cache.delete', () => this.cache.delete(request, options)) return await this.ctx.with('cache.delete', () => this.cache.delete(request, options))
} }
async put (request: RequestInfo, response: Response): Promise<void> { async put (request: RequestInfo, response: Response): Promise<void> {
await measure('cache.put', () => this.cache.put(request, response)) await this.ctx.with('cache.put', () => this.cache.put(request, response))
} }
} }

View File

@ -14,9 +14,10 @@
// //
import { error, json } from 'itty-router' import { error, json } from 'itty-router'
import db, { withPostgres } from './db' import { withPostgres } from './db'
import { cacheControl } from './const' import { cacheControl } from './const'
import { toUUID } from './encodings' import { toUUID } from './encodings'
import { type MetricsContext } from './metrics'
import { selectStorage } from './storage' import { selectStorage } from './storage'
import { type BlobRequest, type UUID } from './types' import { type BlobRequest, type UUID } from './types'
@ -82,7 +83,8 @@ export async function handleMultipartUploadPart (
export async function handleMultipartUploadComplete ( export async function handleMultipartUploadComplete (
request: BlobRequest, request: BlobRequest,
env: Env, env: Env,
ctx: ExecutionContext ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> { ): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
@ -105,17 +107,15 @@ export async function handleMultipartUploadComplete (
const size = object.size ?? 0 const size = object.size ?? 0
const filename = multipartKey as UUID const filename = multipartKey as UUID
await withPostgres(env, ctx, async (sql) => { await withPostgres(env, ctx, metrics, async (db) => {
const data = await db.getData(sql, { hash, location }) const data = await db.getData({ hash, location })
if (data !== null) { if (data !== null) {
// blob already exists // blob already exists
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })])
} else { } else {
// Otherwise register a new hash and blob // Otherwise register a new hash and blob
await sql.begin((sql) => [ await db.createData({ hash, location, filename, type, size })
db.createData(sql, { hash, location, filename, type, size }), await db.createBlob({ workspace, name, hash, location })
db.createBlob(sql, { workspace, name, hash, location })
])
} }
}) })

View File

@ -15,8 +15,9 @@
import { AwsClient } from 'aws4fetch' import { AwsClient } from 'aws4fetch'
import { error, json } from 'itty-router' import { error, json } from 'itty-router'
import db, { withPostgres } from './db' import { withPostgres } from './db'
import { saveBlob } from './blob' import { saveBlob } from './blob'
import { type MetricsContext } from './metrics'
import { type BlobRequest } from './types' import { type BlobRequest } from './types'
export interface S3UploadPayload { export interface S3UploadPayload {
@ -35,16 +36,21 @@ function getS3Client (payload: S3UploadPayload): AwsClient {
}) })
} }
export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> { export async function handleS3Blob (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
const payload = await request.json<S3UploadPayload>() const payload = await request.json<S3UploadPayload>()
const client = getS3Client(payload) const client = getS3Client(payload)
return await withPostgres(env, ctx, async (sql) => { return await withPostgres(env, ctx, metrics, async (db) => {
// Ensure the blob does not exist // Ensure the blob does not exist
const blob = await db.getBlob(sql, { workspace, name }) const blob = await db.getBlob({ workspace, name })
if (blob !== null) { if (blob !== null) {
return new Response(null, { status: 200 }) return new Response(null, { status: 200 })
} }
@ -65,7 +71,7 @@ export async function handleS3Blob (request: BlobRequest, env: Env, ctx: Executi
const contentLength = Number.parseInt(contentLengthHeader) const contentLength = Number.parseInt(contentLengthHeader)
const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now()
const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified) const result = await saveBlob(env, db, object.body, contentLength, contentType, workspace, name, lastModified)
return json(result) return json(result)
}) })
} }

View File

@ -17,8 +17,9 @@ import { AwsClient } from 'aws4fetch'
import { error } from 'itty-router' import { error } from 'itty-router'
import { handleBlobUploaded } from './blob' import { handleBlobUploaded } from './blob'
import { type MetricsContext } from './metrics'
import { type Storage, selectStorage } from './storage'
import { type BlobRequest, type UUID } from './types' import { type BlobRequest, type UUID } from './types'
import { selectStorage, type Storage } from './storage'
const S3_SIGNED_LINK_TTL = 3600 const S3_SIGNED_LINK_TTL = 3600
@ -39,7 +40,12 @@ function getS3Client (storage: Storage): AwsClient {
}) })
} }
export async function handleSignCreate (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> { export async function handleSignCreate (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
const storage = selectStorage(env, workspace) const storage = selectStorage(env, workspace)
const accountId = env.R2_ACCOUNT_ID const accountId = env.R2_ACCOUNT_ID
@ -57,7 +63,9 @@ export async function handleSignCreate (request: BlobRequest, env: Env, ctx: Exe
try { try {
const client = getS3Client(storage) const client = getS3Client(storage)
signed = await client.sign(new Request(url, { method: 'PUT' }), { aws: { signQuery: true } }) signed = await metrics.with('s3.sign', () => {
return client.sign(new Request(url, { method: 'PUT' }), { aws: { signQuery: true } })
})
} catch (err: any) { } catch (err: any) {
console.error({ error: 'failed to generate signed url', message: `${err}` }) console.error({ error: 'failed to generate signed url', message: `${err}` })
return error(500, 'failed to generate signed url') return error(500, 'failed to generate signed url')
@ -73,7 +81,12 @@ export async function handleSignCreate (request: BlobRequest, env: Env, ctx: Exe
return new Response(signed.url, { status: 200, headers }) return new Response(signed.url, { status: 200, headers })
} }
export async function handleSignComplete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> { export async function handleSignComplete (
request: BlobRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace, name } = request const { workspace, name } = request
const { bucket } = selectStorage(env, workspace) const { bucket } = selectStorage(env, workspace)
@ -96,7 +109,7 @@ export async function handleSignComplete (request: BlobRequest, env: Env, ctx: E
} }
try { try {
await handleBlobUploaded(env, ctx, workspace, name, uuid) await handleBlobUploaded(env, ctx, metrics, workspace, name, uuid)
} catch (err) { } catch (err) {
const message = err instanceof Error ? err.message : String(err) const message = err instanceof Error ? err.message : String(err)
console.error({ error: message, workspace, name, uuid }) console.error({ error: message, workspace, name, uuid })

View File

@ -95,7 +95,7 @@ r2_buckets = [
] ]
hyperdrive = [ hyperdrive = [
{ binding = "HYPERDRIVE", id = "055e968f3067414eaa30467d8a9c5021" } { binding = "HYPERDRIVE", id = "055e968f3067414eaa30467d8a9c5021", localConnectionString = "postgresql://root:roach@localhost:26257/datalake" }
] ]
[env.dev.vars] [env.dev.vars]