samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgg <sggraha...@gmail.com>
Subject Re: Example Initable Task
Date Tue, 10 Dec 2013 01:45:16 GMT
Ok thanks Chris.  But I still need to figure out why the init() method is not getting called,
in fact, the entire Samza job fails.  It works if the task implements StreamTask (obviously
not invoking init()), but at least the task runs.  When I change it to implement InitableTask()
as shown, the job fails.  It seems to not be able to instantiate the task object, I was hoping
there would be an example that runs that would allow me to see what a correct start up sequence
looks like.

sgg
On Dec 9, 2013, at 12:27 PM, Chris Riccomini <criccomini@linkedin.com> wrote:

> Hi there,
> 
> Your task looks good. Your init() method receives two parameters: Config,
> and TaskContext. The config object has *all* config properties defined in
> your job's config. If you were to write:
> 
> @Override
> public void init(Config config, TaskContext context) throws Exception {
>  System.out.println(config.get("task.foo.bar");
>  }
> 
> 
> And you had task.foo.bar=someVal, you'd then expect to see your task print
> "someVal" once for each partition that the Samza container is responsible
> for. For example, if you had a single Samza container
> (yarn.container.count=1, or using LocalJobFactory), and you had defined a
> single input stream that had 4 partitions, your logs would show "someVal"
> printed four times (one for each partition that the Samza container is
> responsible for processing).
> 
> Cheers,
> Chris
> 
> On 12/7/13 4:18 AM, "sgg" <sggraham64@gmail.com> wrote:
> 
>> Does anyone have a working example of a Samza job using an InitableTask?
>> 
>> It wasn't clear from the documentation what ways to specify the input
>> parameters to the init() method.  Is it a matter of adding lines in the
>> config file that look like:
>> task.foo.bar = someVal
>> 
>> ?
>> 
>> Also, when I tried to run a simple example with an InitableTask, I get an
>> error.  It seems there is some problem when Samza attempts to instantiate
>> the class
>> 
>> 
>> Here is the sample task I am trying to run:
>> public class SimpleSamzaTask implements InitableTask {
>> private static final SystemStream OUTPUT_STREAM =
>>       new SystemStream("kafka", "simpleout");
>> 
>> public SimpleSamzaTask(){
>> 	  super();
>> }
>> 
>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>> collector, TaskCoordinator coordinator) {
>> 	  System.out.println("hello world");
>> 	  collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "hello: " +
>> envelope.getMessage()));
>> }
>> 
>> @Override
>> public void init(Config arg0, TaskContext arg1) throws Exception {
>> 	  // TODO Auto-generated method stub
>> 	  System.out.println("in init");
>> }
>> }
>> 
>> Thoughts?
> 


Mime
View raw message