From user-return-59732-apmail-spark-user-archive=spark.apache.org@spark.apache.org Wed Jul 20 11:06:17 2016 Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2E4B118CE7 for ; Wed, 20 Jul 2016 11:06:17 +0000 (UTC) Received: (qmail 97076 invoked by uid 500); 20 Jul 2016 11:06:12 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 96946 invoked by uid 500); 20 Jul 2016 11:06:12 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 96925 invoked by uid 99); 20 Jul 2016 11:06:12 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jul 2016 11:06:12 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C3901C0113; Wed, 20 Jul 2016 11:06:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id OPfElxl6YdQd; Wed, 20 Jul 2016 11:06:08 +0000 (UTC) Received: from mail-vk0-f42.google.com (mail-vk0-f42.google.com [209.85.213.42]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 6BF365FDE7; Wed, 20 Jul 2016 11:06:08 +0000 (UTC) Received: by mail-vk0-f42.google.com with SMTP id n129so18531700vke.3; Wed, 20 Jul 2016 04:06:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=BBH9danDg2pQxqJ2OA+Ygbs4SvnExBg9xyXicBQw0kU=; b=g6tiFsK6VNjl7pmUBN5OnFSK7cJlRg26fjUTmOh29QHg+LtXjuMhVENPgfeOqE6UW0 BEaZmhd2kJhPra1DedPjGM2c4WUpfmxaWfCy9oQMivLr/bRy7zsOqCLF/VMQDXBPdefc Gtoc8ssG7X2dfwBQn6RBQO35yvRvu2I9lhFVgVYgJmEUHLTjchf7+SXlR7W9y3cc1AHX G9H+jO3Gvi4TZen+r3s6mQnxXVdMVzG1gyfN/oWUWkb60QFTL5SObs1SW1SWl77dcaBx 8z2bjPxXk71n4zVH9E7gZHbu6f3XLM6nS6P9kvvBFJTxAIU2b9C3nsZiTtZyjqkvb0Ac lq6Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=BBH9danDg2pQxqJ2OA+Ygbs4SvnExBg9xyXicBQw0kU=; b=ezHUFS52HJ4WfqwKv09qj5KKBRWW9UfTJwqKe0eQ/dr9Oh02XWJ1VAlc0hG1GA7Ord xNQEnJZWUoUF7TOgdZaJVsJt8xVxyQly9JHACAaxNxBdFUTosVklPBSITtlZtqXAUxUk WtiQwds/DGpgcNNp46/C8vl80RAoTVKOGD4Zkrrmp8T8acrxrVEQHC7FuJHl6tPfpEwX qQcrBvE4REN+ShcHWdLDQC3wfWfB3HwrhD3YJaKNpM5mqNBGYvo9WgPZB/Ak3iUkRZlC 8nX+QrPzRK5kQi+xX2J3S7hFmH180FsbhzUz+SeKiauhwRShj/a0ynZFAP76B3iYC0Fs JiFw== X-Gm-Message-State: ALyK8tIF0Uvkot3fnYdt9teuCKHL4q4rY/nLI4ZSpW9suZwS7QqM/JcI1Lr6TXL6yBANT96b/ZZQJ+Qh62fldQ== X-Received: by 10.31.57.71 with SMTP id g68mr20668693vka.20.1469012767378; Wed, 20 Jul 2016 04:06:07 -0700 (PDT) MIME-Version: 1.0 Received: by 10.31.94.72 with HTTP; Wed, 20 Jul 2016 04:06:07 -0700 (PDT) In-Reply-To: References: <653A07AF9AFBE94DA495244AAC54A146DD95C0A3C0@AUSX7MCPC101.AMER.DELL.COM> From: Rabin Banerjee Date: Wed, 20 Jul 2016 16:36:07 +0530 Message-ID: Subject: Re: Storm HDFS bolt equivalent in Spark Streaming. To: Deepak Sharma Cc: Rajesh_Kalluri@dellteam.com, spark users , user-help@spark.apache.org Content-Type: multipart/alternative; boundary=001a1144ac042b928605380f307e --001a1144ac042b928605380f307e Content-Type: text/plain; charset=UTF-8 ++Deepak, There is also a option to use saveAsHadoopFile & saveAsNewAPIHadoopFile, In which you can customize(filename and many things ...) the way you want to save it. :) Happy Sparking !!!! Regards, Rabin Banerjee On Wed, Jul 20, 2016 at 10:01 AM, Deepak Sharma wrote: > In spark streaming , you have to decide the duration of micro batches to > run. > Once you get the micro batch , transform it as per your logic and then you > can use saveAsTextFiles on your final RDD to write it to HDFS. > > Thanks > Deepak > > On 20 Jul 2016 9:49 am, wrote: > > *Dell - Internal Use - Confidential * > > *Dell - Internal Use - Confidential * > > While writing to Kafka from Storm, the hdfs bolt provides a nice way to > batch the messages , rotate files, file name convention etc as shown below. > > > > Do you know of something similar in Spark Streaming or do we have to roll > our own? If anyone attempted this can you throw some pointers. > > > > Every other streaming solution like Flume and NIFI handle logic like below. > > > > > https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_storm-user-guide/content/writing-data-with-storm-hdfs-connector.html > > > > // use "|" instead of "," for field delimiter > > RecordFormat format = new DelimitedRecordFormat() > > .withFieldDelimiter("|"); > > > > // Synchronize the filesystem after every 1000 tuples > > SyncPolicy syncPolicy = new CountSyncPolicy(1000); > > > > // Rotate data files when they reach 5 MB > > FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, > Units.MB); > > > > // Use default, Storm-generated file names > > FileNameFormat fileNameFormat = new DefaultFileNameFormat() > > .withPath("/foo/"); > > > > > > // Instantiate the HdfsBolt > > HdfsBolt bolt = new HdfsBolt() > > .withFsUrl("hdfs://localhost:8020") > > .withFileNameFormat(fileNameFormat) > > .withRecordFormat(format) > > .withRotationPolicy(rotationPolicy) > > .withSyncPolicy(syncPolicy); > > > > > > > --001a1144ac042b928605380f307e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
++Deepak,

There is also a option to use= saveAsHadoopFile & saveAsNewAPIHadoopFile, In which you can customize(= filename and many things ...) the way you want to save it. :)
Happy Sparking !!!!

Regards,
Rabin Banerjee

On Wed, Jul 20, 2016 at 10:01 AM, Deepak Sharma <deepakmca05= @gmail.com> wrote:

In spark streaming , you have to decide the duration of micro batc= hes to run.
Once you get the micro batch , transform it as per your logic and then you = can use saveAsTextFiles on your final RDD to write it to HDFS.

Thanks
Deepak


On 20 Jul 2016 9:= 49 am, <Rajesh_Kalluri@dellteam.com> wrote:

Dell - Internal Use - Confidential

Dell - Internal Use - Confidential

While writing to Kafka from Storm, the h= dfs bolt provides a nice way to batch the messages , rotate files, file nam= e convention etc as shown below.

=C2=A0

Do you know of something simil= ar in Spark Streaming or do we have to roll our own? If anyone attempted th= is can you throw some pointers.

= =C2=A0

Every other streaming solution = like Flume and NIFI handle logic like below.

=C2=A0

https://d= ocs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_storm-user-guide/content= /writing-data-with-storm-hdfs-connector.html

=C2=A0

// u= se "|" instead of "," for field delimiter=

RecordFormat format =3D new Delimi= tedRecordFormat()

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .withFieldDelimiter("|");=

=C2=A0=

// Synchronize the filesystem afte= r every 1000 tuples

S= yncPolicy syncPolicy =3D new CountSyncPolicy(1000);

=C2=A0

// Rotate data files when they reach 5 MB

FileRotationPolicy rotationPolicy =3D= new FileSizeRotationPolicy(5.0f, Units.MB);

=C2=A0

// Use default, Storm-generated file names

FileNameFormat fileNameFormat =3D new Defau= ltFileNameFormat()

= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .withPath("/foo/");=

=C2=A0

=C2=A0

// Instantiate the HdfsBolt

HdfsBolt bolt =3D new HdfsBolt()

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0 .withFsUrl("hdfs://localhost:8020")

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0 .withFileNameFormat(fileNameFormat)

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .withRecor= dFormat(format)

=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 .withRotationPolicy(rotationPolicy)=

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 .withSyncPolicy(syncPolicy);<= /p>

=C2=A0

=C2=A0



--001a1144ac042b928605380f307e--