mirror of
https://github.com/hcengineering/platform.git
synced 2025-04-24 01:07:50 +00:00
fix: remove prefixes from listStream method parameters (#6480)
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
3be2de033a
commit
60444ebdab
@ -337,7 +337,7 @@ async function migrateBlobData (exAdapter: StorageAdapterEx, client: MigrationCl
|
|||||||
if (!(await adapter.exists(ctx, client.workspaceId))) {
|
if (!(await adapter.exists(ctx, client.workspaceId))) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
const blobs = await adapter.listStream(ctx, client.workspaceId, '')
|
const blobs = await adapter.listStream(ctx, client.workspaceId)
|
||||||
const bulk = new Map<Ref<Blob>, Blob>()
|
const bulk = new Map<Ref<Blob>, Blob>()
|
||||||
try {
|
try {
|
||||||
const push = async (force: boolean): Promise<void> => {
|
const push = async (force: boolean): Promise<void> => {
|
||||||
|
@ -45,7 +45,7 @@ export interface StorageAdapter {
|
|||||||
|
|
||||||
listBuckets: (ctx: MeasureContext) => Promise<BucketInfo[]>
|
listBuckets: (ctx: MeasureContext) => Promise<BucketInfo[]>
|
||||||
remove: (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]) => Promise<void>
|
remove: (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]) => Promise<void>
|
||||||
listStream: (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string) => Promise<BlobStorageIterator>
|
listStream: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise<BlobStorageIterator>
|
||||||
stat: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Blob | undefined>
|
stat: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Blob | undefined>
|
||||||
get: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Readable>
|
get: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Readable>
|
||||||
put: (
|
put: (
|
||||||
@ -114,15 +114,11 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {
|
|||||||
|
|
||||||
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {}
|
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {}
|
||||||
|
|
||||||
async list (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string | undefined): Promise<ListBlobResult[]> {
|
async list (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<ListBlobResult[]> {
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
|
|
||||||
async listStream (
|
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
|
||||||
ctx: MeasureContext,
|
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
prefix?: string | undefined
|
|
||||||
): Promise<BlobStorageIterator> {
|
|
||||||
return {
|
return {
|
||||||
next: async (): Promise<ListBlobResult | undefined> => {
|
next: async (): Promise<ListBlobResult | undefined> => {
|
||||||
return undefined
|
return undefined
|
||||||
@ -203,11 +199,10 @@ export async function removeAllObjects (
|
|||||||
export async function objectsToArray (
|
export async function objectsToArray (
|
||||||
ctx: MeasureContext,
|
ctx: MeasureContext,
|
||||||
storage: StorageAdapter,
|
storage: StorageAdapter,
|
||||||
workspaceId: WorkspaceId,
|
workspaceId: WorkspaceId
|
||||||
prefix?: string
|
|
||||||
): Promise<ListBlobResult[]> {
|
): Promise<ListBlobResult[]> {
|
||||||
// We need to list all files and delete them
|
// We need to list all files and delete them
|
||||||
const iterator = await storage.listStream(ctx, workspaceId, prefix)
|
const iterator = await storage.listStream(ctx, workspaceId)
|
||||||
const bulk: ListBlobResult[] = []
|
const bulk: ListBlobResult[] = []
|
||||||
while (true) {
|
while (true) {
|
||||||
const obj = await iterator.next()
|
const obj = await iterator.next()
|
||||||
|
@ -52,11 +52,7 @@ export class MemStorageAdapter implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async listStream (
|
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
|
||||||
ctx: MeasureContext,
|
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
prefix?: string | undefined
|
|
||||||
): Promise<BlobStorageIterator> {
|
|
||||||
const files = Array.from(this.files.values()).filter((it) => it.workspace === workspaceId.name)
|
const files = Array.from(this.files.values()).filter((it) => it.workspace === workspaceId.name)
|
||||||
return {
|
return {
|
||||||
next: async () => {
|
next: async () => {
|
||||||
|
@ -224,14 +224,8 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
|
|||||||
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, objectNames as Ref<Blob>[])
|
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, objectNames as Ref<Blob>[])
|
||||||
}
|
}
|
||||||
|
|
||||||
async listStream (
|
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
|
||||||
ctx: MeasureContext,
|
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {})
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
prefix?: string | undefined
|
|
||||||
): Promise<BlobStorageIterator> {
|
|
||||||
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {
|
|
||||||
_id: { $regex: `${prefix ?? ''}.*` }
|
|
||||||
})
|
|
||||||
return {
|
return {
|
||||||
next: async (): Promise<ListBlobResult | undefined> => {
|
next: async (): Promise<ListBlobResult | undefined> => {
|
||||||
return await data.next()
|
return await data.next()
|
||||||
|
@ -73,11 +73,7 @@ export class DatalakeService implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@withContext('listStream')
|
@withContext('listStream')
|
||||||
async listStream (
|
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
|
||||||
ctx: MeasureContext,
|
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
prefix?: string | undefined
|
|
||||||
): Promise<BlobStorageIterator> {
|
|
||||||
throw new Error('not supported')
|
throw new Error('not supported')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,11 +182,7 @@ export class MinioService implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@withContext('listStream')
|
@withContext('listStream')
|
||||||
async listStream (
|
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
|
||||||
ctx: MeasureContext,
|
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
prefix?: string | undefined
|
|
||||||
): Promise<BlobStorageIterator> {
|
|
||||||
let hasMore = true
|
let hasMore = true
|
||||||
let stream: BucketStream<BucketItem> | undefined
|
let stream: BucketStream<BucketItem> | undefined
|
||||||
let done = false
|
let done = false
|
||||||
@ -199,7 +195,7 @@ export class MinioService implements StorageAdapter {
|
|||||||
next: async (): Promise<ListBlobResult | undefined> => {
|
next: async (): Promise<ListBlobResult | undefined> => {
|
||||||
try {
|
try {
|
||||||
if (stream === undefined && !done) {
|
if (stream === undefined && !done) {
|
||||||
const rprefix = rootPrefix !== undefined ? rootPrefix + (prefix ?? '') : prefix ?? ''
|
const rprefix = rootPrefix ?? ''
|
||||||
stream = this.client.listObjects(this.getBucketId(workspaceId), rprefix, true)
|
stream = this.client.listObjects(this.getBucketId(workspaceId), rprefix, true)
|
||||||
stream.on('end', () => {
|
stream.on('end', () => {
|
||||||
stream?.destroy()
|
stream?.destroy()
|
||||||
|
@ -232,11 +232,7 @@ export class S3Service implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@withContext('listStream')
|
@withContext('listStream')
|
||||||
async listStream (
|
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
|
||||||
ctx: MeasureContext,
|
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
prefix?: string | undefined
|
|
||||||
): Promise<BlobStorageIterator> {
|
|
||||||
let hasMore = true
|
let hasMore = true
|
||||||
const buffer: ListBlobResult[] = []
|
const buffer: ListBlobResult[] = []
|
||||||
let token: string | undefined
|
let token: string | undefined
|
||||||
@ -248,7 +244,7 @@ export class S3Service implements StorageAdapter {
|
|||||||
if (hasMore && buffer.length === 0) {
|
if (hasMore && buffer.length === 0) {
|
||||||
const res = await this.client.listObjectsV2({
|
const res = await this.client.listObjectsV2({
|
||||||
Bucket: this.getBucketId(workspaceId),
|
Bucket: this.getBucketId(workspaceId),
|
||||||
Prefix: rootPrefix !== undefined ? rootPrefix + (prefix ?? '') : prefix ?? '',
|
Prefix: rootPrefix ?? '',
|
||||||
ContinuationToken: token
|
ContinuationToken: token
|
||||||
})
|
})
|
||||||
if (res.IsTruncated === true) {
|
if (res.IsTruncated === true) {
|
||||||
@ -273,7 +269,7 @@ export class S3Service implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name, prefix })
|
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name })
|
||||||
}
|
}
|
||||||
if (buffer.length > 0) {
|
if (buffer.length > 0) {
|
||||||
return buffer.shift()
|
return buffer.shift()
|
||||||
|
Loading…
Reference in New Issue
Block a user