Flink程序富函数中使用定时任务查询mysql出现内存堆积
问题描述
flink程序在跑的过程中, 发现跑几天就停掉了, 看日志发现是代码中的ResultSet和Statement堆积的太多,引起的内存溢出,但是代码中确实已经关闭了ResultSet和Statement,但是还是一直出现OOM的问题
具体案例
在open方法中的案例
override def open(parameters: Configuration): Unit = {
parameterTool = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
queryMysql()
TimerUtil.schedule(30000, 30000, queryMysql())
}
说明
定时任务中链接问题:
1.connection链接 ,这个放在close方法里面进行关闭即可;
2.ResultSet链接 , 即用即关;
3.Statement链接 ,即用即关;
例如1:
def getMysqlEvent(): Map[Int, mutable.Set[String]] = {
val sql = "SELECT * FROM test WHERE status = false"
var statement: Statement = null
var result: ResultSet = null
try {
statement = connection.createStatement()
result = statement.executeQuery(sql)
val map: mutable.Map[Int, mutable.Set[String]] = mutable.Map()
while (result.next()) {
val projectId = result.getInt(1)
}
eventMap.toMap
} catch {
case e: Exception =>
e.printStackTrace()
throw new Exception("error", e)
} finally {
DbUtils.close(result)
DbUtils.close(statement)
}
}
例如2
(重点关注代码中的while循环那块,会产生内存堆积就在这块):文章来源地址https://www.toymoban.com/news/detail-628077.html
def getStudents(): util.HashMap[Int, util.List[Student]] = {
val sql = s"""select id from test01""".stripMargin
var eventState: Statement = null
var eventResult: ResultSet = null
val map = new util.HashMap[Int, util.List[Student]]()
val a11 = new util.ArrayList[Student]()
val bbb = new util.ArrayList[Mater]
try {
eventState = connection.createStatement()
eventResult = eventState.executeQuery(sql)
while (eventResult.next()) {
var attrStatement: Statement = null
var resultSet: ResultSet = null
try {
val id = eventResult.getInt(1)
val projectId = eventResult.getInt(2)
val attrSql =
s"""select * from aaa """.stripMargin
attrStatement = connection.createStatement()
resultSet = attrStatement.executeQuery(attrSql)
while (resultSet.next()) {
val id = resultSet.getInt(1)
}
} catch {
case e: Exception => throw new Exception("Error ", e)
}
finally {
DbUtils.close(resultSet)
DbUtils.close(attrStatement)
}
}
ttt
}
catch {
case e: Exception => throw new Exception("Error ", e)
}
finally {
DbUtils.close(eventResult)
DbUtils.close(eventState)
}
}
解释:
在例1中 ,整个方法中只有一次sql查询,所以直接在代码最后面关闭即可
在例2中,整个方法中有两次sql查询,第二次sql查询会把第一次sql查询的结果进行循环遍历,然后再进行查询,,注意 while循环 , 这块会产生多个Statement和ResultSet对象,所以需要再代码中每一次查询结束后就会进行一次数据的查询
文章来源:https://www.toymoban.com/news/detail-628077.html
到了这里,关于Flink程序富函数中使用定时任务查询mysql的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!