samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: RocksDBException: IO error: directory: Invalid argument
Date Tue, 17 Feb 2015 21:02:40 GMT
Hey Lucas,

I'm wondering if this is a filesystem permission issue? This exception:

  org.rocksdb.RocksDBException: IO error: directory: Invalid argument

Looks like it's coming from this line:


https://github.com/facebook/rocksdb/blob/868bfa40336b99005beb9f4fc9cf2acc0d330ae1/util/env_posix.cc#L1016

Which seems to be trying to fsync data to disk. According to:

  http://docs.vagrantup.com/v2/synced-folders/basic_usage.html

It sounds like the sync folder is set to be owned by the default Vagrant
SSH user.

1. Is this the user that you're running the Samza job as?
2. Could you check the file permissions for /vagrant and all of its
subdirectories, and make sure that they match up with what you expect (+rw
for the Samza job's user)?
3. If you try running the job outside of the VM, does it work?

Cheers,
Chris

On Tue, Feb 17, 2015 at 12:57 PM, Lukas Steiblys <lukas@doubledutch.me>
wrote:

> Yeah, I made sure the state is clean. This is the first time I'm trying to
> use RocksDB. I haven't tried LevelDB yet though.
>
> Lukas
>
> -----Original Message----- From: Chris Riccomini
> Sent: Tuesday, February 17, 2015 12:34 PM
> To: dev@samza.apache.org
> Cc: Chris Riccomini
>
> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>
> Hey Lukas,
>
> Strange. Having a more detailed look at your logs.
>
> Note: /vagrant is a synced folder, and I think it *does* persist between VM
> restarts. But, if you've deleted /vagrant/SamzaJobs/deploy, then the state
> should be empty.
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 12:13 PM, Lukas Steiblys <lukas@doubledutch.me>
> wrote:
>
>  It starts out with a fresh FS. I deleted all the state, but the job still
>> fails on the first get.
>>
>> Lukas
>>
>> -----Original Message----- From: Chris Riccomini
>> Sent: Tuesday, February 17, 2015 12:12 PM
>> To: Chris Riccomini
>> Cc: dev@samza.apache.org
>>
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>> Hey Lukas,
>>
>>  This happens every time even if I spin up a new VM.
>>
>>>
>>>
>> Ah I might have misunderstood. Are your VMs started with a fresh FS?
>> You're
>> not using EBS or anything like that, are you?
>>
>> I want to see if you're getting hit by that setErrorIfExists line. If you:
>>
>> 1. Stop your job.
>> 2. Clear the state from the FS.
>> 3. Start your job.
>>
>> Does it work?
>>
>> Cheers,
>> Chris
>>
>> On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccomini@apache.org>
>> wrote:
>>
>>  Hey Lukas,
>>
>>>
>>> Could you try clearing out the state, and starting the job?
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lukas@doubledutch.me>
>>> wrote:
>>>
>>>  This happens every time even if I spin up a new VM. Happens after a
>>>
>>>> restart as well.
>>>>
>>>> Lukas
>>>>
>>>> -----Original Message----- From: Chris Riccomini
>>>> Sent: Tuesday, February 17, 2015 11:01 AM
>>>> To: dev@samza.apache.org
>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>
>>>> Hey Lukas,
>>>>
>>>> Interesting. Does this happen only after restarting your job? Or does it
>>>> happen the first time, as well? I'm wondering if this is the problem:
>>>>
>>>>    options.setErrorIfExists(true)
>>>>
>>>> In RocksDbKeyValueStore.scala. I think this is set under the assumption
>>>> that the job is run in YARN. If you run locally, it seems to me that the
>>>> directory would continue to exist after a job is restarted. If you
>>>> delete
>>>> your state directory, and restart your job, does the problem temporarily
>>>> go
>>>> away until a subsequent restart happens?
>>>>
>>>> Cheers,
>>>> Chris
>>>>
>>>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lukas@doubledutch.me>
>>>> wrote:
>>>>
>>>>  Hi Chris,
>>>>
>>>>
>>>>> 1. We're running locally using ProcessJobFactory
>>>>> 2. CentOS 7 x86_64
>>>>> 3.
>>>>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>>>>    engaged-users.log: https://gist.github.com/
>>>>> imbusy/0b3d264a40ddf34ab8e7
>>>>>    engaged-users.properties: https://gist.github.com/
>>>>> imbusy/d0019db29d7b68c60bfc
>>>>>
>>>>>    Also note that the properties file sets the default offset to
>>>>> oldest,
>>>>> but the log file says that it's setting the offset to largest:
>>>>> "2015-02-17
>>>>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>>>>
>>>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>>>>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>>>>>    I checked the directory and it actually exists:
>>>>>
>>>>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>>>>
>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
0
>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>>>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>>>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition
3
>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>>>>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>>>>
>>>>> Lukas
>>>>>
>>>>> -----Original Message----- From: Chris Riccomini
>>>>> Sent: Monday, February 16, 2015 5:53 PM
>>>>> To: dev@samza.apache.org
>>>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>>>
>>>>>
>>>>> Hey Lukas,
>>>>>
>>>>> It looks like the exception is actually thrown on get, not put:
>>>>>
>>>>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>> KeyValueStorageEngine.scala:44)
>>>>>
>>>>> 1. Are you running your job under YARN, or as a local job
>>>>> (ThreadJobFactory/ProcessJobFactory)?
>>>>> 2. What OS are you running on?
>>>>> 3. Could post a fully copy of your logs somewhere (github gist,
>>>>> pasteboard,
>>>>> or something)?
>>>>> 4.  Also, what does this line say in your logs:
>>>>>
>>>>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>>>>
>>>>> It sounds like something is getting messed up with the directory where
>>>>> the
>>>>> RocksDB store is trying to keep its data.
>>>>>
>>>>> Cheers,
>>>>> Chris
>>>>>
>>>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lukas@doubledutch.me>
>>>>> wrote:
>>>>>
>>>>>  Hello,
>>>>>
>>>>>
>>>>>  I was setting up the key-value storage engine in Samza and ran into
an
>>>>>> exception when querying the data.
>>>>>>
>>>>>> I added these properties to the config:
>>>>>>
>>>>>>
>>>>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>>>>> RocksDbKeyValueStorageEngineFactory
>>>>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>>>>     # a custom data type with an appropriate Serde
>>>>>>     stores.engaged-store.key.serde=UserAppPair
>>>>>>     # wrote a Serde for Long using ByteBuffer
>>>>>>     stores.engaged-store.msg.serde=Long
>>>>>>
>>>>>> I have no trouble initializing the storage engine with:
>>>>>>
>>>>>>     val store =
>>>>>> context.getStore("engaged-store").asInstanceOf[
>>>>>> KeyValueStore[UserAppPair,
>>>>>> Long]];
>>>>>>
>>>>>> but when I query by the key when processing messages, it’s throwing
an
>>>>>> exception:
>>>>>>
>>>>>>     val key = new UserAppPair(userId, appId);
>>>>>>     val value = store.get(key);
>>>>>>
>>>>>> Here’s the log:
>>>>>>
>>>>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>> localhost:9092
>>>>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received
>>>>>> an
>>>>>> invalid or empty offset None for [Follows,0]. Attempting to use
>>>>>> Kafka's
>>>>>> auto.offset.reset setting. This can result in data loss if processing
>>>>>> continues.
>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset
>>>>>> is
>>>>>> defined for topic Follows
>>>>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>>>>> localhost:9092
>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for
key
>>>>>> in
>>>>>> rocksdb.
>>>>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>>>>> process
>>>>>> loop.
>>>>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid
>>>>>> argument
>>>>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>>>>> RocksDbKeyValueStore.scala:85)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>> RocksDbKeyValueStore.scala:92)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>>>> RocksDbKeyValueStore.scala:80)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>>>>> SerializedKeyValueStore.scala:36)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>>>>> NullSafeKeyValueStore.scala:36)
>>>>>>         at
>>>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>>>> KeyValueStorageEngine.scala:44)
>>>>>>         at
>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>>>>> EngagedUsersTask.scala:66)
>>>>>>         at
>>>>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>>>>> EngagedUsersTask.scala:100)
>>>>>>         at
>>>>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>>>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>>>>         at
>>>>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>>>>> TaskInstanceExceptionHandler.scala:54)
>>>>>>         at
>>>>>> org.apache.samza.container.TaskInstance.process(
>>>>>> TaskInstance.scala:136)
>>>>>>         at
>>>>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>>>>> apply(RunLoop.scala:93)
>>>>>>         at
>>>>>> org.apache.samza.util.TimerUtils$class.updateTimer(
>>>>>> TimerUtils.scala:37)
>>>>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>>>>> scala:36)
>>>>>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:
>>>>>> 79)
>>>>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>>>>         at
>>>>>> org.apache.samza.container.SamzaContainer.run(
>>>>>> SamzaContainer.scala:556)
>>>>>>         at
>>>>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>>>>> SamzaContainer.scala:108)
>>>>>>         at
>>>>>> org.apache.samza.container.SamzaContainer$.main(
>>>>>> SamzaContainer.scala:87)
>>>>>>         at
>>>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>>>>> multiplexer.
>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy
>>>>>> for
>>>>>> localhost:9092
>>>>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect
>>>>>> due
>>>>>> to
>>>>>> socket error: null
>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>>>>> exception in broker proxy thread.
>>>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>>>>> interrupt.
>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>>>>> multiplexer.
>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>> instance
>>>>>> stream tasks.
>>>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>>>> instance
>>>>>> stores.
>>>>>>
>>>>>>
>>>>>> Same exception is thrown if I try to put a value in RocksDB. Has
>>>>>> anyone
>>>>>> run into this problem before or has any pointers into solving it?
>>>>>>
>>>>>> Lukas
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message