UBER-1086: Fixed Elastic scroll contexts overflow issue, added tests for Elastic (#4124)

This commit is contained in:
Pete Anøther 2023-12-01 22:26:09 -03:00 committed by GitHub
parent 27740f3f35
commit 676abfb67c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 114 additions and 43 deletions

View File

@ -135,7 +135,6 @@ jobs:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cache build results
uses: actions/cache@v3
env:
@ -144,13 +143,19 @@ jobs:
path: ${{ env.CacheFolders}}
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ github.sha }}
restore-keys: ${{ runner.os }}-build-${{ env.cache-name }}-${{ github.sha }}
- name: Start MongoDB
uses: supercharge/mongodb-github-action@v1.10.0
with:
mongodb-version: 5.0
- name: Docker build
run: node common/scripts/install-run-rush.js docker:build
env:
DOCKER_CLI_HINTS: false
- name: Prepare server
run: |
cd ./tests
./prepare.sh
- name: Testing...
run: node common/scripts/install-run-rush.js test --verbose
env:
ELASTIC_URL: 'http://localhost:9201'
MONGO_URL: 'mongodb://localhost:27018'
uitest:
needs: build
runs-on: ubuntu-latest
@ -166,12 +171,10 @@ jobs:
path: ${{ env.CacheFolders}}
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ github.sha }}
restore-keys: ${{ runner.os }}-build-${{ env.cache-name }}-${{ github.sha }}
- name: Docker build
run: node common/scripts/install-run-rush.js docker:build
env:
DOCKER_CLI_HINTS: false
- name: Prepare server
run: |
cd ./tests

View File

@ -25,7 +25,7 @@ const DB_NAME = 'test_accounts'
const methods = getMethods(version, builder().getTxes(), migrateOperations)
describe('server', () => {
const dbUri = process.env.MONGODB_URI ?? 'mongodb://localhost:27017'
const dbUri = process.env.MONGO_URL ?? 'mongodb://localhost:27017'
let conn: MongoClient
let db: Db
let workspace: string = 'ws-' + randomBytes(8).toString('hex')

View File

@ -376,8 +376,8 @@ export async function backup (
backupIndex = '0' + backupIndex
}
for (const c of domains) {
console.log('dumping domain...', c)
for (const domain of domains) {
console.log('dumping domain...', domain)
const changes: Snapshot = {
added: new Map(),
@ -404,7 +404,7 @@ export async function backup (
}
// Cumulative digest
const digest = await loadDigest(storage, backupInfo.snapshots, c)
const digest = await loadDigest(storage, backupInfo.snapshots, domain)
let idx: number | undefined
@ -419,7 +419,7 @@ export async function backup (
// Load all digest from collection.
while (true) {
try {
const it = await connection.loadChunk(c, idx)
const it = await connection.loadChunk(domain, idx)
idx = it.idx
const needRetrieve: Ref<Doc>[] = []
@ -467,7 +467,7 @@ export async function backup (
console.log('Retrieve chunk:', needRetrieve.length)
let docs: Doc[] = []
try {
docs = await connection.loadDocs(c, needRetrieve)
docs = await connection.loadDocs(domain, needRetrieve)
} catch (err: any) {
console.log(err)
// Put back.
@ -482,12 +482,12 @@ export async function backup (
addedDocuments = 0
if (changed > 0) {
snapshot.domains[c] = domainInfo
snapshot.domains[domain] = domainInfo
domainInfo.added += processedChanges.added.size
domainInfo.updated += processedChanges.updated.size
domainInfo.removed += processedChanges.removed.length
const snapshotFile = join(backupIndex, `${c}-${snapshot.date}-${snapshotIndex}.snp.gz`)
const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`)
snapshotIndex++
domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile]
await writeChanges(storage, snapshotFile, processedChanges)
@ -501,8 +501,8 @@ export async function backup (
if (_pack === undefined) {
_pack = pack()
stIndex++
const storageFile = join(backupIndex, `${c}-data-${snapshot.date}-${stIndex}.tar.gz`)
console.log('storing from domain', c, storageFile)
const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`)
console.log('storing from domain', domain, storageFile)
domainInfo.storage = [...(domainInfo.storage ?? []), storageFile]
const dataStream = await storage.write(storageFile)
const storageZip = createGzip()
@ -553,12 +553,12 @@ export async function backup (
}
if (changed > 0) {
snapshot.domains[c] = domainInfo
snapshot.domains[domain] = domainInfo
domainInfo.added += processedChanges.added.size
domainInfo.updated += processedChanges.updated.size
domainInfo.removed += processedChanges.removed.length
const snapshotFile = join(backupIndex, `${c}-${snapshot.date}-${snapshotIndex}.snp.gz`)
const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`)
snapshotIndex++
domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile]
await writeChanges(storage, snapshotFile, processedChanges)

View File

@ -2,6 +2,6 @@ module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'],
roots: ["./src"],
coverageReporters: ["text-summary", "html"]
roots: ['./src'],
coverageReporters: ['text-summary', 'html']
}

View File

@ -4,9 +4,6 @@
"main": "lib/index.js",
"author": "Anticrm Platform Contributors",
"license": "EPL-2.0",
"#override": [
"test"
],
"scripts": {
"init": "ts-node src/__init.ts",
"build": "tsc",
@ -14,7 +11,7 @@
"lint:fix": "eslint --fix src",
"lint": "eslint src",
"format": "format src",
"test": ""
"test": "jest --passWithNoTests --silent --forceExit"
},
"devDependencies": {
"@hcengineering/platform-rig": "^0.6.0",

View File

@ -15,17 +15,30 @@
//
import { Account, Class, Doc, getWorkspaceId, MeasureMetricsContext, Ref, Space } from '@hcengineering/core'
import type { IndexedDoc } from '@hcengineering/server-core'
import type { FullTextAdapter, IndexedDoc } from '@hcengineering/server-core'
import { createElasticAdapter } from '../adapter'
describe('client', () => {
it('should create document', async () => {
const adapter = await createElasticAdapter(
'http://localhost:9200/',
describe('Elastic Adapter', () => {
let adapter: FullTextAdapter
beforeEach(async () => {
adapter = await createElasticAdapter(
process.env.ELASTIC_URL ?? 'http://localhost:9200/',
getWorkspaceId('ws1', ''),
new MeasureMetricsContext('-', {})
)
})
afterEach(async () => {
await adapter.close()
})
it('should init', () => {
expect(adapter).toBeTruthy()
})
it('should create document', async () => {
const doc: IndexedDoc = {
id: 'doc1' as Ref<Doc>,
_class: 'class1' as Ref<Class<Doc>>,
@ -40,11 +53,6 @@ describe('client', () => {
})
it('should find document with raw search', async () => {
const adapter = await createElasticAdapter(
'http://localhost:9200/',
getWorkspaceId('ws1', ''),
new MeasureMetricsContext('-', {})
)
const result = await adapter.searchString(
{
query: 'hey'

View File

@ -0,0 +1,57 @@
import { DbAdapter } from '@hcengineering/server-core'
import { Domain, getWorkspaceId, Hierarchy } from '@hcengineering/core'
import { createElasticBackupDataAdapter } from '../backup'
import { Client } from '@elastic/elasticsearch'
describe('Elastic Data Adapter', () => {
const url = process.env.ELASTIC_URL ?? 'http://localhost:9200/'
const domain = 'test' as Domain
let adapter: DbAdapter
beforeEach(async () => {
adapter = await createElasticBackupDataAdapter(new Hierarchy(), url, getWorkspaceId('ws1', ''))
})
afterEach(async () => {
await adapter.close()
})
it('should init', () => {
expect(adapter).toBeTruthy()
})
describe('Scroll Contexts', () => {
let client: Client
beforeEach(async () => {
client = new Client({ node: url })
await client.cluster.putSettings({
body: {
persistent: { 'search.max_open_scroll_context': '2' },
transient: { 'search.max_open_scroll_context': '2' }
}
})
})
// Use afterEach() to make sure we clean up even if test fail
afterEach(async () => {
await client.cluster.putSettings({
body: {
persistent: { 'search.max_open_scroll_context': null },
transient: { 'search.max_open_scroll_context': null }
}
})
await client.close()
})
it('should get properly closed', async () => {
for (let i = 0; i <= 3; i++) {
const cursor = adapter.find(domain)
await cursor.next()
await cursor.close()
}
})
})
})

View File

@ -477,7 +477,7 @@ export async function createElasticAdapter (
url: string,
workspaceId: WorkspaceId,
metrics: MeasureContext
): Promise<FullTextAdapter & { close: () => Promise<void> }> {
): Promise<FullTextAdapter> {
const client = new Client({
node: url
})

View File

@ -36,6 +36,7 @@ import core, {
import { PlatformError, unknownStatus } from '@hcengineering/platform'
import { DbAdapter, IndexedDoc } from '@hcengineering/server-core'
import { createHash } from 'node:crypto'
import { SearchResponse } from '@elastic/elasticsearch/api/types'
class ElasticDataAdapter implements DbAdapter {
constructor (
@ -67,7 +68,7 @@ class ElasticDataAdapter implements DbAdapter {
let listRecieved = false
let pos = 0
let buffer: { _id: string, data: IndexedDoc }[] = []
let resp: ApiResponse
let resp: ApiResponse | null = null
let finished = false
return {
next: async () => {
@ -99,10 +100,9 @@ class ElasticDataAdapter implements DbAdapter {
}
listRecieved = true
}
if (pos === buffer.length && !finished) {
if (resp !== null && pos === buffer.length && !finished) {
const params = {
scrollId: resp.body._scroll_id as string,
scroll: '23h'
scroll_id: (resp.body as SearchResponse)._scroll_id
}
resp = await this.client.scroll(params, { maxRetries: 5 })
if (resp.statusCode !== 200) {
@ -137,7 +137,13 @@ class ElasticDataAdapter implements DbAdapter {
throw new PlatformError(e)
}
},
close: async () => {}
close: async () => {
if (resp !== null) {
await this.client.clearScroll({
scroll_id: (resp.body as SearchResponse)._scroll_id
})
}
}
}
}

View File

@ -79,7 +79,7 @@ async function createNullContentTextAdapter (): Promise<ContentTextAdapter> {
}
describe('mongo operations', () => {
const mongodbUri: string = process.env.MONGODB_URI ?? 'mongodb://localhost:27017'
const mongodbUri: string = process.env.MONGO_URL ?? 'mongodb://localhost:27017'
let mongoClient!: MongoClient
let dbId: string = generateId()
let hierarchy: Hierarchy