mirror of
https://github.com/hcengineering/platform.git
synced 2025-05-16 20:40:36 +00:00
fix: storage adapter throw error on stat (#8893)
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
3295adee57
commit
94f8b55908
@ -1943,8 +1943,8 @@ importers:
|
||||
specifier: ^2.2.0
|
||||
version: 2.8.0(webpack@5.97.1)
|
||||
minio:
|
||||
specifier: ^8.0.0
|
||||
version: 8.0.0
|
||||
specifier: ^8.0.5
|
||||
version: 8.0.5
|
||||
mongodb:
|
||||
specifier: ^6.12.0
|
||||
version: 6.12.0(gcp-metadata@5.3.0(encoding@0.1.13))(snappy@7.2.2)(socks@2.8.3)
|
||||
@ -4404,7 +4404,7 @@ packages:
|
||||
version: 0.0.0
|
||||
|
||||
'@rush-temp/minio@file:projects/minio.tgz':
|
||||
resolution: {integrity: sha512-3yNUVfYUNDuDMcvX19iY2li2h8zCTFOkSevtKXox8JtVhaBkuZuRUKXlQ7VxdOpSCdXodgzbLbzqqeLD7Ag29Q==, tarball: file:projects/minio.tgz}
|
||||
resolution: {integrity: sha512-yhBypcBYWAGiAuROzVI+dfzlZ9e/+Qv12QV62+b9OwZ5MfqWvrxXIjfNA0gflsOdl82ZqGq4ETATs3kN1gMKAg==, tarball: file:projects/minio.tgz}
|
||||
version: 0.0.0
|
||||
|
||||
'@rush-temp/model-achievement@file:projects/model-achievement.tgz':
|
||||
@ -9000,10 +9000,6 @@ packages:
|
||||
fast-uri@3.0.1:
|
||||
resolution: {integrity: sha512-MWipKbbYiYI0UC7cl8m/i/IWTqfC8YXsqjzybjddLsFjStroQzsHXkc73JutMvBiXmOvapk+axIl79ig5t55Bw==}
|
||||
|
||||
fast-xml-parser@4.3.4:
|
||||
resolution: {integrity: sha512-utnwm92SyozgA3hhH2I8qldf2lBqm6qHOICawRNRFu1qMe3+oqr+GcXjGqTmXTMGE5T4eC03kr/rlh5C1IRdZA==}
|
||||
hasBin: true
|
||||
|
||||
fast-xml-parser@4.4.1:
|
||||
resolution: {integrity: sha512-xkjOecfnKGkSsOwtZ5Pz7Us/T6mrbPQrq0nh+aCO5V9nk5NLWmasAHumTKjiPJPWANe+kAZ84Jc8ooJkzZ88Sw==}
|
||||
hasBin: true
|
||||
@ -10856,8 +10852,8 @@ packages:
|
||||
minimist@1.2.8:
|
||||
resolution: {integrity: sha512-2yyAR8qBkN3YuheJanUpWC5U3bb5osDywNB8RzDVlDwDHbocAJveqqj1u8+SVD7jkWT4yvsHCpWqqWqAxb0zCA==}
|
||||
|
||||
minio@8.0.0:
|
||||
resolution: {integrity: sha512-GkM/lk+Gzwd4fAQvLlB+cy3NV3PRADe0tNXnH9JD5BmdAHKIp+5vypptbjdkU85xWBIQsa2xK35GpXjmYXBBYA==}
|
||||
minio@8.0.5:
|
||||
resolution: {integrity: sha512-/vAze1uyrK2R/DSkVutE4cjVoAowvIQ18RAwn7HrqnLecLlMazFnY0oNBqfuoAWvu7mZIGX75AzpuV05TJeoHg==}
|
||||
engines: {node: ^16 || ^18 || >=20}
|
||||
|
||||
minipass-collect@1.0.2:
|
||||
@ -20088,7 +20084,6 @@ snapshots:
|
||||
'@rush-temp/minio@file:projects/minio.tgz(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(esbuild@0.24.2)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3))':
|
||||
dependencies:
|
||||
'@types/jest': 29.5.12
|
||||
'@types/minio': 7.0.18
|
||||
'@types/node': 20.11.19
|
||||
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.3.3))(eslint@8.56.0)(typescript@5.3.3)
|
||||
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
|
||||
@ -20098,7 +20093,7 @@ snapshots:
|
||||
eslint-plugin-n: 15.7.0(eslint@8.56.0)
|
||||
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
|
||||
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3))
|
||||
minio: 8.0.0
|
||||
minio: 8.0.5
|
||||
prettier: 3.2.5
|
||||
ts-jest: 29.1.2(@babel/core@7.23.9)(@jest/types@29.6.3)(babel-jest@29.7.0(@babel/core@7.23.9))(esbuild@0.24.2)(jest@29.7.0(@types/node@20.11.19)(ts-node@10.9.2(@types/node@20.11.19)(typescript@5.3.3)))(typescript@5.3.3)
|
||||
typescript: 5.3.3
|
||||
@ -32218,10 +32213,6 @@ snapshots:
|
||||
|
||||
fast-uri@3.0.1: {}
|
||||
|
||||
fast-xml-parser@4.3.4:
|
||||
dependencies:
|
||||
strnum: 1.0.5
|
||||
|
||||
fast-xml-parser@4.4.1:
|
||||
dependencies:
|
||||
strnum: 1.0.5
|
||||
@ -33236,7 +33227,7 @@ snapshots:
|
||||
|
||||
is-typed-array@1.1.13:
|
||||
dependencies:
|
||||
which-typed-array: 1.1.14
|
||||
which-typed-array: 1.1.15
|
||||
|
||||
is-typedarray@1.0.0: {}
|
||||
|
||||
@ -34468,14 +34459,14 @@ snapshots:
|
||||
|
||||
minimist@1.2.8: {}
|
||||
|
||||
minio@8.0.0:
|
||||
minio@8.0.5:
|
||||
dependencies:
|
||||
async: 3.2.5
|
||||
block-stream2: 2.1.0
|
||||
browser-or-node: 2.1.1
|
||||
buffer-crc32: 1.0.0
|
||||
eventemitter3: 5.0.1
|
||||
fast-xml-parser: 4.3.4
|
||||
fast-xml-parser: 4.4.1
|
||||
ipaddr.js: 2.1.0
|
||||
lodash: 4.17.21
|
||||
mime-types: 2.1.35
|
||||
@ -37372,7 +37363,7 @@ snapshots:
|
||||
is-arguments: 1.1.1
|
||||
is-generator-function: 1.0.10
|
||||
is-typed-array: 1.1.13
|
||||
which-typed-array: 1.1.14
|
||||
which-typed-array: 1.1.15
|
||||
|
||||
utila@0.4.0: {}
|
||||
|
||||
@ -37665,7 +37656,7 @@ snapshots:
|
||||
isarray: 2.0.5
|
||||
which-boxed-primitive: 1.0.2
|
||||
which-collection: 1.0.1
|
||||
which-typed-array: 1.1.14
|
||||
which-typed-array: 1.1.15
|
||||
|
||||
which-collection@1.0.1:
|
||||
dependencies:
|
||||
|
@ -1529,16 +1529,20 @@ async function rebuildSizeInfo (
|
||||
|
||||
const addFileSize = async (file: string | undefined | null): Promise<void> => {
|
||||
if (file != null) {
|
||||
const sz = sizeInfo[file]
|
||||
const fileSize = sz ?? (await storage.stat(file))
|
||||
if (sz === undefined) {
|
||||
sizeInfo[file] = fileSize
|
||||
processed++
|
||||
if (processed % 10 === 0) {
|
||||
ctx.info('Calculate size processed', { processed, size: Math.round(result.backupSize / (1024 * 1024)) })
|
||||
try {
|
||||
const sz = sizeInfo[file]
|
||||
const fileSize = sz ?? (await storage.stat(file))
|
||||
if (sz === undefined) {
|
||||
sizeInfo[file] = fileSize
|
||||
processed++
|
||||
if (processed % 10 === 0) {
|
||||
ctx.info('Calculate size processed', { processed, size: Math.round(result.backupSize / (1024 * 1024)) })
|
||||
}
|
||||
}
|
||||
result.backupSize += fileSize
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to calculate size', { file, err })
|
||||
}
|
||||
result.backupSize += fileSize
|
||||
}
|
||||
}
|
||||
|
||||
@ -1610,9 +1614,13 @@ export async function backupSize (storage: BackupStorage): Promise<void> {
|
||||
console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version)
|
||||
const addFileSize = async (file: string | undefined | null): Promise<void> => {
|
||||
if (file != null && (await storage.exists(file))) {
|
||||
const fileSize = await storage.stat(file)
|
||||
console.log(file, fileSize)
|
||||
size += fileSize
|
||||
try {
|
||||
const fileSize = await storage.stat(file)
|
||||
console.log(file, fileSize)
|
||||
size += fileSize
|
||||
} catch (err: any) {
|
||||
console.error('failed to calculate size', { file, err })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1665,20 +1673,24 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
|
||||
const serverSize: number | undefined = sizeInfo[file]
|
||||
|
||||
if (!existsSync(target) || force || (serverSize !== undefined && serverSize !== statSync(target).size)) {
|
||||
const fileSize = serverSize ?? (await storage.stat(file))
|
||||
console.log('downloading', file, fileSize)
|
||||
const readStream = await storage.load(file)
|
||||
const outp = createWriteStream(target)
|
||||
try {
|
||||
const fileSize = serverSize ?? (await storage.stat(file))
|
||||
console.log('downloading', file, fileSize)
|
||||
const readStream = await storage.load(file)
|
||||
const outp = createWriteStream(target)
|
||||
|
||||
readStream.pipe(outp)
|
||||
await new Promise<void>((resolve) => {
|
||||
readStream.on('end', () => {
|
||||
readStream.destroy()
|
||||
outp.close()
|
||||
resolve()
|
||||
readStream.pipe(outp)
|
||||
await new Promise<void>((resolve) => {
|
||||
readStream.on('end', () => {
|
||||
readStream.destroy()
|
||||
outp.close()
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
size += fileSize
|
||||
size += fileSize
|
||||
} catch (err: any) {
|
||||
console.error('failed to calculate size', { file, err })
|
||||
}
|
||||
} else {
|
||||
console.log('file-same', file)
|
||||
}
|
||||
|
@ -46,22 +46,37 @@ export interface DatalakeConfig extends StorageConfig {
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function createDatalakeClient (opt: DatalakeConfig, token: string): DatalakeClient {
|
||||
const endpoint = Number.isInteger(opt.port) ? `${opt.endpoint}:${opt.port}` : opt.endpoint
|
||||
export function createDatalakeClient (cfg: DatalakeConfig, token: string): DatalakeClient {
|
||||
const endpoint = Number.isInteger(cfg.port) ? `${cfg.endpoint}:${cfg.port}` : cfg.endpoint
|
||||
return new DatalakeClient(endpoint, token)
|
||||
}
|
||||
|
||||
export const CONFIG_KIND = 'datalake'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export interface DatalakeClientOptions {
|
||||
retryCount?: number
|
||||
retryInterval?: number
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export class DatalakeService implements StorageAdapter {
|
||||
private readonly client: DatalakeClient
|
||||
private readonly retryCount: number
|
||||
private readonly retryInterval: number
|
||||
|
||||
constructor (readonly opt: DatalakeConfig) {
|
||||
constructor (
|
||||
readonly cfg: DatalakeConfig,
|
||||
readonly options: DatalakeClientOptions = {}
|
||||
) {
|
||||
const token = generateToken(systemAccountUuid, undefined, { service: 'datalake' })
|
||||
this.client = createDatalakeClient(opt, token)
|
||||
this.client = createDatalakeClient(cfg, token)
|
||||
this.retryCount = options.retryCount ?? 5
|
||||
this.retryInterval = options.retryInterval ?? 50
|
||||
}
|
||||
|
||||
async initialize (ctx: MeasureContext, wsIds: WorkspaceIds): Promise<void> {}
|
||||
@ -86,7 +101,7 @@ export class DatalakeService implements StorageAdapter {
|
||||
async remove (ctx: MeasureContext, wsIds: WorkspaceIds, objectNames: string[]): Promise<void> {
|
||||
await Promise.all(
|
||||
objectNames.map(async (objectName) => {
|
||||
await this.client.deleteObject(ctx, wsIds.uuid, objectName)
|
||||
await this.retry(ctx, () => this.client.deleteObject(ctx, wsIds.uuid, objectName))
|
||||
})
|
||||
)
|
||||
}
|
||||
@ -106,7 +121,7 @@ export class DatalakeService implements StorageAdapter {
|
||||
next: async () => {
|
||||
try {
|
||||
while (hasMore && buffer.length < 50) {
|
||||
const res = await this.client.listObjects(ctx, wsIds.uuid, cursor)
|
||||
const res = await this.retry(ctx, () => this.client.listObjects(ctx, wsIds.uuid, cursor))
|
||||
hasMore = res.cursor !== undefined
|
||||
cursor = res.cursor
|
||||
|
||||
@ -116,7 +131,7 @@ export class DatalakeService implements StorageAdapter {
|
||||
_class: core.class.Blob,
|
||||
etag: blob.etag,
|
||||
size: (typeof blob.size === 'string' ? parseInt(blob.size) : blob.size) ?? 0,
|
||||
provider: this.opt.name,
|
||||
provider: this.cfg.name,
|
||||
space: core.space.Configuration,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: 0
|
||||
@ -134,32 +149,26 @@ export class DatalakeService implements StorageAdapter {
|
||||
|
||||
@withContext('stat')
|
||||
async stat (ctx: MeasureContext, wsIds: WorkspaceIds, objectName: string): Promise<Blob | undefined> {
|
||||
return await withRetry(ctx, 5, async () => {
|
||||
try {
|
||||
const result = await this.client.statObject(ctx, wsIds.uuid, objectName)
|
||||
if (result !== undefined) {
|
||||
return {
|
||||
provider: '',
|
||||
_class: core.class.Blob,
|
||||
_id: objectName as Ref<Blob>,
|
||||
contentType: result.type,
|
||||
size: result.size ?? 0,
|
||||
etag: result.etag ?? '',
|
||||
space: core.space.Configuration,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: result.lastModified,
|
||||
version: null
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
ctx.error('failed to stat object', { error: err, objectName, workspaceId: wsIds.uuid })
|
||||
const result = await this.retry(ctx, () => this.client.statObject(ctx, wsIds.uuid, objectName))
|
||||
if (result !== undefined) {
|
||||
return {
|
||||
provider: '',
|
||||
_class: core.class.Blob,
|
||||
_id: objectName as Ref<Blob>,
|
||||
contentType: result.type,
|
||||
size: result.size ?? 0,
|
||||
etag: result.etag ?? '',
|
||||
space: core.space.Configuration,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: result.lastModified,
|
||||
version: null
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@withContext('get')
|
||||
async get (ctx: MeasureContext, wsIds: WorkspaceIds, objectName: string): Promise<Readable> {
|
||||
return await this.client.getObject(ctx, wsIds.uuid, objectName)
|
||||
return await this.retry(ctx, () => this.client.getObject(ctx, wsIds.uuid, objectName))
|
||||
}
|
||||
|
||||
@withContext('put')
|
||||
@ -178,7 +187,7 @@ export class DatalakeService implements StorageAdapter {
|
||||
}
|
||||
|
||||
const { etag } = await ctx.with('put', {}, (ctx) =>
|
||||
withRetry(ctx, 5, () => this.client.putObject(ctx, wsIds.uuid, objectName, stream, params))
|
||||
this.retry(ctx, () => this.client.putObject(ctx, wsIds.uuid, objectName, stream, params))
|
||||
)
|
||||
|
||||
return {
|
||||
@ -189,7 +198,7 @@ export class DatalakeService implements StorageAdapter {
|
||||
|
||||
@withContext('read')
|
||||
async read (ctx: MeasureContext, wsIds: WorkspaceIds, objectName: string): Promise<Buffer[]> {
|
||||
const data = await this.client.getObject(ctx, wsIds.uuid, objectName)
|
||||
const data = await this.retry(ctx, () => this.client.getObject(ctx, wsIds.uuid, objectName))
|
||||
const chunks: Buffer[] = []
|
||||
|
||||
for await (const chunk of data) {
|
||||
@ -207,12 +216,16 @@ export class DatalakeService implements StorageAdapter {
|
||||
offset: number,
|
||||
length?: number
|
||||
): Promise<Readable> {
|
||||
return await this.client.getPartialObject(ctx, wsIds.uuid, objectName, offset, length)
|
||||
return await this.retry(ctx, () => this.client.getPartialObject(ctx, wsIds.uuid, objectName, offset, length))
|
||||
}
|
||||
|
||||
async getUrl (ctx: MeasureContext, wsIds: WorkspaceIds, objectName: string): Promise<string> {
|
||||
return this.client.getObjectUrl(ctx, wsIds.uuid, objectName)
|
||||
}
|
||||
|
||||
async retry<T>(ctx: MeasureContext, op: () => Promise<T>): Promise<T> {
|
||||
return await withRetry(ctx, this.retryCount, op, this.retryInterval)
|
||||
}
|
||||
}
|
||||
|
||||
export function processConfigFromEnv (storageConfig: StorageConfiguration): string | undefined {
|
||||
@ -244,7 +257,7 @@ async function withRetry<T> (
|
||||
} catch (err: any) {
|
||||
error = err
|
||||
ctx.error('error', { err })
|
||||
if (retries !== 0) {
|
||||
if (retries !== 0 && delay > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, delay))
|
||||
}
|
||||
}
|
||||
|
@ -31,13 +31,12 @@
|
||||
"@types/node": "~20.11.16",
|
||||
"jest": "^29.7.0",
|
||||
"ts-jest": "^29.1.1",
|
||||
"@types/jest": "^29.5.5",
|
||||
"@types/minio": "~7.0.11"
|
||||
"@types/jest": "^29.5.5"
|
||||
},
|
||||
"dependencies": {
|
||||
"@hcengineering/core": "^0.6.32",
|
||||
"@hcengineering/platform": "^0.6.11",
|
||||
"@hcengineering/server-core": "^0.6.1",
|
||||
"minio": "^8.0.0"
|
||||
"minio": "^8.0.5"
|
||||
}
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ export class MinioService implements StorageAdapter {
|
||||
if (!(await this.client.bucketExists(this.opt.rootBucket))) {
|
||||
return []
|
||||
}
|
||||
const stream = this.client.listObjects(this.opt.rootBucket, '', false)
|
||||
const stream = this.client.listObjectsV2(this.opt.rootBucket, '', false)
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
stream.on('end', () => {
|
||||
stream.destroy()
|
||||
@ -210,7 +210,7 @@ export class MinioService implements StorageAdapter {
|
||||
try {
|
||||
if (stream === undefined && !done) {
|
||||
const rprefix = rootPrefix ?? ''
|
||||
stream = this.client.listObjects(this.getBucketId(wsIds), rprefix, true)
|
||||
stream = this.client.listObjectsV2(this.getBucketId(wsIds), rprefix, true)
|
||||
stream.on('end', () => {
|
||||
stream?.destroy()
|
||||
done = true
|
||||
@ -299,13 +299,14 @@ export class MinioService implements StorageAdapter {
|
||||
err?.code === 'NoSuchKey' ||
|
||||
err?.code === 'NotFound' ||
|
||||
err?.message === 'No such key' ||
|
||||
err?.Code === 'NoSuchKey' ||
|
||||
err?.code === 'ECONNRESET'
|
||||
err?.Code === 'NoSuchKey'
|
||||
) {
|
||||
// Do not print error in this case
|
||||
return
|
||||
}
|
||||
ctx.error('no object found', { error: err, objectName, wsIds })
|
||||
|
||||
ctx.error('failed to stat object', { error: err, objectName, wsIds })
|
||||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -325,6 +325,7 @@ export class S3Service implements StorageAdapter {
|
||||
} catch (err: any) {
|
||||
if (err?.$metadata?.httpStatusCode !== 404) {
|
||||
ctx.warn('no object found', { error: err, objectName, wsIds })
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user