samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lukas Steiblys" <lu...@doubledutch.me>
Subject RocksDBException: IO error: directory: Invalid argument
Date Mon, 16 Feb 2015 23:50:15 GMT
Hello,

I was setting up the key-value storage engine in Samza and ran into an exception when querying
the data. 

I added these properties to the config:

    stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
    stores.engaged-store.changelog=kafka.engaged-store-changelog
    # a custom data type with an appropriate Serde
    stores.engaged-store.key.serde=UserAppPair
    # wrote a Serde for Long using ByteBuffer
    stores.engaged-store.msg.serde=Long

I have no trouble initializing the storage engine with:

    val store = context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair,
Long]];

but when I query by the key when processing messages, it’s throwing an exception:

    val key = new UserAppPair(userId, appId);
    val value = store.get(key);

Here’s the log:

    2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for localhost:9092
    2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an invalid or empty
offset None for [Follows,0]. Attempting to use Kafka's auto.offset.reset setting. This can
result in data loss if processing continues.
    2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is defined for topic
Follows
    2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
    2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for localhost:9092
    2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
    2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in rocksdb.
    2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process loop.
    org.rocksdb.RocksDBException: IO error: directory: Invalid argument
        at org.rocksdb.RocksDB.open(Native Method)
        at org.rocksdb.RocksDB.open(RocksDB.java:133)
        at org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85)
        at org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85)
        at org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92)
        at org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80)
        at org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
        at org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36)
        at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
        at org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
        at org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
        at me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66)
        at me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100)
        at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137)
        at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
        at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
        at org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93)
        at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
        at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
        at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer multiplexer.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for localhost:9092
    2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to socket error: null
    2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt exception in broker proxy
thread.
    2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer multiplexer.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance stream tasks.
    2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance stores.


Same exception is thrown if I try to put a value in RocksDB. Has anyone run into this problem
before or has any pointers into solving it?

Lukas
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message