2023年3月

嗨,老铁!听说你想了解一下工厂模式?没问题,这个话题可是我的拿手好戏,接下来就由我为你详细介绍设计模式中的工厂模式

工厂模式简介

首先,让我们从模式基础知识开始吧。是一种创建型设计模式,它提供了一种方法来封装对象的创建过程。它的基本思想是定义一个抽象工厂接口,该接口具有一个或多个方法用于创建对象,然后由具体工厂类实现该接口并提供实际的实现。客户端代码可以通过这些工厂方法来创建对象。其核心目标是将对象的创建与使用分离。这使得客户端代码可以更加灵活地使用对象,而不必关心对象的创建过程。如果我们需要更改对象的创建方式,我们只需要修改工厂类的实现即可,而不必修改客户端代码。可以帮助我们降低代码的耦合性,并提高代码的可维护性和可扩展性。

举个现实生活中的例子:你需要1-3周岁的宝宝奶粉的时候,只需要找到一个奶粉工厂购买就行,不需要知道牛奶从牛身上挤下来后,又经过了哪些过程,哪些处理才变成了奶粉。假如需要换成4-6周岁只需要换个工厂就行,简单快捷。

在工厂模式中,通常有三个重要的角色:抽象工厂接口、具体工厂类和产品类。抽象工厂接口定义了一个或多个工厂方法,用于创建产品对象。具体工厂类实现了抽象工厂接口,并提供实际的产品创建方法。产品类是工厂方法创建的对象,其实现方式可能因具体工厂类的不同而异。

工厂模式的范例

接下来让我们来看一下使用代码示例。下面是一个简单的工厂模式的示例,我们来看一下它是如何工作的。

/**
 * 上层抽象接口类
 */
public interface Animal {
    void speak();
}
/**
 * 具体动物实现类 狗
 */
public class Dog implements Animal {
    @Override
    public void speak() {
        System.out.println("Dog says: Bark");
    }
}
/**
 * 具体动物实现类 猫
 */
public class Cat implements Animal {
    @Override
    public void speak() {
        System.out.println("Cat says: Meow");
    }
}
/**
 * 动物生产工厂类
 */
public class AnimalFactory {
    public static Animal createAnimal(String type) {
        if ("dog".equalsIgnoreCase(type)) {
            return new Dog();
        } else if ("cat".equalsIgnoreCase(type)) {
            return new Cat();
        }
        return null;
    }
}

public class Main {
    public static void main(String[] args) {
        Animal dog = AnimalFactory.createAnimal("dog");
        dog.speak();

        Animal cat = AnimalFactory.createAnimal("cat");
        cat.speak();
    }
}

在这个示例中,我们定义了一个 Animal 接口和两个实现类:Dog 和 Cat。AnimalFactory 类是一个工厂类,它提供了一个静态方法 createAnimal 来创建 Animal 对象。在 main 方法中,我们调用 createAnimal 方法来创建 Dog 和 Cat 对象,并分别调用它们的 speak 方法。

工厂模式的好处

那么,工厂模式有什么好处呢?首先,它能将对象的创建过程与使用过程分离,使代码更易于维护和扩展。其次,它可以隐藏对象的创建细节,使客户端代码更简洁。还可以根据需要更改对象的创建方式,例如更改对象的实现类、更改对象的初始化参数等。最后,它可以将对象的创建过程集中在一个工厂类中,提高代码的复用性和可维护性。

工厂模式的不足之处

当然,工厂模式并不是万能的,它也有一些缺点。首先,如果工厂类中创建的对象过多,工厂类会变得臃肿和难以维护。其次,如果需要创建的对象有多种不同类型,那么需要为每种类型都定义一个工厂方法,这样会导致工厂类变得复杂和难以理解。最后,工厂模式可能会导致类的数量增加,使代码变得更加复杂。

工厂模式在源码中的运用

工厂模式在开源代码中得到了广泛的应用,例如 Spring Framework、Apache Commons Codec、Joda-Time、MyBatis、Hibernate、Guava 等等。这些开源框架和库都使用了工厂模式来创建对象,使代码更加灵活和易于扩展。比如,在Spring中,有很多地方都使用了工厂模式,例如BeanFactory、ApplicationContext、AutowireCapableBeanFactory等。下面以BeanFactory为例,简要介绍一下Spring源码中的工厂模式实现。 BeanFactory是Spring中最基本的工厂模式,它负责创建和管理应用程序中的对象(也称为bean)。BeanFactory定义了创建和获取bean的接口,具体的实现则由不同的子类完成。在Spring源码中,BeanFactory的主要实现类是DefaultListableBeanFactory。该类实现了BeanFactory接口,并扩展了AbstractAutowireCapableBeanFactory抽象类,提供了创建和管理bean的核心功能。

下面是一个简单的例子,展示了Spring源码中如何使用BeanFactory创建和管理bean:

public class MyBean {
    private String message;

    public void setMessage(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}
// 创建BeanFactory
DefaultListableBeanFactory beanFactory = new DefaultListableBeanFactory();

// 创建bean定义
RootBeanDefinition beanDefinition = new RootBeanDefinition(MyBean.class);
beanDefinition.getPropertyValues().add("message", "Hello, world!");

// 注册bean定义
beanFactory.registerBeanDefinition("myBean", beanDefinition);

// 获取bean实例
MyBean myBean = (MyBean) beanFactory.getBean("myBean");

// 输出bean的消息
System.out.println(myBean.getMessage());

在这个例子中,我们首先创建了一个DefaultListableBeanFactory对象作为BeanFactory。然后,我们使用RootBeanDefinition来创建一个bean定义,指定了bean的类和属性值。接着,我们将该bean定义注册到BeanFactory中,并指定了bean的名称为"myBean"。最后,我们使用BeanFactory的getBean方法来获取bean实例,输出了其消息。

总之,在Spring源码中,工厂模式的应用非常广泛,贯穿了整个框架的核心功能。如果您想深入了解Spring的工厂模式实现,可以进一步研究DefaultListableBeanFactory类及其相关子类。

结尾

总之,工厂模式是一种常见的设计模式,它能将对象的创建过程与使用过程分离,使代码更易于维护和扩展。在实际开发中,我们可以结合其他设计模式和编程技巧来使用工厂模式,例如单例模式、抽象工厂模式、依赖注入等。如果你还不了解这些设计模式,别担心,我们可以在以后的文章中一一为你介绍。
如果觉得好的话请点个赞哟。

MapReduce Shuffle源码解读

相信很多小伙伴都背过shuffle的八股文,但一直不是很理解shuffle的过程,这次我通过源码来解读下shuffle过程,加深对shuffle的理解,但是我自己还是个菜鸟,这篇博客也是参考了很多资料,如果有不对的地方,请指正。

shuffle是Map Task和 Reduce Task之间的一个阶段,本质上是一个
跨节点跨进程间的数据传输
,网上的资料也把MapReduce的过程细分为六个阶段:

  1. Collect 2. Spill 3.Merge 4.Copy 5.Merge 6. Sort

看过源码之后,这几个阶段划分的还是很有道理的,首先看看官网上对shuffle的描述图,有个印象

img

Map

首先,我们先来看看Map阶段的代码,先找到Map Task的入口(org/apache/hadoop/mapred/MapTask.java)的run方法,当map task启动时都会执行这个方法。

@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;   // 一个taskAttempt的代理,后面比较多的地方使用

  if (isMapTask()) {
    // If there are no reducers then there won't be any sort. Hence the map 
    // phase will govern the entire attempt's progress.
    if (conf.getNumReduceTasks() == 0) {
      mapPhase = getProgress().addPhase("map", 1.0f);
    } else {
      // If there are reducers then the entire attempt's progress will be 
      // split between the map phase (67%) and the sort phase (33%).
      mapPhase = getProgress().addPhase("map", 0.667f);
      sortPhase  = getProgress().addPhase("sort", 0.333f);
    }
  }

  // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳)
  TaskReporter reporter = startReporter(umbilical);

  boolean useNewApi = job.getUseNewMapper();
  initialize(job, getJobID(), reporter, useNewApi);  // 重要方法,可以认为初始化task启动的一切资源了

  // check if it is a cleanupJobTask
  if (jobCleanup) {
    runJobCleanupTask(umbilical, reporter);
    return;
  }
  if (jobSetup) {
    runJobSetupTask(umbilical, reporter);
    return;
  }
  if (taskCleanup) {
    runTaskCleanupTask(umbilical, reporter);
    return;
  }

  if (useNewApi) {
    runNewMapper(job, splitMetaInfo, umbilical, reporter); // 核心代码,点进去
  } else {
    runOldMapper(job, splitMetaInfo, umbilical, reporter);
  }
  done(umbilical, reporter);
}

这里umbilical比较难理解,我其实也没怎么搞懂,看名字是个协议,这里贴出它的注释

任务子进程用于联系其父进程的协议。父进程是一个守护进程,它轮询中央主进程以获取新的map或reduce Task,并将其作为子进程(Child)运行。孩子和父母之间的所有通信都是通过此协议进行的

看起来是个RPC,这个父进程我不是很清楚,我理解是在v1版本的话,这个可能是taskTracker,如果在v2版本(yarn)可能是ApplicationMaster,如果不对,请大神解答我的疑问。

进入runNewMapper方法

@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
  // make a task context so we can get the classes  创建Task的上下文环境
  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                getTaskID(),
                                                                reporter);
  // make a mapper  通过反射创建mapper
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
  // make the input format   通过反射创建inputFormat,来读取数据
  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
  // rebuild the input split // 获取切片信息
  org.apache.hadoop.mapreduce.InputSplit split = null;
  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
      splitIndex.getStartOffset());
  LOG.info("Processing split: " + split);

  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
    new NewTrackingRecordReader<INKEY,INVALUE>   //通过反射创建RecordReader。InputFormat是通过RecordReader来读取数据,这个也是大学问,在job submit时很关键
      (split, inputFormat, reporter, taskContext);
  
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
  org.apache.hadoop.mapreduce.RecordWriter output = null;
  
  // get an output object
  if (job.getNumReduceTasks() == 0) { // 如果没有reduce任务,则直接写入磁盘
    output = 
      new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
  } else { //  核心代码,创建collector收集器  ,点进去
    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
  }

  org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
  mapContext = 
    new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
        input, output, 
        committer, 
        reporter, split);

  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
      mapperContext = 
        new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
            mapContext);

  try {
    input.initialize(split, mapperContext);
    mapper.run(mapperContext);  // 调用我们自己实现的mapper类
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    input = null;
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}

马上进入
collect
阶段了,点进 NewOutputCollector,看看如何创建Collector

  private class NewOutputCollector<K,V>
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
    private final MapOutputCollector<K,V> collector;
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
    private final int partitions;

    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      collector = createSortingCollector(job, reporter);
      partitions = jobContext.getNumReduceTasks();  // partitions数等于reduce任务数
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value, // 向对应分区的环形缓冲区写入(k,v)
                        partitioner.getPartition(key, value, partitions));
    }

    @Override
    public void close(TaskAttemptContext context
                      ) throws IOException,InterruptedException {
      try {
        collector.flush();//核心方法,将数据刷出去。
      } catch (ClassNotFoundException cnf) {
        throw new IOException("can't find class ", cnf);
      }
      collector.close();
    }
  }

点进 creareSortingCollector

@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>  // collector是map 类型
        createSortingCollector(JobConf job, TaskReporter reporter)
  throws IOException, ClassNotFoundException {
  MapOutputCollector.Context context =
    new MapOutputCollector.Context(this, job, reporter);

  Class<?>[] collectorClasses = job.getClasses(  // 获取Map Collector的类型
    JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);  // 说到底还是MapOutputBuffer类型
  int remainingCollectors = collectorClasses.length;
  Exception lastException = null;
  for (Class clazz : collectorClasses) {
    try {
      if (!MapOutputCollector.class.isAssignableFrom(clazz)) {  // MapOutputCollector是不是clazz或者其父类
        throw new IOException("Invalid output collector class: " + clazz.getName() +
          " (does not implement MapOutputCollector)");
      }
      Class<? extends MapOutputCollector> subclazz =
        clazz.asSubclass(MapOutputCollector.class);
      LOG.debug("Trying map output collector class: " + subclazz.getName());
      MapOutputCollector<KEY, VALUE> collector =
        ReflectionUtils.newInstance(subclazz, job); //  创建collector
      collector.init(context);   // 初始化 点进去
      LOG.info("Map output collector class = " + collector.getClass().getName());
      return collector;
    } catch (Exception e) {
      String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
      if (--remainingCollectors > 0) {
        msg += " (" + remainingCollectors + " more collector(s) to try)";
      }
      lastException = e;
      LOG.warn(msg, e);
    }
  }
}

这个
init
方法十分的关键,不仅涉及了
环形缓冲区
,还涉及了
Spill

public void init(MapOutputCollector.Context context    
                 // 这个方法中,主要就是对收集器对象进行一些初始化
                ) throws IOException, ClassNotFoundException {
  job = context.getJobConf();
  reporter = context.getReporter();
  mapTask = context.getMapTask();
  mapOutputFile = mapTask.getMapOutputFile();
  sortPhase = mapTask.getSortPhase();
  spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
  partitions = job.getNumReduceTasks();
  rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

  //sanity checks
  final float spillper =
    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);  // 设置环形缓冲区溢写比例为0.8
  final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
      MRJobConfig.DEFAULT_IO_SORT_MB);  //  默认环形缓冲区大小为100M
  indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
  if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
        "\": " + spillper);
  }
  if ((sortmb & 0x7FF) != sortmb) {
    throw new IOException(
        "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
  }

  // 排序,默认使用的快排
  // 获取到排序对象,在数据由环形缓冲区溢写到磁盘中前
  // 并且排序是针对索引的,并非对数据进行排序。
  sorter = ReflectionUtils.newInstance(job.getClass(
               MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
               IndexedSorter.class), job);
  // buffers and accounting
  // 对环形缓冲区初始化,大名鼎鼎的环形缓冲区本质上是个byte数组
  int maxMemUsage = sortmb << 20;  // 将MB转换为Bytes
  // 一对kv数据有四个元数据MATE,分别是valstart,keystart,partitions,vallen,都是int类型
  // METASIZE 就是4个int转换成byte就是4*4
  maxMemUsage -= maxMemUsage % METASIZE;  // 计算METE数据存储的大小
  kvbuffer = new byte[maxMemUsage]; // 元数据数组  以byte为单位
  bufvoid = kvbuffer.length;
  kvmeta = ByteBuffer.wrap(kvbuffer)
     .order(ByteOrder.nativeOrder())
     .asIntBuffer();  // 将byte单位的kvbuffer转换成int单位的kvmeta
  setEquator(0);
  bufstart = bufend = bufindex = equator;
  kvstart = kvend = kvindex;
  // kvmeta中存放元数据实体的最大个数
  maxRec = kvmeta.capacity() / NMETA;
  softLimit = (int)(kvbuffer.length * spillper); // buffer 溢写的阈值
  bufferRemaining = softLimit;
  LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
  LOG.info("soft limit at " + softLimit);
  LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
  LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

  // k/v serialization
  comparator = job.getOutputKeyComparator();
  keyClass = (Class<K>)job.getMapOutputKeyClass();
  valClass = (Class<V>)job.getMapOutputValueClass();
  serializationFactory = new SerializationFactory(job);
  keySerializer = serializationFactory.getSerializer(keyClass);
  keySerializer.open(bb);  // 将key写入bb中 blockingbuffer
  valSerializer = serializationFactory.getSerializer(valClass);
  valSerializer.open(bb); // 将value写入bb中

  // output counters
  mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
  mapOutputRecordCounter =
    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
  fileOutputByteCounter = reporter
      .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

  // compression  压缩器,减少shuffle数据量
  if (job.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
      job.getMapOutputCompressorClass(DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  } else {
    codec = null;
  }

  // combiner
  // combiner  map端的reduce
  final Counters.Counter combineInputCounter =
    reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
  combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                         combineInputCounter,
                                         reporter, null);
  if (combinerRunner != null) {
    final Counters.Counter combineOutputCounter =
      reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
    combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
  } else {
    combineCollector = null;
  }
  // 溢写线程
  spillInProgress = false;
  minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
  spillThread.setDaemon(true); //  是个守护线程
  spillThread.setName("SpillThread"); //
  spillLock.lock();
  try {
    spillThread.start();  // 启动一个spill线程
    while (!spillThreadRunning) {
      spillDone.await();
    }
  } catch (InterruptedException e) {
    throw new IOException("Spill thread failed to initialize", e);
  } finally {
    spillLock.unlock();
  }
  if (sortSpillException != null) {
    throw new IOException("Spill thread failed to initialize",
        sortSpillException);
  }
}

从这个类,我们可以看到
环形缓冲区
的一些初始化过程,如大小为100M,开始溢写的比例是0.8,实际上,Collector是一个宏观的概念,本质上就是一个MapOutputBuffer对象。

后面还启动了
Spill
线程,不过如果是第一次进去会被阻塞这里我们先按下不表。

至此,一些map开始之前的工作已经准备好了,至于它是怎么工作的我们可以从我们写的mapper中write方法debug进去,发现其实还是
NewOutputCollector
中定义的write方法,点进去是
MapOutputBuffer
的collect方法

public synchronized void collect(K key, V value, final int partition
                                 ) throws IOException {
  reporter.progress();
  if (key.getClass() != keyClass) {
    throw new IOException("Type mismatch in key from map: expected "
                          + keyClass.getName() + ", received "
                          + key.getClass().getName());
  }
  if (value.getClass() != valClass) {
    throw new IOException("Type mismatch in value from map: expected "
                          + valClass.getName() + ", received "
                          + value.getClass().getName());
  }
  if (partition < 0 || partition >= partitions) {
    throw new IOException("Illegal partition for " + key + " (" +
        partition + ")");
  }
  checkSpillException();
  bufferRemaining -= METASIZE;  // 新数据collect时,先将元数据长度前去,之后判断
  if (bufferRemaining <= 0) { // 说明已经超过阈值了
    // start spill if the thread is not running and the soft limit has been
    // reached
    spillLock.lock();
    try {
      do {
        // 首次spill时,spillInProgress是false
        if (!spillInProgress) {
          final int kvbidx = 4 * kvindex; // 单位是byte
          final int kvbend = 4 * kvend;  // 单位是byte
          // serialized, unspilled bytes always lie between kvindex and
          // bufindex, crossing the equator. Note that any void space
          // created by a reset must be included in "used" bytes
          final int bUsed = distanceTo(kvbidx, bufindex);  // 剩下可以写入的空间大小
          final boolean bufsoftlimit = bUsed >= softLimit;  // true说明已经超过softLimit了
          if ((kvbend + METASIZE) % kvbuffer.length !=
              equator - (equator % METASIZE)) {
            // spill finished, reclaim space
            resetSpill();
            bufferRemaining = Math.min(
                distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                softLimit - bUsed) - METASIZE;  // 这里是重新选择equator吧,但是计算方式不了解
            continue;
          } else if (bufsoftlimit && kvindex != kvend) {
            // spill records, if any collected; check latter, as it may
            // be possible for metadata alignment to hit spill pcnt
            startSpill();  //开始溢写,里面唤醒spill线程  
            final int avgRec = (int)
              (mapOutputByteCounter.getCounter() /
              mapOutputRecordCounter.getCounter());
            // leave at least half the split buffer for serialization data
            // ensure that kvindex >= bufindex
            final int distkvi = distanceTo(bufindex, kvbidx);
            final int newPos = (bufindex +
              Math.max(2 * METASIZE - 1,
                      Math.min(distkvi / 2,
                               distkvi / (METASIZE + avgRec) * METASIZE)))
              % kvbuffer.length;
            setEquator(newPos);
            bufmark = bufindex = newPos;
            final int serBound = 4 * kvend;
            // bytes remaining before the lock must be held and limits
            // checked is the minimum of three arcs: the metadata space, the
            // serialization space, and the soft limit
            bufferRemaining = Math.min(
                // metadata max
                distanceTo(bufend, newPos),
                Math.min(
                  // serialization max
                  distanceTo(newPos, serBound),
                  // soft limit
                  softLimit)) - 2 * METASIZE;
          }
        }
      } while (false);   // 这是什么写法?????
    } finally {
      spillLock.unlock();
    }
  }
  // 直接写入buffer,不涉及spill
  try {
    // serialize key bytes into buffer
    int keystart = bufindex;
    keySerializer.serialize(key);
    // key所占空间被bufvoid分隔,则移动key,
    // 将其值放在连续的空间中便于sort时key的对比
    if (bufindex < keystart) {
      // wrapped the key; must make contiguous
      bb.shiftBufferedKey();
      keystart = 0;
    }
    // serialize value bytes into buffer
    final int valstart = bufindex;
    valSerializer.serialize(value);
    // It's possible for records to have zero length, i.e. the serializer
    // will perform no writes. To ensure that the boundary conditions are
    // checked and that the kvindex invariant is maintained, perform a
    // zero-length write into the buffer. The logic monitoring this could be
    // moved into collect, but this is cleaner and inexpensive. For now, it
    // is acceptable.
    bb.write(b0, 0, 0);

    // the record must be marked after the preceding write, as the metadata
    // for this record are not yet written
    int valend = bb.markRecord();

    mapOutputRecordCounter.increment(1);
    mapOutputByteCounter.increment(
        distanceTo(keystart, valend, bufvoid)); //计数器+1

    // write accounting info
    kvmeta.put(kvindex + PARTITION, 
              );
    kvmeta.put(kvindex + KEYSTART, keystart);
    kvmeta.put(kvindex + VALSTART, valstart);
    kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
    // advance kvindex
    kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
  } catch (MapBufferTooSmallException e) {
    LOG.info("Record too large for in-memory buffer: " + e.getMessage());
    spillSingleRecord(key, value, partition);  // 长record就直接写入磁盘
    mapOutputRecordCounter.increment(1);
    return;
  }
}

这里首先最重要的方法就是第46行的startSpill()方法,这里点进去会发现一个spillReady.signal(),这就是唤醒之前因spillReady.await()方法阻塞的spill线程,这里的spillReady就是可重入锁,这里spill开始正式工作,这里涉及了环形缓冲区如何写和如何读,会比较抽象,我之后再写一篇关于环形缓冲区的文章。

这里代码就是
Collect
,本质上就是map端将输出的(k,v)数据和它的元数据写入MapOutputBuffer中。

此外,这个代码里也有唤醒spill线程的代码,找到SpillThread的run方法,很明显里面有个很重要的方法
sortAndSpill

private void sortAndSpill() throws IOException, ClassNotFoundException,
                                   InterruptedException {
  //approximate the length of the output file to be the length of the
  //buffer + header lengths for the partitions
  final long size = distanceTo(bufstart, bufend, bufvoid) +
              partitions * APPROX_HEADER_LENGTH;  // 写出长度
  FSDataOutputStream out = null;
  FSDataOutputStream partitionOut = null;
  try {
    // create spill file
    final SpillRecord spillRec = new SpillRecord(partitions);
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);// 默认是output/spillx.out
    out = rfs.create(filename);// 创建分区文件

    final int mstart = kvend / NMETA;
    final int mend = 1 + // kvend is a valid record
      (kvstart >= kvend
      ? kvstart
      : kvmeta.capacity() + kvstart) / NMETA;
    // 对元数据进行排序,先按照partition进行排序,再按照key值进行排序
    // 二次排序,排的是元数据部分
    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
    int spindex = mstart;
    final IndexRecord rec = new IndexRecord();
    final InMemValBytes value = new InMemValBytes();
    for (int i = 0; i < partitions; ++i) {//循环分区
      // 溢写时的临时文件 类型是IFile
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
        writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                  spilledRecordsCounter);
        if (combinerRunner == null) {
          // spill directly
          DataInputBuffer key = new DataInputBuffer();
          // 写入相同的partition数据
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
            final int kvoff = offsetFor(spindex % maxRec);
            int keystart = kvmeta.get(kvoff + KEYSTART);
            int valstart = kvmeta.get(kvoff + VALSTART);
            key.reset(kvbuffer, keystart, valstart - keystart);
            getVBytesForOffset(kvoff, value);
            writer.append(key, value);
            ++spindex;
          }
        } else {    // 进行combiner,避免小文件问题
          int spstart = spindex;
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex % maxRec)
                        + PARTITION) == i) {
            ++spindex;
          }
          // Note: we would like to avoid the combiner if we've fewer
          // than some threshold of records for a partition
          if (spstart != spindex) {
            combineCollector.setWriter(writer);
            RawKeyValueIterator kvIter =
              new MRResultIterator(spstart, spindex);
            combinerRunner.combine(kvIter, combineCollector);
          }
        }

        // close the writer
        writer.close();  ///  将文件写入本地磁盘中,不是HDFS上
        if (partitionOut != out) {
          partitionOut.close();
          partitionOut = null;
        }

        // record offsets
        // 记录当前partition i的信息写入索文件rec中
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
        rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
        //spillRec中存放了spill中partition的信息
        spillRec.putIndex(rec, i);

        writer = null;
      } finally {
        if (null != writer) writer.close();
      }
    }

    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);  // 将内存中的index文件写入磁盘
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null) out.close();
    if (partitionOut != null) {
      partitionOut.close();
    }
  }
}

很明显,spill有两个临时文件生成,一个是(k,v)文件,它保存在默认路径是output/spill{x}.out文件中,注意,这段代码里并没有明显的将(k,v)文件写入磁盘的代码,这些代码在writer.close()中实现。而另一个明显写入磁盘的是
spillRec.writeToFile(indexFilename, job)
,这个存放的每个partition的index。

在SpillThread在辛辛苦苦进行sortAndSpill工作时,map Task 也不断地产生新(k,v)写入MapOutputBuffer中,环形缓冲区的读线程和写线程同时工作!!怎么避免冲突呢?答案是反向写。

红色箭头是写(k,v)数据,蓝色箭头是写元数据,紫色是预留的百分之20的空间不能写,绿色是已经写入的数据部分,正在被spill线程读取操作。

至此,
spill

sort
阶段算是大功告成,那么还有个疑问,如果MapOutPutBuffer还有部分数据,但这部分数据并没有达到spill的标准,怎么办呢?还是回到
NewOutputCollector
部分中
close
方法,里面有MapOutputBuffer的flush方法会解决这个问题。

最后就是Map Task中Shuffle过程的最后一个阶段
Merge
,这部分有点多就不贴代码了,感兴趣的同学可以查看MapOutputBuffer中mergeParts方法,这个方法在上面的flush方法里调用,该作用是合并spill阶段产生出来的out文件和index文件。

Merge
过程目的很简单,但是过程确实很复杂。首先,
Merge
过程会扫描目录获取out文件的地址,存放一个数组中,同时也会获得index文件,存放到另一个数组中。好奇的同学可能再想
既然又要读入到内存中,当初为啥要刷进磁盘里呢,这不是闲着没事干嘛
,确实,这是MapReduce的缺陷,太过于批处理了,磁盘IO也限制了它的其他可能性,比如机器学习需要反复迭代,MapReduce就做不了这个,但是这一步确实很有必要的,因为早期内存很贵,不是每个人都是土豪的,考虑到OOM的风险,把所有的(K,V)数据和index数据刷进磁盘是非常有必要的,但是后面又可以全读入内存,那是因为
缓存缓冲区
这个大东西已经不再使用,内存就富裕起来了。

同时,
Merge
过程还涉及到
归并算法
,这个并不是简单的
归并
过程,而是一个很复杂的过程,因为考虑到一个partition并不只存在一种key,所以源码里有着相当复杂的过程同时注释也很迷惑人,注释里有优先队列和Heap的字样,看代码的时候可能以为采用了堆排序,有兴趣的同学可以看看,并不是太重要(ps我也看得一知半解)。

Reduce

Reduce部分我就长话短说,只看重点了。

同样,第一步就是查看 Reduce Task的run方法,这是启动redduce逻辑的自动过程

 public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
   throws IOException, InterruptedException, ClassNotFoundException {
   job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

   if (isMapOrReduce()) { // reduce的三个阶段
     copyPhase = getProgress().addPhase("copy");
     sortPhase  = getProgress().addPhase("sort");
     reducePhase = getProgress().addPhase("reduce");
   }
   // start thread that will handle communication with parent
   // 启动任务状态汇报器,其内部有周期性的汇报线程(状态汇报和心跳)
   TaskReporter reporter = startReporter(umbilical);
   
   boolean useNewApi = job.getUseNewReducer();
   initialize(job, getJobID(), reporter, useNewApi);//核心代码,初始化任务

   // check if it is a cleanupJobTask
   if (jobCleanup) {
     runJobCleanupTask(umbilical, reporter);
     return;
   }
   if (jobSetup) {
     runJobSetupTask(umbilical, reporter);
     return;
   }
   if (taskCleanup) {
     runTaskCleanupTask(umbilical, reporter);
     return;
   }
   
   // Initialize the codec
   codec = initCodec();
   RawKeyValueIterator rIter = null;
   ShuffleConsumerPlugin shuffleConsumerPlugin = null;
   
   Class combinerClass = conf.getCombinerClass();
   CombineOutputCollector combineCollector = 
     (null != combinerClass) ? 
    new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

   Class<? extends ShuffleConsumerPlugin> clazz =
         job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
// 设置shuffle插件
   shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
   LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

   ShuffleConsumerPlugin.Context shuffleContext = 
     new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                 super.lDirAlloc, reporter, codec, 
                 combinerClass, combineCollector, 
                 spilledRecordsCounter, reduceCombineInputCounter,
                 shuffledMapsCounter,
                 reduceShuffleBytes, failedShuffleCounter,
                 mergedMapOutputsCounter,
                 taskStatus, copyPhase, sortPhase, this,
                 mapOutputFile, localMapFiles);
   shuffleConsumerPlugin.init(shuffleContext);
   // 执行shuffle过程中的远程数据拉取,在拉取的过程中
   // 内部 启动 map-completion event fetch线程 获取map端完成的event信息
   // 在开启默认5个的fetch 线程 拉取数据,里面核心函数就是一直点进去是doShuffle,有两种一种是in-memory另一种就是on-disk
   // 超出shuffle内存就merge到disk
   // shuffle插件内部有个mergeMangager 会在合适的时候就是快超过shuffle内存缓存的时候,启动merge线程

   // 这个表面是一次网络IO,本质上是一个RPC,通过umbilical代理获取已经完成的MapTask任务的taskAttempt的ID,存入schedule中,为后面shuffle做准备

   rIter = shuffleConsumerPlugin.run();

   // free up the data structures
   // 一个sort set,是TreeSet数据结构·
   mapOutputFilesOnDisk.clear();
   
   sortPhase.complete();                         // sort is complete
   setPhase(TaskStatus.Phase.REDUCE); 
   statusUpdate(umbilical);
   Class keyClass = job.getMapOutputKeyClass();
   Class valueClass = job.getMapOutputValueClass();
   RawComparator comparator = job.getOutputValueGroupingComparator();

   if (useNewApi) {
     runNewReducer(job, umbilical, reporter, rIter, comparator, 
                   keyClass, valueClass); // 执行reduce操作,(用户定义的逻辑)
   } else {
     runOldReducer(job, umbilical, reporter, rIter, comparator, 
                   keyClass, valueClass);
   }

   shuffleConsumerPlugin.close();
   done(umbilical, reporter);
 }

Reduce Task的重点比较清晰,就是57行的初始化
shuffleConsumerPlugin
这个Shuffle插件,以及66行运行这个插件,让他拉取数据。

初始化shuffle插件过程中,有两个组件一个是schedule调度器,另一个就是MergeManager,这个MergeManger有大用处。

接下来查看run方法

public RawKeyValueIterator run() throws IOException, InterruptedException {
  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
  // on the ApplicationMaster when a thundering herd of reducers fetch events
  // TODO: This should not be necessary after HADOOP-8942
  int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
      MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
  int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

  // Start the map-completion events fetcher thread
  // 启动 一个 event fetcher线程 获取map端完成的event信息
  final EventFetcher<K,V> eventFetcher = 
    new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
        maxEventsToFetch);
  eventFetcher.start();
  
  // Start the map-output fetcher threads  启动fetch线程
  // fetch 线程 远程从map端拉取对应partition的数据
  boolean isLocal = localMapFiles != null;
  final int numFetchers = isLocal ? 1 :
    jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
  Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
  if (isLocal) {
    fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
        merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
        localMapFiles);
    fetchers[0].start();
  } else {
    for (int i=0; i < numFetchers; ++i) {
      fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, 
                                     reporter, metrics, this, 
                                     reduceTask.getShuffleSecret());
      fetchers[i].start();
    }
  }
  
  // Wait for shuffle to complete successfully
  while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
    reporter.progress();
    
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
  }

  // Stop the event-fetcher thread
  eventFetcher.shutDown();
  
  // Stop the map-output fetcher threads
  for (Fetcher<K,V> fetcher : fetchers) {
    fetcher.shutDown();
  }
  
  // stop the scheduler
  scheduler.close();

  copyPhase.complete(); // copy is already complete
  taskStatus.setPhase(TaskStatus.Phase.SORT);
  reduceTask.statusUpdate(umbilical);

  // Finish the on-going merges...
  RawKeyValueIterator kvIter = null;
  try {
    kvIter = merger.close();
  } catch (Throwable e) {
    throw new ShuffleError("Error while doing final merge " , e);
  }

  // Sanity check
  synchronized (this) {
    if (throwable != null) {
      throw new ShuffleError("error in shuffle in " + throwingThreadName,
                             throwable);
    }
  }
  
  return kvIter;
}

重点就是两

线程,一种是Event fetch,另一种是fetch线程

首先,event fetch线程的作用是获取TaskAttempt的ID等信息,存入schedule中,方面以后Shuffle尤其是sort时使用,本质上这是个RPC,注意看event fetch初始化时的参数里有个
umbilical
代理对象。

而fetch线程的工作原理是通过HTTP向各个Map任务拖取它所需要的数据(至于HTTP和RPC的区别有兴趣的同学可以查查),里面最核心的方法是
doShuffle
(一直点进去才能找到这个),在Copy的同时还会MergeSort。doShuffle它有两个实现,一个是In-memory,另一个是On-disk有两个实现(同样的,Merge也分为这两种)。是基于考虑到拉取相同的key值可能有很大的数据量,那么有必要写入磁盘中了,但为了减少这种情况,在达到
缓存区
(默认是64K)阈值的时候会将数据merge(如果太大的话就在磁盘中merge),Merge的工作就是交给Shuffle插件的MergeManager管理。

所以,copy和Merge和Sort是重叠过程的。

至此,Shuffle部分的源码基本讲解完成。

参考资料

  1. MapReduce ReduceTask源码解析

  2. MapReduce中的shuffle详解

  3. 环形缓冲区

本文先引入一个例子,星期天你和女朋友去逛街,看到一家奶茶店。女朋友想喝奶茶了,你就去买了杯奶茶,然后你问了一下价格。店员说奶茶0.9元一杯。然后你给了1元钱。这个时候你忽然问了一下女友。服务员该找我们多少钱呢?女友说你个小傻瓜当然是0.1元啊。作为一个”严谨“的程序猿,这时你拿起电脑写了个简单计算如下:

看到计算结果不是0.1这个时候你有点慌了,女朋友怎么会错呢?但是你没有犹豫女朋友说的怎么会错呢,女朋友说的一定是对的。一定是电脑计算错了,然后你毫不犹豫把电脑扔了。说了一句对对对该找0.1元。【PS奶茶目前当然不可能1元钱,本文为了演示计算效果所以定义奶茶1元,因为不是所有的浮点运算都会有这样的问题】。

千万别小看这些一点点的计算误差,假如我说假如全国人民一人送你0.1元你能实现财富自由了【呸,别做白日梦了好好搬砖去吧】

好了聊文章的主题BigDecimal这个知识点,这个知识点应该很多人都知道了,我感觉很有有用就聊一下。阿里巴巴开发手册中提到:

使用BigDecimal来定义值,再进行浮点数的运算操作。

注意:BigDecimal(double)存在精度损失的风险,在精确计算或比较的场景中可能会导致业务逻辑异常。如:BigDecimal g = new BigDecimal(0.1f)。实际存储的值为:0.10000000149011611938 。

      float a = 1.0f -0.9f;float b = 0.9f-0.8f;
System.out.println(a);
System.out.println(b);
System.out.println(a
==b);

为什么浮点数 float 或 double 运算的时候会有精度丢失的风险呢?

这个和计算机保存浮点数的机制有很大关系。我们知道计算机是二进制的,而且计算机在表示一个数字时,宽度是有限的,无限循环的小数存储在计算机时,只能被截断,所以就会导致小数精度发生损失的情况。

那怎么解决精度丢失的问题呢?那就是本文要说的BigDecimal。用它可以实现对浮点的计算,解决精度丢失的问题。如下用差不多相同的代码结果就是对的【PS一定要记住女朋友说的就是对的就是0.1】。

BigDecimal创建方法,请先看注意事项:

阿里官方:【强制】禁止使用构造方法BigDecimal(double)的方式把double值转换为BigDecimal对象

正确的创建方法:优先推荐入参为String的构造方法,或者使用BigDecimal的valueOf方法,此方法内部其实执行了Double的toString。而Double的toString按double实际能表达的精度对尾数进行了拦截。如下推荐使用:

BigDecimal a = new BigDecimal("0.1");

BigDecimal  b = BigDecimal.valueOf(0.1);

开发中用到计算也就是加减乘除,BigDecimal中相关方法如下:

add 方法用于将两个 BigDecimal 对象相加。subtract 方法用于将两个 BigDecimal 对象相减。multiply 方法用于将两个 BigDecimal 对象相乘,divide 方法用于将两个 BigDecimal 对象相除。

这里需要注意的是,在我们使用 divide 方法的时候尽量使用 3 个参数方法。其中 scale 表示要保留几位小数,roundingMode 代表保留规则。具体可以看源码,源码中介绍的很清楚了,并且给了各种例子如下:

大小的比较

a.compareTo(b) : 返回 -1 说明 a 小于 b,0 说明a 等于 b , 1 说明 a 大于 b。用法如下:

注意:BigDecimal的比较不能使用equals进行比较【原因还是因为精度的问题】,一定要使用compareTo比较,如下:

BigDecilmal计算相关的工具类。

add方法:

public staticBigDecimal add(Number v1, Number v2) {return add(newNumber[]{v1, v2});
}
public staticBigDecimal add(Number... values) {if(ArrayUtil.isEmpty(values)) {returnBigDecimal.ZERO;
}
Number value
= values[0];
BigDecimal result
= new BigDecimal(null == value ? "0": value.toString());for (int i = 1; i < values.length; i++) {
value
=values[i];if (null !=value) {
result
= result.add(newBigDecimal(value.toString()));
}
}
returnresult;
}

substract方法:

public staticBigDecimal sub(Number v1, Number v2) {return sub(newNumber[]{v1, v2});
}
public staticBigDecimal sub(Number... values) {if(ArrayUtil.isEmpty(values)) {returnBigDecimal.ZERO;
}
Number value
= values[0];
BigDecimal result
= new BigDecimal(null == value ? "0": value.toString());for (int i = 1; i < values.length; i++) {
value
=values[i];if (null !=value) {
result
= result.subtract(newBigDecimal(value.toString()));
}
}
returnresult;
}

multiply

乘法:

public staticBigDecimal mul(Number v1, Number v2) {return mul(newNumber[]{v1, v2});
}
public staticBigDecimal mul(Number... values) {if(ArrayUtil.isEmpty(values)) {returnBigDecimal.ZERO;
}
Number value
= values[0];
BigDecimal result
= new BigDecimal(null == value ? "0": value.toString());for (int i = 1; i < values.length; i++) {
value
=values[i];
result
= result.multiply(new BigDecimal(null == value ? "0": value.toString()));
}
returnresult;
}

divide 除法:

public staticBigDecimal div(BigDecimal v1, BigDecimal v2,Integer scale,RoundingMode roundingMode) {if (null ==v1) {returnBigDecimal.ZERO;
}
if (scale < 0) {
scale
= -scale;
}
returnv1.divide(v2, scale, roundingMode);
}

谢谢点赞,也欢迎你分享给其他的开发者,让更多的人知道。当然也欢迎未关注的小伙伴关注。

11

数据库服务器 CPU 近 100% 问题几乎每年都要发生一次,上次发生在
去年1月31日
,每次都是通过主备切换或者重启实例解决,数据库服务用的是阿里云 RDS SQL Server 2016 标准版。

今年这个问题真会找时间,在园子非常困难的时期,在昨天刚刚因
疯狂蜘蛛袭击
被整得精疲力尽之后,在星期天早上难得睡个懒觉的时候,在今天早上 8:30 左右来袭。

非常抱歉!今天 8:30-10:00 期间,由于数据库服务器 CPU 近 100% 问题引发全站故障,再加上我们没有及时处理,由此给大家带来了很大的麻烦,请大家谅解。

1天之内连发2个故障公告,也是很少见的情况,这段时间真是困难重重难上见难。

当前园子正处于难关,我们正在努力过关斩难。巧合的是,
十年前的3月
,园子刚刚搬上阿里云时也经历了一次很大的难关。

A
bp(net core)+easyui+efcore实现仓储管理系统目录

一、简介

微软从.NET 5开始进行.NET 统一计划,.NET 5是继3.1之后.NET Core的下一个主要版本。微软从名称中删除了“Core”,是为了强调这是 .NET未来的主要实现。与 .NET Core 或 .NET Framework 相比,.NET 5 会支持类型更多的应用和平台。.NET 5不会替换 .NET Framework。

.NET 6 提供 .NET 统一计划的最终部分,该计划在 .NET 5中启动。 .NET 6在移动、桌面、IoT 和云应用之间统一了SDK、基础库和运行时。除了这方面的统一以外,.NET 6 生态系统还提供了以下功能:

简化开发:轻松入门。 C# 10 中的新语言功能可减少需要编写的代码量。 利用 Web 堆栈和最小 API 的投资,可以轻松地快速编写更小、更快速的微服务。

更佳的性能:.NET 6 是最快的完整堆栈 Web 框架,如果在云中运行,则会降低计算成本。

终极工作效率:.Net 6 和 Visual Studio 2022 提供热重载、新的 git 工具、智能代码编辑、可靠的诊断和测试工具以及更好的团队协作。

.NET 7已经在2022年11月发布了,.NET8已经在路上了,预计将在今年的11月份发布。.NET小步快走,每年都在进步。

从2019年5月至2020年12月,花了一年半时间写了
abp(net core)+easyui+efcore实现仓储管理系统
系列文章。本系列是介绍基于ABP+EasyUI的Web开发框架的实现一个仓储管理系统的实例,主要包括一些ABP的介绍,ASP.NET MVC Core技术、EasyUI技术、JQuery技术、WebAPI 技术,以及一些我对整体框架改造的基础性东西,力求更加稳定、通用、高效、简洁,用最少的代码做尽可能多的事情。当时我所使用的ABP版本是4.3,是基于ASP.NET CORE 2.X的版本。

2年时间过去了,ASP.NTE CORE 2.x微软已经不在对其进行支持,已经结束了支持周期。我准备将ABP升级到7.3,这是基于NET6的一个版本,NET6是一个长期支持版本,其支持周期结束时间在2024年11月份。easyui升级到1.10.12。

做为IT从业人员,我们在不同的公司或相同的公司基于不同的需求创建着各种应用,这些应用都有一些通用和相似的结构。这些通用的结构包括授权,验证,异常处理,日志,本地化,数据库连接管理,设置管理,审计日志等。

我们在创建与实现各种应用时,都会试着应用各种新的最佳实践,比如分层和模块化架构,领域驱动设计(DDD),依赖注入等等。

ABP是一个开源的且文档友好的应用框架,它不仅仅是一个框架,更提供了一个基于DDD和最佳实践的健壮的体系模型。

接下来我们进行升级,
在浏览器中输入
https://aspnetboilerplate.com/Templates
。然后
依次按下图选择,输入验证码,之后点击“create my project”按钮。下载项目模板。

至于你是用新的项目模板替换旧的项目模板,还是将项目中的代码文件拷贝到新的项目中,这个就由你自己决定了。

二、升级过程遇到的问题

接下来介绍一下升级过程中遇到的问题。

第一个问题,原来代码中我们使用的ABP基类提供的GetAll()、Create()、Update()此类方法 ,在ABP7.3中都已经不提供了,现在ABP7.3中提供的是异步方法,在以上方法上加上Async后缀。即GetAll()变为了GetAllAsync、Create()变为了CreateAsync、Update()变为了UpdateAsync。

第二个问题,用Visual Studio 2022打开了我们的ABP.TPLMS项目,在解决方案资源管理器中,将ABP.TPLMS.WEB.MVC项目设置为启动项目,按F5启动,浏览器中呈现的登录页面没有样式。如下图。整个登录界面在页面的左上角,不在页面的正中间,而且没有css。

这是由于ABP的客户端库,需要我们自己进行还原。在Visual Studio 2022的解决方案资源管理器中,找到ABP.TPLMS.WEB.MVC项目,在这个项目中有一个libman.json文件。

使用鼠标左键选中这个文件,然后单击鼠标右键,在弹出菜单中选择“还原客户端库”,如下图。在这个还原过程中,需要一直保持网络畅通。请对照此文件中的内容,查看wwwroot\libs目录下的所还原的客户端库是否完整。如果不完整,请再次进行还原。

客户端库还原成功之后,在Visual Studio 2022的解决方案资源管理器中,将ABP.TPLMS.WEB.MVC项目设置为启动项目,按F5键运行项目,浏览器中呈现的登录页面如下图。

第三个问题:AutoMapper 9版本之后取消了静态方法,所以造成了以下错误。

我首先想到的解决方法就是注入,将IMapper注入到这个类中,于是我在Visual Studio 2022 的解决方案资源管理器中,找到ABP.TPLMS.Application项目中的Modules文件夹中的ModuleAppService.cs文件,修改了ModuleAppService的构造方法 ,将IMapper注入。将代码中原来的Mapper.Map都修改为m_map.Map。编译是通过了,但是在后续的升级过程中却遇到了另外的一个问题,一个错误,这个问题在后续来解决。先按编译通过的方式来修改代码,具体代码如下:

public classModuleAppService : ApplicationService, IModuleAppService
{
private readonly IRepository<Module>_moduleRepository;

AutoMapper.IMapper m_map;
public ModuleAppService(IRepository<Module>moduleRepository, IMapper map)
{

_moduleRepository
=moduleRepository;

m_map
=map;

}

}