背景
产品开发了新版本,数据不兼容,新版本的数据结构比旧版本复杂一些,目前,旧版本是在线系统,需要将旧版本程序的数据适配至新版本程序。数据库是MySQL,这里选择使用NodeJS来完成。
使用的版本:
"mysql": "^2.18.1",
"mysql2": "^2.3.3"
工具代码
需要的依赖
npm install mysql --save
npm install mysql2 --save
案例一,单表的数据同步:
database\mysql\mysql.tools.js
const mysql = require('mysql');
// 创建连接
const createConnection = (host, port, database, user, password) => {
let connection = mysql.createConnection({
host, user, password, port, database,
});
connection.connect(function (error) {
if (error) {
// console.log('[query] - :' + error);
return;
}
// console.log('[src][connection connect] succeed!');
});
return connection;
};
// 关闭连接
const closeConnection = (connection) => {
connection.end(function (error) {
if (error) {
return;
}
// console.log('[src][connection end] succeed!');
});
};
module.exports = { createConnection, closeConnection }
database\mysql2\mysql2.tools.js
const mysql = require('mysql2');
// 创建连接
const createConnection = (host, port, database, user, password) => {
let connection = mysql.createConnection({
host: host,
user: user,
password: password,
database: database,
port: port,
multipleStatements: true
});
return connection;
};
// 关闭连接
const closeConnection = (connection) => {
connection.end(function (error) {
if (error) {
return;
}
// console.log('[src][connection end] succeed!');
});
};
module.exports = {
createConnection, closeConnection
}
syncdata\00.base\syncData.js
let source = {
host: "127.0.0.1",
port: 3306,
database: "db1",
user: "root",
password: "123456"
};
let target = {
host: "127.0.0.1",
port: 3306,
database: "db2",
user: "root",
password: "123456"
};
// 主体ID和主体编号
const subject_id = '1001';
const subject_code = 'ABCDEFG';
module.exports = { source, target, subject_id, subject_code }
syncdata\01.manufactor\main.manufactor.js
const { createConnection, closeConnection } = require('../../database/mysql/mysql.tools');
const { source, target, subject_id, subject_code } = require('../00.base/syncData');
// 创建连接
let sourceConnection = createConnection(source.host, source.port, source.database, source.user, source.password);
let targetConnection = createConnection(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnection", sourceConnection);
console.log("targetConnection", targetConnection);
// 拷贝数据
function executeCopyData(sourceConnection, targetConnection, closeConnection) {
// ====================================================================//
// 1、查询数据
// ====================================================================//
let selectSql = "select * from health_manufactor";
sourceConnection.query(selectSql, function (error, result, fields) {
if (error) {
throw error;
}
if (result.length > 0) {
let count = 1;
result.forEach(item => {
console.log("item", item);
// ====================================================================//
// 2、插入数据
// ====================================================================//
let addSql = `INSERT INTO goods_manufactor(
id,subject_id,subject_code,manufactor_code,manufactor_name,address,phone,
sorted,remark,status,logical_deleted,create_uid,
create_user,create_time,modified_user,modified_time
)
VALUES(
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?
)`;
let addSqlParams = [
item.id, subject_id, subject_code, item.manufactor_code, item.manufactor_name,
item.address, '', 1, '', 1,
0, 0, 'system', new Date(), 'system',
new Date()
];
targetConnection.query(addSql, addSqlParams, function (addErr, addResult) {
if (addErr) {
console.log(addErr);
throw addErr;
}
console.log('addResult.insertId:', addResult.insertId);
count++;
if (count == result.length) {
closeConnection();
}
});
});
}
});
}
syncdata\02.brand\main.brand.js
const { createConnection, closeConnection } = require("../../database/mysql2/mysql2.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');
// 创建连接
let sourceConnection = createConnection(source.host, source.port, source.database, source.user, source.password);
let targetConnection = createConnection(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnection", sourceConnection);
console.log("targetConnection", targetConnection);
// 拷贝数据
function executeCopyData(sourceConnection, targetConnection, closeConnection) {
// ====================================================================//
// 1、查询数据
// ====================================================================//
let selectSql = "SELECT * FROM health_brand";
sourceConnection.query(selectSql, function (error, result, fields) {
if (error) {
throw error;
}
if (result.length > 0) {
let count = 1;
for (let i = 0; i < result.length; i++) {
// ====================================================================//
// 2、查询一条数据
// ====================================================================//
let selectOneSql = "select * from goods_manufactor where id = ?";
let selectOneSqlParams = [result[i].brand_manufactor_id];
targetConnection.query(selectOneSql, selectOneSqlParams, function (error1, result1, fields1) {
if (error1) {
throw error1;
}
console.log('result1', result1);
// ====================================================================//
// 3、插入数据
// ====================================================================//
let insertSql = `insert into goods_brand(
id,subject_id,subject_code,manufactor_id,manufactor_code,
brand_code,brand_name,sorted,remark,status,
logical_deleted,create_uid,create_user,create_time,modified_user,
modified_time
) values(
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?
)`;
let insertSqlParams = [
result[i].id, subject_id, subject_code, result[i].brand_manufactor_id, result1[0].manufactor_code,
result[i].brand_code, result[i].brand_name, 1, '', result[i].status,
0, 0, 'system', new Date(), 'system',
new Date()
];
targetConnection.query(insertSql, insertSqlParams, function (error2, result2) {
if (error2) {
console.log(error2);
throw error2;
}
console.log('result2.insertId:', result2.insertId);
count++;
if (count == result.length) {
closeConnection();
}
});
// ====================================================================//
// The end
// ====================================================================//
});
}
}
});
}
// 执行拷贝数据
executeCopyData(sourceConnection, targetConnection, function () {
console.log('close sourceConnection ', sourceConnection);
console.log('close targetConnection', targetConnection);
closeConnection(sourceConnection);
closeConnection(targetConnection);
});
syncdata\03.category\main.category.js
const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');
let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password);
let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnectionPool", sourceConnectionPool);
console.log("targetConnectionPool", targetConnectionPool);
// 拷贝数据
async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) {
// ====================================================================//
// 1、查询数据
// ====================================================================//
let selectSql = "SELECT * FROM health_category";
const [res] = await sourceConnectionPool.execute(selectSql);
console.log("res", res);
let count = 1;
res.forEach(async (item) => {
// ====================================================================//
// 2、插入数据
// ====================================================================//
let insertSql = `insert into goods_category(
id,subject_id,subject_code,category_code,category_name,
parent_id,parent_code,tree_level,parent_full_path,sorted,
remark,status,logical_deleted,create_uid,create_user,
create_time,modified_user,modified_time
) values(
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?,?
)`;
let insertSqlParams = [
item.id, subject_id, subject_code, item.category_code, item.category_name,
-1, '', 1, '', 1,
'', 1, 0, 0, "system",
new Date(), 'system', new Date()
];
const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams);
console.log("results", results);
// 关闭
count++;
if (count == res.length) {
closeConnectionPoolHandle();
}
});
}
// 拷贝数据
executeCopyData(sourceConnectionPool, targetConnectionPool, () => {
console.log('close sourceConnectionPool ', sourceConnectionPool);
console.log('close targetConnectionPool', targetConnectionPool);
closeConnectionPool(sourceConnectionPool);
closeConnectionPool(targetConnectionPool);
});
案例二,关联表的数据同步:
database\mysql2\mysql2.promise.tools.js
const mysql = require('mysql2/promise');
// 创建连接池
const createConnectionPool = (host, port, database, user, password) => {
const pool = mysql.createPool({
host: host,
user: user,
password: password,
database: database,
port: port,
//连接超额时是否等待
waitForConnections: true,
//连接的最多的个数
connectionLimit: 5,
//可以等待的连接的个数
queueLimit: 0
});
return pool;
}
// 创建连接
const createConnection = (host, port, database, user, password) => {
let connection = mysql.createConnection({
host: host,
user: user,
password: password,
database: database,
port: port,
multipleStatements: true
});
return connection;
};
// 关闭连接池
const closeConnectionPool = (connectionPool) => {
connectionPool.end(function (error) {
if (error) {
return;
}
// console.log('[src][connection end] succeed!');
});
};
module.exports = { createConnectionPool, createConnection, closeConnectionPool };
syncdata\04.general_name\main.general_name.js
const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');
let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password);
let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnectionPool", sourceConnectionPool);
console.log("targetConnectionPool", targetConnectionPool);
// 拷贝数据
async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) {
// ====================================================================//
// 1、查询数据
// ====================================================================//
let selectSql = "SELECT * FROM health_icd_general_name";
sourceConnectionPool.execute(selectSql).then(data => {
const [res] = data;
// console.log("res", res);
let itemMap = new Map();
let categroySet = new Set();
res.forEach(item => {
itemMap.set(item.id, item);
categroySet.add(item.category_id);
});
let resolveData = {
itemList: res,
itemMap: itemMap,
categroyList: Array.from(categroySet),
};
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(async (data) => {
const { itemList, itemMap, categroyList } = data;
// console.log("itemMap", itemMap);
// console.log("categroyList", categroyList);
// ====================================================================//
// 2、查询数据
// ====================================================================//
let selectSql = "select * from goods_category";
// let selectSql2 = "select * from goods_category where id in (?)";
let sekectSqlParam = [categroyList];
const [res] = await targetConnectionPool.execute(selectSql);
// const [res] = await targetConnectionPool.execute(selectSql, sekectSqlParam);
console.log("res", res);
let categroyMap = new Map();
res.forEach(item => {
categroyMap.set(item.id, item);
});
let resolveData = {
itemList: itemList,
itemMap: itemMap,
categroyList: categroyList,
categroyMap: categroyMap,
};
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(data => {
const { itemList, itemMap, categroyList, categroyMap } = data;
//console.log("itemList", itemList);
//console.log("itemMap", itemMap);
//console.log("categroyList", categroyList);
//console.log("categroyMap", categroyMap);
let count = 1;
itemList.forEach(async (item) => {
// ====================================================================//
// 3、插入数据
// ====================================================================//
let insertSql = `insert into goods_generic_name(
id,subject_id,subject_code,name,code,
category_id,category_code,parent_id,parent_code,remark,
status,sorted,logical_deleted,create_uid,create_user,
create_time,modified_user,modified_time
) values(
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?,?
)`;
// 分类编号
let category_code = '';
if (categroyMap.get(item.category_id)) {
category_code = categroyMap.get(item.category_id).category_code;
}
// 父通用名编号
let parent_code = '';
if (itemMap.get(item.parent_id)) {
parent_code = itemMap.get(item.parent_id).general_code;
}
let parent_id = item.parent_id;
if (item.id == 0) {
parent_id = -1;
}
let insertSqlParams = [
item.id, subject_id, subject_code, item.general_name, item.general_code,
item.category_id, category_code, item.parent_id, parent_code, '',
0, 1, 0, 0, 'system',
new Date(), 'system', new Date()
];
console.log("insertSql", insertSql);
console.log("insertSqlParams", insertSqlParams);
//
const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams);
//console.log("results", results);
//
count++;
if (count == itemList.length) {
closeConnectionPoolHandle();
}
});
});
}
// 拷贝数据
executeCopyData(sourceConnectionPool, targetConnectionPool, () => {
console.log('close sourceConnectionPool ', sourceConnectionPool);
console.log('close targetConnectionPool', targetConnectionPool);
closeConnectionPool(sourceConnectionPool);
closeConnectionPool(targetConnectionPool);
});
syncdata\05.goods\main.goods.js
const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');
let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password);
let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnectionPool", sourceConnectionPool);
console.log("targetConnectionPool", targetConnectionPool);
// 拷贝数据
async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) {
// ====================================================================//
// 1、查询数据
// ====================================================================//
let selectSql = "SELECT * FROM health_goods";
sourceConnectionPool.execute(selectSql).then(data => {
const [res] = data;
// console.log("res", res);
const resolveData = {
goodsList: res
};
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(async (data) => {
let selectSql = "SELECT * FROM goods_manufactor";
let [res] = await targetConnectionPool.execute(selectSql);
// console.log("res", JSON.stringify(res));
let resolveData = Object.assign(data, { manufactorList: res });
// console.log("resolveData", JSON.stringify(resolveData));
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(async (data) => {
let selectSql = "SELECT * FROM goods_brand";
let [res] = await targetConnectionPool.execute(selectSql);
//console.log("res", JSON.stringify(res));
let resolveData = Object.assign(data, { brandList: res });
// console.log("resolveData", resolveData);
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(async (data) => {
let selectSql = "SELECT * FROM goods_category";
let [res] = await targetConnectionPool.execute(selectSql);
//console.log("res", JSON.stringify(res));
let resolveData = Object.assign(data, { categoryList: res });
// console.log("resolveData", resolveData);
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(async (data) => {
let selectSql = "SELECT * FROM goods_generic_name";
let [res] = await targetConnectionPool.execute(selectSql);
//console.log("res", JSON.stringify(res));
let resolveData = Object.assign(data, { genericNameList: res });
// console.log("resolveData", resolveData);
return new Promise(function (resolve, reject) {
resolve(resolveData);
});
}).then(async (data) => {
const { goodsList, manufactorList, brandList, categoryList, genericNameList } = data;
// console.log("goodsList", goodsList.length);
console.log("manufactorList", manufactorList.length);
console.log("brandList", brandList.length);
console.log("categoryList", categoryList.length);
console.log("genericNameList", genericNameList);
// console.log("goodsList", goodsList);
let manufactorMap = new Map();
manufactorList.forEach((item) => {
manufactorMap.set(item.id, item);
});
let brandMap = new Map();
brandList.forEach((item) => {
brandMap.set(item.id, item);
});
let categoryMap = new Map();
categoryList.forEach((item) => {
categoryMap.set(item.id, item);
});
let genericNameMap = new Map();
genericNameList.forEach((item) => {
genericNameMap.set(item.id, item);
});
// ====================================================================//
// 插入数据
// ====================================================================//
let count = 1;
goodsList.forEach(async (item) => {
let insertSql = `insert into goods_object(
id,subject_id,subject_code,manufacturer_id,manufacturer_code,
brand_id,brand_code,category_id,category_code,general_name_first_id,
general_name_first_code,general_name_id,general_name_code,name,code,
product_name,short_code,bar_code,product_type,instructions,
main_graph_url,introduce_url,details,introduce,simple_spelling,
dosage_form,apparatus_model,preservation_method,quality_guarantee_period,status,
dosage,logical_deleted,create_uid,create_user,create_time,
modified_user,modified_time
) values(
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?,?,?,?,
?,?
)`;
// 厂家信息
let manufacturer_id = item.manufactor_id;
let manufacturer_code = manufactorMap.get(manufacturer_id).manufactor_code;
// 品牌信息
let brand_id = item.brand_id;
let brand_code = brandMap.get(brand_id).brand_code;
// 分类信息
let category_id = item.category_id;
let category_code = categoryMap.get(category_id).category_code;
// 通用名信息
let general_name_id = item.goods_general_name_id;
let general_name_code = genericNameMap.get(general_name_id).code;
// 插入参数
let insertSqlParams = [
item.id, subject_id, subject_code, manufacturer_id, manufacturer_code,
brand_id, brand_code, category_id, category_code, 0,
'', general_name_id, general_name_code, item.goods_name, item.goods_code,
item.product_name, item.short_code, item.goods_bar_code, item.goods_type, item.goods_instructions,
item.goods_img, '', '', item.goods_instructions, item.simple_spelling,
item.goods_dosageform, '', item.preservation_method, item.quality_guarantee_period, 1,
'', 0, 0, 'system', new Date(),
'system', new Date()
];
//
const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams);
console.log("results", results);
//
count++;
if (count == goodsList.length) {
closeConnectionPoolHandle();
}
});
});
}
// 拷贝数据
executeCopyData(sourceConnectionPool, targetConnectionPool, () => {
console.log('close sourceConnectionPool ', sourceConnectionPool);
console.log('close targetConnectionPool', targetConnectionPool);
closeConnectionPool(sourceConnectionPool);
closeConnectionPool(targetConnectionPool);
});