spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sea" <261810...@qq.com>
Subject Exception in using updateStateByKey
Date Mon, 27 Apr 2015 13:32:57 GMT
Hi, all:
I use function updateStateByKey in Spark Streaming, I need to store the states for one minite,
 I set "spark.cleaner.ttl" to 120, the duration is 2 seconds, but it throws Exception 




Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does
not exist: spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
        at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


        at org.apache.hadoop.ipc.Client.call(Client.java:1347)
        at org.apache.hadoop.ipc.Client.call(Client.java:1300)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
        at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188)
        at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)



Why?


my code is 


    ssc = StreamingContext(sc,2)
    kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1})
    kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\
        .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \
        .filter(lambda x: x[1]['isExisted'] != 1) \
        .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb))
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message