UBERF-11432 Use tx queue for blobs events (#9175)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / uitest-workspaces (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2025-06-05 13:33:09 +07:00 committed by GitHub
parent 27a35e2c93
commit 852137c213
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 88 additions and 75 deletions

View File

@ -535,9 +535,9 @@ services:
- SECRET=secret
- ACCOUNTS_URL=http://huly.local:3000
- STATS_URL=http://huly.local:4900
- STREAM_URL=http://huly.local:1080/transcoding
- DB_URL=${DB_CR_URL}
- BUCKETS=blobs,eu|http://minio:9000?accessKey=minioadmin&secretKey=minioadmin
- QUEUE_CONFIG=${QUEUE_CONFIG}
restart: unless-stopped
hulykvs:
image: hardcoreeng/hulykvs

View File

@ -63,6 +63,7 @@
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/core": "^0.6.32",
"@hcengineering/kafka": "^0.6.0",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/account-client": "^0.6.0",
"cors": "^2.8.5",

View File

@ -26,7 +26,6 @@ export interface Config {
Port: number
Secret: string
AccountsUrl: string
StreamUrl?: string
DbUrl: string
Buckets: BucketConfig[]
CleanupInterval: number
@ -78,7 +77,6 @@ const config: Config = (() => {
Secret: process.env.SECRET,
AccountsUrl: process.env.ACCOUNTS_URL,
DbUrl: process.env.DB_URL,
StreamUrl: process.env.STREAM_URL,
Buckets: parseBucketsConfig(process.env.BUCKETS)
}

View File

@ -13,19 +13,21 @@
// limitations under the License.
//
import { type MeasureContext } from '@hcengineering/core'
import { type MeasureContext, type Tx } from '@hcengineering/core'
import { PlatformQueueProducer } from '@hcengineering/server-core'
import { Readable } from 'stream'
import { type BlobDB } from './db'
import { digestToUUID, stringToUUID } from './encodings'
import { type BlobHead, type BlobBody, type BlobList, type BlobStorage, type Datalake, type Location } from './types'
import { requestHLS } from '../handlers/video'
import { type S3Bucket } from '../s3'
import { blobEvents } from './queue'
export class DatalakeImpl implements Datalake {
constructor (
private readonly db: BlobDB,
private readonly buckets: Array<{ location: Location, bucket: S3Bucket }>,
private readonly producer: PlatformQueueProducer<Tx>,
private readonly options: {
cacheControl: string
}
@ -108,6 +110,13 @@ export class DatalakeImpl implements Datalake {
} else {
await this.db.deleteBlob(ctx, { workspace, name })
}
try {
const events = Array.isArray(name) ? name.map((n) => blobEvents.deleted(n)) : [blobEvents.deleted(name)]
await this.producer.send(workspace, events)
} catch (err) {
ctx.error('failed to send blob deleted event', { err })
}
}
async put (
@ -138,6 +147,14 @@ export class DatalakeImpl implements Datalake {
if (data !== null) {
// Lucky boy, nothing to upload, use existing blob
await this.db.createBlob(ctx, { workspace, name, hash, location })
try {
const event = blobEvents.created(name, { contentType, lastModified, size, etag: hash })
await this.producer.send(workspace, [event])
} catch (err) {
ctx.error('failed to send blob created event', { err })
}
return { name, size, contentType, lastModified, etag: hash }
} else {
const putOptions = {
@ -148,9 +165,14 @@ export class DatalakeImpl implements Datalake {
}
await bucket.put(ctx, filename, body, putOptions)
await this.db.createBlobData(ctx, { workspace, name, hash, location, filename, size, type: contentType })
if (contentType.startsWith('video/')) {
void requestHLS(ctx, workspace, name)
try {
const event = blobEvents.created(name, { contentType, lastModified, size, etag: hash })
await this.producer.send(workspace, [event])
} catch (err) {
ctx.error('failed to send blob created event', { err })
}
return { name, size, contentType, lastModified, etag: hash }
}
}
@ -175,10 +197,6 @@ export class DatalakeImpl implements Datalake {
await this.db.createBlobData(ctx, { workspace, name, hash, location, filename, size, type: contentType })
}
if (contentType.startsWith('video/')) {
void requestHLS(ctx, workspace, name)
}
return { name, size, contentType, lastModified, etag: hash }
}

View File

@ -0,0 +1,46 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import core, { TxFactory, type Blob, type Ref, type Tx } from '@hcengineering/core'
const factory = new TxFactory(core.account.System)
export const blobEvents = {
created: function blobCreated (
name: string,
data: {
size: number
etag: string
contentType: string
lastModified: number
}
): Tx {
const { lastModified, ...rest } = data
return factory.createTxCreateDoc(
core.class.Blob,
core.space.Configuration,
{
provider: 'datalake',
version: '',
...rest
},
name as Ref<Blob>,
lastModified
)
},
deleted: function blobDeleted (name: string): Tx {
return factory.createTxRemoveDoc(core.class.Blob, core.space.Configuration, name as Ref<Blob>)
}
}

View File

@ -1,59 +0,0 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { Analytics } from '@hcengineering/analytics'
import { systemAccountUuid, MeasureContext } from '@hcengineering/core'
import { generateToken } from '@hcengineering/server-token'
import config from '../config'
interface StreamRequest {
source: string
format: string
workspace: string
metadata?: Record<string, string>
}
export async function requestHLS (ctx: MeasureContext, workspace: string, name: string): Promise<void> {
try {
ctx.info('request for hls', { workspace, name })
await postTranscodingTask(ctx, workspace, name)
} catch (err: any) {
Analytics.handleError(err)
ctx.error('can not schedule a task', { err })
}
}
async function postTranscodingTask (ctx: MeasureContext, workspace: string, name: string): Promise<void> {
if (config.StreamUrl === undefined) {
return
}
const streamReq: StreamRequest = { format: 'hls', source: name, workspace }
const token = generateToken(systemAccountUuid, undefined, { iss: 'datalake', aud: 'stream' })
const request = new Request(config.StreamUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(streamReq)
})
const resp = await fetch(request)
if (!resp.ok) {
ctx.error(resp.statusText)
}
}

View File

@ -16,6 +16,7 @@
import { Analytics } from '@hcengineering/analytics'
import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service'
import { MeasureMetricsContext, newMetrics } from '@hcengineering/core'
import { getPlatformQueue } from '@hcengineering/kafka'
import { setMetadata } from '@hcengineering/platform'
import { initStatisticsContext } from '@hcengineering/server-core'
import serverToken from '@hcengineering/server-token'
@ -48,7 +49,9 @@ export const main = async (): Promise<void> => {
)
})
const { app, close } = await createServer(metricsContext, config)
const queue = getPlatformQueue('datalake')
const { app, close } = await createServer(metricsContext, queue, config)
const server = listen(app, config.Port)
const shutdown = (): void => {

View File

@ -14,8 +14,8 @@
//
import { Analytics } from '@hcengineering/analytics'
import { MeasureContext, metricsAggregate } from '@hcengineering/core'
import { getCPUInfo, getMemoryInfo } from '@hcengineering/server-core'
import { MeasureContext, Tx, metricsAggregate } from '@hcengineering/core'
import { PlatformQueue, QueueTopic, getCPUInfo, getMemoryInfo } from '@hcengineering/server-core'
import { decodeToken, TokenError } from '@hcengineering/server-token'
import cors from 'cors'
@ -97,7 +97,11 @@ const handleRequest = async (
}
}
export async function createServer (ctx: MeasureContext, config: Config): Promise<{ app: Express, close: () => void }> {
export async function createServer (
ctx: MeasureContext,
queue: PlatformQueue,
config: Config
): Promise<{ app: Express, close: () => void }> {
const buckets: Array<{ location: Location, bucket: S3Bucket }> = []
for (const bucket of config.Buckets) {
const location = bucket.location as Location
@ -115,8 +119,10 @@ export async function createServer (ctx: MeasureContext, config: Config): Promis
}
}
const producer = queue.getProducer<Tx>(ctx.newChild('queue', {}), QueueTopic.Tx)
const db = await createDb(ctx, config.DbUrl)
const datalake = new DatalakeImpl(db, buckets, { cacheControl })
const datalake = new DatalakeImpl(db, buckets, producer, { cacheControl })
const tempDir = new TemporaryDir(ctx, 'datalake-', config.CleanupInterval)
const app = express()