sqoop-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jarek Jarcec Cecho (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SQOOP-1803) JobManager and Execution Engine changes: Support for a injecting and pulling out configs and job output in connectors
Date Tue, 03 Mar 2015 18:57:04 GMT

    [ https://issues.apache.org/jira/browse/SQOOP-1803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14345504#comment-14345504
] 

Jarek Jarcec Cecho commented on SQOOP-1803:
-------------------------------------------

I was thinking about this one a bit myself. I have couple of thoughts of getting data back
from the execution engine and I'm wondering what others thinks. Please don't hesitate and
chime in if I missed any approach.

1) DistributedCache

In addition to [~gwenshap] comments about supportability (and/or debuggability) of {{DistributedCache}},
to my best knowledge it can be only used to distribute data from the launcher (Sqoop Server)
to children mapreduce tasks. I do not believe that it can be used to the other way around
to get files or data from individual tasks back to the launcher. Looking at the [latest javadocs|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html]
it seems still valid as the documentation contains note about immutability of the cache when
the job is submitted:

{quote}
DistributedCache tracks modification timestamps of the cache files. Clearly the cache files
should not be modified by the application or externally while the job is executing.
{quote}

*Summary:* I believe that this solution is disqualified for the retrieving data back from
execution engine.

2) Counters

[Counters|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/Counters.html]
are nice technique how to get insight into what is the mapreduce job doing. Multiple mappers
can be updating the counters in parallel and it's mapreduce responsibility to ensure that
counters from all children tasks are sum-upped correctly. The limitation of this solution
is that counters can be only {{long}} based (e.g. no {{String}}, {{Date}}, ...). Also the
counters are cumulative in nature, so it might be a bit difficult to retrieve discrete values
- we would need to ensure that only certain mappers/reducers will update given counters whereas
others won't or we would need to figure out a way how to decode one single value when multiple
mapper/reducers will update one counter at the same time.

*Summary:* Whereas it's not impossible to use the counters to retrieve data from execution
engine, it seems that this solution will impose limitations and will be "difficult" to implement
and maintain.

3) HDFS Files

Looking into how others are solving this problem, [Oozie|http://oozie.apache.org] launcher
tasks (=one map mapreduce jobs) are generating files on HDFS in predefined directory from
where the Oozie server will pick them up to read any arbitrary values. This is neat solution
as it allows us to retrieve any value of any type from any part of the workflow (all processes
can create their own files if needed). The downside is that we would need to agree on certain
location where Server and the mapreduce job will be exchanging files - this directory must
exists and must be accessible by both Sqoop (running under system user) and the mapreduce
job itself (most likely running as end user). I believe that HDFS ACLs can be easily used
to accomplish this task.

We would need to be careful here with edge conditions - we would need to make sure that we're
cleaning up old and unused files (job failures, ...) and that we are not leaking any sensitive
information to the HDFS.

*Summary:* Possible solution that will support all our use cases, but will be a bit harder
to implement.

4) Server side exchange only

I was also looking into how things work currently in the server and I've realized something
that made me thing about this proposal. Back when we were defining the workflow, the intention
was that only {{Initializer}} is allowed to generate state whereas all other parts of the
workflow {{Partitioner}}, {{Extractor}}, {{Loader}} and {{Destroyer}} should not generate
any state and only reuse the one that was pre-prepared in initializer. The reason for that
is that {{Initializer}} is run only once whereas all other parts of the workflow are run in
parallel and/or not running on Sqoop server itself, hence by allowing state to be generated
only in {{Initializer}} we don't have to deal with synchronizing the parallel pieces or deal
with limitations in various execution engines. The intention is persisted in the API when
{{Initializer}} is given {{MutableContext}} where connector developer can set any properties
that will be shared with rest of the workflow (~ the state) and when all other parts are given
only {{ImmutableContext}} that doesn't allow any changes to the shared properties. I have
to say that we have small exception in the code base, because {{Partitioner}} class is generating
{{Partition}} objects that can carry some context as well. However as the {{Partition}} objects
are not available in {{Destroyer}}, connector developer still needs to persist state that
is required through entire workflow inside the {{Initializer}}.

Having said that, another option seems to be to simply not retrieve anything from the execution
engine and let connector update the configuration objects based on info that the connector
generated in {{Initializer}} - assuming that the job finished correctly. Looking at current
connectors this should work well, as we need to update and persist state that is 'locked'
at the {{Initializer}} stage. For database-base connectors the "max value" should be determined
in initializer (it's currently not though) and the same for Kafka and other connectors. The
beauty of this approach is that it's simple to implement and can actually be easily extended
in the future to include data coming from execution engine shell there be a need for it (for
the approach 3) for example).

> JobManager and Execution Engine changes: Support for a injecting and pulling out configs
and job output in connectors 
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SQOOP-1803
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1803
>             Project: Sqoop
>          Issue Type: Sub-task
>            Reporter: Veena Basavaraj
>            Assignee: Veena Basavaraj
>             Fix For: 1.99.6
>
>
> The details are in the design wiki, as the implementation happens more discussions can
happen here.
> https://cwiki.apache.org/confluence/display/SQOOP/Delta+Fetch+And+Merge+Design#DeltaFetchAndMergeDesign-Howtogetoutputfromconnectortosqoop?
> The goal is to dynamically inject a IncrementalConfig instance into the FromJobConfiguration.
The current MFromConfig and MToConfig can already hold a list of configs, and a strong sentiment
was expressed to keep it as a list, why not for the first time actually make use of it and
group the incremental related configs in one config object
> This task will prepare the FromJobConfiguration from the job config data, ExtractorContext
with the relevant values from the prev job run 
> This task will prepare the ToJobConfiguration from the job config data, LoaderContext
with the relevant values from the prev job run if any
> We will use DistributedCache to get State information from the Extractor and Loader out
and finally persist it into the sqoop repository depending on SQOOP-1804 once the outputcommitter
commit is called



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message