Merge remote-tracking branch 'origin/develop'

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-26 14:21:04 +07:00
commit 5370b1bd19
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
7 changed files with 324 additions and 121 deletions

View File

@ -991,7 +991,8 @@ export function devTool (
.command('move-files') .command('move-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '') .option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50') .option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50')
.action(async (cmd: { workspace: string, blobLimit: string }) => { .option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.action(async (cmd: { workspace: string, blobLimit: string, concurrency: string }) => {
const { mongodbUri } = prepareTools() const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => { await withDatabase(mongodbUri, async (db, client) => {
await withStorage(mongodbUri, async (adapter) => { await withStorage(mongodbUri, async (adapter) => {
@ -1010,7 +1011,10 @@ export function devTool (
} }
const wsId = getWorkspaceId(workspace.workspace) const wsId = getWorkspaceId(workspace.workspace)
await moveFiles(toolCtx, wsId, exAdapter, parseInt(cmd.blobLimit)) await moveFiles(toolCtx, wsId, exAdapter, {
blobSizeLimitMb: parseInt(cmd.blobLimit),
concurrency: parseInt(cmd.concurrency)
})
} }
} catch (err: any) { } catch (err: any) {
console.error(err) console.error(err)

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
import { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core' import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { type StorageAdapterEx } from '@hcengineering/server-core' import { type StorageAdapterEx } from '@hcengineering/server-core'
import { PassThrough } from 'stream' import { PassThrough } from 'stream'
@ -21,7 +21,10 @@ export async function moveFiles (
ctx: MeasureContext, ctx: MeasureContext,
workspaceId: WorkspaceId, workspaceId: WorkspaceId,
exAdapter: StorageAdapterEx, exAdapter: StorageAdapterEx,
blobSizeLimitMb: number params: {
blobSizeLimitMb: number
concurrency: number
}
): Promise<void> { ): Promise<void> {
if (exAdapter.adapters === undefined) return if (exAdapter.adapters === undefined) return
@ -35,7 +38,11 @@ export async function moveFiles (
for (const [name, adapter] of exAdapter.adapters.entries()) { for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === target) continue if (name === target) continue
console.log('moving from', name) console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency)
let time = Date.now()
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await adapter.listStream(ctx, workspaceId) const iterator = await adapter.listStream(ctx, workspaceId)
while (true) { while (true) {
@ -46,29 +53,37 @@ export async function moveFiles (
if (blob === undefined) continue if (blob === undefined) continue
if (blob.provider === target) continue if (blob.provider === target) continue
if (blob.size > blobSizeLimitMb * 1024 * 1024) { if (blob.size > params.blobSizeLimitMb * 1024 * 1024) {
console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024))
continue continue
} }
try { await rateLimiter.exec(async () => {
await retryOnFailure( try {
ctx, await retryOnFailure(
5, ctx,
async () => { 5,
await moveFile(ctx, exAdapter, workspaceId, blob) async () => {
}, await moveFile(ctx, exAdapter, workspaceId, blob)
50 },
) 50
} catch (err) { )
console.error('failed to process blob', name, data._id, err) } catch (err) {
} console.error('failed to process blob', name, data._id, err)
}
})
count += 1 count += 1
if (count % 100 === 0) { if (count % 100 === 0) {
console.log('...moved: ', count) await rateLimiter.waitProcessing()
const duration = Date.now() - time
time = Date.now()
console.log('...moved: ', count, Math.round(duration / 1000))
} }
} }
await rateLimiter.waitProcessing()
await iterator.close() await iterator.close()
} }

View File

@ -13,13 +13,14 @@
// limitations under the License. // limitations under the License.
// //
import { saveCollaborativeDoc, takeCollaborativeDocSnapshot } from '@hcengineering/collaboration' import { saveCollaborativeDoc } from '@hcengineering/collaboration'
import core, { import core, {
DOMAIN_BLOB, DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE, DOMAIN_DOC_INDEX_STATE,
DOMAIN_STATUS, DOMAIN_STATUS,
DOMAIN_TX, DOMAIN_TX,
MeasureMetricsContext, MeasureMetricsContext,
RateLimiter,
collaborativeDocParse, collaborativeDocParse,
coreId, coreId,
generateId, generateId,
@ -188,7 +189,10 @@ async function processMigrateContentFor (
storageAdapter: StorageAdapter, storageAdapter: StorageAdapter,
iterator: MigrationIterator<Doc> iterator: MigrationIterator<Doc>
): Promise<void> { ): Promise<void> {
const rateLimiter = new RateLimiter(10)
let processed = 0 let processed = 0
while (true) { while (true) {
const docs = await iterator.next(1000) const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) { if (docs === null || docs.length === 0) {
@ -201,45 +205,36 @@ async function processMigrateContentFor (
const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = [] const operations: { filter: MigrationDocumentQuery<Doc>, update: MigrateUpdate<Doc> }[] = []
for (const doc of docs) { for (const doc of docs) {
const update: MigrateUpdate<Doc> = {} await rateLimiter.exec(async () => {
const update: MigrateUpdate<Doc> = {}
for (const attribute of attributes) { for (const attribute of attributes) {
const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
const value = (doc as any)[attribute.name] as string const value = (doc as any)[attribute.name] as string
if (value != null && value.startsWith('{')) { if (value != null && value.startsWith('{')) {
const { documentId } = collaborativeDocParse(collaborativeDoc) const { documentId } = collaborativeDocParse(collaborativeDoc)
const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId) const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId)
// only for documents not in storage // only for documents not in storage
if (blob === undefined) { if (blob === undefined) {
const ydoc = markupToYDoc(value, attribute.name) const ydoc = markupToYDoc(value, attribute.name)
await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx)
await takeCollaborativeDocSnapshot( }
storageAdapter,
client.workspaceId, update[attribute.name] = collaborativeDoc
collaborativeDoc, } else if (value == null) {
ydoc, update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
{
versionId: revisionId,
name: 'Migration to storage',
createdBy: core.account.System,
createdOn: Date.now()
},
ctx
)
} }
update[attribute.name] = collaborativeDoc
} else if (value == null) {
update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId)
} }
}
if (Object.keys(update).length > 0) { if (Object.keys(update).length > 0) {
operations.push({ filter: { _id: doc._id }, update }) operations.push({ filter: { _id: doc._id }, update })
} }
})
} }
await rateLimiter.waitProcessing()
if (operations.length > 0) { if (operations.length > 0) {
await client.bulk(domain, operations) await client.bulk(domain, operations)
} }

View File

@ -21,7 +21,7 @@ import { setMetadata } from '@hcengineering/platform'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token' import serverToken, { generateToken } from '@hcengineering/server-token'
import tracker from '@hcengineering/tracker' import tracker from '@hcengineering/tracker'
import { Installation } from '@octokit/webhooks-types' import { Installation, type InstallationCreatedEvent, type InstallationUnsuspendEvent } from '@octokit/webhooks-types'
import { Collection } from 'mongodb' import { Collection } from 'mongodb'
import { App, Octokit } from 'octokit' import { App, Octokit } from 'octokit'
@ -38,12 +38,15 @@ import { registerLoaders } from './loaders'
import { createNotification } from './notifications' import { createNotification } from './notifications'
import { errorToObj } from './sync/utils' import { errorToObj } from './sync/utils'
import { GithubIntegrationRecord, GithubUserRecord } from './types' import { GithubIntegrationRecord, GithubUserRecord } from './types'
import { UserManager } from './users'
import { GithubWorker, syncUser } from './worker' import { GithubWorker, syncUser } from './worker'
export interface InstallationRecord { export interface InstallationRecord {
installationName: string installationName: string
login: string login: string
loginNodeId: string loginNodeId: string
repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories']
type: 'Bot' | 'User' | 'Organization' type: 'Bot' | 'User' | 'Organization'
octokit: Octokit octokit: Octokit
} }
@ -60,12 +63,14 @@ export class PlatformWorker {
mongoRef!: MongoClientReference mongoRef!: MongoClientReference
integrationCollection!: Collection<GithubIntegrationRecord> integrationCollection!: Collection<GithubIntegrationRecord>
usersCollection!: Collection<GithubUserRecord>
periodicTimer: any periodicTimer: any
periodicSyncPromise: Promise<void> | undefined periodicSyncPromise: Promise<void> | undefined
canceled = false canceled = false
userManager!: UserManager
private constructor ( private constructor (
readonly ctx: MeasureContext, readonly ctx: MeasureContext,
readonly app: App, readonly app: App,
@ -82,7 +87,8 @@ export class PlatformWorker {
const db = mongoClient.db(config.ConfigurationDB) const db = mongoClient.db(config.ConfigurationDB)
this.integrationCollection = db.collection<GithubIntegrationRecord>('installations') this.integrationCollection = db.collection<GithubIntegrationRecord>('installations')
this.usersCollection = db.collection<GithubUserRecord>('users')
this.userManager = new UserManager(db.collection<GithubUserRecord>('users'))
const storageConfig = storageConfigFromEnv() const storageConfig = storageConfigFromEnv()
this.storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL) this.storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL)
@ -165,7 +171,7 @@ export class PlatformWorker {
} }
private async findUsersWorkspaces (): Promise<Map<string, GithubUserRecord[]>> { private async findUsersWorkspaces (): Promise<Map<string, GithubUserRecord[]>> {
const i = this.usersCollection.find({}) const i = this.userManager.getAllUsers()
const workspaces = new Map<string, GithubUserRecord[]>() const workspaces = new Map<string, GithubUserRecord[]>()
while (await i.hasNext()) { while (await i.hasNext()) {
const userInfo = await i.next() const userInfo = await i.next()
@ -178,19 +184,16 @@ export class PlatformWorker {
} }
} }
} }
await i.close()
return workspaces return workspaces
} }
public async getUsers (workspace: string): Promise<GithubUserRecord[]> { public async getUsers (workspace: string): Promise<GithubUserRecord[]> {
return await this.usersCollection return await this.userManager.getUsers(workspace)
.find<GithubUserRecord>({
[`accounts.${workspace}`]: { $exists: true }
})
.toArray()
} }
public async getUser (login: string): Promise<GithubUserRecord | undefined> { public async getUser (login: string): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.find<GithubUserRecord>({ _id: login }).toArray()).shift() return await this.userManager.getAccount(login)
} }
async mapInstallation ( async mapInstallation (
@ -263,8 +266,8 @@ export class PlatformWorker {
private async removeInstallationFromWorkspace (client: Client, installationId: number): Promise<void> { private async removeInstallationFromWorkspace (client: Client, installationId: number): Promise<void> {
const wsIntegerations = await client.findAll(github.class.GithubIntegration, { installationId }) const wsIntegerations = await client.findAll(github.class.GithubIntegration, { installationId })
const ops = new TxOperations(client, core.account.System)
for (const intValue of wsIntegerations) { for (const intValue of wsIntegerations) {
const ops = new TxOperations(client, core.account.System)
await ops.remove<GithubIntegration>(intValue) await ops.remove<GithubIntegration>(intValue)
} }
} }
@ -347,12 +350,13 @@ export class PlatformWorker {
scope: resultJson.scope, scope: resultJson.scope,
accounts: { [payload.workspace]: payload.accountId } accounts: { [payload.workspace]: payload.accountId }
} }
const [existingUser] = await this.usersCollection.find({ _id: user.data.login }).toArray() await this.userManager.updateUser(dta)
if (existingUser === undefined) { const existingUser = await this.userManager.getAccount(user.data.login)
await this.usersCollection.insertOne(dta) if (existingUser == null) {
await this.userManager.insertUser(dta)
} else { } else {
dta.accounts = { ...existingUser.accounts, [payload.workspace]: payload.accountId } dta.accounts = { ...existingUser.accounts, [payload.workspace]: payload.accountId }
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any) await this.userManager.updateUser(dta)
} }
// Update workspace client login info. // Update workspace client login info.
@ -520,17 +524,17 @@ export class PlatformWorker {
auth.refreshTokenExpiresIn = dta.refreshTokenExpiresIn auth.refreshTokenExpiresIn = dta.refreshTokenExpiresIn
auth.scope = dta.scope auth.scope = dta.scope
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any) await this.userManager.updateUser(dta)
} }
} }
} }
async getAccount (login: string): Promise<GithubUserRecord | undefined> { async getAccount (login: string): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.findOne({ _id: login })) ?? undefined return await this.userManager.getAccount(login)
} }
async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> { async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined return await this.userManager.getAccountByRef(workspace, ref)
} }
private async updateInstallation (installationId: number): Promise<void> { private async updateInstallation (installationId: number): Promise<void> {
@ -544,6 +548,24 @@ export class PlatformWorker {
type: tinst.account?.type ?? 'User', type: tinst.account?.type ?? 'User',
installationName: `${tinst.account?.html_url ?? ''}` installationName: `${tinst.account?.html_url ?? ''}`
} }
this.updateInstallationRecord(installationId, val)
}
}
private updateInstallationRecord (installationId: number, val: InstallationRecord): void {
const current = this.installations.get(installationId)
if (current !== undefined) {
if (val.octokit !== undefined) {
current.octokit = val.octokit
}
current.login = val.login
current.loginNodeId = val.loginNodeId
current.type = val.type
current.installationName = val.installationName
if (val.repositories !== undefined) {
current.repositories = val.repositories
}
} else {
this.installations.set(installationId, val) this.installations.set(installationId, val)
} }
} }
@ -558,7 +580,7 @@ export class PlatformWorker {
type: tinst.account?.type ?? 'User', type: tinst.account?.type ?? 'User',
installationName: `${tinst.account?.html_url ?? ''}` installationName: `${tinst.account?.html_url ?? ''}`
} }
this.installations.set(install.installation.id, val) this.updateInstallationRecord(install.installation.id, val)
ctx.info('Found installation', { ctx.info('Found installation', {
installationId: install.installation.id, installationId: install.installation.id,
url: install.installation.account?.html_url ?? '' url: install.installation.account?.html_url ?? ''
@ -566,16 +588,22 @@ export class PlatformWorker {
} }
} }
async handleInstallationEvent (install: Installation, enabled: boolean): Promise<void> { async handleInstallationEvent (
install: Installation,
repositories: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories'],
enabled: boolean
): Promise<void> {
this.ctx.info('handle integration add', { installId: install.id, name: install.html_url }) this.ctx.info('handle integration add', { installId: install.id, name: install.html_url })
const okit = await this.app.getInstallationOctokit(install.id) const okit = await this.app.getInstallationOctokit(install.id)
const iName = `${install.account.html_url ?? ''}` const iName = `${install.account.html_url ?? ''}`
this.installations.set(install.id, {
this.updateInstallationRecord(install.id, {
octokit: okit, octokit: okit,
login: install.account.login, login: install.account.login,
type: install.account?.type ?? 'User', type: install.account?.type ?? 'User',
loginNodeId: install.account.node_id, loginNodeId: install.account.node_id,
installationName: iName installationName: iName,
repositories
}) })
const worker = this.getWorker(install.id) const worker = this.getWorker(install.id)
@ -612,6 +640,9 @@ export class PlatformWorker {
if (integeration !== undefined) { if (integeration !== undefined) {
integeration.enabled = false integeration.enabled = false
integeration.synchronized = new Set() integeration.synchronized = new Set()
await this.removeInstallationFromWorkspace(worker._client, installId)
await worker._client.remove(integeration.integration) await worker._client.remove(integeration.integration)
} }
worker.integrations.delete(installId) worker.integrations.delete(installId)
@ -797,11 +828,11 @@ export class PlatformWorker {
if (event.payload.action === 'revoked') { if (event.payload.action === 'revoked') {
const sender = event.payload.sender const sender = event.payload.sender
const records = await this.usersCollection.find({ _id: sender.login }).toArray() const record = await this.getAccount(sender.login)
for (const r of records) { if (record !== undefined) {
await this.revokeUserAuth(r) await this.revokeUserAuth(record)
await this.userManager.removeUser(sender.login)
} }
await this.usersCollection.deleteOne({ _id: sender.login })
} }
}) })
@ -902,7 +933,7 @@ export class PlatformWorker {
case 'created': case 'created':
case 'unsuspend': { case 'unsuspend': {
catchEventError( catchEventError(
this.handleInstallationEvent(payload.installation, true), this.handleInstallationEvent(payload.installation, payload.repositories, true),
payload.action, payload.action,
name, name,
id, id,
@ -912,7 +943,7 @@ export class PlatformWorker {
} }
case 'suspend': { case 'suspend': {
catchEventError( catchEventError(
this.handleInstallationEvent(payload.installation, false), this.handleInstallationEvent(payload.installation, payload.repositories, false),
payload.action, payload.action,
name, name,
id, id,

View File

@ -4,9 +4,14 @@
// //
import core, { Doc, DocData, DocumentUpdate, MeasureContext, TxOperations, generateId } from '@hcengineering/core' import core, { Doc, DocData, DocumentUpdate, MeasureContext, TxOperations, generateId } from '@hcengineering/core'
import { Endpoints } from '@octokit/types'
import { Repository, RepositoryEvent } from '@octokit/webhooks-types'
import github, { DocSyncInfo, GithubIntegrationRepository, GithubProject } from '@hcengineering/github' import github, { DocSyncInfo, GithubIntegrationRepository, GithubProject } from '@hcengineering/github'
import { Endpoints } from '@octokit/types'
import {
Repository,
RepositoryEvent,
type InstallationCreatedEvent,
type InstallationUnsuspendEvent
} from '@octokit/webhooks-types'
import { App } from 'octokit' import { App } from 'octokit'
import { DocSyncManager, ExternalSyncField, IntegrationContainer, IntegrationManager } from '../types' import { DocSyncManager, ExternalSyncField, IntegrationContainer, IntegrationManager } from '../types'
import { collectUpdate } from './utils' import { collectUpdate } from './utils'
@ -34,8 +39,67 @@ export class RepositorySyncMapper implements DocSyncManager {
return {} return {}
} }
async reloadRepositories (integration: IntegrationContainer): Promise<void> { async reloadRepositories (
integration: IntegrationContainer,
repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories']
): Promise<void> {
integration.synchronized.delete(syncReposKey) integration.synchronized.delete(syncReposKey)
if (repositories !== undefined) {
// We have a list of repositories, so we could create them if they are missing.
// Need to find all repositories, not only active, so passed repositories are not work.
const allRepositories = (
await this.provider.liveQuery.queryFind(github.class.GithubIntegrationRepository, {})
).filter((it) => it.attachedTo === integration.integration._id)
const allRepos: GithubIntegrationRepository[] = [...allRepositories]
for (const repository of repositories) {
const integrationRepo: GithubIntegrationRepository | undefined = allRepos.find(
(it) => it.repositoryId === repository.id
)
if (integrationRepo === undefined) {
// No integration repository found, we need to push one.
await this.client.addCollection(
github.class.GithubIntegrationRepository,
integration.integration.space,
integration.integration._id,
integration.integration._class,
'repositories',
{
nodeId: repository.node_id,
name: repository.name,
url: integration.installationName + '/' + repository.name,
repositoryId: repository.id,
enabled: true,
deleted: false,
archived: false,
fork: false,
forks: 0,
hasDiscussions: false,
hasDownloads: false,
hasIssues: false,
hasPages: false,
hasProjects: false,
hasWiki: false,
openIssues: 0,
private: repository.private,
size: 0,
stargazers: 0,
watchers: 0,
visibility: repository.private ? 'private' : 'public'
},
undefined, // id
Date.now(),
integration.integration.createdBy
)
this.ctx.info('Creating repository info document...', {
url: repository.full_name,
workspace: this.provider.getWorkspaceId().name
})
}
}
}
} }
async handleEvent<T>(integration: IntegrationContainer, derivedClient: TxOperations, evt: T): Promise<void> { async handleEvent<T>(integration: IntegrationContainer, derivedClient: TxOperations, evt: T): Promise<void> {
@ -191,6 +255,7 @@ export class RepositorySyncMapper implements DocSyncManager {
installationId: integration.installationId, installationId: integration.installationId,
workspace: this.provider.getWorkspaceId().name workspace: this.provider.getWorkspaceId().name
}) })
const iterable = this.app.eachRepository.iterator({ installationId: integration.installationId }) const iterable = this.app.eachRepository.iterator({ installationId: integration.installationId })
// Need to find all repositories, not only active, so passed repositories are not work. // Need to find all repositories, not only active, so passed repositories are not work.

View File

@ -0,0 +1,69 @@
import type { Account, Ref } from '@hcengineering/core'
import type { Collection, FindCursor } from 'mongodb'
import type { GithubUserRecord } from './types'
export class UserManager {
userCache = new Map<string, GithubUserRecord>()
refUserCache = new Map<string, GithubUserRecord>()
constructor (readonly usersCollection: Collection<GithubUserRecord>) {}
public async getUsers (workspace: string): Promise<GithubUserRecord[]> {
return await this.usersCollection
.find<GithubUserRecord>({
[`accounts.${workspace}`]: { $exists: true }
})
.toArray()
}
async getAccount (login: string): Promise<GithubUserRecord | undefined> {
let res = this.userCache.get(login)
if (res !== undefined) {
return res
}
res = (await this.usersCollection.findOne({ _id: login })) ?? undefined
if (res !== undefined) {
if (this.userCache.size > 1000) {
this.userCache.clear()
}
this.userCache.set(login, res)
}
return res
}
async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> {
const key = `${workspace}.${ref}`
let rec = this.refUserCache.get(key)
if (rec !== undefined) {
return rec
}
rec = (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined
if (rec !== undefined) {
if (this.refUserCache.size > 1000) {
this.refUserCache.clear()
}
this.refUserCache.set(key, rec)
}
return rec
}
async updateUser (dta: GithubUserRecord): Promise<void> {
this.userCache.clear()
this.refUserCache.clear()
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
}
async insertUser (dta: GithubUserRecord): Promise<void> {
await this.usersCollection.insertOne(dta)
}
async removeUser (login: string): Promise<void> {
this.userCache.clear()
this.refUserCache.clear()
await this.usersCollection.deleteOne({ _id: login })
}
getAllUsers (): FindCursor<GithubUserRecord> {
return this.usersCollection.find({})
}
}

View File

@ -31,6 +31,7 @@ import core, {
concatLink, concatLink,
generateId, generateId,
groupByArray, groupByArray,
reduceCalls,
toIdMap, toIdMap,
type Blob, type Blob,
type MigrationState type MigrationState
@ -189,9 +190,11 @@ export class GithubWorker implements IntegrationManager {
if (v.octokit === undefined) { if (v.octokit === undefined) {
continue continue
} }
const project = await this.liveQuery.findOne<GithubProject>(github.mixin.GithubProject, { const project = (
_id: space as Ref<GithubProject> await this.liveQuery.queryFind<GithubProject>(github.mixin.GithubProject, {
}) _id: space as Ref<GithubProject>
})
).shift()
if (project !== undefined) { if (project !== undefined) {
const repositories = await this.liveQuery.queryFind<GithubIntegrationRepository>( const repositories = await this.liveQuery.queryFind<GithubIntegrationRepository>(
github.class.GithubIntegrationRepository, github.class.GithubIntegrationRepository,
@ -299,7 +302,7 @@ export class GithubWorker implements IntegrationManager {
person, person,
role: AccountRole.User role: AccountRole.User
}) })
const acc = await this._client.findOne(contact.class.PersonAccount, { _id: id }) const acc = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: id })
return acc return acc
} }
} }
@ -391,12 +394,12 @@ export class GithubWorker implements IntegrationManager {
let person: Ref<Person> | undefined let person: Ref<Person> | undefined
// try to find by account. // try to find by account.
if (userInfo.email != null) { if (userInfo.email != null) {
const personAccount = await this.client.findOne(contact.class.PersonAccount, { email: userInfo.email }) const personAccount = await this.liveQuery.findOne(contact.class.PersonAccount, { email: userInfo.email })
person = personAccount?.person person = personAccount?.person
} }
if (person === undefined) { if (person === undefined) {
const channel = await this.client.findOne(contact.class.Channel, { const channel = await this.liveQuery.findOne(contact.class.Channel, {
provider: contact.channelProvider.GitHub, provider: contact.channelProvider.GitHub,
value: userInfo.login value: userInfo.login
}) })
@ -475,14 +478,19 @@ export class GithubWorker implements IntegrationManager {
async syncUserData (ctx: MeasureContext, users: GithubUserRecord[]): Promise<void> { async syncUserData (ctx: MeasureContext, users: GithubUserRecord[]): Promise<void> {
// Let's sync information about users and send some details // Let's sync information about users and send some details
const accounts = await this._client.findAll(contact.class.PersonAccount, {
email: { $in: users.map((it) => `github:${it._id}`) }
})
const userAuths = await this._client.findAll(github.class.GithubAuthentication, {})
const persons = await this._client.findAll(contact.class.Person, { _id: { $in: accounts.map((it) => it.person) } })
for (const record of users) { for (const record of users) {
if (record.error !== undefined) { if (record.error !== undefined) {
// Skip accounts with error // Skip accounts with error
continue continue
} }
const account = await this._client.findOne(contact.class.PersonAccount, { email: `github:${record._id}` }) const account = accounts.find((it) => it.email === `github:${record._id}`)
const userAuth = await this._client.findOne(github.class.GithubAuthentication, { login: record._id }) const userAuth = userAuths.find((it) => it.login === record._id)
const person = await this._client.findOne(contact.class.Person, { _id: account?.person }) const person = persons.find((it) => account?.person)
if (account === undefined || userAuth === undefined || person === undefined) { if (account === undefined || userAuth === undefined || person === undefined) {
continue continue
} }
@ -517,12 +525,11 @@ export class GithubWorker implements IntegrationManager {
let record = await this.platform.getAccountByRef(this.workspace.name, account) let record = await this.platform.getAccountByRef(this.workspace.name, account)
// const accountRef = this.accounts.find((it) => it._id === account) // const accountRef = this.accounts.find((it) => it._id === account)
const accountRef = await this._client.findOne(contact.class.PersonAccount, { _id: account }) const accountRef = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: account })
if (record === undefined) { if (record === undefined) {
if (accountRef !== undefined) { if (accountRef !== undefined) {
const accounts = await this._client.findAll(contact.class.PersonAccount, {}) const accounts = this._client.getModel().getAccountByPersonId(accountRef.person)
const allAccounts = accounts.filter((it) => it.person === accountRef.person) for (const aa of accounts) {
for (const aa of allAccounts) {
record = await this.platform.getAccountByRef(this.workspace.name, aa._id) record = await this.platform.getAccountByRef(this.workspace.name, aa._id)
if (record !== undefined) { if (record !== undefined) {
break break
@ -532,7 +539,7 @@ export class GithubWorker implements IntegrationManager {
} }
// Check and refresh token if required. // Check and refresh token if required.
if (record !== undefined) { if (record !== undefined) {
this.ctx.info('get octokit', { account, recordId: record._id }) this.ctx.info('get octokit', { account, recordId: record._id, workspace: this.workspace.name })
await this.platform.checkRefreshToken(record) await this.platform.checkRefreshToken(record)
return new Octokit({ return new Octokit({
auth: record.token, auth: record.token,
@ -543,9 +550,9 @@ export class GithubWorker implements IntegrationManager {
// We need to inform user, he need to authorize this account with github. // We need to inform user, he need to authorize this account with github.
if (accountRef !== undefined) { if (accountRef !== undefined) {
const person = await this.client.findOne(contact.class.Person, { _id: accountRef.person }) const person = await this.liveQuery.findOne(contact.class.Person, { _id: accountRef.person })
if (person !== undefined) { if (person !== undefined) {
const personSpace = await this.client.findOne(contact.class.PersonSpace, { person: person._id }) const personSpace = await this.liveQuery.findOne(contact.class.PersonSpace, { person: person._id })
if (personSpace !== undefined) { if (personSpace !== undefined) {
await createNotification(this._client, person, { await createNotification(this._client, person, {
user: account, user: account,
@ -556,17 +563,16 @@ export class GithubWorker implements IntegrationManager {
} }
} }
} }
this.ctx.info('get octokit: return bot', { account }) this.ctx.info('get octokit: return bot', { account, workspace: this.workspace.name })
} }
async isPlatformUser (account: Ref<PersonAccount>): Promise<boolean> { async isPlatformUser (account: Ref<PersonAccount>): Promise<boolean> {
let record = await this.platform.getAccountByRef(this.workspace.name, account) let record = await this.platform.getAccountByRef(this.workspace.name, account)
const accountRef = await this._client.findOne(contact.class.PersonAccount, { _id: account }) const accountRef = await this.liveQuery.findOne(contact.class.PersonAccount, { _id: account })
if (record === undefined) { if (record === undefined) {
if (accountRef !== undefined) { if (accountRef !== undefined) {
const accounts = await this._client.findAll(contact.class.PersonAccount, {}) const accounts = this._client.getModel().getAccountByPersonId(accountRef.person)
const allAccounts = accounts.filter((it) => it.person === accountRef.person) for (const aa of accounts) {
for (const aa of allAccounts) {
record = await this.platform.getAccountByRef(this.workspace.name, aa._id) record = await this.platform.getAccountByRef(this.workspace.name, aa._id)
if (record !== undefined) { if (record !== undefined) {
break break
@ -588,11 +594,11 @@ export class GithubWorker implements IntegrationManager {
integrationsRaw: GithubIntegration[] = [] integrationsRaw: GithubIntegration[] = []
async getProjectType (type: Ref<ProjectType>): Promise<ProjectType | undefined> { async getProjectType (type: Ref<ProjectType>): Promise<ProjectType | undefined> {
return await this._client.findOne(task.class.ProjectType, { _id: type }) return (await this.liveQuery.queryFind(task.class.ProjectType, { _id: type })).shift()
} }
async getTaskType (type: Ref<TaskType>): Promise<TaskType | undefined> { async getTaskType (type: Ref<TaskType>): Promise<TaskType | undefined> {
return await this._client.findOne(task.class.TaskType, { _id: type }) return (await this.liveQuery.queryFind(task.class.TaskType, { _id: type })).shift()
} }
async getTaskTypeOf (project: Ref<ProjectType>, ofClass: Ref<Class<Doc>>): Promise<TaskType | undefined> { async getTaskTypeOf (project: Ref<ProjectType>, ofClass: Ref<Class<Doc>>): Promise<TaskType | undefined> {
@ -687,13 +693,16 @@ export class GithubWorker implements IntegrationManager {
}) })
}) })
const userRecords = await this.platform.getUsers(this.workspace.name)
await this.syncUserData(this.ctx, userRecords)
this.triggerRequests = 1 this.triggerRequests = 1
this.updateRequests = 1 this.updateRequests = 1
this.syncPromise = this.syncAndWait() this.syncPromise = this.syncAndWait()
const userRecords = await this.platform.getUsers(this.workspace.name)
try {
await this.syncUserData(this.ctx, userRecords)
} catch (err: any) {
Analytics.handleError(err)
}
} }
projects: GithubProject[] = [] projects: GithubProject[] = []
@ -756,7 +765,7 @@ export class GithubWorker implements IntegrationManager {
syncLock: new Map() syncLock: new Map()
} }
this.integrations.set(it.installationId, current) this.integrations.set(it.installationId, current)
await this.repositoryManager.reloadRepositories(current) await this.repositoryManager.reloadRepositories(current, inst.repositories)
} catch (err: any) { } catch (err: any) {
Analytics.handleError(err) Analytics.handleError(err)
this.ctx.error('Error', { err }) this.ctx.error('Error', { err })
@ -768,7 +777,7 @@ export class GithubWorker implements IntegrationManager {
continue continue
} }
current.integration = it current.integration = it
await this.repositoryManager.reloadRepositories(current) await this.repositoryManager.reloadRepositories(current, inst.repositories)
} }
} }
} }
@ -919,7 +928,14 @@ export class GithubWorker implements IntegrationManager {
continue continue
} }
this.ctx.info('External Syncing', { name: repo.name, prj: prj.name, field, version, docs: docs.length }) this.ctx.info('External Syncing', {
name: repo.name,
prj: prj.name,
field,
version,
docs: docs.length,
workspace: this.workspace.name
})
const byClass = this.groupByClass(docs) const byClass = this.groupByClass(docs)
for (const [_class, _docs] of byClass.entries()) { for (const [_class, _docs] of byClass.entries()) {
@ -1043,7 +1059,7 @@ export class GithubWorker implements IntegrationManager {
if (this.updateRequests > 0) { if (this.updateRequests > 0) {
this.updateRequests = 0 // Just in case this.updateRequests = 0 // Just in case
await this.updateIntegrations() await this.updateIntegrations()
await this.performFullSync() void this.performFullSync()
} }
const { projects, repositories } = await this.collectActiveProjects() const { projects, repositories } = await this.collectActiveProjects()
@ -1072,7 +1088,7 @@ export class GithubWorker implements IntegrationManager {
if (!hadExternalChanges && !hadSyncChanges && !hadDerivedChanges) { if (!hadExternalChanges && !hadSyncChanges && !hadDerivedChanges) {
if (this.previousWait !== 0) { if (this.previousWait !== 0) {
this.ctx.info('Wait for changes:', { previousWait: this.previousWait }) this.ctx.info('Wait for changes:', { previousWait: this.previousWait, workspace: this.workspace.name })
this.previousWait = 0 this.previousWait = 0
} }
// Wait until some sync documents will be modified, updated. // Wait until some sync documents will be modified, updated.
@ -1125,7 +1141,7 @@ export class GithubWorker implements IntegrationManager {
if (docs.length > 0) { if (docs.length > 0) {
this.previousWait += docs.length this.previousWait += docs.length
this.ctx.info('Syncing', { docs: docs.length }) this.ctx.info('Syncing', { docs: docs.length, workspace: this.workspace.name })
await this.doSyncFor(docs) await this.doSyncFor(docs)
} }
@ -1239,7 +1255,7 @@ export class GithubWorker implements IntegrationManager {
const existing = externalDocs.find((it) => it._id === info._id) const existing = externalDocs.find((it) => it._id === info._id)
const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper const mapper = this.mappers.find((it) => it._class.includes(info.objectClass))?.mapper
if (mapper === undefined) { if (mapper === undefined) {
this.ctx.info('No mapper for class', { objectClass: info.objectClass }) this.ctx.info('No mapper for class', { objectClass: info.objectClass, workspace: this.workspace.name })
await derivedClient.update<DocSyncInfo>(info, { await derivedClient.update<DocSyncInfo>(info, {
needSync: githubSyncVersion needSync: githubSyncVersion
}) })
@ -1333,7 +1349,11 @@ export class GithubWorker implements IntegrationManager {
private async waitChanges (): Promise<void> { private async waitChanges (): Promise<void> {
if (this.triggerRequests > 0 || this.updateRequests > 0) { if (this.triggerRequests > 0 || this.updateRequests > 0) {
this.ctx.info('Trigger check pending:', { requests: this.triggerRequests, updates: this.updateRequests }) this.ctx.info('Trigger check pending:', {
requests: this.triggerRequests,
updates: this.updateRequests,
workspace: this.workspace.name
})
this.triggerRequests = 0 this.triggerRequests = 0
return return
} }
@ -1347,7 +1367,7 @@ export class GithubWorker implements IntegrationManager {
triggerTimeout = setTimeout(() => { triggerTimeout = setTimeout(() => {
triggerTimeout = undefined triggerTimeout = undefined
if (was0) { if (was0) {
this.ctx.info('Sync triggered', { request: this.triggerRequests }) this.ctx.info('Sync triggered', { request: this.triggerRequests, workspace: this.workspace.name })
} }
resolve() resolve()
}, 50) // Small timeout to aggregate few bulk changes. }, 50) // Small timeout to aggregate few bulk changes.
@ -1361,7 +1381,7 @@ export class GithubWorker implements IntegrationManager {
updateTimeout = setTimeout(() => { updateTimeout = setTimeout(() => {
updateTimeout = undefined updateTimeout = undefined
if (was0) { if (was0) {
this.ctx.info('Sync update triggered', { requests: this.updateRequests }) this.ctx.info('Sync update triggered', { requests: this.updateRequests, workspace: this.workspace.name })
} }
resolve() resolve()
}, 50) // Small timeout to aggregate few bulk changes. }, 50) // Small timeout to aggregate few bulk changes.
@ -1378,7 +1398,11 @@ export class GithubWorker implements IntegrationManager {
this.triggerSync() this.triggerSync()
} }
async performFullSync (): Promise<void> { performFullSync = reduceCalls(async () => {
await this._performFullSync()
})
async _performFullSync (): Promise<void> {
// Wait previous active sync // Wait previous active sync
for (const integration of this.integrations.values()) { for (const integration of this.integrations.values()) {
await this.ctx.withLog('external sync', { installation: integration.installationName }, async () => { await this.ctx.withLog('external sync', { installation: integration.installationName }, async () => {