Уменьшите пару "ключ-значение" в пару из списка ключей с помощью Apache Spark

Я пишу приложение Spark и хочу объединить набор пар Key-Value (K, V1), (K, V2), ..., (K, Vn) в одну пару Key-Multivalue (K, [V1, V2, ..., Vn]). Мне кажется, что я должен это сделать, используя функцию reduceByKey с чем-то вроде:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

Ошибка, которую я получаю, когда это происходит:

Объект NoneType не имеет атрибута 'append'.

Мои ключи являются целыми числами, а значения V1,..., Vn являются кортежами. Моя цель - создать одну пару с ключом и список значений (кортежи).

+33
источник поделиться
9 ответов

Карта и ReduceByKey

Тип ввода и тип вывода reduce должны быть одинаковыми, поэтому, если вы хотите скопировать список, вы должны map вводить список. Затем вы объединяете списки в один список.

Объединение списков

Вам понадобится метод объединения списков в один список. Phyton предоставляет несколько методов для объединения списков.

append изменяет первый список и всегда будет возвращать None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend делает то же самое, но распаковывает списки:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

Оба метода возвращают None, но вам понадобится метод, который возвращает объединенный список, поэтому просто использовать знак плюса.

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

Спарк

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)

CombineByKey

Это также можно решить с помощью combineByKey, который используется внутри для реализации reduceByKey, но он более сложный и ", используя один из специализированные комбайнеры с ключом в Spark могут быть намного быстрее" . Ваш вариант использования достаточно прост для верхнего решения.

GroupByKey

Это также можно решить с помощью groupByKey, но это уменьшает распараллеливание и, следовательно, может быть намного медленнее для больших наборов данных.

+36
источник

Я немного опаздываю на разговор, но здесь мое предложение:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
+10
источник
другие ответы

Связанные вопросы


Похожие вопросы

Вы можете использовать метод RDD groupByKey.

Input:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

Вывод:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
+9
источник

tl; dr Если вам действительно нужна операция, используйте groupByKey как предложено @MariusIon. Каждое предлагаемое здесь решение является либо неэффективным, либо, по меньшей мере, субоптимальным по сравнению с прямой группировкой.

reduceByKey со списком конкатенаций не является приемлемым решением, потому что:

  • Требуется инициализация списков O (N).
  • Каждое приложение + для пары списков требует полной копии обоих списков (O (N)), эффективно увеличивая общую сложность до O (N 2).
  • Не затрагивает ни одну из проблем, связанных с groupByKey. Количество данных, которые необходимо перетасовать, а также размер окончательной структуры, одинаковы.
  • В отличие от предложенного одним из ответов, нет разницы в уровне parallelism между реализацией с использованием reduceByKey и groupByKey.

combineByKey с list.extend является субоптимальным решением, потому что:

  • Создает объекты списка O (N) в MergeValue (это можно оптимизировать, используя list.append непосредственно в новом элементе).
  • Если он оптимизирован с помощью list.append, он в точности эквивалентен старой (Spark & ​​lt; = 1.3) реализации groupByKey и игнорирует все оптимизаторы, введенные SPARK-3074, которые обеспечивают внешнюю (на диске) группировку структуры большей, чем памяти.
+7
источник

Если вы хотите сделать reduceByKey, где тип в уменьшенных пар KV отличается от типа исходных пар KV, тогда можно использовать функцию combineByKey. То, что делает функция, - это взять пары KV и объединить их (по Key) в пары KC, где C - другой тип, чем V.

Один определяет 3 функции, createCombiner, mergeValue, mergeCombiners. Первый указывает, как преобразовать тип V в тип C, второй описывает, как объединить тип C с типом V, а последний указывает, как объединить тип C с другим типом C. Мой код создает пары KV:

Определите три функции следующим образом:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

Тогда My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

Лучший ресурс, который я нашел при использовании этой функции: http://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

Как указывали другие, a.append(b) или a.extend(b) return None. Таким образом, reduceByKey(lambda a, b: a.append(b)) возвращает None в первой паре пар KV, а затем не работает во второй паре, потому что None.append(b) терпит неудачу. Вы можете обойти это, указав отдельную функцию:

 def My_Extend(a,b):
      a.extend(b)
      return a

Затем вызовите reduceByKey(lambda a, b: My_Extend(a,b)) (использование лямбда-функции здесь может быть ненужным, но я не проверял этот случай.)

+2
источник

Ok. Надеюсь, у меня все получилось. Ваш ввод выглядит примерно так:

kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]

и вы хотите получить что-то вроде этого:

kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]

Тогда это может сделать работу (см. здесь):

d = dict()
for k, v in kv_input:
    d.setdefault(k, list()).append(v)
kmv_output = list(d.items())

Если я ошибаюсь, скажите, пожалуйста, чтобы я мог настроить это на ваши нужды.

P.S.: a.append([b]) всегда возвращается None. Вы можете наблюдать либо [b], либо a, но не результат append.

0
источник

Сообщение об ошибке связано с типом "a" в вашем закрытии.

 My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

Пусть pySpark явно оценивает a как список. Например,

My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))

Во многих случаях reduceByKey будет предпочтительнее для groupByKey, обратитесь к: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

0
источник

Я ударил эту страницу, ища пример Java для той же проблемы. (Если ваш случай похож, вот мой пример)

Трюк - вам нужно группировать ключи.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class SparkMRExample {

    public static void main(String[] args) {
        // spark context initialisation
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");
        JavaSparkContext context = new JavaSparkContext(conf);

        //input for testing;
        List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
                "Lorem Ipsum has been the industry standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
                "It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
                "It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
        JavaRDD<String> inputRDD = context.parallelize(input);


        // the map phase of word count example
        JavaPairRDD<String, Integer> mappedRDD =
                inputRDD.flatMapToPair( line ->                      // for this input, each string is a line
                        Arrays.stream(line.split("\\s+"))            // splitting into words, converting into stream
                                .map(word -> new Tuple2<>(word, 1))  // each word is assigned with count 1
                                .collect(Collectors.toList()));      // stream to iterable

        // group the tuples by key
        // (String,Integer) -> (String, Iterable<Integer>)
        JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();

        // the reduce phase of word count example
        //(String, Iterable<Integer>) -> (String,Integer)
        JavaRDD<Tuple2<String, Integer>> resultRDD =
                groupedRDD.map(group ->                                      //input is a tuple (String, Iterable<Integer>)
                        new Tuple2<>(group._1,                              // the output key is same as input key
                        StreamSupport.stream(group._2.spliterator(), true)  // converting to stream
                                .reduce(0, (f, s) -> f + s)));              // the sum of counts
        //collecting the RRD so that we can print
        List<Tuple2<String, Integer>> result = resultRDD.collect();
        // print each tuple
        result.forEach(System.out::println);
    }
}
-1
источник

Я попробовал с combByKey, вот мои шаги

combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])

combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()

Вывод:

[('A', [3, 9, 12]), ('B', [4, 10, 11])]
  • Определите функцию для объединителя, которая устанавливает аккумулятор в первую пару значений ключа, которую он встречает внутри раздела, преобразует значение, указанное на этом этапе.

  • Определите функцию, которая объединяет новое значение того же ключа с значением аккумулятора, которое было записано на шаге 1. Примечание: -конвертируйте значение для списка в этой функции, так как значение аккумулятора было преобразовано в список на первом шаге

  • Определить функцию для объединения выходов комбинаторов отдельных разделов.

-1
источник

Посмотрите другие вопросы по меткам или Задайте вопрос