From 63418d67a1d5eb7543570093d4fd36c079bc6883 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Tue, 10 Dec 2024 14:45:08 +0700 Subject: [PATCH] UBERF-8842 Improve datalake performance logs (#7406) Signed-off-by: Alexander Onnikov --- workers/datalake/src/blob.ts | 88 ++++++++------ workers/datalake/src/db.ts | 112 +++++++++++------- workers/datalake/src/image.ts | 10 +- workers/datalake/src/index.ts | 44 ++++--- .../datalake/src/{measure.ts => metrics.ts} | 95 ++++++++------- workers/datalake/src/multipart.ts | 18 +-- workers/datalake/src/s3.ts | 16 ++- workers/datalake/src/sign.ts | 23 +++- workers/datalake/wrangler.toml | 2 +- 9 files changed, 246 insertions(+), 162 deletions(-) rename workers/datalake/src/{measure.ts => metrics.ts} (67%) diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index 3fd6fcd16f..1c97a1675b 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -14,15 +14,14 @@ // import { error, json } from 'itty-router' -import { type Sql } from 'postgres' -import db, { withPostgres } from './db' +import { type BlobDB, withPostgres } from './db' import { cacheControl, hashLimit } from './const' import { toUUID } from './encodings' import { getSha256 } from './hash' import { selectStorage } from './storage' import { type BlobRequest, type WorkspaceRequest, type UUID } from './types' import { copyVideo, deleteVideo } from './video' -import { measure, LoggedCache } from './measure' +import { type MetricsContext, LoggedCache } from './metrics' interface BlobMetadata { lastModified: number @@ -36,20 +35,24 @@ export function getBlobURL (request: Request, workspace: string, name: string): return new URL(path, request.url).toString() } -export async function handleBlobGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { +export async function handleBlobGet ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name } = request - const cache = new LoggedCache(caches.default) + const cache = new LoggedCache(caches.default, metrics) const cached = await cache.match(request) if (cached !== undefined) { - console.log({ message: 'cache hit' }) return cached } const { bucket } = selectStorage(env, workspace) - const blob = await withPostgres(env, ctx, (sql) => { - return db.getBlob(sql, { workspace, name }) + const blob = await withPostgres(env, ctx, metrics, (db) => { + return db.getBlob({ workspace, name }) }) if (blob === null || blob.deleted) { return error(404) @@ -72,19 +75,25 @@ export async function handleBlobGet (request: BlobRequest, env: Env, ctx: Execut const response = new Response(object?.body, { headers, status }) if (response.status === 200) { - ctx.waitUntil(cache.put(request, response.clone())) + const clone = metrics.withSync('response.clone', () => response.clone()) + ctx.waitUntil(cache.put(request, clone)) } return response } -export async function handleBlobHead (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { +export async function handleBlobHead ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name } = request const { bucket } = selectStorage(env, workspace) - const blob = await withPostgres(env, ctx, (sql) => { - return db.getBlob(sql, { workspace, name }) + const blob = await withPostgres(env, ctx, metrics, (db) => { + return db.getBlob({ workspace, name }) }) if (blob === null || blob.deleted) { return error(404) @@ -99,12 +108,17 @@ export async function handleBlobHead (request: BlobRequest, env: Env, ctx: Execu return new Response(null, { headers, status: 200 }) } -export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { +export async function handleBlobDelete ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name } = request try { - await withPostgres(env, ctx, (sql) => { - return Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)]) + await withPostgres(env, ctx, metrics, (db) => { + return Promise.all([db.deleteBlob({ workspace, name }), deleteVideo(env, workspace, name)]) }) return new Response(null, { status: 204 }) @@ -118,7 +132,8 @@ export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: Exe export async function handleUploadFormData ( request: WorkspaceRequest, env: Env, - ctx: ExecutionContext + ctx: ExecutionContext, + metrics: MetricsContext ): Promise { const contentType = request.headers.get('Content-Type') if (contentType === null || !contentType.includes('multipart/form-data')) { @@ -130,7 +145,7 @@ export async function handleUploadFormData ( let formData: FormData try { - formData = await measure('fetch formdata', () => request.formData()) + formData = await metrics.with('request.formData', () => request.formData()) } catch (err: any) { const message = err instanceof Error ? err.message : String(err) console.error({ error: 'failed to parse form data', message }) @@ -146,8 +161,8 @@ export async function handleUploadFormData ( files.map(async ([file, key]) => { const { name, type, lastModified } = file try { - const metadata = await withPostgres(env, ctx, (sql) => { - return saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified) + const metadata = await withPostgres(env, ctx, metrics, (db) => { + return saveBlob(env, db, file.stream(), file.size, type, workspace, name, lastModified) }) // TODO this probably should happen via queue, let it be here for now @@ -170,7 +185,7 @@ export async function handleUploadFormData ( export async function saveBlob ( env: Env, - sql: Sql, + db: BlobDB, stream: ReadableStream, size: number, type: string, @@ -187,17 +202,15 @@ export async function saveBlob ( const [hashStream, uploadStream] = stream.tee() const hash = await getSha256(hashStream) - const data = await db.getData(sql, { hash, location }) + const data = await db.getData({ hash, location }) if (data !== null) { // Lucky boy, nothing to upload, use existing blob - await db.createBlob(sql, { workspace, name, hash, location }) + await db.createBlob({ workspace, name, hash, location }) } else { await bucket.put(filename, uploadStream, { httpMetadata }) - await sql.begin((sql) => [ - db.createData(sql, { hash, location, filename, type, size }), - db.createBlob(sql, { workspace, name, hash, location }) - ]) + await db.createData({ hash, location, filename, type, size }) + await db.createBlob({ workspace, name, hash, location }) } return { type, size, lastModified, name } @@ -205,17 +218,15 @@ export async function saveBlob ( // For large files we cannot calculate checksum beforehead // upload file with unique filename and then obtain checksum const { hash } = await uploadLargeFile(bucket, stream, filename, { httpMetadata }) - const data = await db.getData(sql, { hash, location }) + const data = await db.getData({ hash, location }) if (data !== null) { // We found an existing blob with the same hash // we can safely remove the existing blob from storage - await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })]) } else { // Otherwise register a new hash and blob - await sql.begin((sql) => [ - db.createData(sql, { hash, location, filename, type, size }), - db.createBlob(sql, { workspace, name, hash, location }) - ]) + await db.createData({ hash, location, filename, type, size }) + await db.createBlob({ workspace, name, hash, location }) } return { type, size, lastModified, name } @@ -225,6 +236,7 @@ export async function saveBlob ( export async function handleBlobUploaded ( env: Env, ctx: ExecutionContext, + metrics: MetricsContext, workspace: string, name: string, filename: UUID @@ -238,18 +250,16 @@ export async function handleBlobUploaded ( const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID) - await withPostgres(env, ctx, async (sql) => { - const data = await db.getData(sql, { hash, location }) + await withPostgres(env, ctx, metrics, async (db) => { + const data = await db.getData({ hash, location }) if (data !== null) { - await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })]) } else { const size = object.size const type = object.httpMetadata?.contentType ?? 'application/octet-stream' - await sql.begin((sql) => [ - db.createData(sql, { hash, location, filename, type, size }), - db.createBlob(sql, { workspace, name, hash, location }) - ]) + await db.createData({ hash, location, filename, type, size }) + await db.createBlob({ workspace, name, hash, location }) } }) } diff --git a/workers/datalake/src/db.ts b/workers/datalake/src/db.ts index 6aad3e3be8..8af838fdb9 100644 --- a/workers/datalake/src/db.ts +++ b/workers/datalake/src/db.ts @@ -14,7 +14,7 @@ // import postgres from 'postgres' -import { measure, measureSync } from './measure' +import { type MetricsContext } from './metrics' import { type Location, type UUID } from './types' export interface BlobDataId { @@ -46,78 +46,91 @@ export interface BlobRecordWithFilename extends BlobRecord { export async function withPostgres ( env: Env, ctx: ExecutionContext, - fn: (sql: postgres.Sql) => Promise + metrics: MetricsContext, + fn: (db: BlobDB) => Promise ): Promise { - const sql = measureSync('db.connect', () => { - return postgres(env.HYPERDRIVE.connectionString) + const sql = metrics.withSync('db.connect', () => { + return postgres(env.HYPERDRIVE.connectionString, { + connection: { + application_name: 'datalake' + } + }) }) + const db = new LoggedDB(new PostgresDB(sql), metrics) + try { - return await fn(sql) + return await fn(db) } finally { - measureSync('db.close', () => { + metrics.withSync('db.disconnect', () => { ctx.waitUntil(sql.end({ timeout: 0 })) }) } } export interface BlobDB { - getData: (sql: postgres.Sql, dataId: BlobDataId) => Promise - createData: (sql: postgres.Sql, data: BlobDataRecord) => Promise - getBlob: (sql: postgres.Sql, blobId: BlobId) => Promise - createBlob: (sql: postgres.Sql, blob: Omit) => Promise - deleteBlob: (sql: postgres.Sql, blob: BlobId) => Promise + getData: (dataId: BlobDataId) => Promise + createData: (data: BlobDataRecord) => Promise + getBlob: (blobId: BlobId) => Promise + createBlob: (blob: Omit) => Promise + deleteBlob: (blob: BlobId) => Promise } -const db: BlobDB = { - async getData (sql: postgres.Sql, dataId: BlobDataId): Promise { +export class PostgresDB implements BlobDB { + constructor (private readonly sql: postgres.Sql) {} + + async getData (dataId: BlobDataId): Promise { const { hash, location } = dataId - const rows = await sql` + const rows = await this.sql` SELECT hash, location, filename, size, type FROM blob.data WHERE hash = ${hash} AND location = ${location} ` return rows.length > 0 ? rows[0] : null - }, + } - async createData (sql: postgres.Sql, data: BlobDataRecord): Promise { + async createData (data: BlobDataRecord): Promise { const { hash, location, filename, size, type } = data - await sql` + await this.sql` UPSERT INTO blob.data (hash, location, filename, size, type) VALUES (${hash}, ${location}, ${filename}, ${size}, ${type}) ` - }, + } - async getBlob (sql: postgres.Sql, blobId: BlobId): Promise { + async getBlob (blobId: BlobId): Promise { const { workspace, name } = blobId - const rows = await sql` - SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename - FROM blob.blob AS b - JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location - WHERE b.workspace = ${workspace} AND b.name = ${name} - ` + try { + const rows = await this.sql` + SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename + FROM blob.blob AS b + JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location + WHERE b.workspace = ${workspace} AND b.name = ${name} + ` - if (rows.length > 0) { - return rows[0] + if (rows.length > 0) { + return rows[0] + } + } catch (err) { + console.error(err) } return null - }, + } - async createBlob (sql: postgres.Sql, blob: Omit): Promise { + async createBlob (blob: Omit): Promise { const { workspace, name, hash, location } = blob - await sql` + await this.sql` UPSERT INTO blob.blob (workspace, name, hash, location, deleted) VALUES (${workspace}, ${name}, ${hash}, ${location}, false) ` - }, + } - async deleteBlob (sql: postgres.Sql, blob: BlobId): Promise { + async deleteBlob (blob: BlobId): Promise { const { workspace, name } = blob - await sql` + await this.sql` UPDATE blob.blob SET deleted = true WHERE workspace = ${workspace} AND name = ${name} @@ -125,12 +138,29 @@ const db: BlobDB = { } } -export const measuredDb: BlobDB = { - getData: (sql, dataId) => measure('db.getData', () => db.getData(sql, dataId)), - createData: (sql, data) => measure('db.createData', () => db.createData(sql, data)), - getBlob: (sql, blobId) => measure('db.getBlob', () => db.getBlob(sql, blobId)), - createBlob: (sql, blob) => measure('db.createBlob', () => db.createBlob(sql, blob)), - deleteBlob: (sql, blob) => measure('db.deleteBlob', () => db.deleteBlob(sql, blob)) -} +export class LoggedDB implements BlobDB { + constructor ( + private readonly db: BlobDB, + private readonly ctx: MetricsContext + ) {} -export default measuredDb + async getData (dataId: BlobDataId): Promise { + return await this.ctx.with('db.getData', () => this.db.getData(dataId)) + } + + async createData (data: BlobDataRecord): Promise { + await this.ctx.with('db.createData', () => this.db.createData(data)) + } + + async getBlob (blobId: BlobId): Promise { + return await this.ctx.with('db.getBlob', () => this.db.getBlob(blobId)) + } + + async createBlob (blob: Omit): Promise { + await this.ctx.with('db.createBlob', () => this.db.createBlob(blob)) + } + + async deleteBlob (blob: BlobId): Promise { + await this.ctx.with('db.deleteBlob', () => this.db.deleteBlob(blob)) + } +} diff --git a/workers/datalake/src/image.ts b/workers/datalake/src/image.ts index 1293e5c89f..e39646a3fc 100644 --- a/workers/datalake/src/image.ts +++ b/workers/datalake/src/image.ts @@ -14,11 +14,17 @@ // import { getBlobURL } from './blob' +import { type MetricsContext } from './metrics' import { type BlobRequest } from './types' const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png'] -export async function handleImageGet (request: BlobRequest): Promise { +export async function handleImageGet ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name, @@ -48,5 +54,5 @@ export async function handleImageGet (request: BlobRequest): Promise { const blobURL = getBlobURL(request, workspace, name) const imageRequest = new Request(blobURL, { headers: { Accept } }) - return await fetch(imageRequest, { cf: { image, cacheTtl: 3600 } }) + return await metrics.with('image.transform', () => fetch(imageRequest, { cf: { image, cacheTtl: 3600 } })) } diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index 4dcf33df06..054ee11cae 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -14,11 +14,11 @@ // import { WorkerEntrypoint } from 'cloudflare:workers' -import { type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' +import { type IRequest, type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob' import { cors } from './cors' -import { LoggedKVNamespace, LoggedR2Bucket, requestTimeAfter, requestTimeBefore } from './measure' +import { LoggedKVNamespace, LoggedR2Bucket, MetricsContext } from './metrics' import { handleImageGet } from './image' import { handleS3Blob } from './s3' import { handleVideoMetaGet } from './video' @@ -36,8 +36,8 @@ const { preflight, corsify } = cors({ }) const router = Router({ - before: [preflight, requestTimeBefore], - finally: [corsify, requestTimeAfter] + before: [preflight], + finally: [corsify] }) const withWorkspace: RequestHandler = (request: WorkspaceRequest) => { @@ -88,21 +88,29 @@ router .all('*', () => error(404)) export default class DatalakeWorker extends WorkerEntrypoint { - constructor (ctx: ExecutionContext, env: Env) { - env = { - ...env, - datalake_blobs: new LoggedKVNamespace(env.datalake_blobs), - DATALAKE_APAC: new LoggedR2Bucket(env.DATALAKE_APAC), - DATALAKE_EEUR: new LoggedR2Bucket(env.DATALAKE_EEUR), - DATALAKE_WEUR: new LoggedR2Bucket(env.DATALAKE_WEUR), - DATALAKE_ENAM: new LoggedR2Bucket(env.DATALAKE_ENAM), - DATALAKE_WNAM: new LoggedR2Bucket(env.DATALAKE_WNAM) - } - super(ctx, env) - } + async fetch (request: IRequest): Promise { + const start = performance.now() + const context = new MetricsContext() - async fetch (request: Request): Promise { - return await router.fetch(request, this.env, this.ctx).catch(error) + const env = { + ...this.env, + datalake_blobs: new LoggedKVNamespace(this.env.datalake_blobs, context), + DATALAKE_APAC: new LoggedR2Bucket(this.env.DATALAKE_APAC, context), + DATALAKE_EEUR: new LoggedR2Bucket(this.env.DATALAKE_EEUR, context), + DATALAKE_WEUR: new LoggedR2Bucket(this.env.DATALAKE_WEUR, context), + DATALAKE_ENAM: new LoggedR2Bucket(this.env.DATALAKE_ENAM, context), + DATALAKE_WNAM: new LoggedR2Bucket(this.env.DATALAKE_WNAM, context) + } + + try { + return await router.fetch(request, env, this.ctx, context).catch(error) + } finally { + const total = performance.now() - start + const ops = context.metrics + const url = `${request.method} ${request.url}` + const message = `total=${total} ` + context.toString() + console.log({ message, total, ops, url }) + } } async getBlob (workspace: string, name: string): Promise { diff --git a/workers/datalake/src/measure.ts b/workers/datalake/src/metrics.ts similarity index 67% rename from workers/datalake/src/measure.ts rename to workers/datalake/src/metrics.ts index 385f3b44e8..589bbb396f 100644 --- a/workers/datalake/src/measure.ts +++ b/workers/datalake/src/metrics.ts @@ -13,42 +13,47 @@ // limitations under the License. // -import { type IRequest, type ResponseHandler, type RequestHandler } from 'itty-router' +export interface MetricsData { + name: string + time: number +} -export async function measure (label: string, fn: () => Promise): Promise { - const start = performance.now() - try { - return await fn() - } finally { - const duration = performance.now() - start - console.log({ stage: label, duration }) +export class MetricsContext { + metrics: Array = [] + + async with(name: string, fn: () => Promise): Promise { + const start = performance.now() + try { + return await fn() + } finally { + const time = performance.now() - start + this.metrics.push({ name, time }) + } } -} -export function measureSync (label: string, fn: () => T): T { - const start = performance.now() - try { - return fn() - } finally { - const duration = performance.now() - start - console.log({ stage: label, duration }) + withSync(name: string, fn: () => T): T { + const start = performance.now() + try { + return fn() + } finally { + const time = performance.now() - start + this.metrics.push({ name, time }) + } } -} -export const requestTimeBefore: RequestHandler = async (request: IRequest) => { - request.startTime = performance.now() -} - -export const requestTimeAfter: ResponseHandler = async (response: Response, request: IRequest) => { - const duration = performance.now() - request.startTime - console.log({ stage: 'total', duration }) + toString (): string { + return this.metrics.map((p) => `${p.name}=${p.time}`).join(' ') + } } export class LoggedR2Bucket implements R2Bucket { - constructor (private readonly bucket: R2Bucket) {} + constructor ( + private readonly bucket: R2Bucket, + private readonly ctx: MetricsContext + ) {} async head (key: string): Promise { - return await measure('r2.head', () => this.bucket.head(key)) + return await this.ctx.with('r2.head', () => this.bucket.head(key)) } async get ( @@ -57,7 +62,7 @@ export class LoggedR2Bucket implements R2Bucket { onlyIf?: R2Conditional | Headers } ): Promise { - return await measure('r2.get', () => this.bucket.get(key, options)) + return await this.ctx.with('r2.get', () => this.bucket.get(key, options)) } async put ( @@ -67,28 +72,31 @@ export class LoggedR2Bucket implements R2Bucket { onlyIf?: R2Conditional | Headers } ): Promise { - return await measure('r2.put', () => this.bucket.put(key, value, options)) + return await this.ctx.with('r2.put', () => this.bucket.put(key, value, options)) } async createMultipartUpload (key: string, options?: R2MultipartOptions): Promise { - return await measure('r2.createMultipartUpload', () => this.bucket.createMultipartUpload(key, options)) + return await this.ctx.with('r2.createMultipartUpload', () => this.bucket.createMultipartUpload(key, options)) } resumeMultipartUpload (key: string, uploadId: string): R2MultipartUpload { - return measureSync('r2.resumeMultipartUpload', () => this.bucket.resumeMultipartUpload(key, uploadId)) + return this.ctx.withSync('r2.resumeMultipartUpload', () => this.bucket.resumeMultipartUpload(key, uploadId)) } async delete (keys: string | string[]): Promise { - await measure('r2.delete', () => this.bucket.delete(keys)) + await this.ctx.with('r2.delete', () => this.bucket.delete(keys)) } async list (options?: R2ListOptions): Promise { - return await measure('r2.list', () => this.bucket.list(options)) + return await this.ctx.with('r2.list', () => this.bucket.list(options)) } } export class LoggedKVNamespace implements KVNamespace { - constructor (private readonly kv: KVNamespace) {} + constructor ( + private readonly kv: KVNamespace, + private readonly ctx: MetricsContext + ) {} get (key: string, options?: Partial>): Promise get (key: string, type: 'text'): Promise @@ -100,7 +108,7 @@ export class LoggedKVNamespace implements KVNamespace { get (key: string, options?: KVNamespaceGetOptions<'arrayBuffer'>): Promise get (key: string, options?: KVNamespaceGetOptions<'stream'>): Promise async get (key: string, options?: any): Promise { - return await measure('kv.get', () => this.kv.get(key, options)) + return await this.ctx.with('kv.get', () => this.kv.get(key, options)) } getWithMetadata( @@ -140,11 +148,11 @@ export class LoggedKVNamespace implements KVNamespace { options?: KVNamespaceGetOptions<'stream'> ): Promise> async getWithMetadata (key: string, options?: any): Promise { - return await measure('kv.getWithMetadata', () => this.kv.getWithMetadata(key, options)) + return await this.ctx.with('kv.getWithMetadata', () => this.kv.getWithMetadata(key, options)) } async list(options?: KVNamespaceListOptions): Promise> { - return await measure('kv.list', () => this.kv.list(options)) + return await this.ctx.with('kv.list', () => this.kv.list(options)) } async put ( @@ -152,26 +160,29 @@ export class LoggedKVNamespace implements KVNamespace { value: string | ArrayBuffer | ArrayBufferView | ReadableStream, options?: KVNamespacePutOptions ): Promise { - await measure('kv.put', () => this.kv.put(key, value)) + await this.ctx.with('kv.put', () => this.kv.put(key, value)) } async delete (key: string): Promise { - await measure('kv.delete', () => this.kv.delete(key)) + await this.ctx.with('kv.delete', () => this.kv.delete(key)) } } export class LoggedCache implements Cache { - constructor (private readonly cache: Cache) {} + constructor ( + private readonly cache: Cache, + private readonly ctx: MetricsContext + ) {} async match (request: RequestInfo, options?: CacheQueryOptions): Promise { - return await measure('cache.match', () => this.cache.match(request, options)) + return await this.ctx.with('cache.match', () => this.cache.match(request, options)) } async delete (request: RequestInfo, options?: CacheQueryOptions): Promise { - return await measure('cache.delete', () => this.cache.delete(request, options)) + return await this.ctx.with('cache.delete', () => this.cache.delete(request, options)) } async put (request: RequestInfo, response: Response): Promise { - await measure('cache.put', () => this.cache.put(request, response)) + await this.ctx.with('cache.put', () => this.cache.put(request, response)) } } diff --git a/workers/datalake/src/multipart.ts b/workers/datalake/src/multipart.ts index 6b5558a816..92993be97e 100644 --- a/workers/datalake/src/multipart.ts +++ b/workers/datalake/src/multipart.ts @@ -14,9 +14,10 @@ // import { error, json } from 'itty-router' -import db, { withPostgres } from './db' +import { withPostgres } from './db' import { cacheControl } from './const' import { toUUID } from './encodings' +import { type MetricsContext } from './metrics' import { selectStorage } from './storage' import { type BlobRequest, type UUID } from './types' @@ -82,7 +83,8 @@ export async function handleMultipartUploadPart ( export async function handleMultipartUploadComplete ( request: BlobRequest, env: Env, - ctx: ExecutionContext + ctx: ExecutionContext, + metrics: MetricsContext ): Promise { const { workspace, name } = request @@ -105,17 +107,15 @@ export async function handleMultipartUploadComplete ( const size = object.size ?? 0 const filename = multipartKey as UUID - await withPostgres(env, ctx, async (sql) => { - const data = await db.getData(sql, { hash, location }) + await withPostgres(env, ctx, metrics, async (db) => { + const data = await db.getData({ hash, location }) if (data !== null) { // blob already exists - await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })]) } else { // Otherwise register a new hash and blob - await sql.begin((sql) => [ - db.createData(sql, { hash, location, filename, type, size }), - db.createBlob(sql, { workspace, name, hash, location }) - ]) + await db.createData({ hash, location, filename, type, size }) + await db.createBlob({ workspace, name, hash, location }) } }) diff --git a/workers/datalake/src/s3.ts b/workers/datalake/src/s3.ts index 062b7714e3..2f1e1d1f47 100644 --- a/workers/datalake/src/s3.ts +++ b/workers/datalake/src/s3.ts @@ -15,8 +15,9 @@ import { AwsClient } from 'aws4fetch' import { error, json } from 'itty-router' -import db, { withPostgres } from './db' +import { withPostgres } from './db' import { saveBlob } from './blob' +import { type MetricsContext } from './metrics' import { type BlobRequest } from './types' export interface S3UploadPayload { @@ -35,16 +36,21 @@ function getS3Client (payload: S3UploadPayload): AwsClient { }) } -export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { +export async function handleS3Blob ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name } = request const payload = await request.json() const client = getS3Client(payload) - return await withPostgres(env, ctx, async (sql) => { + return await withPostgres(env, ctx, metrics, async (db) => { // Ensure the blob does not exist - const blob = await db.getBlob(sql, { workspace, name }) + const blob = await db.getBlob({ workspace, name }) if (blob !== null) { return new Response(null, { status: 200 }) } @@ -65,7 +71,7 @@ export async function handleS3Blob (request: BlobRequest, env: Env, ctx: Executi const contentLength = Number.parseInt(contentLengthHeader) const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() - const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified) + const result = await saveBlob(env, db, object.body, contentLength, contentType, workspace, name, lastModified) return json(result) }) } diff --git a/workers/datalake/src/sign.ts b/workers/datalake/src/sign.ts index 1ac7c6206c..be6fcfa8d9 100644 --- a/workers/datalake/src/sign.ts +++ b/workers/datalake/src/sign.ts @@ -17,8 +17,9 @@ import { AwsClient } from 'aws4fetch' import { error } from 'itty-router' import { handleBlobUploaded } from './blob' +import { type MetricsContext } from './metrics' +import { type Storage, selectStorage } from './storage' import { type BlobRequest, type UUID } from './types' -import { selectStorage, type Storage } from './storage' const S3_SIGNED_LINK_TTL = 3600 @@ -39,7 +40,12 @@ function getS3Client (storage: Storage): AwsClient { }) } -export async function handleSignCreate (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { +export async function handleSignCreate ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name } = request const storage = selectStorage(env, workspace) const accountId = env.R2_ACCOUNT_ID @@ -57,7 +63,9 @@ export async function handleSignCreate (request: BlobRequest, env: Env, ctx: Exe try { const client = getS3Client(storage) - signed = await client.sign(new Request(url, { method: 'PUT' }), { aws: { signQuery: true } }) + signed = await metrics.with('s3.sign', () => { + return client.sign(new Request(url, { method: 'PUT' }), { aws: { signQuery: true } }) + }) } catch (err: any) { console.error({ error: 'failed to generate signed url', message: `${err}` }) return error(500, 'failed to generate signed url') @@ -73,7 +81,12 @@ export async function handleSignCreate (request: BlobRequest, env: Env, ctx: Exe return new Response(signed.url, { status: 200, headers }) } -export async function handleSignComplete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { +export async function handleSignComplete ( + request: BlobRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { const { workspace, name } = request const { bucket } = selectStorage(env, workspace) @@ -96,7 +109,7 @@ export async function handleSignComplete (request: BlobRequest, env: Env, ctx: E } try { - await handleBlobUploaded(env, ctx, workspace, name, uuid) + await handleBlobUploaded(env, ctx, metrics, workspace, name, uuid) } catch (err) { const message = err instanceof Error ? err.message : String(err) console.error({ error: message, workspace, name, uuid }) diff --git a/workers/datalake/wrangler.toml b/workers/datalake/wrangler.toml index 0b6989c4b0..1008edbbe5 100644 --- a/workers/datalake/wrangler.toml +++ b/workers/datalake/wrangler.toml @@ -95,7 +95,7 @@ r2_buckets = [ ] hyperdrive = [ - { binding = "HYPERDRIVE", id = "055e968f3067414eaa30467d8a9c5021" } + { binding = "HYPERDRIVE", id = "055e968f3067414eaa30467d8a9c5021", localConnectionString = "postgresql://root:roach@localhost:26257/datalake" } ] [env.dev.vars]