samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lukas Steiblys" <lu...@doubledutch.me>
Subject Re: RocksDBException: IO error: directory: Invalid argument
Date Tue, 17 Feb 2015 18:55:39 GMT
Hi Chris,

1. We're running locally using ProcessJobFactory
2. CentOS 7 x86_64
3.
    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
    engaged-users.log: https://gist.github.com/imbusy/0b3d264a40ddf34ab8e7
    engaged-users.properties: 
https://gist.github.com/imbusy/d0019db29d7b68c60bfc

    Also note that the properties file sets the default offset to oldest, 
but the log file says that it's setting the offset to largest: "2015-02-17 
18:46:32 GetOffset [INFO] Got reset of type largest."

4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got 
storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
    I checked the directory and it actually exists:

du -h /vagrant/SamzaJobs/deploy/samza/state

16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
36K    /vagrant/SamzaJobs/deploy/samza/state

Lukas

-----Original Message----- 
From: Chris Riccomini
Sent: Monday, February 16, 2015 5:53 PM
To: dev@samza.apache.org
Subject: Re: RocksDBException: IO error: directory: Invalid argument

Hey Lukas,

It looks like the exception is actually thrown on get, not put:

          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
KeyValueStorageEngine.scala:44)

1. Are you running your job under YARN, or as a local job
(ThreadJobFactory/ProcessJobFactory)?
2. What OS are you running on?
3. Could post a fully copy of your logs somewhere (github gist, pasteboard,
or something)?
4.  Also, what does this line say in your logs:

    info("Got storage engine base directory: %s" format storeBaseDir)

It sounds like something is getting messed up with the directory where the
RocksDB store is trying to keep its data.

Cheers,
Chris

On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lukas@doubledutch.me>
wrote:

> Hello,
>
> I was setting up the key-value storage engine in Samza and ran into an
> exception when querying the data.
>
> I added these properties to the config:
>
>
> stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>     # a custom data type with an appropriate Serde
>     stores.engaged-store.key.serde=UserAppPair
>     # wrote a Serde for Long using ByteBuffer
>     stores.engaged-store.msg.serde=Long
>
> I have no trouble initializing the storage engine with:
>
>     val store =
> context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair,
> Long]];
>
> but when I query by the key when processing messages, it’s throwing an
> exception:
>
>     val key = new UserAppPair(userId, appId);
>     val value = store.get(key);
>
> Here’s the log:
>
>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
> localhost:9092
>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an
> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
> auto.offset.reset setting. This can result in data loss if processing
> continues.
>     2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is
> defined for topic Follows
>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
> localhost:9092
>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in
> rocksdb.
>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process
> loop.
>     org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>         at org.rocksdb.RocksDB.open(Native Method)
>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>         at
> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85)
>         at
> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85)
>         at
> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92)
>         at
> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80)
>         at
> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>         at
> org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36)
>         at
> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>         at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
>         at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
>         at
> me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66)
>         at
> me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100)
>         at
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137)
>         at
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
>         at
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
>         at
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93)
>         at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>         at 
> org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>         at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
>         at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>         at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
> multiplexer.
>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
> localhost:9092
>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to
> socket error: null
>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
> exception in broker proxy thread.
>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt.
>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
> multiplexer.
>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
> stream tasks.
>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
> stores.
>
>
> Same exception is thrown if I try to put a value in RocksDB. Has anyone
> run into this problem before or has any pointers into solving it?
>
> Lukas 


Mime
View raw message