Files
Yajbir Singh f1b860b25c
Some checks failed
check / markdownlint (push) Has been cancelled
check / spellchecker (push) Has been cancelled
updated
2025-12-11 19:03:17 +05:30

356 lines
12 KiB
JavaScript

/*
* (c) Copyright Ascensio System SIA 2010-2024
*
* This program is a free software product. You can redistribute it and/or
* modify it under the terms of the GNU Affero General Public License (AGPL)
* version 3 as published by the Free Software Foundation. In accordance with
* Section 7(a) of the GNU AGPL its Section 15 shall be amended to the effect
* that Ascensio System SIA expressly excludes the warranty of non-infringement
* of any third-party rights.
*
* This program is distributed WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For
* details, see the GNU AGPL at: http://www.gnu.org/licenses/agpl-3.0.html
*
* You can contact Ascensio System SIA at 20A-6 Ernesta Birznieka-Upish
* street, Riga, Latvia, EU, LV-1050.
*
* The interactive user interfaces in modified source and object code versions
* of the Program must display Appropriate Legal Notices, as required under
* Section 5 of the GNU AGPL version 3.
*
* Pursuant to Section 7(b) of the License you must retain the original Product
* logo when distributing the program. Pursuant to Section 7(e) we decline to
* grant you any rights under trademark law for use of our trademarks.
*
* All the Product's GUI elements, including illustrations and icon sets, as
* well as technical writing content are licensed under the terms of the
* Creative Commons Attribution-ShareAlike 4.0 International. See the License
* terms at http://creativecommons.org/licenses/by-sa/4.0/legalcode
*
*/
'use strict';
const sql = require('mssql');
const config = require('config');
const connectorUtilities = require('./connectorUtilities');
const utils = require('../../../Common/sources/utils');
const configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = configSql.get('tableResult');
const cfgTableChanges = configSql.get('tableChanges');
const cfgMaxPacketSize = configSql.get('max_allowed_packet');
const connectionConfiguration = {
user: configSql.get('dbUser'),
password: configSql.get('dbPass'),
server: configSql.get('dbHost'),
port: parseInt(configSql.get('dbPort')),
database: configSql.get('dbName'),
pool: {
max: configSql.get('connectionlimit'),
min: 0
}
};
const additionalOptions = config.util.cloneDeep(configSql.get('msSqlExtraOptions'));
const configuration = utils.deepMergeObjects({}, connectionConfiguration, additionalOptions);
const placeholderPrefix = 'ph_';
function errorHandle(message, error, ctx) {
ctx.logger.error(`${message}:`);
if (error.precedingErrors?.length) {
error.precedingErrors.forEach(category => ctx.logger.error(category.originalError));
} else {
ctx.logger.error(error.originalError);
}
}
function dataType(value) {
let type = sql.TYPES.NChar(1);
switch (typeof value) {
case 'number': {
type = sql.TYPES.Decimal(18, 0);
break;
}
case 'string': {
type = sql.TYPES.NVarChar(sql.MAX);
break;
}
case 'object': {
if (value instanceof Date) {
type = sql.TYPES.DateTime();
}
break;
}
}
return type;
}
function convertPlaceholdersValues(values) {
if (!Array.isArray(values)) {
return values instanceof Object ? values : {};
}
const placeholdersObject = {};
for (const index in values) {
placeholdersObject[`${placeholderPrefix}${index}`] = values[index];
}
return placeholdersObject;
}
function registerPlaceholderValues(values, statement) {
if (values._typesMetadata !== undefined) {
for (const placeholderName of Object.keys(values._typesMetadata)) {
statement.input(placeholderName, values._typesMetadata[placeholderName]);
}
delete values._typesMetadata;
} else {
for (const key of Object.keys(values)) {
statement.input(key, dataType(values[key]));
}
}
}
function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes = false, opt_noLog = false, opt_values = {}) {
return executeQuery(ctx, sqlCommand, opt_values, opt_noModifyRes, opt_noLog).then(
result => callbackFunction?.(null, result),
error => callbackFunction?.(error)
);
}
async function executeQuery(ctx, sqlCommand, values = {}, noModifyRes = false, noLog = false) {
try {
await sql.connect(configuration);
const statement = new sql.PreparedStatement();
const placeholders = convertPlaceholdersValues(values);
registerPlaceholderValues(placeholders, statement);
await statement.prepare(sqlCommand);
const result = await statement.execute(placeholders);
await statement.unprepare();
if (!result.recordset && !result.rowsAffected?.length) {
return {rows: [], affectedRows: 0};
}
let output = result;
if (!noModifyRes) {
if (result.recordset) {
output = result.recordset;
} else {
output = {affectedRows: result.rowsAffected.pop()};
}
}
return output;
} catch (error) {
if (!noLog) {
errorHandle(`sqlQuery() error while executing query: ${sqlCommand}`, error, ctx);
}
throw error;
}
}
async function executeBulk(ctx, table) {
try {
await sql.connect(configuration);
const result = await new sql.Request().bulk(table);
return {affectedRows: result?.rowsAffected ?? 0};
} catch (error) {
errorHandle(`sqlQuery() error while executing bulk for table ${table.name}`, error, ctx);
throw error;
}
}
function closePool() {
return sql.close();
}
function addSqlParameterObjectBased(parameter, name, type, accumulatedObject) {
if (accumulatedObject._typesMetadata === undefined) {
accumulatedObject._typesMetadata = {};
}
const placeholder = `${placeholderPrefix}${name}`;
accumulatedObject[placeholder] = parameter;
accumulatedObject._typesMetadata[placeholder] = type;
return `@${placeholder}`;
}
function addSqlParameter(parameter, accumulatedArray) {
const currentIndex = accumulatedArray.push(parameter) - 1;
return `@${placeholderPrefix}${currentIndex}`;
}
function concatParams(...parameters) {
return `CONCAT(${parameters.join(', ')})`;
}
function getTableColumns(ctx, tableName) {
const values = [];
const sqlParam = addSqlParameter(tableName, values);
const sqlCommand = `SELECT column_name FROM information_schema.COLUMNS WHERE TABLE_NAME = ${sqlParam} AND TABLE_SCHEMA = SCHEMA_NAME();`;
return executeQuery(ctx, sqlCommand, values);
}
function getDocumentsWithChanges(ctx) {
const existingId = `SELECT TOP(1) id FROM ${cfgTableChanges} WHERE tenant=${cfgTableResult}.tenant AND id = ${cfgTableResult}.id`;
const sqlCommand = `SELECT * FROM ${cfgTableResult} WHERE EXISTS(${existingId});`;
return executeQuery(ctx, sqlCommand);
}
function getExpired(ctx, maxCount, expireSeconds) {
const expireDate = new Date();
utils.addSeconds(expireDate, -expireSeconds);
const values = {};
const date = addSqlParameterObjectBased(expireDate, 'expireDate', sql.TYPES.DateTime(), values);
const count = addSqlParameterObjectBased(maxCount, 'maxCount', sql.TYPES.Int(), values);
const notExistingTenantAndId = `SELECT TOP(1) tenant, id FROM ${cfgTableChanges} WHERE ${cfgTableChanges}.tenant = ${cfgTableResult}.tenant AND ${cfgTableChanges}.id = ${cfgTableResult}.id`;
const sqlCommand = `SELECT TOP(${count}) * FROM ${cfgTableResult} WHERE last_open_date <= ${date} AND NOT EXISTS(${notExistingTenantAndId});`;
return executeQuery(ctx, sqlCommand, values);
}
async function upsert(ctx, task) {
task.completeDefaults();
let cbInsert = task.callback;
if (task.callback) {
const userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
const dateNow = new Date();
const values = {};
const insertValuesPlaceholder = [
addSqlParameterObjectBased(task.tenant, 'tenant', sql.TYPES.NVarChar(255), values),
addSqlParameterObjectBased(task.key, 'key', sql.TYPES.NVarChar(255), values),
addSqlParameterObjectBased(task.status, 'status', sql.TYPES.SmallInt(), values),
addSqlParameterObjectBased(task.statusInfo, 'statusInfo', sql.TYPES.Int(), values),
addSqlParameterObjectBased(dateNow, 'dateNow', sql.TYPES.DateTime(), values),
addSqlParameterObjectBased(task.userIndex, 'userIndex', sql.TYPES.Decimal(18, 0), values),
addSqlParameterObjectBased(task.changeId, 'changeId', sql.TYPES.Decimal(18, 0), values),
addSqlParameterObjectBased(cbInsert, 'cbInsert', sql.TYPES.NVarChar(sql.MAX), values),
addSqlParameterObjectBased(task.baseurl, 'baseurl', sql.TYPES.NVarChar(sql.MAX), values)
];
const tenant = insertValuesPlaceholder[0];
const id = insertValuesPlaceholder[1];
const lastOpenDate = insertValuesPlaceholder[4];
const baseUrl = insertValuesPlaceholder[8];
const insertValues = insertValuesPlaceholder.join(', ');
const columns = ['tenant', 'id', 'status', 'status_info', 'last_open_date', 'user_index', 'change_id', 'callback', 'baseurl'];
const sourceColumns = columns.join(', ');
const sourceValues = columns.map(column => `source.${column}`).join(', ');
const condition = `target.tenant = ${tenant} AND target.id = ${id}`;
let updateColumns = `target.last_open_date = ${lastOpenDate}`;
if (task.callback) {
const parameter = addSqlParameterObjectBased(JSON.stringify(task.callback), 'callback', sql.TYPES.NVarChar(sql.MAX), values);
const concatenatedColumns = concatParams(
'target.callback',
`'${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":'`,
'(target.user_index + 1)',
`',"callback":'`,
parameter,
`'}'`
);
updateColumns += `, target.callback = ${concatenatedColumns}`;
}
if (task.baseurl) {
updateColumns += `, target.baseurl = ${baseUrl}`;
}
updateColumns += ', target.user_index = target.user_index + 1';
const sqlMerge =
`MERGE INTO ${cfgTableResult} AS target ` +
`USING(VALUES(${insertValues})) AS source(${sourceColumns}) ` +
`ON(${condition}) ` +
`WHEN MATCHED THEN UPDATE SET ${updateColumns} ` +
`WHEN NOT MATCHED THEN INSERT(${sourceColumns}) VALUES(${sourceValues}) ` +
`OUTPUT $ACTION as action, INSERTED.user_index as insertId;`;
const result = await executeQuery(ctx, sqlMerge, values, true);
const insertId = result.recordset[0].insertId;
const isInsert = result.recordset[0].action === 'INSERT';
return {isInsert, insertId};
}
function insertChanges(ctx, tableChanges, startIndex, objChanges, docId, index, user, callback) {
insertChangesAsync(ctx, tableChanges, startIndex, objChanges, docId, index, user).then(
result => callback(null, result, true),
error => callback(error, null, true)
);
}
async function insertChangesAsync(ctx, tableChanges, startIndex, objChanges, docId, index, user) {
if (startIndex === objChanges.length) {
return {affectedRows: 0};
}
const table = new sql.Table(tableChanges);
table.columns.add('tenant', sql.TYPES.NVarChar(sql.MAX), {nullable: false, length: 'max'});
table.columns.add('id', sql.TYPES.NVarChar(sql.MAX), {nullable: false, length: 'max'});
table.columns.add('change_id', sql.TYPES.Int, {nullable: false});
table.columns.add('user_id', sql.TYPES.NVarChar(sql.MAX), {nullable: false, length: 'max'});
table.columns.add('user_id_original', sql.TYPES.NVarChar(sql.MAX), {nullable: false, length: 'max'});
table.columns.add('user_name', sql.TYPES.NVarChar(sql.MAX), {nullable: false, length: 'max'});
table.columns.add('change_data', sql.TYPES.NVarChar(sql.MAX), {nullable: false, length: 'max'});
table.columns.add('change_date', sql.TYPES.DateTime, {nullable: false});
const indexBytes = 4;
const timeBytes = 8;
let bytes = 0;
let currentIndex = startIndex;
for (; currentIndex < objChanges.length && bytes <= cfgMaxPacketSize; ++currentIndex, ++index) {
bytes +=
indexBytes +
timeBytes +
4 *
(ctx.tenant.length + docId.length + user.id.length + user.idOriginal.length + user.username.length + objChanges[currentIndex].change.length);
table.rows.add(ctx.tenant, docId, index, user.id, user.idOriginal, user.username, objChanges[currentIndex].change, objChanges[currentIndex].time);
}
const result = await executeBulk(ctx, table);
if (currentIndex < objChanges.length) {
const recursiveValue = await insertChangesAsync(ctx, tableChanges, currentIndex, objChanges, docId, index, user);
result.affectedRows += recursiveValue.affectedRows;
}
return result;
}
module.exports = {
sqlQuery,
closePool,
addSqlParameter,
concatParams,
getTableColumns,
getDocumentsWithChanges,
getExpired,
upsert,
insertChanges
};