Merge remote-tracking branch 'origin/develop' into staging

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-05 16:32:45 +07:00
commit ac5feea726
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
16 changed files with 686 additions and 58 deletions

View File

@ -150,6 +150,8 @@
"@hcengineering/telegram": "^0.6.21",
"@hcengineering/tracker": "^0.6.24",
"@hcengineering/collaboration": "^0.6.0",
"@hcengineering/datalake": "^0.6.0",
"@hcengineering/s3": "^0.6.0",
"commander": "^8.1.0",
"csv-parse": "~5.1.0",
"email-addresses": "^5.0.0",

View File

@ -118,8 +118,10 @@ import { changeConfiguration } from './configuration'
import { moveAccountDbFromMongoToPG, moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { moveFiles, showLostFiles } from './storage'
import { copyToDatalake, moveFiles, showLostFiles } from './storage'
import { getModelVersion } from '@hcengineering/model-all'
import { type DatalakeConfig, DatalakeService, createDatalakeClient } from '@hcengineering/datalake'
import { S3Service, type S3Config } from '@hcengineering/s3'
const colorConstants = {
colorRed: '\u001b[31m',
@ -1184,6 +1186,54 @@ export function devTool (
await storageAdapter.close()
})
program
.command('copy-s3-datalake')
.description('migrate files from s3 to datalake')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, concurrency: string }) => {
const params = {
concurrency: parseInt(cmd.concurrency)
}
const storageConfig = storageConfigFromEnv(process.env.STORAGE)
const storages = storageConfig.storages.filter((p) => p.kind === S3Service.config) as S3Config[]
if (storages.length === 0) {
throw new Error('S3 storage config is required')
}
const datalakeConfig = storageConfig.storages.find((p) => p.kind === DatalakeService.config)
if (datalakeConfig === undefined) {
throw new Error('Datalake storage config is required')
}
const datalake = createDatalakeClient(datalakeConfig as DatalakeConfig)
let workspaces: Workspace[] = []
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
workspaces = await listWorkspacesPure(db)
workspaces = workspaces
.filter((p) => p.mode !== 'archived')
.filter((p) => cmd.workspace === '' || p.workspace === cmd.workspace)
.sort((a, b) => b.lastVisit - a.lastVisit)
})
const count = workspaces.length
let index = 0
for (const workspace of workspaces) {
index++
toolCtx.info('processing workspace', { workspace: workspace.workspace, index, count })
const workspaceId = getWorkspaceId(workspace.workspace)
for (const config of storages) {
const storage = new S3Service(config)
await copyToDatalake(toolCtx, workspaceId, config, storage, datalake, params)
}
}
})
program
.command('confirm-email <email>')
.description('confirm user email')

View File

@ -14,8 +14,17 @@
//
import { type Attachment } from '@hcengineering/attachment'
import { type Blob, type MeasureContext, type Ref, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import {
type Blob,
type MeasureContext,
type Ref,
type WorkspaceId,
concatLink,
RateLimiter
} from '@hcengineering/core'
import { type DatalakeClient } from '@hcengineering/datalake'
import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment'
import { type S3Config, type S3Service } from '@hcengineering/s3'
import {
type ListBlobResult,
type StorageAdapter,
@ -249,3 +258,127 @@ async function retryOnFailure<T> (
}
throw lastError
}
export interface CopyDatalakeParams {
concurrency: number
}
export async function copyToDatalake (
ctx: MeasureContext,
workspaceId: WorkspaceId,
config: S3Config,
adapter: S3Service,
datalake: DatalakeClient,
params: CopyDatalakeParams
): Promise<void> {
console.log('copying from', config.name, 'concurrency:', params.concurrency)
const exists = await adapter.exists(ctx, workspaceId)
if (!exists) {
console.log('no files to copy')
return
}
let time = Date.now()
let processedCnt = 0
let skippedCnt = 0
function printStats (): void {
const duration = Date.now() - time
console.log('...processed', processedCnt, 'skipped', skippedCnt, Math.round(duration / 1000) + 's')
time = Date.now()
}
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await adapter.listStream(ctx, workspaceId)
try {
while (true) {
const batch = await iterator.next()
if (batch.length === 0) break
for (const blob of batch) {
const objectName = blob._id
if (objectName.includes('%preview%') || objectName.includes('%size%') || objectName.endsWith('#history')) {
skippedCnt++
continue
}
await rateLimiter.add(async () => {
try {
await retryOnFailure(
ctx,
5,
async () => {
await copyBlobToDatalake(ctx, workspaceId, blob, config, adapter, datalake)
processedCnt += 1
},
50
)
} catch (err) {
console.error('failed to process blob', objectName, err)
}
})
}
await rateLimiter.waitProcessing()
printStats()
}
await rateLimiter.waitProcessing()
printStats()
} finally {
await iterator.close()
}
}
export async function copyBlobToDatalake (
ctx: MeasureContext,
workspaceId: WorkspaceId,
blob: ListBlobResult,
config: S3Config,
adapter: S3Service,
datalake: DatalakeClient
): Promise<void> {
const objectName = blob._id
const stat = await datalake.statObject(ctx, workspaceId, objectName)
if (stat !== undefined) {
return
}
if (blob.size < 1024 * 1024 * 64) {
// Handle small file
const { endpoint, accessKey: accessKeyId, secretKey: secretAccessKey, region } = config
const bucketId = adapter.getBucketId(workspaceId)
const objectId = adapter.getDocumentKey(workspaceId, encodeURIComponent(objectName))
const url = concatLink(endpoint, `${bucketId}/${objectId}`)
const params = { url, accessKeyId, secretAccessKey, region }
await datalake.uploadFromS3(ctx, workspaceId, objectName, params)
} else {
// Handle huge file
const stat = await adapter.stat(ctx, workspaceId, objectName)
if (stat !== undefined) {
const metadata = {
lastModified: stat.modifiedOn,
name: objectName,
type: stat.contentType,
size: stat.size
}
const readable = await adapter.get(ctx, workspaceId, objectName)
try {
readable.on('end', () => {
readable.destroy()
})
console.log('uploading huge blob', objectName, Math.round(stat.size / 1024 / 1024), 'MB')
const stream = readable.pipe(new PassThrough())
await datalake.uploadMultipart(ctx, workspaceId, objectName, stream, metadata)
console.log('done', objectName)
} finally {
readable.destroy()
}
}
}
}

View File

@ -423,20 +423,6 @@ export const coreOperation: MigrateOperation = {
await client.update(DOMAIN_SPACE, { '%hash%': { $exists: true } }, { $set: { '%hash%': null } })
}
},
{
state: 'remove-github-patches',
func: async (client) => {
await client.update(
DOMAIN_TX,
{
objectClass: 'tracker:class:Issue',
collection: 'pullRequests',
'tx.attributes.patch': { $exists: true }
},
{ $unset: { 'tx.attributes.patch': 1 } }
)
}
},
{
state: 'remove-collection-txes',
func: async (client) => {

View File

@ -258,8 +258,8 @@ async function uploadFileWithSignedUrl (file: File, uuid: string, uploadUrl: str
method: 'PUT',
headers: {
'Content-Type': file.type,
'Content-Length': file.size.toString(),
'x-amz-meta-last-modified': file.lastModified.toString()
'Content-Length': file.size.toString()
// 'x-amz-meta-last-modified': file.lastModified.toString()
}
})

View File

@ -15,7 +15,7 @@
import { type MeasureContext, type WorkspaceId, concatLink } from '@hcengineering/core'
import FormData from 'form-data'
import fetch, { type RequestInit, type Response } from 'node-fetch'
import fetch, { type RequestInfo, type RequestInit, type Response } from 'node-fetch'
import { Readable } from 'stream'
import { DatalakeError, NetworkError, NotFoundError } from './error'
@ -49,8 +49,18 @@ interface BlobUploadSuccess {
type BlobUploadResult = BlobUploadSuccess | BlobUploadError
interface MultipartUpload {
key: string
uploadId: string
}
interface MultipartUploadPart {
partNumber: number
etag: string
}
/** @public */
export class Client {
export class DatalakeClient {
constructor (private readonly endpoint: string) {}
getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string {
@ -186,7 +196,7 @@ export class Client {
}
}
private async uploadWithFormData (
async uploadWithFormData (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
@ -221,7 +231,35 @@ export class Client {
}
}
private async uploadWithSignedURL (
async uploadMultipart (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
const chunkSize = 10 * 1024 * 1024
const multipart = await this.multipartUploadStart(ctx, workspace, objectName, metadata)
try {
const parts: MultipartUploadPart[] = []
let partNumber = 1
for await (const chunk of getChunks(stream, chunkSize)) {
const part = await this.multipartUploadPart(ctx, workspace, objectName, multipart, partNumber, chunk)
parts.push(part)
partNumber++
}
await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts)
} catch (err: any) {
await this.multipartUploadAbort(ctx, workspace, objectName, multipart)
throw err
}
}
async uploadWithSignedURL (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
@ -236,8 +274,8 @@ export class Client {
method: 'PUT',
headers: {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0',
'x-amz-meta-last-modified': metadata.lastModified.toString()
'Content-Length': metadata.size?.toString() ?? '0'
// 'x-amz-meta-last-modified': metadata.lastModified.toString()
}
})
} catch (err) {
@ -249,6 +287,30 @@ export class Client {
await this.signObjectComplete(ctx, workspace, objectName)
}
async uploadFromS3 (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
params: {
url: string
accessKeyId: string
secretAccessKey: string
}
): Promise<void> {
const path = `/upload/s3/${workspace.name}/${encodeURIComponent(objectName)}`
const url = concatLink(this.endpoint, path)
await fetchSafe(ctx, url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(params)
})
}
// Signed URL
private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {
try {
const url = this.getSignObjectUrl(workspace, objectName)
@ -284,9 +346,123 @@ export class Client {
const path = `/upload/signed-url/${workspace.name}/${encodeURIComponent(objectName)}`
return concatLink(this.endpoint, path)
}
// Multipart
private async multipartUploadStart (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
metadata: ObjectMetadata
): Promise<MultipartUpload> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}`
const url = concatLink(this.endpoint, path)
try {
const headers = {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0',
'Last-Modified': new Date(metadata.lastModified).toUTCString()
}
const response = await fetchSafe(ctx, url, { method: 'POST', headers })
return (await response.json()) as MultipartUpload
} catch (err: any) {
ctx.error('failed to start multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to start multipart upload')
}
}
private async multipartUploadPart (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
multipart: MultipartUpload,
partNumber: number,
body: Readable | Buffer | string
): Promise<MultipartUploadPart> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/part`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('key', multipart.key)
url.searchParams.append('uploadId', multipart.uploadId)
url.searchParams.append('partNumber', partNumber.toString())
try {
const response = await fetchSafe(ctx, url, { method: 'POST', body })
return (await response.json()) as MultipartUploadPart
} catch (err: any) {
ctx.error('failed to abort multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to abort multipart upload')
}
}
private async multipartUploadComplete (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
multipart: MultipartUpload,
parts: MultipartUploadPart[]
): Promise<void> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/complete`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('key', multipart.key)
url.searchParams.append('uploadId', multipart.uploadId)
try {
await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) })
} catch (err: any) {
ctx.error('failed to complete multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to complete multipart upload')
}
}
private async multipartUploadAbort (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
multipart: MultipartUpload
): Promise<void> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/abort`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('key', multipart.key)
url.searchParams.append('uploadId', multipart.uploadId)
try {
await fetchSafe(ctx, url, { method: 'POST' })
} catch (err: any) {
ctx.error('failed to abort multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to abort multipart upload')
}
}
}
async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit): Promise<Response> {
async function * getChunks (data: Buffer | string | Readable, chunkSize: number): AsyncGenerator<Buffer> {
if (Buffer.isBuffer(data)) {
let offset = 0
while (offset < data.length) {
yield data.subarray(offset, offset + chunkSize)
offset += chunkSize
}
} else if (typeof data === 'string') {
const buffer = Buffer.from(data)
yield * getChunks(buffer, chunkSize)
} else if (data instanceof Readable) {
let buffer = Buffer.alloc(0)
for await (const chunk of data) {
buffer = Buffer.concat([buffer, chunk])
while (buffer.length >= chunkSize) {
yield buffer.subarray(0, chunkSize)
buffer = buffer.subarray(chunkSize)
}
}
if (buffer.length > 0) {
yield buffer
}
}
}
async function fetchSafe (ctx: MeasureContext, url: RequestInfo, init?: RequestInit): Promise<Response> {
let response
try {
response = await fetch(url, init)

View File

@ -24,22 +24,34 @@ import {
type UploadedObjectInfo
} from '@hcengineering/server-core'
import { type Readable } from 'stream'
import { type ObjectMetadata, Client } from './client'
import { type ObjectMetadata, DatalakeClient } from './client'
export { DatalakeClient }
/**
* @public
*/
export interface DatalakeConfig extends StorageConfig {
kind: 'datalake'
}
/**
* @public
*/
export function createDatalakeClient (opt: DatalakeConfig): DatalakeClient {
const endpoint = Number.isInteger(opt.port) ? `${opt.endpoint}:${opt.port}` : opt.endpoint
return new DatalakeClient(endpoint)
}
/**
* @public
*/
export class DatalakeService implements StorageAdapter {
static config = 'datalake'
private readonly client: Client
private readonly client: DatalakeClient
constructor (readonly opt: DatalakeConfig) {
const endpoint = Number.isInteger(opt.port) ? `${opt.endpoint}:${opt.port}` : opt.endpoint
this.client = new Client(endpoint)
this.client = createDatalakeClient(opt)
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}

View File

@ -237,6 +237,13 @@ abstract class MongoAdapterBase implements DbAdapter {
cursor = cursor.sort(sort)
}
}
if (options?.projection !== undefined) {
const projection: Projection<T> = {}
for (const key in options.projection ?? []) {
projection[key] = options.projection[key]
}
cursor = cursor.project(projection)
}
return await cursor.toArray()
}

View File

@ -214,6 +214,7 @@ const eventSchema: Schema = {
export function addSchema (domain: string, schema: Schema): void {
domainSchemas[translateDomain(domain)] = schema
domainSchemaFields.set(domain, createSchemaFields(schema))
}
export function translateDomain (domain: string): string {

View File

@ -393,7 +393,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
async rawFindAll<T extends Doc>(_domain: Domain, query: DocumentQuery<T>, options?: FindOptions<T>): Promise<T[]> {
const domain = translateDomain(_domain)
const select = `SELECT * FROM ${domain}`
const select = `SELECT ${this.getProjection(domain, options?.projection, [])} FROM ${domain}`
const sqlChunks: string[] = []
sqlChunks.push(`WHERE ${this.buildRawQuery(domain, query, options)}`)
if (options?.sort !== undefined) {
@ -564,7 +564,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
toClass: undefined
})
}
const select = `SELECT ${this.getProjection(_class, domain, options?.projection, joins)} FROM ${domain}`
const select = `SELECT ${this.getProjection(domain, options?.projection, joins)} FROM ${domain}`
const secJoin = this.addSecurity(query, domain, ctx.contextData)
if (secJoin !== undefined) {
sqlChunks.push(secJoin)
@ -1232,7 +1232,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
private getProjection<T extends Doc>(
_class: Ref<Class<T>>,
baseDomain: string,
projection: Projection<T> | undefined,
joins: JoinProps[]

View File

@ -16,17 +16,13 @@
import { error, json } from 'itty-router'
import postgres from 'postgres'
import * as db from './db'
import { cacheControl, hashLimit } from './const'
import { toUUID } from './encodings'
import { getSha256 } from './hash'
import { selectStorage } from './storage'
import { type BlobRequest, type WorkspaceRequest, type UUID } from './types'
import { copyVideo, deleteVideo } from './video'
const expires = 86400
const cacheControl = `public,max-age=${expires}`
// 1MB hash limit
const HASH_LIMIT = 1 * 1024 * 1024
interface BlobMetadata {
lastModified: number
type: string
@ -143,7 +139,7 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env)
files.map(async ([file, key]) => {
const { name, type, lastModified } = file
try {
const metadata = await saveBlob(env, sql, file, type, workspace, name, lastModified)
const metadata = await saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified)
// TODO this probably should happen via queue, let it be here for now
if (type.startsWith('video/')) {
@ -163,10 +159,11 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env)
return json(result)
}
async function saveBlob (
export async function saveBlob (
env: Env,
sql: postgres.Sql,
file: File,
stream: ReadableStream,
size: number,
type: string,
workspace: string,
name: string,
@ -174,18 +171,19 @@ async function saveBlob (
): Promise<BlobMetadata> {
const { location, bucket } = selectStorage(env, workspace)
const size = file.size
const httpMetadata = { contentType: type, cacheControl }
const httpMetadata = { contentType: type, cacheControl, lastModified }
const filename = getUniqueFilename()
if (file.size <= HASH_LIMIT) {
const hash = await getSha256(file)
if (size <= hashLimit) {
const [hashStream, uploadStream] = stream.tee()
const hash = await getSha256(hashStream)
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// Lucky boy, nothing to upload, use existing blob
await db.createBlob(sql, { workspace, name, hash, location })
} else {
await bucket.put(filename, file, { httpMetadata })
await bucket.put(filename, uploadStream, { httpMetadata })
await sql.begin((sql) => [
db.createData(sql, { hash, location, filename, type, size }),
db.createBlob(sql, { workspace, name, hash, location })
@ -196,7 +194,7 @@ async function saveBlob (
} else {
// For large files we cannot calculate checksum beforehead
// upload file with unique filename and then obtain checksum
const { hash } = await uploadLargeFile(bucket, file, filename, { httpMetadata })
const { hash } = await uploadLargeFile(bucket, stream, filename, { httpMetadata })
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// We found an existing blob with the same hash
@ -239,14 +237,13 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
async function uploadLargeFile (
bucket: R2Bucket,
file: File,
stream: ReadableStream,
filename: string,
options: R2PutOptions
): Promise<{ hash: UUID }> {
const digestStream = new crypto.DigestStream('SHA-256')
const fileStream = file.stream()
const [digestFS, uploadFS] = fileStream.tee()
const [digestFS, uploadFS] = stream.tee()
const digestPromise = digestFS.pipeTo(digestStream)
const uploadPromise = bucket.put(filename, uploadFS, options)
@ -262,14 +259,6 @@ function getUniqueFilename (): UUID {
return crypto.randomUUID() as UUID
}
async function getSha256 (file: File): Promise<UUID> {
const digestStream = new crypto.DigestStream('SHA-256')
await file.stream().pipeTo(digestStream)
const digest = await digestStream.digest
return digestToUUID(digest)
}
function digestToUUID (digest: ArrayBuffer): UUID {
return toUUID(new Uint8Array(digest))
}

View File

@ -0,0 +1,19 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
export const expires = 86400
export const cacheControl = `public,max-age=${expires}`
export const hashLimit = 1 * 1024 * 1024

View File

@ -0,0 +1,25 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { toUUID } from './encodings'
import { type UUID } from './types'
export async function getSha256 (stream: ReadableStream): Promise<UUID> {
const digestStream = new crypto.DigestStream('SHA-256')
await stream.pipeTo(digestStream)
const digest = await digestStream.digest
return toUUID(new Uint8Array(digest))
}

View File

@ -19,8 +19,15 @@ import { type IRequestStrict, type RequestHandler, Router, error, html } from 'i
import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob'
import { cors } from './cors'
import { handleImageGet } from './image'
import { handleS3Blob } from './s3'
import { handleVideoMetaGet } from './video'
import { handleSignAbort, handleSignComplete, handleSignCreate } from './sign'
import {
handleMultipartUploadStart,
handleMultipartUploadPart,
handleMultipartUploadComplete,
handleMultipartUploadAbort
} from './multipart'
import { type BlobRequest, type WorkspaceRequest } from './types'
const { preflight, corsify } = cors({
@ -64,6 +71,13 @@ router
.post('/upload/signed-url/:workspace/:name', withBlob, handleSignCreate)
.put('/upload/signed-url/:workspace/:name', withBlob, handleSignComplete)
.delete('/upload/signed-url/:workspace/:name', withBlob, handleSignAbort)
// Multipart
.post('/upload/multipart/:workspace/:name', withBlob, handleMultipartUploadStart)
.post('/upload/multipart/:workspace/:name/part', withBlob, handleMultipartUploadPart)
.post('/upload/multipart/:workspace/:name/complete', withBlob, handleMultipartUploadComplete)
.post('/upload/multipart/:workspace/:name/abort', withBlob, handleMultipartUploadAbort)
// S3
.post('/upload/s3/:workspace/:name', withBlob, handleS3Blob)
.all('/', () =>
html(
`Huly&reg; Datalake&trade; <a href="https://huly.io">https://huly.io</a>

View File

@ -0,0 +1,144 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { error, json } from 'itty-router'
import postgres from 'postgres'
import * as db from './db'
import { cacheControl } from './const'
import { toUUID } from './encodings'
import { selectStorage } from './storage'
import { type BlobRequest, type UUID } from './types'
export interface MultipartUpload {
key: string
uploadId: string
}
export interface MultipartUploadPart {
partNumber: number
etag: string
}
export interface MultipartUploadCompleteRequest {
parts: MultipartUploadPart[]
}
export async function handleMultipartUploadStart (
request: BlobRequest,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
const { workspace } = request
const { bucket } = selectStorage(env, workspace)
const contentType = request.headers.get('content-type') ?? 'application/octet-stream'
const lastModifiedHeader = request.headers.get('last-modified')
const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now()
const httpMetadata = { contentType, cacheControl, lastModified }
const uuid = crypto.randomUUID() as UUID
const multipart = await bucket.createMultipartUpload(uuid, { httpMetadata })
return json({ key: multipart.key, uploadId: multipart.uploadId })
}
export async function handleMultipartUploadPart (
request: BlobRequest,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
const { workspace } = request
const multipartKey = request.query?.key
const multipartUploadId = request.query?.uploadId
const partNumber = request.query?.partNumber
if (typeof multipartKey !== 'string' || typeof multipartUploadId !== 'string' || typeof partNumber !== 'string') {
return error(400, 'missing key or uploadId or partNumber')
}
if (request.body === null) {
return error(400, 'missing body')
}
const { bucket } = selectStorage(env, workspace)
const upload = bucket.resumeMultipartUpload(multipartKey, multipartUploadId)
const part = await upload.uploadPart(Number.parseInt(partNumber), request.body)
return json({ partNumber: part.partNumber, etag: part.etag })
}
export async function handleMultipartUploadComplete (
request: BlobRequest,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString)
const { workspace, name } = request
const multipartKey = request.query?.key
const multipartUploadId = request.query?.uploadId
if (typeof multipartKey !== 'string' || typeof multipartUploadId !== 'string') {
return error(400, 'missing key or uploadId')
}
const { parts } = await request.json<MultipartUploadCompleteRequest>()
const { bucket, location } = selectStorage(env, workspace)
const upload = bucket.resumeMultipartUpload(multipartKey, multipartUploadId)
const object = await upload.complete(parts)
const hash =
object.checksums.md5 !== undefined ? toUUID(new Uint8Array(object.checksums.md5)) : (crypto.randomUUID() as UUID)
const type = object.httpMetadata?.contentType ?? 'application/octet-stream'
const size = object.size ?? 0
const filename = multipartKey as UUID
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// blob already exists
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
} else {
// Otherwise register a new hash and blob
await sql.begin((sql) => [
db.createData(sql, { hash, location, filename, type, size }),
db.createBlob(sql, { workspace, name, hash, location })
])
}
return new Response(null, { status: 204 })
}
export async function handleMultipartUploadAbort (
request: BlobRequest,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
const { workspace } = request
const multipartKey = request.query?.key
const multipartUploadId = request.query?.uploadId
if (typeof multipartKey !== 'string' || typeof multipartUploadId !== 'string') {
return error(400, 'missing key or uploadId')
}
const { bucket } = selectStorage(env, workspace)
const upload = bucket.resumeMultipartUpload(multipartKey, multipartUploadId)
await upload.abort()
return new Response(null, { status: 204 })
}

View File

@ -0,0 +1,71 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { AwsClient } from 'aws4fetch'
import { error, json } from 'itty-router'
import postgres from 'postgres'
import * as db from './db'
import { saveBlob } from './blob'
import { type BlobRequest } from './types'
export interface S3UploadPayload {
url: string
region: string
accessKeyId: string
secretAccessKey: string
}
function getS3Client (payload: S3UploadPayload): AwsClient {
return new AwsClient({
service: 's3',
region: payload.region,
accessKeyId: payload.accessKeyId,
secretAccessKey: payload.secretAccessKey
})
}
export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
const { workspace, name } = request
const sql = postgres(env.HYPERDRIVE.connectionString)
const payload = await request.json<S3UploadPayload>()
const client = getS3Client(payload)
// Ensure the blob does not exist
const blob = await db.getBlob(sql, { workspace, name })
if (blob !== null) {
return new Response(null, { status: 200 })
}
const object = await client.fetch(payload.url)
if (!object.ok || object.status !== 200) {
return error(object.status)
}
if (object.body === null) {
return error(400)
}
const contentType = object.headers.get('content-type') ?? 'application/octet-stream'
const contentLengthHeader = object.headers.get('content-length') ?? '0'
const lastModifiedHeader = object.headers.get('last-modified')
const contentLength = Number.parseInt(contentLengthHeader)
const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now()
const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified)
return json(result)
}