NodeJS,mysql2/promise,连接池执行,将数据复制到另一个数据库

背景

产品开发了新版本,数据不兼容,新版本的数据结构比旧版本复杂一些,目前,旧版本是在线系统,需要将旧版本程序的数据适配至新版本程序。数据库是MySQL,这里选择使用NodeJS来完成。

使用的版本:

"mysql": "^2.18.1",
"mysql2": "^2.3.3"

工具代码

需要的依赖

npm install mysql --save
npm install mysql2 --save

案例一,单表的数据同步:

database\mysql\mysql.tools.js

const mysql = require('mysql');

// 创建连接
const createConnection = (host, port, database, user, password) => {
  let connection = mysql.createConnection({
    host, user, password, port, database,
  });
  connection.connect(function (error) {
    if (error) {
      // console.log('[query] - :' + error);
      return;
    }
    // console.log('[src][connection connect]  succeed!');
  });
  return connection;
};


// 关闭连接
const closeConnection = (connection) => {
  connection.end(function (error) {
    if (error) {
      return;
    }
    // console.log('[src][connection end] succeed!');
  });
};

module.exports = { createConnection, closeConnection }

database\mysql2\mysql2.tools.js

const mysql = require('mysql2');

// 创建连接
const createConnection = (host, port, database, user, password) => {
  let connection = mysql.createConnection({
    host: host,
    user: user,
    password: password,
    database: database,
    port: port,
    multipleStatements: true
  });
  return connection;
};

// 关闭连接
const closeConnection = (connection) => {
  connection.end(function (error) {
    if (error) {
      return;
    }
    // console.log('[src][connection end] succeed!');
  });
};

module.exports = {
  createConnection, closeConnection
}

syncdata\00.base\syncData.js

let source = {
  host: "127.0.0.1",
  port: 3306,
  database: "db1",
  user: "root",
  password: "123456"
};

let target = {
  host: "127.0.0.1",
  port: 3306,
  database: "db2",
  user: "root",
  password: "123456"
};

// 主体ID和主体编号
const subject_id = '1001';
const subject_code = 'ABCDEFG';

module.exports = { source, target, subject_id, subject_code }

syncdata\01.manufactor\main.manufactor.js

const { createConnection, closeConnection } = require('../../database/mysql/mysql.tools');
const { source, target, subject_id, subject_code } = require('../00.base/syncData');


// 创建连接
let sourceConnection = createConnection(source.host, source.port, source.database, source.user, source.password);
let targetConnection = createConnection(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnection", sourceConnection);
console.log("targetConnection", targetConnection);


// 拷贝数据
function executeCopyData(sourceConnection, targetConnection, closeConnection) {
  // ====================================================================//
  // 1、查询数据
  // ====================================================================//
  let selectSql = "select * from health_manufactor";
  sourceConnection.query(selectSql, function (error, result, fields) {
    if (error) {
      throw error;
    }
    if (result.length > 0) {
      let count = 1;
      result.forEach(item => {
        console.log("item", item);
        // ====================================================================//
        // 2、插入数据
        // ====================================================================//
        let addSql = `INSERT INTO goods_manufactor(
          id,subject_id,subject_code,manufactor_code,manufactor_name,address,phone,
          sorted,remark,status,logical_deleted,create_uid,
          create_user,create_time,modified_user,modified_time
        )
        VALUES(
            ?,?,?,?,?,
            ?,?,?,?,?,
            ?,?,?,?,?,
            ?
        )`;
        let addSqlParams = [
          item.id, subject_id, subject_code, item.manufactor_code, item.manufactor_name,
          item.address, '', 1, '', 1,
          0, 0, 'system', new Date(), 'system',
          new Date()
        ];
        targetConnection.query(addSql, addSqlParams, function (addErr, addResult) {
          if (addErr) {
            console.log(addErr);
            throw addErr;
          }
          console.log('addResult.insertId:', addResult.insertId);
          count++;
          if (count == result.length) {
            closeConnection();
          }
        });
      });
    }
  });
}

syncdata\02.brand\main.brand.js

const { createConnection, closeConnection } = require("../../database/mysql2/mysql2.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');


// 创建连接
let sourceConnection = createConnection(source.host, source.port, source.database, source.user, source.password);
let targetConnection = createConnection(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnection", sourceConnection);
console.log("targetConnection", targetConnection);


// 拷贝数据
function executeCopyData(sourceConnection, targetConnection, closeConnection) {
  // ====================================================================//
  // 1、查询数据
  // ====================================================================//
  let selectSql = "SELECT * FROM health_brand";
  sourceConnection.query(selectSql, function (error, result, fields) {
    if (error) {
      throw error;
    }
    if (result.length > 0) {
      let count = 1;
      for (let i = 0; i < result.length; i++) {
        // ====================================================================//
        // 2、查询一条数据
        // ====================================================================//
        let selectOneSql = "select * from goods_manufactor where id = ?";
        let selectOneSqlParams = [result[i].brand_manufactor_id];
        targetConnection.query(selectOneSql, selectOneSqlParams, function (error1, result1, fields1) {
          if (error1) {
            throw error1;
          }
          console.log('result1', result1);
          // ====================================================================//
          // 3、插入数据
          // ====================================================================//
          let insertSql = `insert into goods_brand(
            id,subject_id,subject_code,manufactor_id,manufactor_code,
            brand_code,brand_name,sorted,remark,status,
            logical_deleted,create_uid,create_user,create_time,modified_user,
            modified_time
          ) values(
            ?,?,?,?,?,
            ?,?,?,?,?,
            ?,?,?,?,?,
            ?
          )`;
          let insertSqlParams = [
            result[i].id, subject_id, subject_code, result[i].brand_manufactor_id, result1[0].manufactor_code,
            result[i].brand_code, result[i].brand_name, 1, '', result[i].status,
            0, 0, 'system', new Date(), 'system',
            new Date()
          ];
          targetConnection.query(insertSql, insertSqlParams, function (error2, result2) {
            if (error2) {
              console.log(error2);
              throw error2;
            }
            console.log('result2.insertId:', result2.insertId);
            count++;
            if (count == result.length) {
              closeConnection();
            }
          });
          // ====================================================================//
          // The end
          // ====================================================================//
        });
      }
    }
  });
}


// 执行拷贝数据
executeCopyData(sourceConnection, targetConnection, function () {
  console.log('close sourceConnection ', sourceConnection);
  console.log('close targetConnection', targetConnection);
  closeConnection(sourceConnection);
  closeConnection(targetConnection);
});

syncdata\03.category\main.category.js

const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');


let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password);
let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnectionPool", sourceConnectionPool);
console.log("targetConnectionPool", targetConnectionPool);


// 拷贝数据
async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) {
  // ====================================================================//
  // 1、查询数据
  // ====================================================================//
  let selectSql = "SELECT * FROM health_category";
  const [res] = await sourceConnectionPool.execute(selectSql);
  console.log("res", res);
  let count = 1;
  res.forEach(async (item) => {
    // ====================================================================//
    // 2、插入数据
    // ====================================================================//
    let insertSql = `insert into goods_category(
      id,subject_id,subject_code,category_code,category_name,
      parent_id,parent_code,tree_level,parent_full_path,sorted,
      remark,status,logical_deleted,create_uid,create_user,
      create_time,modified_user,modified_time
    ) values(
      ?,?,?,?,?,
      ?,?,?,?,?,
      ?,?,?,?,?,
      ?,?,?
    )`;
    let insertSqlParams = [
      item.id, subject_id, subject_code, item.category_code, item.category_name,
      -1, '', 1, '', 1,
      '', 1, 0, 0, "system",
      new Date(), 'system', new Date()
    ];
    const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams);
    console.log("results", results);
    // 关闭
    count++;
    if (count == res.length) {
      closeConnectionPoolHandle();
    }
  });
}

// 拷贝数据
executeCopyData(sourceConnectionPool, targetConnectionPool, () => {
  console.log('close sourceConnectionPool ', sourceConnectionPool);
  console.log('close targetConnectionPool', targetConnectionPool);
  closeConnectionPool(sourceConnectionPool);
  closeConnectionPool(targetConnectionPool);
});

案例二,关联表的数据同步:

database\mysql2\mysql2.promise.tools.js

const mysql = require('mysql2/promise');

// 创建连接池
const createConnectionPool = (host, port, database, user, password) => {
  const pool = mysql.createPool({
    host: host,
    user: user,
    password: password,
    database: database,
    port: port,
    //连接超额时是否等待
    waitForConnections: true,
    //连接的最多的个数
    connectionLimit: 5,
    //可以等待的连接的个数
    queueLimit: 0
  });
  return pool;
}

// 创建连接
const createConnection = (host, port, database, user, password) => {
  let connection = mysql.createConnection({
    host: host,
    user: user,
    password: password,
    database: database,
    port: port,
    multipleStatements: true
  });
  return connection;
};

// 关闭连接池
const closeConnectionPool = (connectionPool) => {
  connectionPool.end(function (error) {
    if (error) {
      return;
    }
    // console.log('[src][connection end] succeed!');
  });
};

module.exports = { createConnectionPool, createConnection, closeConnectionPool };

syncdata\04.general_name\main.general_name.js

const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');

let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password);
let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnectionPool", sourceConnectionPool);
console.log("targetConnectionPool", targetConnectionPool);

// 拷贝数据
async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) {
  // ====================================================================//
  // 1、查询数据
  // ====================================================================//
  let selectSql = "SELECT * FROM health_icd_general_name";
  sourceConnectionPool.execute(selectSql).then(data => {
    const [res] = data;
    // console.log("res", res);
    let itemMap = new Map();
    let categroySet = new Set();
    res.forEach(item => {
      itemMap.set(item.id, item);
      categroySet.add(item.category_id);
    });
    let resolveData = {
      itemList: res,
      itemMap: itemMap,
      categroyList: Array.from(categroySet),
    };
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(async (data) => {
    const { itemList, itemMap, categroyList } = data;
    // console.log("itemMap", itemMap);
    // console.log("categroyList", categroyList);
    // ====================================================================//
    // 2、查询数据
    // ====================================================================//
    let selectSql = "select * from goods_category";
    // let selectSql2 = "select * from goods_category where id in (?)";
    let sekectSqlParam = [categroyList];
    const [res] = await targetConnectionPool.execute(selectSql);
    // const [res] = await targetConnectionPool.execute(selectSql, sekectSqlParam);
    console.log("res", res);
    let categroyMap = new Map();
    res.forEach(item => {
      categroyMap.set(item.id, item);
    });
    let resolveData = {
      itemList: itemList,
      itemMap: itemMap,
      categroyList: categroyList,
      categroyMap: categroyMap,
    };
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(data => {
    const { itemList, itemMap, categroyList, categroyMap } = data;
    //console.log("itemList", itemList);
    //console.log("itemMap", itemMap);
    //console.log("categroyList", categroyList);
    //console.log("categroyMap", categroyMap);
    let count = 1;
    itemList.forEach(async (item) => {
      // ====================================================================//
      // 3、插入数据
      // ====================================================================//
      let insertSql = `insert into goods_generic_name(
        id,subject_id,subject_code,name,code,
        category_id,category_code,parent_id,parent_code,remark,
        status,sorted,logical_deleted,create_uid,create_user,
        create_time,modified_user,modified_time
      ) values(
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?
      )`;
      // 分类编号
      let category_code = '';
      if (categroyMap.get(item.category_id)) {
        category_code = categroyMap.get(item.category_id).category_code;
      }
      // 父通用名编号
      let parent_code = '';
      if (itemMap.get(item.parent_id)) {
        parent_code = itemMap.get(item.parent_id).general_code;
      }
      let parent_id = item.parent_id;
      if (item.id == 0) {
        parent_id = -1;
      }
      let insertSqlParams = [
        item.id, subject_id, subject_code, item.general_name, item.general_code,
        item.category_id, category_code, item.parent_id, parent_code, '',
        0, 1, 0, 0, 'system',
        new Date(), 'system', new Date()
      ];
      console.log("insertSql", insertSql);
      console.log("insertSqlParams", insertSqlParams);
      //
      const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams);
      //console.log("results", results);
      // 
      count++;
      if (count == itemList.length) {
        closeConnectionPoolHandle();
      }
    });
  });
}

// 拷贝数据
executeCopyData(sourceConnectionPool, targetConnectionPool, () => {
  console.log('close sourceConnectionPool ', sourceConnectionPool);
  console.log('close targetConnectionPool', targetConnectionPool);
  closeConnectionPool(sourceConnectionPool);
  closeConnectionPool(targetConnectionPool);
});

syncdata\05.goods\main.goods.js

const { createConnectionPool, createConnection, closeConnectionPool } = require("../../database/mysql2/mysql2.promise.tools");
const { source, target, subject_id, subject_code } = require('../00.base/syncData');

let sourceConnectionPool = createConnectionPool(source.host, source.port, source.database, source.user, source.password);
let targetConnectionPool = createConnectionPool(target.host, target.port, target.database, target.user, target.password);
console.log("sourceConnectionPool", sourceConnectionPool);
console.log("targetConnectionPool", targetConnectionPool);


// 拷贝数据
async function executeCopyData(sourceConnectionPool, targetConnectionPool, closeConnectionPoolHandle) {
  // ====================================================================//
  // 1、查询数据
  // ====================================================================//
  let selectSql = "SELECT * FROM health_goods";
  sourceConnectionPool.execute(selectSql).then(data => {
    const [res] = data;
    // console.log("res", res);
    const resolveData = {
      goodsList: res
    };
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(async (data) => {
    let selectSql = "SELECT * FROM goods_manufactor";
    let [res] = await targetConnectionPool.execute(selectSql);
    // console.log("res", JSON.stringify(res));
    let resolveData = Object.assign(data, { manufactorList: res });
    // console.log("resolveData", JSON.stringify(resolveData));
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(async (data) => {
    let selectSql = "SELECT * FROM goods_brand";
    let [res] = await targetConnectionPool.execute(selectSql);
    //console.log("res", JSON.stringify(res));
    let resolveData = Object.assign(data, { brandList: res });
    // console.log("resolveData", resolveData);
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(async (data) => {
    let selectSql = "SELECT * FROM goods_category";
    let [res] = await targetConnectionPool.execute(selectSql);
    //console.log("res", JSON.stringify(res));
    let resolveData = Object.assign(data, { categoryList: res });
    // console.log("resolveData", resolveData);
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(async (data) => {
    let selectSql = "SELECT * FROM goods_generic_name";
    let [res] = await targetConnectionPool.execute(selectSql);
    //console.log("res", JSON.stringify(res));
    let resolveData = Object.assign(data, { genericNameList: res });
    // console.log("resolveData", resolveData);
    return new Promise(function (resolve, reject) {
      resolve(resolveData);
    });
  }).then(async (data) => {
    const { goodsList, manufactorList, brandList, categoryList, genericNameList } = data;
    // console.log("goodsList", goodsList.length);
    console.log("manufactorList", manufactorList.length);
    console.log("brandList", brandList.length);
    console.log("categoryList", categoryList.length);
    console.log("genericNameList", genericNameList);
    // console.log("goodsList", goodsList);
    let manufactorMap = new Map();
    manufactorList.forEach((item) => {
      manufactorMap.set(item.id, item);
    });
    let brandMap = new Map();
    brandList.forEach((item) => {
      brandMap.set(item.id, item);
    });
    let categoryMap = new Map();
    categoryList.forEach((item) => {
      categoryMap.set(item.id, item);
    });
    let genericNameMap = new Map();
    genericNameList.forEach((item) => {
      genericNameMap.set(item.id, item);
    });
    // ====================================================================//
    // 插入数据
    // ====================================================================//
    let count = 1;
    goodsList.forEach(async (item) => {
      let insertSql = `insert into goods_object(
        id,subject_id,subject_code,manufacturer_id,manufacturer_code,
        brand_id,brand_code,category_id,category_code,general_name_first_id,
        general_name_first_code,general_name_id,general_name_code,name,code,
        product_name,short_code,bar_code,product_type,instructions,
        main_graph_url,introduce_url,details,introduce,simple_spelling,
        dosage_form,apparatus_model,preservation_method,quality_guarantee_period,status,
        dosage,logical_deleted,create_uid,create_user,create_time,
        modified_user,modified_time
      ) values(
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?,?,?,?,
        ?,?
      )`;
      // 厂家信息
      let manufacturer_id = item.manufactor_id;
      let manufacturer_code = manufactorMap.get(manufacturer_id).manufactor_code;
      // 品牌信息
      let brand_id = item.brand_id;
      let brand_code = brandMap.get(brand_id).brand_code;
      // 分类信息
      let category_id = item.category_id;
      let category_code = categoryMap.get(category_id).category_code;
      // 通用名信息
      let general_name_id = item.goods_general_name_id;
      let general_name_code = genericNameMap.get(general_name_id).code;
      // 插入参数
      let insertSqlParams = [
        item.id, subject_id, subject_code, manufacturer_id, manufacturer_code,
        brand_id, brand_code, category_id, category_code, 0,
        '', general_name_id, general_name_code, item.goods_name, item.goods_code,
        item.product_name, item.short_code, item.goods_bar_code, item.goods_type, item.goods_instructions,
        item.goods_img, '', '', item.goods_instructions, item.simple_spelling,
        item.goods_dosageform, '', item.preservation_method, item.quality_guarantee_period, 1,
        '', 0, 0, 'system', new Date(),
        'system', new Date()
      ];
      // 
      const [results] = await targetConnectionPool.execute(insertSql, insertSqlParams);
      console.log("results", results);
      // 
      count++;
      if (count == goodsList.length) {
        closeConnectionPoolHandle();
      }
    });
  });
}


// 拷贝数据
executeCopyData(sourceConnectionPool, targetConnectionPool, () => {
  console.log('close sourceConnectionPool ', sourceConnectionPool);
  console.log('close targetConnectionPool', targetConnectionPool);
  closeConnectionPool(sourceConnectionPool);
  closeConnectionPool(targetConnectionPool);
});

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注