sqoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From KayVajj <vajjalak...@gmail.com>
Subject Sqoop Postgres Direct option issue
Date Tue, 19 May 2015 18:37:46 GMT
Hi Sqoopers,

I'm trying to compare the direct option against the JDBC (default) option
against Postgres database. I'm able to pull data using both approached but
I'm having issues getting the hadoop counters using the direct option. The
sqoop command is being run via oozie. Below is the snippet of oozie
workflow (some of names are obfuscated for obvious reasons)

  <start to="checkDirectImport"/>
>
>
>
>   <decision name="checkDirectImport">
>
>     <switch>
>
>       <case to="directImportFromGP">${directImport eq 'true'}</case>
>
>       <default to="importFromGP"/>
>
>     </switch>
>
>   </decision>
>
>
>
>
>
>   <action name="directImportFromGP">
>
>     <sqoop xmlns="uri:oozie:sqoop-action:0.3">
>
>       <job-tracker>${jobTracker}</job-tracker>
>
>       <name-node>${nameNode}</name-node>
>
>       <configuration>
>
>         <property>
>
>           <name>mapreduce.map.speculative</name>
>
>           <value>false</value>
>
>         </property>
>
>         <property>
>
>           <name>mapreduce.reduce.speculative</name>
>
>           <value>false</value>
>
>         </property>
>
>         <property>
>
>           <name>mapreduce.map.log.level</name>
>
>           <value>DEBUG</value>
>
>         </property>
>
>         <property>
>
>           <name>org.apache.sqoop.log.level</name>
>
>           <value>DEBUG</value>
>
>         </property>
>
>       </configuration>
>
>       <arg>import</arg>
>
>       <arg>--options-file</arg>
>
>       <arg>gp.sqoop.db.props</arg>
>
>       <arg>--connection-manager</arg>
>
>       <arg>org.apache.sqoop.manager.DirectPostgresqlManager</arg>
>
>       <arg>--table</arg>
>
>       <arg>${gpTableName}</arg>
>
>       <arg>--direct</arg>
>
>       <arg>--where</arg>
>
>       <arg>
>
>         date(datetime) between '${beginDt}'::date and '${endDt}'::date and
>> org_id='${orgId}'
>
>       </arg>
>
>       <arg>--split-by</arg>
>
>       <arg>event_id</arg>
>
>       <arg>--boundary-query</arg>
>
>       <arg>
>
>         select min(col1), max(col1)
>
>         from table_name
>
>         where date(datetime) between
>
>         '${beginDt}'::date and '${endDt}'::date and org_id='${orgId}'
>
>       </arg>
>
>       <arg>--target-dir</arg>
>
>       <arg>${targetDir}</arg>
>
>       <arg>--delete-target-dir</arg>
>
>       <arg>--jar-file</arg>
>
>       <arg>gp-record.jar</arg>
>
>       <arg>--class-name</arg>
>
>       <arg>com...<someclassname></arg>
>
>       <!-- <arg>as-sequencefile</arg> -->
>
>       <arg>--compress</arg>
>
>       <arg>--compression-codec</arg>
>
>       <arg>gzip</arg>
>
>       <!--  <arg>hive-drop-import-delims</arg>-->
>
>       <arg>--fields-terminated-by</arg>
>
>       <arg>\0001</arg>
>
>       <arg>--null-string</arg>
>
>       <arg>\\N</arg>
>
>       <arg>--null-non-string</arg>
>
>       <arg>\\N</arg>
>
>       <arg>--escaped-by</arg>
>
>       <arg>\\</arg>
>
>       <arg>--optionally-enclosed-by</arg>
>
>       <arg>\"</arg>
>
>       <arg>--verbose</arg>
>
>       <file>${some_path}/gp.sqoop.db.props</file>
>
>       <file>${some_lib_path}/gp-record.jar</file>
>
>     </sqoop>
>
>     <ok to="processImportStats"/>
>
>     <error to="kill"/>
>
>   </action>
>
>
>
>
>>   <action name="importFromGP">
>
>     <sqoop xmlns="uri:oozie:sqoop-action:0.3">
>
>       <job-tracker>${jobTracker}</job-tracker>
>
>       <name-node>${nameNode}</name-node>
>
>       <configuration>
>
>         <property>
>
>           <name>mapreduce.map.speculative</name>
>
>           <value>false</value>
>
>         </property>
>
>         <property>
>
>           <name>mapreduce.reduce.speculative</name>
>
>           <value>false</value>
>
>         </property>
>
>         <property>
>
>           <name>mapreduce.map.log.level</name>
>
>           <value>DEBUG</value>
>
>         </property>
>
>         <property>
>
>           <name>org.apache.sqoop.log.level</name>
>
>           <value>DEBUG</value>
>
>         </property>
>
>       </configuration>
>
>       <arg>import</arg>
>
>       <arg>--options-file</arg>
>
>       <arg>gp.sqoop.db.props</arg>
>
>       <arg>--table</arg>
>
>       <arg>${gpTableName}</arg>
>
>       <arg>--where</arg>
>
>       <arg>
>
>         date(datetime) between '${beginDt}'::date and '${endDt}'::date and
>> org_id='${orgId}'
>
>       </arg>
>
>       <arg>--split-by</arg>
>
>       <arg>event_id</arg>
>
>       <arg>--boundary-query</arg>
>
>       <arg>
>
>         select min(col1), max(col1)
>
>         from table_name
>
>         where date(datetime) between
>
>         '${beginDt}'::date and '${endDt}'::date and org_id='${orgId}'
>
>       </arg>
>
>       <arg>--target-dir</arg>
>
>       <arg>${targetDir}</arg>
>
>       <arg>--delete-target-dir</arg>
>
>       <arg>--jar-file</arg>
>
>       <arg>gp-record.jar</arg>
>
>       <arg>--class-name</arg>
>
>       <arg>com...,someclassname></arg>
>
>       <arg>--as-sequencefile</arg>
>
>       <arg>--compress</arg>
>
>       <arg>--compression-codec</arg>
>
>       <arg>gzip</arg>
>
>       <arg>--hive-drop-import-delims</arg>
>
>       <arg>--fields-terminated-by</arg>
>
>       <arg>\0001</arg>
>
>       <arg>--null-string</arg>
>
>       <arg>\\N</arg>
>
>       <arg>--null-non-string</arg>
>
>       <arg>\\N</arg>
>
>       <arg>--escaped-by</arg>
>
>       <arg>\\</arg>
>
>       <arg>--optionally-enclosed-by</arg>
>
>       <arg>\"</arg>
>
>       <arg>--verbose</arg>
>
>       <file>${somepath_path}/gp.sqoop.db.props</file>
>
>       <file>${somelib_path}/gp-record.jar</file>
>
>     </sqoop>
>
>     <ok to="processImportStats"/>
>
>     <error to="kill"/>
>
>   </action>
>
>
>
>   <action name="processImportStats">
>
>     <java>
>
>       <job-tracker>${jobTracker}</job-tracker>
>
>       <name-node>${nameNode}</name-node>
>
>       <prepare>
>
>       </prepare>
>
>       <main-class>com.someclass</main-class>
>
>       <arg>--gp-record-count</arg>
>
>       <arg>${recordCount}</arg>
>
>       <arg>--direct-import</arg>
>
>       <arg>${directImport}</arg>
>
>       <arg>--import-job-status</arg>
>
>
>> <arg>${firstNotNull((firstNotNull(wf:actionExternalStatus('importFromGP'),
>> wf:actionExternalStatus('directImportFromGP')) ), 'DID_NOT_RUN')}</arg>
>
>       <arg>--import-input-count</arg>
>
>       <arg>
>
>         ${
>
>           (firstNotNull(firstNotNull(wf:actionExternalId('importFromGP'),
>> wf:actionExternalId('directImportFromGP')), 'DID_NOT_RUN') eq
>> 'DID_NOT_RUN')
>
>             ? 'UNKNOWN'
>
>             : ((firstNotNull(wf:actionExternalId('importFromGP'),
>> wf:actionExternalId('directImportFromGP')) eq
>> wf:actionExternalId('importFromGP'))
>
>               ? (
>
>                   (firstNotNull(wf:actionExternalStatus('importFromGP'),
>> 'FAILED') eq 'SUCCEEDED')
>
>                   ?
>> hadoop:counters("importFromGP")["org.apache.hadoop.mapred.Task$Counter"]["MAP_INPUT_RECORDS"]
>
>                   : 'UNKNOWN'
>
>                 )
>
>
>
>               : (
>
>
>> (firstNotNull(wf:actionExternalStatus('directImportFromGP'), 'FAILED') eq
>> 'SUCCEEDED')
>
>                   ?
>> hadoop:counters("directImportFromGP")["org.apache.hadoop.mapred.Task$Counter"]["MAP_INPUT_RECORDS"]
>
>                   : 'UNKNOWN'
>
>                 )
>
>              )
>
>          }
>
>       </arg>
>
>       <arg>--import-output-count</arg>
>
>       <arg>
>
>         ${
>
>            (
>> firstNotNull(firstNotNull(wf:actionExternalId('importFromGP'),
>> wf:actionExternalId('directImportFromGP')), 'DID_NOT_RUN') eq
>> 'DID_NOT_RUN')
>
>            ? 'UNKNOWN'
>
>            : (
>
>               (firstNotNull(wf:actionExternalId('importFromGP'),
>> wf:actionExternalId('directImportFromGP')) eq
>> wf:actionExternalId('importFromGP'))
>
>               ? (
>
>                   (firstNotNull(wf:actionExternalStatus('importFromGP'),
>> 'FAILED') eq 'SUCCEEDED')
>
>                   ?
>> hadoop:counters("importFromGP")["org.apache.hadoop.mapred.Task$Counter"]["MAP_OUTPUT_RECORDS"]
>
>                   : 'UNKNOWN'
>
>                 )
>
>
>
>               : (
>
>
>> (firstNotNull(wf:actionExternalStatus('directImportFromGP'), 'FAILED') eq
>> 'SUCCEEDED')
>
>                   ?
>> hadoop:counters("directImportFromGP")["org.apache.hadoop.mapred.Task$Counter"]["MAP_OUTPUT_RECORDS"]
>
>                   : 'UNKNOWN'
>
>                 )
>
>              )
>
>          }
>
>       </arg>
>
>       <capture-output/>
>
>     </java>
>
>     <ok to="generateReport"/>
>
>     <error to="kill"/>
>
>   </action>
>
>
While I run in the default mode I get the
 hadoop:counters("importFromGP")["org.apache.hadoop.mapred.Task$Counter"]["MAP_OUTPUT_RECORDS"].
But not while I run in the Direct
mode hadoop:counters("directImportFromGP")["org.apache.hadoop.mapred.Task$Counter"]["MAP_OUTPUT_RECORDS"].
OOzie throws a EL Error and the oozie log snippet is below

2015-05-19 11:01:09,773 WARN
>> org.apache.oozie.command.wf.ActionStartXCommand:
>> SERVER[quickstart.cloudera] USER[cloudera] GROUP[-] TOKEN[]
>> APP[migrate--2014-11-01] JOB[0000027-150516144541141-oozie-oozi-W]
>> ACTION[0000027-150516144541141-oozie-oozi-W@processImportStats]
>> Exception in ActionStartXCommand
>
> java.lang.NullPointerException
>
> at
>> org.apache.oozie.action.hadoop.HadoopELFunctions.hadoop_counters(HadoopELFunctions.java:56)
>
> at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source)
>
> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
>> org.apache.commons.el.FunctionInvocation.evaluate(FunctionInvocation.java:172)
>
> at org.apache.commons.el.ComplexValue.evaluate(ComplexValue.java:140)
>
> at
>> org.apache.commons.el.ConditionalExpression.evaluate(ConditionalExpression.java:153)
>
> at
>> org.apache.commons.el.ConditionalExpression.evaluate(ConditionalExpression.java:155)
>
> at
>> org.apache.commons.el.ConditionalExpression.evaluate(ConditionalExpression.java:155)
>
> at
>> org.apache.commons.el.ExpressionString.evaluate(ExpressionString.java:114)
>
> at
>> org.apache.commons.el.ExpressionEvaluatorImpl.evaluate(ExpressionEvaluatorImpl.java:274)
>
> at
>> org.apache.commons.el.ExpressionEvaluatorImpl.evaluate(ExpressionEvaluatorImpl.java:190)
>
> at org.apache.oozie.util.ELEvaluator.evaluate(ELEvaluator.java:203)
>
> at
>> org.apache.oozie.command.wf.ActionStartXCommand.execute(ActionStartXCommand.java:188)
>
> at
>> org.apache.oozie.command.wf.ActionStartXCommand.execute(ActionStartXCommand.java:63)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:281)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:351)
>
> at
>> org.apache.oozie.command.wf.SignalXCommand.execute(SignalXCommand.java:394)
>
> at
>> org.apache.oozie.command.wf.SignalXCommand.execute(SignalXCommand.java:73)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:281)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:351)
>
> at
>> org.apache.oozie.command.wf.ActionEndXCommand.execute(ActionEndXCommand.java:273)
>
> at
>> org.apache.oozie.command.wf.ActionEndXCommand.execute(ActionEndXCommand.java:60)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:281)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:351)
>
> at
>> org.apache.oozie.command.wf.ActionCheckXCommand.execute(ActionCheckXCommand.java:241)
>
> at
>> org.apache.oozie.command.wf.ActionCheckXCommand.execute(ActionCheckXCommand.java:55)
>
> at org.apache.oozie.command.XCommand.call(XCommand.java:281)
>
> at
>> org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:174)
>
> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 2015-05-19 11:01:09,774 WARN
>> org.apache.oozie.command.wf.ActionStartXCommand:
>> SERVER[quickstart.cloudera] USER[cloudera] GROUP[-] TOKEN[]
>> APP[migrate--2014-11-01] JOB[0000027-150516144541141-oozie-oozi-W]
>> ACTION[0000027-150516144541141-oozie-oozi-W@processImportStats] Failing
>> Job due to failed action [processImportStats]
>
>
>
The questions I have is


   1. I see that Direct mode run from command line doesn't create a MR job.
   While in case of the oozie there is a mapper which is running Sqoop I
   checked the logs of the mapper, I do not see any hadoop counters. Doesn't
   direct mode generate hadoop counters while running in oozie?
   2. While the default mode splits the jobs into 4 or other predefined
   number of mappers using the boundary query, I do not see the same happening
   in case of direct mode. I have read the user guide a couple of times, but
   did not see anything on these lines in the guide. Why is this so and does
   it mean the postgres copy command is much more efficient than running MR
   (parallelized)?


Thanks
Kay

Mime
View raw message