beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Guillaume Balaine (JIRA)" <>
Subject [jira] [Commented] (BEAM-793) JdbcIO can create a deadlock when parallelism is greater than 1
Date Thu, 21 Sep 2017 07:37:00 GMT


Guillaume Balaine commented on BEAM-793:

Hello Jean-Baptiste, thanks for being so responsive.
I copied over JdbcIO and am trying to make it work. The use case is a bounded data pipeline,
so it does not care if to add a few seconds of processing time. I use the same strategy as
in the Google Datastore implementation with the BackoffUtils to make the batch flush fn sleep
increasingly in case of deadlocks. 
Unfortunately, it seems that my pipeline then terminates before all batches can go through,
perhaps because of the @Teardown which, in the current JdbcIO impl, closes the statement not
caring whether there is still a retry loop ongoing or not. 
I'll let you know if that works.

> JdbcIO can create a deadlock when parallelism is greater than 1
> ---------------------------------------------------------------
>                 Key: BEAM-793
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>             Fix For: 2.2.0
> With the following JdbcIO configuration, if the parallelism is greater than 1, we can
have a {{Deadlock found when trying to get lock; try restarting transaction}}.
> {code}
>         MysqlDataSource dbCfg = new MysqlDataSource();
>         dbCfg.setDatabaseName("db");
>         dbCfg.setUser("user");
>         dbCfg.setPassword("pass");
>         dbCfg.setServerName("localhost");
>         dbCfg.setPortNumber(3306);
>         p.apply(Create.of(data))
>                 .apply(JdbcIO.<Tuple5<Integer, Integer, ByteString, Long, Long>>write()
>                         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dbCfg))
>                         .withStatement("INSERT INTO smth(loc,event_type,hash,begin_date,end_date)
VALUES(?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE event_type=VALUES(event_type),end_date=VALUES(end_date)")
>                         .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<Tuple5<Integer,
Integer, ByteString, Long, Long>>() {
>                             public void setParameters(Tuple5<Integer, Integer, ByteString,
Long, Long> element, PreparedStatement statement)
>                                     throws Exception {
>                                 statement.setInt(1, element.f0);
>                                 statement.setInt(2, element.f1);
>                                 statement.setBytes(3, element.f2.toByteArray());
>                                 statement.setLong(4, element.f3);
>                                 statement.setLong(5, element.f4);
>                             }
>                         }));
> {code}
> This can happen due to the {{autocommit}}. I'm going to investigate.

This message was sent by Atlassian JIRA

View raw message