问题描述:
数据 list 有154725条关系数据,开始节点和结束都存在
我对15w分批来执行每批次10000条数据,这地方确是每次都新增一个连接,我是为测试每次连接是否正常关闭的问题,测试了半小时所有连接都能正常关闭
问题复现过程:
我每次创建关系后,通过界面把数据删除了,重新启动这个方法进行插入,反复操作10次以后,linux服务端8g内存就满了,一直在fullgc。
附上堆的情况
[root@node1 logs]# jmap -heap 29849
Attaching to process ID 29849, please wait…
Debugger attached successfully.
Server compiler detected.
JVM version is 25.144-b01
using thread-local object allocation. Garbage-First (G1) GC with 23 thread(s)
Heap Configuration: MinHeapFreeRatio = 40 MaxHeapFreeRatio = 70 MaxHeapSize = 8589934592 (8192.0MB) NewSize = 1363144 (1.2999954223632812MB) MaxNewSize = 5150605312 (4912.0MB) OldSize = 5452592 (5.1999969482421875MB) NewRatio = 2 SurvivorRatio = 8 MetaspaceSize = 21807104 (20.796875MB) CompressedClassSpaceSize = 1073741824 (1024.0MB) MaxMetaspaceSize = 17592186044415 MB G1HeapRegionSize = 4194304 (4.0MB)
Heap Usage: G1 Heap: regions = 2048 capacity = 8589934592 (8192.0MB) used = 6181947672 (5895.564720153809MB) free = 2407986920 (2296.4352798461914MB) ** 71.96734277531505% used** G1 Young Generation:
def batchSaveCarImsiR(neo4jConf: mutable.Map[String, String], list: util.ArrayList[Neo4jRelationShip]): Unit = { if (!(list.isEmpty || list.size() == 0)) { val batchCount = list.size()/COMMIT_SIZE val mod = list.size()%COMMIT_SIZE log.info(s"batchCount:$batchCount;mod=$mod,sum=${list.size()}") for(i<-0 until batchCount){ val subList = list.subList(i*COMMIT_SIZE,(i+1)COMMIT_SIZE) log.info(s"$i:${subList.size()}") val sql = s"UNWIND ${subList.toString} as row MATCH (a:car {code:row.startNode}) MATCH(b:imsi {code:row.endNode}) CREATE (a)-[:car_imsi{date:row.date,rate:row.rate,type:row.relationType,merge:row.merge}]->(b)" Neo4jManager.execute(neo4jConf, sql) } if(mod>0){ val subList = list.subList(batchCountCOMMIT_SIZE,list.size()) log.info(s"$mod:${subList.size()}") val sql = s"UNWIND ${subList.toString} as row MATCH (a:car {code:row.startNode}) MATCH(b:imsi {code:row.endNode}) CREATE (a)-[:car_imsi{date:row.date,rate:row.rate,type:row.relationType,merge:row.merge}]->(b)“ Neo4jManager.execute(neo4jConf, sql) } log.info(s"batchSaveImsiMacR:${list.size()}”) } }
连接neo4j执行cql /** * 执行sql * @param conf neo4j配置 * @param sql 批量执行的sql */ def execute(conf:mutable.Map[String,String], sql: String): Unit = { Class.forName(conf(“neo4j.driver”)) val conn = DriverManager.getConnection(conf(“neo4j.url”), conf(“neo4j.user”), conf(“neo4j.password”)) log.debug(conn.getAutoCommit) val stmt = conn.createStatement try { stmt.execute(sql) stmt.close() // conn.commit() conn.close() } catch { case ex: Exception => ex.printStackTrace() throw ex } finally { if(stmt != null) stmt.close() if(conn != null) conn.close() } }
问题解决了 换了个思路 UNWIND放弃了 用的纯jdbc的方式来干了 log.info(“start”) val sqls = "MATCH (a:car {code:?}) MATCH(b:imsi {code:?}) CREATE (a)-[:car_imsi{date:?,rate:?,type:?,merge:?}]->(b)" Class.forName(neo4jConf(“neo4j.driver”)) val conn = DriverManager.getConnection(neo4jConf(“neo4j.url”), neo4jConf(“neo4j.user”), neo4jConf(“neo4j.password”)) conn.setAutoCommit(false) log.debug(conn.getAutoCommit) val stmt = conn.prepareStatement(sqls) try { for(i<-0 until list.size()){ val rel = list.get(i) stmt.setString(1,rel.startNode) stmt.setString(2,rel.endNode) stmt.setString(3,rel.date) stmt.setString(4,rel.rate) stmt.setInt(5,rel.relationType) stmt.setInt(6,rel.merge) stmt.addBatch() if((i+1)%COMMIT_SIZE==0){ stmt.executeBatch() conn.commit() stmt.clearBatch() } } if(list.size()%COMMIT_SIZE>0){ stmt.executeBatch() conn.commit() } stmt.close() conn.close() } catch { case ex: Exception => ex.printStackTrace() conn.rollback() throw ex } finally { if(stmt != null) stmt.close() if(conn != null) conn.close() } log.info(“end”) }