samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgg <sggraha...@gmail.com>
Subject Re: Example Initable Task
Date Tue, 10 Dec 2013 03:50:43 GMT
yep, see 'em now.  Thanks again.
On Dec 9, 2013, at 10:42 PM, Chris Riccomini <criccomini@linkedin.com> wrote:

> Hey sgg,
> 
> If you're using YARN, which it sounds like you are, they should be piped
> to the stdout file of the container (not the ApplicationMaster). The way
> to find the container is by going to the YARN web UI, and clicking on the
> ApplicationMaster link for your Samza job. This will lead you to the
> ApplicationMaster's web UI (confusing, I know), which will have a link to
> your Samza job's containers. If you're playing with hello-samza, it'll be
> something like container_12345678_1234536_2. That last "2" is the
> container number for your Samza job: 1 is the AM, and 2 is the Samza
> container that's running your StreamTasks.
> 
> Cheers,
> Chris
> 
> On 12/9/13 7:37 PM, "sgg" <sggraham64@gmail.com> wrote:
> 
>> Hi Chris:
>> DOH!  that was the problem.  Made the change to implement both interfaces
>> and things ran fine!   Thanks for the pointer!
>> 
>> BTW, where does the output from the System.out.println statements appear?
>> i.e. where is stdout being piped to? I had expected to see it in the logs
>> facility in the Yarn web console, but these print statements don't appear
>> in the yarn stdout log.  Where should I be looking?
>> 
>> sgg
>> On Dec 9, 2013, at 10:14 PM, Chris Riccomini <criccomini@linkedin.com>
>> wrote:
>> 
>>> Hey sgg,
>>> 
>>> Ah, I didn't notice, but your code does not implement StreamTask, just
>>> InitableTask.
>>> 
>>> These interfaces are like mix-ins. You MUST implement StreamTask. Try:
>>> 
>>> public class SimpleSamzaTask implements StreamTask, InitableTask {
>>> 
>>>   ...
>>> }
>>> 
>>> Sorry about that.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 12/9/13 5:45 PM, "sgg" <sggraham64@gmail.com> wrote:
>>> 
>>>> Ok thanks Chris.  But I still need to figure out why the init() method
>>>> is
>>>> not getting called, in fact, the entire Samza job fails.  It works if
>>>> the
>>>> task implements StreamTask (obviously not invoking init()), but at
>>>> least
>>>> the task runs.  When I change it to implement InitableTask() as shown,
>>>> the job fails.  It seems to not be able to instantiate the task
>>>> object, I
>>>> was hoping there would be an example that runs that would allow me to
>>>> see
>>>> what a correct start up sequence looks like.
>>>> 
>>>> sgg
>>>> On Dec 9, 2013, at 12:27 PM, Chris Riccomini <criccomini@linkedin.com>
>>>> wrote:
>>>> 
>>>>> Hi there,
>>>>> 
>>>>> Your task looks good. Your init() method receives two parameters:
>>>>> Config,
>>>>> and TaskContext. The config object has *all* config properties defined
>>>>> in
>>>>> your job's config. If you were to write:
>>>>> 
>>>>> @Override
>>>>> public void init(Config config, TaskContext context) throws Exception
>>>>> {
>>>>> System.out.println(config.get("task.foo.bar");
>>>>> }
>>>>> 
>>>>> 
>>>>> And you had task.foo.bar=someVal, you'd then expect to see your task
>>>>> print
>>>>> "someVal" once for each partition that the Samza container is
>>>>> responsible
>>>>> for. For example, if you had a single Samza container
>>>>> (yarn.container.count=1, or using LocalJobFactory), and you had
>>>>> defined
>>>>> a
>>>>> single input stream that had 4 partitions, your logs would show
>>>>> "someVal"
>>>>> printed four times (one for each partition that the Samza container is
>>>>> responsible for processing).
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 12/7/13 4:18 AM, "sgg" <sggraham64@gmail.com> wrote:
>>>>> 
>>>>>> Does anyone have a working example of a Samza job using an
>>>>>> InitableTask?
>>>>>> 
>>>>>> It wasn't clear from the documentation what ways to specify the input
>>>>>> parameters to the init() method.  Is it a matter of adding lines
in
>>>>>> the
>>>>>> config file that look like:
>>>>>> task.foo.bar = someVal
>>>>>> 
>>>>>> ?
>>>>>> 
>>>>>> Also, when I tried to run a simple example with an InitableTask,
I
>>>>>> get
>>>>>> an
>>>>>> error.  It seems there is some problem when Samza attempts to
>>>>>> instantiate
>>>>>> the class
>>>>>> 
>>>>>> 
>>>>>> Here is the sample task I am trying to run:
>>>>>> public class SimpleSamzaTask implements InitableTask {
>>>>>> private static final SystemStream OUTPUT_STREAM =
>>>>>>     new SystemStream("kafka", "simpleout");
>>>>>> 
>>>>>> public SimpleSamzaTask(){
>>>>>> 	  super();
>>>>>> }
>>>>>> 
>>>>>> public void process(IncomingMessageEnvelope envelope,
>>>>>> MessageCollector
>>>>>> collector, TaskCoordinator coordinator) {
>>>>>> 	  System.out.println("hello world");
>>>>>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello:
>>>>>> "
>>>>>> +
>>>>>> envelope.getMessage()));
>>>>>> }
>>>>>> 
>>>>>> @Override
>>>>>> public void init(Config arg0, TaskContext arg1) throws Exception
{
>>>>>> 	  // TODO Auto-generated method stub
>>>>>> 	  System.out.println("in init");
>>>>>> }
>>>>>> }
>>>>>> 
>>>>>> Thoughts?
>>>>> 
>>>> 
>>> 
>> 
> 


Mime
View raw message