samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rmatharu commented on a change in pull request #915: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
Date Tue, 12 Feb 2019 07:48:58 GMT
rmatharu commented on a change in pull request #915: Consolidating offset read and write for
store-offsets and side-inputs, maintaining backward compatbility
URL: https://github.com/apache/samza/pull/915#discussion_r255834501
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##########
 @@ -129,26 +173,35 @@ public static boolean storeExists(File storeDir) {
    * Read and return the contents of the offset file.
    *
    * @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
    * @return the content of the offset file if it exists for the store, null otherwise.
    */
-  public static String readOffsetFile(File storagePartitionDir, String offsetFileName) {
-    String offset = null;
-    File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir,
Set<SystemStreamPartition> storeSSPs) {
+    Map<SystemStreamPartition, String> offsets = null;
+    String fileContents;
+    File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
     String storePath = storagePartitionDir.getPath();
 
     if (offsetFileRef.exists()) {
       LOG.info("Found offset file in storage partition directory: {}", storePath);
       try {
-        offset = FileUtil.readWithChecksum(offsetFileRef);
+        fileContents = FileUtil.readWithChecksum(offsetFileRef);
+        try {
+          offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
+        } catch (JsonParseException | JsonMappingException e) {
+          LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value",
storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+          offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x
-> x, y -> fileContents)) : null;
+        } catch (IOException e) {
+          LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value",
storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+          offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(x
-> x, y -> fileContents)) : null;
 
 Review comment:
   No if we're unable to read the file then we'd get to the blanket catch below. 
   These will only be thrown when the file's contents are not proper json (existing offset
format) in which case we fall-back to older store-offset format.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message