drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [drill] dbw9580 commented on a change in pull request #2084: [WIP] DRILL-7745: Add storage plugin for IPFS
Date Mon, 22 Jun 2020 18:44:22 GMT

dbw9580 commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r443756373



##########
File path: contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONRecordReader.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class IPFSJSONRecordReader extends AbstractRecordReader {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IPFSJSONRecordReader.class);
+
+  public static final long DEFAULT_ROWS_PER_BATCH = BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+  private FragmentContext fragmentContext;
+  private IPFSContext ipfsContext;
+  private String subScanSpec;
+  private List<SchemaPath> columnList;
+  private JsonProcessor jsonReader;
+  private InputStream stream;
+  private int recordCount;
+  private long runningRecordCount = 0;
+  private final boolean enableAllTextMode;
+  private final boolean enableNanInf;
+  private final boolean enableEscapeAnyChar;
+  private final boolean readNumbersAsDouble;
+  private final boolean unionEnabled;
+  private long parseErrorCount;
+  private final boolean skipMalformedJSONRecords;
+  private final boolean printSkippedMalformedJSONRecordLineNumber;
+  private JsonProcessor.ReadState write = null;
+  private VectorContainerWriter writer;
+
+  public IPFSJSONRecordReader(FragmentContext fragmentContext, IPFSContext ipfsContext, String
scanSpec, List<SchemaPath> columns) {
+    this.fragmentContext = fragmentContext;
+    this.ipfsContext = ipfsContext;
+    this.subScanSpec = scanSpec;
+    this.columnList = columns;
+    setColumns(columns);
+    this.fragmentContext = fragmentContext;
+    // only enable all text mode if we aren't using embedded content mode.
+    this.enableAllTextMode = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
+    this.enableEscapeAnyChar = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR);
+    this.enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR);
+    this.readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
+    this.unionEnabled = fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
+    this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
+    this.printSkippedMalformedJSONRecordLineNumber = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
+
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + ", recordCount = " + recordCount
+        + ", parseErrorCount = " + parseErrorCount
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{

Review comment:
       Refactored in 97b4a7d

##########
File path: contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class IPFSPeer {
+  private IPFSHelper helper;
+
+  private Multihash id;
+  private List<MultiAddress> addrs;
+  private boolean isDrillReady;
+  private boolean isDrillReadyChecked = false;
+  private Optional<String> drillbitAddress = Optional.empty();
+  private boolean drillbitAddressChecked = false;
+
+
+  public IPFSPeer(IPFSHelper helper, Multihash id) {
+    this.helper = helper;
+    this.id = id;
+  }
+
+  IPFSPeer(IPFSHelper helper, Multihash id, List<MultiAddress> addrs) {
+    this.helper = helper;
+    this.id = id;
+    this.addrs = addrs;
+    this.isDrillReady = helper.isDrillReady(id);
+    this.isDrillReadyChecked = true;
+    this.drillbitAddress = IPFSHelper.pickPeerHost(addrs);
+    this.drillbitAddressChecked = true;
+  }
+
+  public boolean isDrillReady() {
+    if (!isDrillReadyChecked) {
+      isDrillReady = helper.isDrillReady(id);
+      isDrillReadyChecked = true;
+    }
+    return isDrillReady;
+  }
+
+  public boolean hasDrillbitAddress() {

Review comment:
       Fixed in 160a909.

##########
File path: contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+public class IPFSPeer {
+  private IPFSHelper helper;
+
+  private Multihash id;
+  private List<MultiAddress> addrs;
+  private boolean isDrillReady;
+  private boolean isDrillReadyChecked = false;
+  private Optional<String> drillbitAddress = Optional.empty();
+  private boolean drillbitAddressChecked = false;
+
+
+  public IPFSPeer(IPFSHelper helper, Multihash id) {
+    this.helper = helper;
+    this.id = id;
+  }
+
+  IPFSPeer(IPFSHelper helper, Multihash id, List<MultiAddress> addrs) {
+    this.helper = helper;
+    this.id = id;
+    this.addrs = addrs;
+    this.isDrillReady = helper.isDrillReady(id);
+    this.isDrillReadyChecked = true;
+    this.drillbitAddress = IPFSHelper.pickPeerHost(addrs);
+    this.drillbitAddressChecked = true;
+  }
+
+  public boolean isDrillReady() {
+    if (!isDrillReadyChecked) {
+      isDrillReady = helper.isDrillReady(id);
+      isDrillReadyChecked = true;
+    }
+    return isDrillReady;
+  }
+
+  public boolean hasDrillbitAddress() {
+    findDrillbitAddress();
+    return drillbitAddress.isPresent();
+  }
+
+  public Optional<String> getDrillbitAddress() {
+    findDrillbitAddress();
+    return drillbitAddress;
+  }
+
+  public List<MultiAddress> getMultiAddresses() {
+    findDrillbitAddress();
+    return addrs;
+  }
+
+  public Multihash getId() {
+    return id;
+  }
+
+
+  private void findDrillbitAddress() {
+    if (!drillbitAddressChecked) {

Review comment:
       Changed in 160a909. The `IOException` is no longer relevant since `IPFSCompat` is refactored.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message