// // Copyright © 2022 Hardcore Engineering Inc. // // Licensed under the Eclipse Public License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. You may // obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // // See the License for the specific language governing permissions and // limitations under the License. // import { Attachment } from '@hcengineering/attachment' import core, { AttachedDoc, Class, Doc, DocumentQuery, Domain, DOMAIN_TX, FindOptions, FindResult, generateId, Hierarchy, MeasureMetricsContext, metricsToString, ModelDb, newMetrics, Ref, ServerStorage, StorageIterator, Tx, TxCollectionCUD, TxCreateDoc, TxMixin, TxProcessor, TxRemoveDoc, TxResult, TxUpdateDoc } from '@hcengineering/core' import { createElasticAdapter } from '@hcengineering/elastic' import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment' import { createMongoAdapter, createMongoTxAdapter } from '@hcengineering/mongo' import { addLocation } from '@hcengineering/platform' import { serverAttachmentId } from '@hcengineering/server-attachment' import { serverCalendarId } from '@hcengineering/server-calendar' import { serverChunterId } from '@hcengineering/server-chunter' import { serverContactId } from '@hcengineering/server-contact' import { createServerStorage, DbAdapter, DbConfiguration, FullTextAdapter, FullTextIndex, IndexedDoc, TxAdapter } from '@hcengineering/server-core' import { serverGmailId } from '@hcengineering/server-gmail' import { serverInventoryId } from '@hcengineering/server-inventory' import { serverLeadId } from '@hcengineering/server-lead' import { serverNotificationId } from '@hcengineering/server-notification' import { serverRecruitId } from '@hcengineering/server-recruit' import { serverSettingId } from '@hcengineering/server-setting' import { serverTagsId } from '@hcengineering/server-tags' import { serverTaskId } from '@hcengineering/server-task' import { serverTrackerId } from '@hcengineering/server-tracker' import { serverTelegramId } from '@hcengineering/server-telegram' import { serverHrId } from '@hcengineering/server-hr' import { Client as ElasticClient } from '@elastic/elasticsearch' import { Client } from 'minio' import { Db, MongoClient } from 'mongodb' import { listMinioObjects } from './minio' export async function rebuildElastic ( mongoUrl: string, dbName: string, minio: Client, elasticUrl: string ): Promise { await dropElastic(elasticUrl, dbName) return await restoreElastic(mongoUrl, dbName, minio, elasticUrl) } async function dropElastic (elasticUrl: string, dbName: string): Promise { console.log('drop existing elastic docment') const client = new ElasticClient({ node: elasticUrl }) await new Promise((resolve, reject) => { client.indices.exists( { index: dbName }, (err: any, result: any) => { if (err != null) reject(err) if (result.body === true) { client.indices.delete( { index: dbName }, (err: any, result: any) => { if (err != null) reject(err) resolve(result) } ) } else { resolve(result) } } ) }) await client.close() } export class ElasticTool { mongoClient: MongoClient elastic!: FullTextAdapter & { close: () => Promise } storage!: ServerStorage db!: Db fulltext!: FullTextIndex constructor (readonly mongoUrl: string, readonly dbName: string, readonly minio: Client, readonly elasticUrl: string) { addLocation(serverAttachmentId, () => import('@hcengineering/server-attachment-resources')) addLocation(serverContactId, () => import('@hcengineering/server-contact-resources')) addLocation(serverNotificationId, () => import('@hcengineering/server-notification-resources')) addLocation(serverChunterId, () => import('@hcengineering/server-chunter-resources')) addLocation(serverInventoryId, () => import('@hcengineering/server-inventory-resources')) addLocation(serverLeadId, () => import('@hcengineering/server-lead-resources')) addLocation(serverRecruitId, () => import('@hcengineering/server-recruit-resources')) addLocation(serverSettingId, () => import('@hcengineering/server-setting-resources')) addLocation(serverTaskId, () => import('@hcengineering/server-task-resources')) addLocation(serverTrackerId, () => import('@hcengineering/server-tracker-resources')) addLocation(serverTagsId, () => import('@hcengineering/server-tags-resources')) addLocation(serverCalendarId, () => import('@hcengineering/server-calendar-resources')) addLocation(serverGmailId, () => import('@hcengineering/server-gmail-resources')) addLocation(serverTelegramId, () => import('@hcengineering/server-telegram-resources')) addLocation(serverHrId, () => import('@hcengineering/server-hr-resources')) this.mongoClient = new MongoClient(mongoUrl) } async connect (): Promise<() => Promise> { await this.mongoClient.connect() this.db = this.mongoClient.db(this.dbName) this.elastic = await createElasticAdapter(this.elasticUrl, this.dbName) this.storage = await createStorage(this.mongoUrl, this.elasticUrl, this.dbName) this.fulltext = new FullTextIndex(this.storage.hierarchy, this.elastic, this.storage, true) return async () => { await this.mongoClient.close() await this.elastic.close() await this.storage.close() } } async indexAttachment (name: string): Promise { const doc: Attachment | null = await this.db.collection(DOMAIN_ATTACHMENT).findOne({ file: name }) if (doc == null) return const buffer = await this.readMinioObject(name) await this.indexAttachmentDoc(doc, buffer) } async indexAttachmentDoc (doc: Attachment, buffer: Buffer): Promise { const id: Ref = (generateId() + '/attachments/') as Ref const indexedDoc: IndexedDoc = { id, _class: doc._class, space: doc.space, modifiedOn: doc.modifiedOn, modifiedBy: core.account.System, attachedTo: doc.attachedTo, data: buffer.toString('base64') } await this.elastic.index(indexedDoc) } private async readMinioObject (name: string): Promise { const data = await this.minio.getObject(this.dbName, name) const chunks: Buffer[] = [] await new Promise((resolve) => { data.on('readable', () => { let chunk while ((chunk = data.read()) !== null) { const b = chunk as Buffer chunks.push(b) } }) data.on('end', () => { resolve() }) }) return Buffer.concat(chunks) } } async function restoreElastic (mongoUrl: string, dbName: string, minio: Client, elasticUrl: string): Promise { const tool = new ElasticTool(mongoUrl, dbName, minio, elasticUrl) const done = await tool.connect() try { const data = await tool.db.collection(DOMAIN_TX).find().toArray() const m = newMetrics() const metricsCtx = new MeasureMetricsContext('elastic', {}, m) const isCreateTx = (tx: Tx): boolean => tx._class === core.class.TxCreateDoc const isCollectionCreateTx = (tx: Tx): boolean => tx._class === core.class.TxCollectionCUD && (tx as TxCollectionCUD).tx._class === core.class.TxCreateDoc const isMixinTx = (tx: Tx): boolean => tx._class === core.class.TxMixin || (tx._class === core.class.TxCollectionCUD && (tx as TxCollectionCUD).tx._class === core.class.TxMixin) const createTxes = data.filter((tx) => isCreateTx(tx)) const collectionTxes = data.filter((tx) => isCollectionCreateTx(tx)) const mixinTxes = data.filter((tx) => isMixinTx(tx)) const removedDocument = new Set>() const startCreate = Date.now() console.log('replay elastic create transactions', createTxes.length) await Promise.all( createTxes.map(async (tx) => { const createTx = tx as TxCreateDoc try { const docSnapshot = ( await tool.storage.findAll(metricsCtx, createTx.objectClass, { _id: createTx.objectId }, { limit: 1 }) ).shift() if (docSnapshot !== undefined) { // If there is no doc, then it is removed, not need to do something with elastic. const { _class, _id, modifiedBy, modifiedOn, space, ...docData } = docSnapshot try { const newTx: TxCreateDoc = { ...createTx, attributes: docData, modifiedBy, modifiedOn, objectSpace: space // <- it could be moved, let's take actual one. } await tool.fulltext.tx(metricsCtx, newTx) } catch (err: any) { console.error('failed to replay tx', tx, err.message) } } else { removedDocument.add(createTx.objectId) } } catch (e) { console.error('failed to find object', tx, e) } }) ) console.log('replay elastic create transactions done', Date.now() - startCreate) const startCollection = Date.now() console.log('replay elastic collection transactions', collectionTxes.length) await Promise.all( collectionTxes.map(async (tx) => { const collTx = tx as TxCollectionCUD const createTx = collTx.tx as unknown as TxCreateDoc try { const docSnapshot = ( await tool.storage.findAll(metricsCtx, createTx.objectClass, { _id: createTx.objectId }, { limit: 1 }) ).shift() as AttachedDoc if (docSnapshot !== undefined) { // If there is no doc, then it is removed, not need to do something with elastic. const { _class, _id, modifiedBy, modifiedOn, space, ...data } = docSnapshot try { const newTx: TxCreateDoc = { ...createTx, attributes: data, modifiedBy, modifiedOn, objectSpace: space // <- it could be moved, let's take actual one. } collTx.tx = newTx collTx.modifiedBy = modifiedBy collTx.modifiedOn = modifiedOn collTx.objectSpace = space await tool.fulltext.tx(metricsCtx, collTx) } catch (err: any) { console.error('failed to replay tx', tx, err.message) } } } catch (e) { console.error('failed to find object', tx, e) } }) ) console.log('replay elastic collection transactions done', Date.now() - startCollection) const startMixin = Date.now() console.log('replay elastic mixin transactions', mixinTxes.length) await Promise.all( mixinTxes.map(async (tx) => { try { let deleted = false if (tx._class === core.class.TxMixin) { deleted = removedDocument.has((tx as TxMixin).objectId) } if ( tx._class === core.class.TxCollectionCUD && (tx as TxCollectionCUD).tx._class === core.class.TxMixin ) { deleted = removedDocument.has((tx as TxCollectionCUD).tx.objectId) } if (!deleted) { await tool.storage.tx(metricsCtx, tx) } } catch (err: any) { console.error('failed to replay tx', tx, err.message) } }) ) console.log('replay elastic mixin transactions done', Date.now() - startMixin) let apos = 0 if (await minio.bucketExists(dbName)) { const minioObjects = await listMinioObjects(minio, dbName) console.log('reply elastic documents', minioObjects.length) for (const d of minioObjects) { apos++ try { await tool.indexAttachment(d.name) } catch (err: any) { console.error(err) } if (apos % 100 === 0) { console.log('replay minio documents', apos, minioObjects.length) } } } console.log('replay elastic transactions done') console.log(metricsToString(m)) } finally { console.log('Elastic restore done') await done() } } async function createStorage (mongoUrl: string, elasticUrl: string, workspace: string): Promise { const conf: DbConfiguration = { domains: { [DOMAIN_TX]: 'MongoTx' }, defaultAdapter: 'Mongo', adapters: { MongoTx: { factory: createMongoReadOnlyTxAdapter, url: mongoUrl }, Mongo: { factory: createMongoReadOnlyAdapter, url: mongoUrl } }, fulltextAdapter: { factory: createElasticAdapter, url: elasticUrl }, workspace } return await createServerStorage(conf, { skipUpdateAttached: true }) } async function createMongoReadOnlyAdapter ( hierarchy: Hierarchy, url: string, dbName: string, modelDb: ModelDb ): Promise { const adapter = await createMongoAdapter(hierarchy, url, dbName, modelDb) return new MongoReadOnlyAdapter(adapter) } async function createMongoReadOnlyTxAdapter ( hierarchy: Hierarchy, url: string, dbName: string, modelDb: ModelDb ): Promise { const adapter = await createMongoTxAdapter(hierarchy, url, dbName, modelDb) return new MongoReadOnlyTxAdapter(adapter) } class MongoReadOnlyAdapter extends TxProcessor implements DbAdapter { constructor (protected readonly adapter: DbAdapter) { super() } protected txCreateDoc (tx: TxCreateDoc): Promise { throw new Error('Method not implemented.') } protected txUpdateDoc (tx: TxUpdateDoc): Promise { throw new Error('Method not implemented.') } protected txRemoveDoc (tx: TxRemoveDoc): Promise { throw new Error('Method not implemented.') } protected txMixin (tx: TxMixin): Promise { throw new Error('Method not implemented.') } async init (model: Tx[]): Promise { return await this.adapter.init(model) } async findAll( _class: Ref>, query: DocumentQuery, options?: FindOptions ): Promise> { return await this.adapter.findAll(_class, query, options) } override tx (tx: Tx): Promise { return new Promise((resolve) => resolve({})) } async close (): Promise { await this.adapter.close() } find (domain: Domain): StorageIterator { return { next: async () => undefined, close: async () => {} } } async load (domain: Domain, docs: Ref[]): Promise { return [] } async upload (domain: Domain, docs: Doc[]): Promise {} async clean (domain: Domain, docs: Ref[]): Promise {} } class MongoReadOnlyTxAdapter extends MongoReadOnlyAdapter implements TxAdapter { constructor (protected readonly adapter: TxAdapter) { super(adapter) } async getModel (): Promise { return await this.adapter.getModel() } }