mirror of
https://github.com/hcengineering/platform.git
synced 2025-06-09 09:20:54 +00:00
UBERF-8425: Improved pg/acc/ws error handling (#9144)
Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
parent
635304c4af
commit
89f81fd14a
@ -195,13 +195,13 @@ async function assignEmployeeRoles (client: MigrationClient): Promise<void> {
|
|||||||
client.logger.log('assigning roles to employees...', {})
|
client.logger.log('assigning roles to employees...', {})
|
||||||
|
|
||||||
const wsMembers = await client.accountClient.getWorkspaceMembers()
|
const wsMembers = await client.accountClient.getWorkspaceMembers()
|
||||||
const persons = await client.traverse<Person>(DOMAIN_CONTACT, {
|
const personsIterator = await client.traverse<Person>(DOMAIN_CONTACT, {
|
||||||
_class: contact.class.Person
|
_class: contact.class.Person
|
||||||
})
|
})
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
const docs = await persons.next(50)
|
const docs = await personsIterator.next(50)
|
||||||
if (docs === null || docs?.length === 0) {
|
if (docs === null || docs?.length === 0) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -236,7 +236,7 @@ async function assignEmployeeRoles (client: MigrationClient): Promise<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
await persons.close()
|
await personsIterator.close()
|
||||||
client.logger.log('finished assigning roles to employees...', {})
|
client.logger.log('finished assigning roles to employees...', {})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -402,7 +402,7 @@ async function ensureGlobalPersonsForLocalAccounts (client: MigrationClient): Pr
|
|||||||
async function createUserProfiles (client: MigrationClient): Promise<void> {
|
async function createUserProfiles (client: MigrationClient): Promise<void> {
|
||||||
client.logger.log('creating user profiles for persons...', {})
|
client.logger.log('creating user profiles for persons...', {})
|
||||||
|
|
||||||
const persons = await client.traverse<Person>(DOMAIN_CONTACT, {
|
const personsIterator = await client.traverse<Person>(DOMAIN_CONTACT, {
|
||||||
_class: contact.class.Person,
|
_class: contact.class.Person,
|
||||||
profile: { $exists: false }
|
profile: { $exists: false }
|
||||||
})
|
})
|
||||||
@ -418,7 +418,7 @@ async function createUserProfiles (client: MigrationClient): Promise<void> {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
const docs = await persons.next(200)
|
const docs = await personsIterator.next(200)
|
||||||
if (docs === null || docs?.length === 0) {
|
if (docs === null || docs?.length === 0) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -452,7 +452,7 @@ async function createUserProfiles (client: MigrationClient): Promise<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
await persons.close()
|
await personsIterator.close()
|
||||||
client.logger.log('finished creating user profiles for persons...', {})
|
client.logger.log('finished creating user profiles for persons...', {})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -460,27 +460,31 @@ async function createUserProfiles (client: MigrationClient): Promise<void> {
|
|||||||
async function fixSocialIdCase (client: MigrationClient): Promise<void> {
|
async function fixSocialIdCase (client: MigrationClient): Promise<void> {
|
||||||
client.logger.log('Fixing social id case...', {})
|
client.logger.log('Fixing social id case...', {})
|
||||||
|
|
||||||
const socialIds = await client.traverse<SocialIdentity>(DOMAIN_CHANNEL, {
|
const socialIdsIterator = await client.traverse<SocialIdentity>(DOMAIN_CHANNEL, {
|
||||||
_class: contact.class.SocialIdentity
|
_class: contact.class.SocialIdentity
|
||||||
})
|
})
|
||||||
|
|
||||||
let updated = 0
|
let updated = 0
|
||||||
while (true) {
|
|
||||||
const docs = await socialIds.next(200)
|
|
||||||
if (docs === null || docs?.length === 0) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const d of docs) {
|
try {
|
||||||
const newKey = d.key.toLowerCase()
|
while (true) {
|
||||||
const newVal = d.value.toLowerCase()
|
const docs = await socialIdsIterator.next(200)
|
||||||
if (newKey !== d.key || newVal !== d.value) {
|
if (docs === null || docs?.length === 0) {
|
||||||
await client.update(DOMAIN_CHANNEL, { _id: d._id }, { key: newKey, value: newVal })
|
break
|
||||||
updated++
|
}
|
||||||
|
|
||||||
|
for (const d of docs) {
|
||||||
|
const newKey = d.key.toLowerCase()
|
||||||
|
const newVal = d.value.toLowerCase()
|
||||||
|
if (newKey !== d.key || newVal !== d.value) {
|
||||||
|
await client.update(DOMAIN_CHANNEL, { _id: d._id }, { key: newKey, value: newVal })
|
||||||
|
updated++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
await socialIdsIterator.close()
|
||||||
|
client.logger.log('Finished fixing social id case. Total updated:', { updated })
|
||||||
}
|
}
|
||||||
client.logger.log('Finished fixing social id case. Total updated:', { updated })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const contactOperation: MigrateOperation = {
|
export const contactOperation: MigrateOperation = {
|
||||||
|
@ -270,31 +270,35 @@ export async function migrateBackupMixins (client: MigrationClient): Promise<voi
|
|||||||
|
|
||||||
const txIterator = await client.traverse<TxMixin<Doc, AttachedDoc>>(DOMAIN_TX, { _class: core.class.TxMixin })
|
const txIterator = await client.traverse<TxMixin<Doc, AttachedDoc>>(DOMAIN_TX, { _class: core.class.TxMixin })
|
||||||
|
|
||||||
while (true) {
|
try {
|
||||||
const mixinOps = await txIterator.next(500)
|
while (true) {
|
||||||
if (mixinOps === null || mixinOps.length === 0) break
|
const mixinOps = await txIterator.next(500)
|
||||||
const _classes = groupByArray(mixinOps, (it) => it.objectClass)
|
if (mixinOps === null || mixinOps.length === 0) break
|
||||||
|
const _classes = groupByArray(mixinOps, (it) => it.objectClass)
|
||||||
|
|
||||||
for (const [_class, ops] of _classes.entries()) {
|
for (const [_class, ops] of _classes.entries()) {
|
||||||
const domain = hierarchy.findDomain(_class)
|
const domain = hierarchy.findDomain(_class)
|
||||||
if (domain === undefined) continue
|
if (domain === undefined) continue
|
||||||
let docs = await client.find(domain, { _id: { $in: ops.map((it) => it.objectId) } })
|
let docs = await client.find(domain, { _id: { $in: ops.map((it) => it.objectId) } })
|
||||||
|
|
||||||
docs = docs.filter((it) => {
|
docs = docs.filter((it) => {
|
||||||
// Check if mixin is last operation by modifiedOn
|
// Check if mixin is last operation by modifiedOn
|
||||||
const mops = ops.filter((mi) => mi.objectId === it._id)
|
const mops = ops.filter((mi) => mi.objectId === it._id)
|
||||||
if (mops.length === 0) return false
|
if (mops.length === 0) return false
|
||||||
return mops.some((mi) => mi.modifiedOn === it.modifiedOn && mi.modifiedBy === it.modifiedBy)
|
return mops.some((mi) => mi.modifiedOn === it.modifiedOn && mi.modifiedBy === it.modifiedBy)
|
||||||
})
|
})
|
||||||
|
|
||||||
if (docs.length > 0) {
|
if (docs.length > 0) {
|
||||||
// Check if docs has mixins from list
|
// Check if docs has mixins from list
|
||||||
const toUpdate = docs.filter((it) => hierarchy.findAllMixins(it).length > 0)
|
const toUpdate = docs.filter((it) => hierarchy.findAllMixins(it).length > 0)
|
||||||
if (toUpdate.length > 0) {
|
if (toUpdate.length > 0) {
|
||||||
await client.update(domain, { _id: { $in: toUpdate.map((it) => it._id) } }, { '%hash%': curHash })
|
await client.update(domain, { _id: { $in: toUpdate.map((it) => it._id) } }, { '%hash%': curHash })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
await txIterator.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
//
|
//
|
||||||
import { Sql } from 'postgres'
|
import { Sql, TransactionSql } from 'postgres'
|
||||||
import {
|
import {
|
||||||
type Data,
|
type Data,
|
||||||
type Version,
|
type Version,
|
||||||
@ -249,11 +249,12 @@ implements DbCollection<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async unsafe (sql: string, values: any[], client?: Sql): Promise<any[]> {
|
async unsafe (sql: string, values: any[], client?: Sql): Promise<any[]> {
|
||||||
if (this.options.withRetryClient !== undefined) {
|
if (client !== undefined) {
|
||||||
|
return await client.unsafe(sql, values)
|
||||||
|
} else if (this.options.withRetryClient !== undefined) {
|
||||||
return await this.options.withRetryClient((_client) => _client.unsafe(sql, values))
|
return await this.options.withRetryClient((_client) => _client.unsafe(sql, values))
|
||||||
} else {
|
} else {
|
||||||
const _client = client ?? this.client
|
return await this.client.unsafe(sql, values)
|
||||||
return await _client.unsafe(sql, values)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -696,7 +697,7 @@ export class PostgresAccountDB implements AccountDB {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
withRetry = async <T>(callback: (client: Sql) => Promise<T>): Promise<T> => {
|
withRetry = async <T>(callback: (client: TransactionSql) => Promise<T>): Promise<T> => {
|
||||||
let attempt = 0
|
let attempt = 0
|
||||||
let delay = this.retryOptions.initialDelayMs
|
let delay = this.retryOptions.initialDelayMs
|
||||||
|
|
||||||
@ -723,14 +724,16 @@ export class PostgresAccountDB implements AccountDB {
|
|||||||
return (
|
return (
|
||||||
err.code === '40001' || // Retry transaction
|
err.code === '40001' || // Retry transaction
|
||||||
err.code === '55P03' || // Lock not available
|
err.code === '55P03' || // Lock not available
|
||||||
|
err.code === 'CONNECTION_CLOSED' || // This error is thrown if the connection was closed without an error.
|
||||||
|
err.code === 'CONNECTION_DESTROYED' || // This error is thrown for any queries that were pending when the timeout to sql.end({ timeout: X }) was reached. If the DB client is being closed completely retry will result in CONNECTION_ENDED which is not retried so should be fine.
|
||||||
msg.includes('RETRY_SERIALIZABLE')
|
msg.includes('RETRY_SERIALIZABLE')
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async createWorkspace (data: WorkspaceData, status: WorkspaceStatusData): Promise<WorkspaceUuid> {
|
async createWorkspace (data: WorkspaceData, status: WorkspaceStatusData): Promise<WorkspaceUuid> {
|
||||||
return await this.client.begin(async (client) => {
|
return await this.withRetry(async (rTx) => {
|
||||||
const workspaceUuid = await this.workspace.insertOne(data, client)
|
const workspaceUuid = await this.workspace.insertOne(data, rTx)
|
||||||
await this.workspaceStatus.insertOne({ ...status, workspaceUuid }, client)
|
await this.workspaceStatus.insertOne({ ...status, workspaceUuid }, rTx)
|
||||||
|
|
||||||
return workspaceUuid
|
return workspaceUuid
|
||||||
})
|
})
|
||||||
@ -742,8 +745,10 @@ export class PostgresAccountDB implements AccountDB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async assignWorkspace (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid, role: AccountRole): Promise<void> {
|
async assignWorkspace (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid, role: AccountRole): Promise<void> {
|
||||||
await this
|
await this.withRetry(
|
||||||
.client`INSERT INTO ${this.client(this.getWsMembersTableName())} (workspace_uuid, account_uuid, role) VALUES (${workspaceUuid}, ${accountUuid}, ${role})`
|
async (rTx) =>
|
||||||
|
await rTx`INSERT INTO ${this.client(this.getWsMembersTableName())} (workspace_uuid, account_uuid, role) VALUES (${workspaceUuid}, ${accountUuid}, ${role})`
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async batchAssignWorkspace (data: [AccountUuid, WorkspaceUuid, AccountRole][]): Promise<void> {
|
async batchAssignWorkspace (data: [AccountUuid, WorkspaceUuid, AccountRole][]): Promise<void> {
|
||||||
@ -756,41 +761,51 @@ export class PostgresAccountDB implements AccountDB {
|
|||||||
VALUES ${placeholders}
|
VALUES ${placeholders}
|
||||||
`
|
`
|
||||||
|
|
||||||
await this.client.unsafe(sql, values)
|
await this.withRetry(async (rTx) => await rTx.unsafe(sql, values))
|
||||||
}
|
}
|
||||||
|
|
||||||
async unassignWorkspace (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid): Promise<void> {
|
async unassignWorkspace (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid): Promise<void> {
|
||||||
await this
|
await this.withRetry(
|
||||||
.client`DELETE FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`
|
async (rTx) =>
|
||||||
|
await rTx`DELETE FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateWorkspaceRole (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid, role: AccountRole): Promise<void> {
|
async updateWorkspaceRole (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid, role: AccountRole): Promise<void> {
|
||||||
await this
|
await this.withRetry(
|
||||||
.client`UPDATE ${this.client(this.getWsMembersTableName())} SET role = ${role} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`
|
async (rTx) =>
|
||||||
|
await rTx`UPDATE ${this.client(this.getWsMembersTableName())} SET role = ${role} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async getWorkspaceRole (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid): Promise<AccountRole | null> {
|
async getWorkspaceRole (accountUuid: AccountUuid, workspaceUuid: WorkspaceUuid): Promise<AccountRole | null> {
|
||||||
const res = await this
|
return await this.withRetry(async (rTx) => {
|
||||||
.client`SELECT role FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`
|
const res =
|
||||||
|
await rTx`SELECT role FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid} AND account_uuid = ${accountUuid}`
|
||||||
|
|
||||||
return res[0]?.role ?? null
|
return res[0]?.role ?? null
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async getWorkspaceRoles (accountUuid: AccountUuid): Promise<Map<WorkspaceUuid, AccountRole>> {
|
async getWorkspaceRoles (accountUuid: AccountUuid): Promise<Map<WorkspaceUuid, AccountRole>> {
|
||||||
const res = await this
|
return await this.withRetry(async (rTx) => {
|
||||||
.client`SELECT workspace_uuid, role FROM ${this.client(this.getWsMembersTableName())} WHERE account_uuid = ${accountUuid}`
|
const res =
|
||||||
|
await rTx`SELECT workspace_uuid, role FROM ${this.client(this.getWsMembersTableName())} WHERE account_uuid = ${accountUuid}`
|
||||||
|
|
||||||
return new Map(res.map((it) => [it.workspace_uuid as WorkspaceUuid, it.role]))
|
return new Map(res.map((it) => [it.workspace_uuid as WorkspaceUuid, it.role]))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async getWorkspaceMembers (workspaceUuid: WorkspaceUuid): Promise<WorkspaceMemberInfo[]> {
|
async getWorkspaceMembers (workspaceUuid: WorkspaceUuid): Promise<WorkspaceMemberInfo[]> {
|
||||||
const res: any = await this
|
return await this.withRetry(async (rTx) => {
|
||||||
.client`SELECT account_uuid, role FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid}`
|
const res: any =
|
||||||
|
await rTx`SELECT account_uuid, role FROM ${this.client(this.getWsMembersTableName())} WHERE workspace_uuid = ${workspaceUuid}`
|
||||||
|
|
||||||
return res.map((p: any) => ({
|
return res.map((p: any) => ({
|
||||||
person: p.account_uuid,
|
person: p.account_uuid,
|
||||||
role: p.role
|
role: p.role
|
||||||
}))
|
}))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async getAccountWorkspaces (accountUuid: AccountUuid): Promise<WorkspaceInfoWithStatus[]> {
|
async getAccountWorkspaces (accountUuid: AccountUuid): Promise<WorkspaceInfoWithStatus[]> {
|
||||||
@ -824,15 +839,17 @@ export class PostgresAccountDB implements AccountDB {
|
|||||||
ORDER BY s.last_visit DESC
|
ORDER BY s.last_visit DESC
|
||||||
`
|
`
|
||||||
|
|
||||||
const res: any = await this.client.unsafe(sql, [accountUuid])
|
return await this.withRetry(async (rTx) => {
|
||||||
|
const res: any = await rTx.unsafe(sql, [accountUuid])
|
||||||
|
|
||||||
for (const row of res) {
|
for (const row of res) {
|
||||||
row.created_on = convertTimestamp(row.created_on)
|
row.created_on = convertTimestamp(row.created_on)
|
||||||
row.status.last_processing_time = convertTimestamp(row.status.last_processing_time)
|
row.status.last_processing_time = convertTimestamp(row.status.last_processing_time)
|
||||||
row.status.last_visit = convertTimestamp(row.status.last_visit)
|
row.status.last_visit = convertTimestamp(row.status.last_visit)
|
||||||
}
|
}
|
||||||
|
|
||||||
return convertKeysToCamelCase(res)
|
return convertKeysToCamelCase(res)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async getPendingWorkspace (
|
async getPendingWorkspace (
|
||||||
@ -929,31 +946,34 @@ export class PostgresAccountDB implements AccountDB {
|
|||||||
// Note: SKIP LOCKED is supported starting from Postgres 9.5 and CockroachDB v22.2.1
|
// Note: SKIP LOCKED is supported starting from Postgres 9.5 and CockroachDB v22.2.1
|
||||||
sqlChunks.push('FOR UPDATE SKIP LOCKED')
|
sqlChunks.push('FOR UPDATE SKIP LOCKED')
|
||||||
|
|
||||||
// We must have all the conditions in the DB query and we cannot filter anything in the code
|
return await this.withRetry(async (rTx) => {
|
||||||
// because of possible concurrency between account services.
|
// We must have all the conditions in the DB query and we cannot filter anything in the code
|
||||||
let res: any | undefined
|
// because of possible concurrency between account services.
|
||||||
await this.client.begin(async (client) => {
|
const res: any = await rTx.unsafe(sqlChunks.join(' '), values)
|
||||||
res = await client.unsafe(sqlChunks.join(' '), values)
|
|
||||||
|
|
||||||
if ((res.length ?? 0) > 0) {
|
if ((res.length ?? 0) > 0) {
|
||||||
await client.unsafe(
|
await rTx.unsafe(
|
||||||
`UPDATE ${this.workspaceStatus.getTableName()} SET processing_attempts = processing_attempts + 1, "last_processing_time" = $1 WHERE workspace_uuid = $2`,
|
`UPDATE ${this.workspaceStatus.getTableName()} SET processing_attempts = processing_attempts + 1, "last_processing_time" = $1 WHERE workspace_uuid = $2`,
|
||||||
[Date.now(), res[0].uuid]
|
[Date.now(), res[0].uuid]
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
})
|
|
||||||
|
|
||||||
return convertKeysToCamelCase(res[0]) as WorkspaceInfoWithStatus
|
return convertKeysToCamelCase(res[0]) as WorkspaceInfoWithStatus
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async setPassword (accountUuid: AccountUuid, hash: Buffer, salt: Buffer): Promise<void> {
|
async setPassword (accountUuid: AccountUuid, hash: Buffer, salt: Buffer): Promise<void> {
|
||||||
await this
|
await this.withRetry(
|
||||||
.client`UPSERT INTO ${this.client(this.account.getPasswordsTableName())} (account_uuid, hash, salt) VALUES (${accountUuid}, ${hash.buffer as any}::bytea, ${salt.buffer as any}::bytea)`
|
async (rTx) =>
|
||||||
|
await rTx`UPSERT INTO ${this.client(this.account.getPasswordsTableName())} (account_uuid, hash, salt) VALUES (${accountUuid}, ${hash.buffer as any}::bytea, ${salt.buffer as any}::bytea)`
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async resetPassword (accountUuid: AccountUuid): Promise<void> {
|
async resetPassword (accountUuid: AccountUuid): Promise<void> {
|
||||||
await this
|
await this.withRetry(
|
||||||
.client`DELETE FROM ${this.client(this.account.getPasswordsTableName())} WHERE account_uuid = ${accountUuid}`
|
async (rTx) =>
|
||||||
|
await rTx`DELETE FROM ${this.client(this.account.getPasswordsTableName())} WHERE account_uuid = ${accountUuid}`
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected getMigrations (): [string, string][] {
|
protected getMigrations (): [string, string][] {
|
||||||
|
@ -66,7 +66,11 @@ import { isAdminEmail } from './admin'
|
|||||||
export const GUEST_ACCOUNT = 'b6996120-416f-49cd-841e-e4a5d2e49c9b'
|
export const GUEST_ACCOUNT = 'b6996120-416f-49cd-841e-e4a5d2e49c9b'
|
||||||
export const READONLY_GUEST_ACCOUNT = '83bbed9a-0867-4851-be32-31d49d1d42ce'
|
export const READONLY_GUEST_ACCOUNT = '83bbed9a-0867-4851-be32-31d49d1d42ce'
|
||||||
|
|
||||||
export async function getAccountDB (uri: string, dbNs?: string): Promise<[AccountDB, () => void]> {
|
export async function getAccountDB (
|
||||||
|
uri: string,
|
||||||
|
dbNs?: string,
|
||||||
|
appName: string = 'account'
|
||||||
|
): Promise<[AccountDB, () => void]> {
|
||||||
const isMongo = uri.startsWith('mongodb://')
|
const isMongo = uri.startsWith('mongodb://')
|
||||||
|
|
||||||
if (isMongo) {
|
if (isMongo) {
|
||||||
@ -85,7 +89,7 @@ export async function getAccountDB (uri: string, dbNs?: string): Promise<[Accoun
|
|||||||
} else {
|
} else {
|
||||||
setDBExtraOptions({
|
setDBExtraOptions({
|
||||||
connection: {
|
connection: {
|
||||||
application_name: 'account'
|
application_name: appName
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
const client = getDBClient(sharedPipelineContextVars, uri)
|
const client = getDBClient(sharedPipelineContextVars, uri)
|
||||||
|
@ -223,7 +223,7 @@ class ConnectionMgr {
|
|||||||
await client.execute('ROLLBACK;')
|
await client.execute('ROLLBACK;')
|
||||||
console.error({ message: 'failed to process tx', error: err.message, cause: err })
|
console.error({ message: 'failed to process tx', error: err.message, cause: err })
|
||||||
|
|
||||||
if (err.code !== '40001' || tries === maxTries) {
|
if (!this.isRetryableError(err) || tries === maxTries) {
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
console.log('Transaction failed. Retrying.')
|
console.log('Transaction failed. Retrying.')
|
||||||
@ -267,7 +267,7 @@ class ConnectionMgr {
|
|||||||
return { result: await fn(client) }
|
return { result: await fn(client) }
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
console.error({ message: 'failed to process sql', error: err.message, cause: err })
|
console.error({ message: 'failed to process sql', error: err.message, cause: err })
|
||||||
if (err.code !== '40001' || tries === maxTries) {
|
if (!this.isRetryableError(err) || tries === maxTries) {
|
||||||
return err
|
return err
|
||||||
} else {
|
} else {
|
||||||
console.log('Read Transaction failed. Retrying.')
|
console.log('Read Transaction failed. Retrying.')
|
||||||
@ -330,6 +330,18 @@ class ConnectionMgr {
|
|||||||
}
|
}
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private isRetryableError (err: any): boolean {
|
||||||
|
const msg: string = err?.message ?? ''
|
||||||
|
|
||||||
|
return (
|
||||||
|
err.code === '40001' || // Retry transaction
|
||||||
|
err.code === '55P03' || // Lock not available
|
||||||
|
err.code === 'CONNECTION_CLOSED' || // This error is thrown if the connection was closed without an error.
|
||||||
|
err.code === 'CONNECTION_DESTROYED' || // This error is thrown for any queries that were pending when the timeout to sql.end({ timeout: X }) was reached. If the DB client is being closed completely retry will result in CONNECTION_ENDED which is not retried so should be fine.
|
||||||
|
msg.includes('RETRY_SERIALIZABLE')
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ValuesVariables {
|
class ValuesVariables {
|
||||||
|
@ -323,7 +323,7 @@ export async function upgradeModel (
|
|||||||
modelDb,
|
modelDb,
|
||||||
pipeline,
|
pipeline,
|
||||||
async (value) => {
|
async (value) => {
|
||||||
await progress(90 + (Math.min(value, 100) / 100) * 10)
|
await progress(10 + (Math.min(value, 100) / 100) * 10)
|
||||||
},
|
},
|
||||||
wsIds.uuid
|
wsIds.uuid
|
||||||
)
|
)
|
||||||
|
@ -52,6 +52,7 @@ import {
|
|||||||
createPostgreeDestroyAdapter,
|
createPostgreeDestroyAdapter,
|
||||||
createPostgresAdapter,
|
createPostgresAdapter,
|
||||||
createPostgresTxAdapter,
|
createPostgresTxAdapter,
|
||||||
|
setDBExtraOptions,
|
||||||
shutdownPostgres
|
shutdownPostgres
|
||||||
} from '@hcengineering/postgres'
|
} from '@hcengineering/postgres'
|
||||||
import { doBackupWorkspace, doRestoreWorkspace } from '@hcengineering/server-backup'
|
import { doBackupWorkspace, doRestoreWorkspace } from '@hcengineering/server-backup'
|
||||||
@ -107,6 +108,7 @@ export type WorkspaceOperation = 'create' | 'upgrade' | 'all' | 'all+backup'
|
|||||||
export class WorkspaceWorker {
|
export class WorkspaceWorker {
|
||||||
runningTasks: number = 0
|
runningTasks: number = 0
|
||||||
resolveBusy: (() => void) | null = null
|
resolveBusy: (() => void) | null = null
|
||||||
|
id = randomUUID().slice(-8)
|
||||||
|
|
||||||
constructor (
|
constructor (
|
||||||
readonly workspaceQueue: PlatformQueueProducer<QueueWorkspaceMessage>,
|
readonly workspaceQueue: PlatformQueueProducer<QueueWorkspaceMessage>,
|
||||||
@ -146,6 +148,7 @@ export class WorkspaceWorker {
|
|||||||
this.wakeup = this.defaultWakeup
|
this.wakeup = this.defaultWakeup
|
||||||
const token = generateToken(systemAccountUuid, undefined, { service: 'workspace' })
|
const token = generateToken(systemAccountUuid, undefined, { service: 'workspace' })
|
||||||
|
|
||||||
|
ctx.info(`Starting workspace service worker ${this.id} with limit ${this.limit}...`)
|
||||||
ctx.info('Sending a handshake to the account service...')
|
ctx.info('Sending a handshake to the account service...')
|
||||||
const accountClient = getAccountClient(this.accountsUrl, token)
|
const accountClient = getAccountClient(this.accountsUrl, token)
|
||||||
|
|
||||||
@ -158,12 +161,14 @@ export class WorkspaceWorker {
|
|||||||
)
|
)
|
||||||
break
|
break
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
ctx.error('error', { err })
|
ctx.error('error during handshake', { err })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.info('Successfully connected to the account service')
|
ctx.info('Successfully connected to the account service')
|
||||||
|
|
||||||
|
setDBExtraOptions({ connection: { application_name: `workspace-${this.id}` } })
|
||||||
|
|
||||||
registerTxAdapterFactory('mongodb', createMongoTxAdapter)
|
registerTxAdapterFactory('mongodb', createMongoTxAdapter)
|
||||||
registerAdapterFactory('mongodb', createMongoAdapter)
|
registerAdapterFactory('mongodb', createMongoAdapter)
|
||||||
registerDestroyFactory('mongodb', createMongoDestroyAdapter)
|
registerDestroyFactory('mongodb', createMongoDestroyAdapter)
|
||||||
@ -187,6 +192,7 @@ export class WorkspaceWorker {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
if (workspace == null) {
|
if (workspace == null) {
|
||||||
|
// no workspaces available, sleep before another attempt
|
||||||
await this.doSleep(ctx, opt)
|
await this.doSleep(ctx, opt)
|
||||||
} else {
|
} else {
|
||||||
void this.exec(async () => {
|
void this.exec(async () => {
|
||||||
@ -196,9 +202,11 @@ export class WorkspaceWorker {
|
|||||||
await this.doWorkspaceOperation(opContext, workspace, opt)
|
await this.doWorkspaceOperation(opContext, workspace, opt)
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
Analytics.handleError(err)
|
Analytics.handleError(err)
|
||||||
ctx.error('error', { err })
|
opContext.error('error', { err })
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
// sleep for a little bit to avoid bombarding the account service, also add jitter to avoid simultaneous requests from multiple workspace services
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, Math.random() * 400 + 200))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -286,7 +294,7 @@ export class WorkspaceWorker {
|
|||||||
|
|
||||||
await this.workspaceQueue.send(ws.uuid, [workspaceEvents.created()])
|
await this.workspaceQueue.send(ws.uuid, [workspaceEvents.created()])
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
await opt.errorHandler(ws, err)
|
void opt.errorHandler(ws, err)
|
||||||
|
|
||||||
logger.log('error', err)
|
logger.log('error', err)
|
||||||
|
|
||||||
@ -385,7 +393,7 @@ export class WorkspaceWorker {
|
|||||||
})
|
})
|
||||||
await this.workspaceQueue.send(ws.uuid, [workspaceEvents.upgraded()])
|
await this.workspaceQueue.send(ws.uuid, [workspaceEvents.upgraded()])
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
await opt.errorHandler(ws, err)
|
void opt.errorHandler(ws, err)
|
||||||
|
|
||||||
logger.log('error', err)
|
logger.log('error', err)
|
||||||
|
|
||||||
@ -720,8 +728,10 @@ export class WorkspaceWorker {
|
|||||||
resolve()
|
resolve()
|
||||||
this.wakeup = this.defaultWakeup
|
this.wakeup = this.defaultWakeup
|
||||||
}
|
}
|
||||||
// sleep for 5 seconds for the next operation, or until a wakeup event
|
// sleep for N (5 by default) seconds for the next operation, or until a wakeup event
|
||||||
const sleepHandle = setTimeout(wakeup, opt.waitTimeout)
|
// add jitter to avoid simultaneous requests from multiple workspace services
|
||||||
|
const maxJitter = opt.waitTimeout * 0.2
|
||||||
|
const sleepHandle = setTimeout(wakeup, opt.waitTimeout + Math.random() * maxJitter)
|
||||||
|
|
||||||
this.wakeup = () => {
|
this.wakeup = () => {
|
||||||
clearTimeout(sleepHandle)
|
clearTimeout(sleepHandle)
|
||||||
|
@ -168,7 +168,8 @@ export async function createWorkspace (
|
|||||||
|
|
||||||
await handleWsEvent?.('create-done', version, 100, '')
|
await handleWsEvent?.('create-done', version, 100, '')
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
await handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`)
|
void handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`)
|
||||||
|
throw err
|
||||||
} finally {
|
} finally {
|
||||||
await pipeline.close()
|
await pipeline.close()
|
||||||
await storageAdapter.close()
|
await storageAdapter.close()
|
||||||
@ -350,7 +351,7 @@ export async function upgradeWorkspaceWith (
|
|||||||
await handleWsEvent?.('upgrade-done', version, 100, '')
|
await handleWsEvent?.('upgrade-done', version, 100, '')
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
ctx.error('upgrade-failed', { message: err.message })
|
ctx.error('upgrade-failed', { message: err.message })
|
||||||
await handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`)
|
void handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`)
|
||||||
throw err
|
throw err
|
||||||
} finally {
|
} finally {
|
||||||
clearInterval(updateProgressHandle)
|
clearInterval(updateProgressHandle)
|
||||||
|
@ -160,7 +160,7 @@ async function migrateFixMissingDocSyncInfo (client: MigrationClient): Promise<v
|
|||||||
[github.mixin.GithubProject]: { $exists: true }
|
[github.mixin.GithubProject]: { $exists: true }
|
||||||
})
|
})
|
||||||
for (const p of projects) {
|
for (const p of projects) {
|
||||||
const issues = await client.traverse<Issue>(
|
const issuesIterator = await client.traverse<Issue>(
|
||||||
DOMAIN_TASK,
|
DOMAIN_TASK,
|
||||||
{
|
{
|
||||||
_class: tracker.class.Issue,
|
_class: tracker.class.Issue,
|
||||||
@ -178,52 +178,56 @@ async function migrateFixMissingDocSyncInfo (client: MigrationClient): Promise<v
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
let counter = 0
|
let counter = 0
|
||||||
while (true) {
|
try {
|
||||||
const docs = await issues.next(1000)
|
while (true) {
|
||||||
if (docs === null || docs.length === 0) {
|
const docs = await issuesIterator.next(1000)
|
||||||
break
|
if (docs === null || docs.length === 0) {
|
||||||
}
|
break
|
||||||
const infos = await client.find(
|
|
||||||
DOMAIN_GITHUB,
|
|
||||||
{
|
|
||||||
_class: github.class.DocSyncInfo,
|
|
||||||
_id: { $in: docs.map((it) => it._id as unknown as Ref<DocSyncInfo>) }
|
|
||||||
},
|
|
||||||
{
|
|
||||||
projection: {
|
|
||||||
_id: 1
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
const infos = await client.find(
|
||||||
const infoIds = toIdMap(infos)
|
DOMAIN_GITHUB,
|
||||||
let repository: Ref<GithubIntegrationRepository> | null = null
|
{
|
||||||
for (const issue of docs) {
|
|
||||||
if (!infoIds.has(issue._id)) {
|
|
||||||
if (client.hierarchy.hasMixin(issue, github.mixin.GithubIssue)) {
|
|
||||||
repository = client.hierarchy.as(issue, github.mixin.GithubIssue).repository
|
|
||||||
}
|
|
||||||
counter++
|
|
||||||
// Missing
|
|
||||||
await client.create<DocSyncInfo>(DOMAIN_GITHUB, {
|
|
||||||
_class: github.class.DocSyncInfo,
|
_class: github.class.DocSyncInfo,
|
||||||
_id: issue._id as any,
|
_id: { $in: docs.map((it) => it._id as unknown as Ref<DocSyncInfo>) }
|
||||||
url: '',
|
},
|
||||||
githubNumber: 0,
|
{
|
||||||
repository,
|
projection: {
|
||||||
objectClass: issue._class,
|
_id: 1
|
||||||
externalVersion: '#', // We need to put this one to handle new documents.
|
}
|
||||||
needSync: '',
|
}
|
||||||
derivedVersion: '',
|
)
|
||||||
attachedTo: issue.attachedTo ?? tracker.ids.NoParent,
|
const infoIds = toIdMap(infos)
|
||||||
space: issue.space,
|
let repository: Ref<GithubIntegrationRepository> | null = null
|
||||||
modifiedBy: issue.modifiedBy,
|
for (const issue of docs) {
|
||||||
modifiedOn: issue.modifiedOn
|
if (!infoIds.has(issue._id)) {
|
||||||
})
|
if (client.hierarchy.hasMixin(issue, github.mixin.GithubIssue)) {
|
||||||
|
repository = client.hierarchy.as(issue, github.mixin.GithubIssue).repository
|
||||||
|
}
|
||||||
|
counter++
|
||||||
|
// Missing
|
||||||
|
await client.create<DocSyncInfo>(DOMAIN_GITHUB, {
|
||||||
|
_class: github.class.DocSyncInfo,
|
||||||
|
_id: issue._id as any,
|
||||||
|
url: '',
|
||||||
|
githubNumber: 0,
|
||||||
|
repository,
|
||||||
|
objectClass: issue._class,
|
||||||
|
externalVersion: '#', // We need to put this one to handle new documents.
|
||||||
|
needSync: '',
|
||||||
|
derivedVersion: '',
|
||||||
|
attachedTo: issue.attachedTo ?? tracker.ids.NoParent,
|
||||||
|
space: issue.space,
|
||||||
|
modifiedBy: issue.modifiedBy,
|
||||||
|
modifiedOn: issue.modifiedOn
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} finally {
|
||||||
if (counter > 0) {
|
await issuesIterator.close()
|
||||||
console.log('Created', counter, 'DocSyncInfos')
|
if (counter > 0) {
|
||||||
|
console.log('Created', counter, 'DocSyncInfos')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user